/hg/icedtea6: Liveconnect message processing design changes.
dbhole at icedtea.classpath.org
dbhole at icedtea.classpath.org
Mon Mar 15 10:35:29 PDT 2010
changeset 3aa674c581d7 in /hg/icedtea6
details: http://icedtea.classpath.org/hg/icedtea6?cmd=changeset;node=3aa674c581d7
author: Deepak Bhole <dbhole at redhat.com>
date: Mon Mar 15 13:35:18 2010 -0400
Liveconnect message processing design changes.
- Redesigned message consumption/processing model to allow arbitrary
number of applets to load without increasing MAX_WORKERS count.
- Implemented priority queuing to allow blocking threads to get
responses first.
- Made message consumption non-blocking and fixed thread-safety
issues therein
diffstat:
5 files changed, 297 insertions(+), 80 deletions(-)
ChangeLog | 43 +
plugin/icedteanp/java/sun/applet/PluginAppletViewer.java | 2
plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java | 267 ++++++++--
plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java | 38 +
plugin/icedteanp/java/sun/applet/PluginStreamHandler.java | 27 -
diffs (485 lines):
diff -r 7fd598f9cf42 -r 3aa674c581d7 ChangeLog
--- a/ChangeLog Fri Mar 12 16:46:26 2010 +0000
+++ b/ChangeLog Mon Mar 15 13:35:18 2010 -0400
@@ -1,3 +1,46 @@ 2010-03-12 Andrew John Hughes <ahughes
+2010-03-13 Deepak Bhole <dbhole at redhat.com>
+
+ * plugin/icedteanp/java/sun/applet/PluginAppletViewer.java
+ (requestPluginCookieInfo): Register cookie info as a priority message.
+ (requestPluginProxyInfo): Register proxy info as a priority message.
+ * plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java: Re-designed
+ message consumption to implement priority queuing and parallel
+ initialization limits so that an arbitrary number of applets can load
+ without blocking one another, all in a thread-safe manner.
+ (registerPriorityWait): New method. Registers a string that is
+ considered a "priority" string, which gets delegated to dedicated worker
+ threads.
+ (unRegisterPriorityWait): Unregisters a string so that it is no longer
+ a priority.
+ (PluginMessageConsumer): Do not create any workers when starting.
+ (consume): Remove method. Consumption is now done in a separate dedicated
+ thread to prevent blocking.
+ (getPriorityStrIfPriority): New method. If the given message is priority,
+ return why (i.e the 'priority string'it matched).
+ (isInInit): New method. Returns if the plugin is currently initializing an
+ applet.
+ (addToInitWorkers): New method. Adds given worker to list of workers
+ currently initializting applets.
+ (okayToProcess): New method. Returns whether or not it is okay to process
+ the given applet.
+ (notifyWorkerIsFree): New method. Notifies this class that a worker has
+ just become free.
+ (queue): Queues the given message for consumption.
+ (ConsumerThread): New protected inner (thread) class. Responsible for
+ consuming messages in the queue, and regueuing them if consumption if not
+ possible.
+ (getFreeWorker): Changed to be non-blocking, and return either a priority
+ worker or a normal worker depending on what is requested.
+ * plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java
+ (PluginMessageHandlerWorker): Set new priority and consumer variables.
+ (busy): Make thread-safe by waiting on same property that free() waits on.
+ (free): Make thread-safe by waiting on same property that busy() waits on.
+ (isPriority): New method. Returns of worker is a priority worker.
+ (isFree): Made thread-safe, and accounts for priority.
+ * plugin/icedteanp/java/sun/applet/PluginStreamHandler.java
+ (startProcessing): Call consumer.queue() instead of consumer.consume().
+ (postMessage): Remove unused method.
+
2010-03-12 Andrew John Hughes <ahughes at redhat.com>
NetX/plugin build sync (1/3):
diff -r 7fd598f9cf42 -r 3aa674c581d7 plugin/icedteanp/java/sun/applet/PluginAppletViewer.java
--- a/plugin/icedteanp/java/sun/applet/PluginAppletViewer.java Fri Mar 12 16:46:26 2010 +0000
+++ b/plugin/icedteanp/java/sun/applet/PluginAppletViewer.java Mon Mar 15 13:35:18 2010 -0400
@@ -1281,6 +1281,7 @@ import com.sun.jndi.toolkit.url.UrlUtil;
return null;
}
+ PluginMessageConsumer.registerPriorityWait(reference);
streamhandler.postCallRequest(request);
streamhandler.write(request.getMessage());
try {
@@ -1326,6 +1327,7 @@ import com.sun.jndi.toolkit.url.UrlUtil;
"plugin PluginProxyInfo reference " + reference + " " +
requestURI, reference);
+ PluginMessageConsumer.registerPriorityWait(reference);
streamhandler.postCallRequest(request);
streamhandler.write(request.getMessage());
try {
diff -r 7fd598f9cf42 -r 3aa674c581d7 plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java
--- a/plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java Fri Mar 12 16:46:26 2010 +0000
+++ b/plugin/icedteanp/java/sun/applet/PluginMessageConsumer.java Mon Mar 15 13:35:18 2010 -0400
@@ -38,82 +38,257 @@ package sun.applet;
package sun.applet;
import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
import java.util.LinkedList;
-
-import sun.applet.AppletSecurity;
+import java.util.Set;
class PluginMessageConsumer {
- int MAX_WORKERS = 20;
+ private static int MAX_PARALLEL_INITS = 1;
+
+ // Each initialization requires 5 responses (tag, handle, width, proxy, cookie)
+ // before the message stack unlocks/collapses. This works out well because we
+ // want to allow upto 5 parallel tasks anyway
+ private static int MAX_WORKERS = MAX_PARALLEL_INITS*5;
+ private static int PRIORITY_WORKERS = MAX_PARALLEL_INITS*2;
+
+ private static Hashtable<Integer, PluginMessageHandlerWorker> initWorkers = new Hashtable<Integer, PluginMessageHandlerWorker>(2);
+
LinkedList<String> readQueue = new LinkedList<String>();
+ private static LinkedList<String> priorityWaitQueue = new LinkedList<String>();
ArrayList<PluginMessageHandlerWorker> workers = new ArrayList<PluginMessageHandlerWorker>();
PluginStreamHandler streamHandler = null;
AppletSecurity as;
+ ConsumerThread consumerThread = new ConsumerThread();
+ /**
+ * Registers a reference to wait for. Responses to registered priority
+ * references get handled by priority worker if normal workers are busy.
+ *
+ * @param reference The reference to give priority to
+ */
+ public static void registerPriorityWait(Long reference) {
+ PluginDebug.debug("Registering priority for reference " + reference);
+ registerPriorityWait("reference " + reference.toString());
+ }
+
+ /**
+ * Registers a string to wait for.
+ *
+ * @param searchString the string to look for in a response
+ */
+ public static void registerPriorityWait(String searchString) {
+ PluginDebug.debug("Registering priority for string " + searchString);
+ synchronized (priorityWaitQueue) {
+ if (!priorityWaitQueue.contains(searchString))
+ priorityWaitQueue.add(searchString);
+ }
+ }
+
+ /**
+ * Unregisters a priority reference to wait for.
+ *
+ * @param reference The reference to remove
+ */
+ public static void unRegisterPriorityWait(Long reference) {
+ unRegisterPriorityWait(reference.toString());
+ }
+
+ /**
+ * Unregisters a priority string to wait for.
+ *
+ * @param searchString The string to unregister from the priority list
+ */
+ public static void unRegisterPriorityWait(String searchString) {
+ synchronized (priorityWaitQueue) {
+ priorityWaitQueue.remove(searchString);
+ }
+ }
+
+ /**
+ * Returns the reference for this message. This method assumes that
+ * the message has a reference number.
+ *
+ * @param The message
+ * @return the reference number
+ */
+ private Long getReference(String[] msgParts) {
+ return Long.parseLong(msgParts[3]);
+ }
+
public PluginMessageConsumer(PluginStreamHandler streamHandler) {
as = new AppletSecurity();
this.streamHandler = streamHandler;
-
- // create some workers at the start...
- for (int i=0; i < 3; i++) {
- PluginDebug.debug("Creating worker " + i);
- PluginMessageHandlerWorker worker = new PluginMessageHandlerWorker(streamHandler, i, as);
- worker.start();
- workers.add(worker);
- }
+ this.consumerThread.start();
}
- public void consume(String message) {
-
- PluginDebug.debug("Consumer received message " + message);
-
- synchronized(readQueue) {
- readQueue.add(message);
- }
+ private String getPriorityStrIfPriority(String message) {
- PluginDebug.debug("Message " + message + " added to queue. Looking for free worker...");
- final PluginMessageHandlerWorker worker = getFreeWorker();
+ synchronized (priorityWaitQueue) {
+ Iterator<String> it = priorityWaitQueue.iterator();
- synchronized(readQueue) {
- if (readQueue.size() > 0) {
- worker.setmessage(readQueue.poll());
- }
- }
+ while (it.hasNext()) {
+ String priorityStr = it.next();
+ if (message.indexOf(priorityStr) > 0)
+ return priorityStr;
+ }
+ }
- worker.interrupt();
+ return null;
}
- private PluginMessageHandlerWorker getFreeWorker() {
+ private boolean isInInit(Integer instanceNum) {
+ return initWorkers.containsKey(instanceNum);
+ }
+
+ private void addToInitWorkers(Integer instanceNum, PluginMessageHandlerWorker worker) {
+ synchronized(initWorkers) {
+ initWorkers.put(instanceNum, worker);
+ }
+ }
+
+ private boolean okayToProcess(String[] msgParts) {
+
+ if (msgParts[2].equals("tag")) {
+
+ Integer instanceNum = new Integer(msgParts[1]);
+
+ synchronized(initWorkers) {
+ if (initWorkers.size() >= MAX_PARALLEL_INITS) {
+ return false;
+ }
+ }
+
+ registerPriorityWait("instance " + instanceNum + " handle");
+ registerPriorityWait("instance " + instanceNum + " width");
+
+ } else if (msgParts[2].equals("handle") || msgParts[2].equals("width")) {
+ Integer instanceNum = new Integer(msgParts[1]);
+
+ // If this instance is not in init, return false immediately.
+ // Handle/Width messages should NEVER go before tag messages
+ if (!isInInit(instanceNum))
+ return false;
+ }
+
+ return true;
+ }
+
+ public void notifyWorkerIsFree(PluginMessageHandlerWorker worker) {
+ synchronized (initWorkers) {
+ Iterator<Integer> i = initWorkers.keySet().iterator();
+ while (i.hasNext()) {
+ Integer key = i.next();
+ if (initWorkers.get(key).equals(worker))
+ initWorkers.remove(key);
+ }
+ }
+
+ consumerThread.interrupt();
+ }
+
+ public void queue(String message) {
+ synchronized(readQueue) {
+ readQueue.addLast(message);
+ }
+
+ // Wake that lazy consumer thread
+ consumerThread.interrupt();
+ }
+
+ protected class ConsumerThread extends Thread {
+ public void run() {
+
+ while (true) {
+
+ String message = null;
+
+ synchronized(readQueue) {
+ message = readQueue.poll();
+ }
+
+ if (message != null) {
+
+ String[] msgParts = message.split(" ");
+
+ // if it is no okay to process just yet, push it back and
+ if (!okayToProcess(msgParts)) {
+ synchronized(readQueue) {
+ readQueue.addLast(message);
+ }
+
+ continue; // re-loop to try next msg
+ }
+
+ String priorityStr = getPriorityStrIfPriority(message);
+ boolean isPriorityResponse = (priorityStr != null);
- // FIXME: Can be made more efficient by having an idle worker pool
-
- while (true) {
+ //PluginDebug.debug("Message " + message + " (priority=" + isPriorityResponse + ") ready to be processed. Looking for free worker...");
+ final PluginMessageHandlerWorker worker = getFreeWorker(isPriorityResponse);
+
+ if (worker == null) {
+ synchronized(readQueue) {
+ readQueue.addLast(message);
+ }
+
+ continue; // re-loop to try next msg
+ }
+
+ if (msgParts[2].equals("tag"))
+ addToInitWorkers((new Integer(msgParts[1])), worker);
+
+ if (isPriorityResponse) {
+ unRegisterPriorityWait(priorityStr);
+ }
+
+ worker.setmessage(message);
+ worker.interrupt();
+
+ } else {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+ }
+ }
+
+ private PluginMessageHandlerWorker getFreeWorker(boolean prioritized) {
+
for (PluginMessageHandlerWorker worker: workers) {
- if (worker.isFree()) {
- PluginDebug.debug("Found free worker with id " + worker.getWorkerId());
+ if (worker.isFree(prioritized)) {
+ PluginDebug.debug("Found free worker (" + worker.isPriority() + ") with id " + worker.getWorkerId());
// mark it busy before returning
worker.busy();
return worker;
}
}
-
+
// If we have less than MAX_WORKERS, create a new worker
if (workers.size() <= MAX_WORKERS) {
- PluginDebug.debug("Cannot find free worker, creating worker " + workers.size());
- PluginMessageHandlerWorker worker = new PluginMessageHandlerWorker(streamHandler, workers.size(), as);
- worker.start();
- workers.add(worker);
- worker.busy();
- return worker;
- } else {
- // else wait
+ PluginMessageHandlerWorker worker = null;
+
+ if (workers.size() <= (MAX_WORKERS - PRIORITY_WORKERS)) {
+ PluginDebug.debug("Cannot find free worker, creating worker " + workers.size());
+ worker = new PluginMessageHandlerWorker(this, streamHandler, workers.size(), as, false);
+ } else if (prioritized) {
+ PluginDebug.debug("Cannot find free worker, creating priority worker " + workers.size());
+ worker = new PluginMessageHandlerWorker(this, streamHandler, workers.size(), as, true);
+ } else {
+ return null;
+ }
+
+ worker.start();
+ worker.busy();
+ workers.add(worker);
+ return worker;
+
}
-
- Thread.yield();
- }
-
- //throw new RuntimeException("Out of message handler workers");
+
+ // No workers available. Better luck next time!
+ return null;
}
}
diff -r 7fd598f9cf42 -r 3aa674c581d7 plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java
--- a/plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java Fri Mar 12 16:46:26 2010 +0000
+++ b/plugin/icedteanp/java/sun/applet/PluginMessageHandlerWorker.java Mon Mar 15 13:35:18 2010 -0400
@@ -42,15 +42,25 @@ class PluginMessageHandlerWorker extends
class PluginMessageHandlerWorker extends Thread {
private boolean free = true;
+ private boolean isPriorityWorker = false;
private int id;
private String message = null;
private SecurityManager sm;
PluginStreamHandler streamHandler = null;
+ PluginMessageConsumer consumer = null;
- public PluginMessageHandlerWorker(PluginStreamHandler streamHandler, int id, SecurityManager sm) {
+ public PluginMessageHandlerWorker(
+ PluginMessageConsumer consumer,
+ PluginStreamHandler streamHandler, int id,
+ SecurityManager sm, boolean isPriorityWorker) {
+
this.id = id;
this.streamHandler = streamHandler;
this.sm = sm;
+ this.isPriorityWorker = isPriorityWorker;
+ this.consumer = consumer;
+
+ PluginDebug.debug("Worker " + this.id + " (priority=" + isPriorityWorker + ") created.");
}
public void setmessage(String message) {
@@ -107,15 +117,27 @@ class PluginMessageHandlerWorker extends
}
public void busy() {
- this.free = false;
+ synchronized (this) {
+ this.free = false;
+ }
+ }
+
+ public void free() {
+ synchronized (this) {
+ this.free = true;
+
+ // Signal the consumer that we are done in case it was waiting
+ consumer.notifyWorkerIsFree(this);
+ }
}
-
- public void free() {
- this.free = true;
+ public boolean isPriority() {
+ return isPriorityWorker;
}
-
- public boolean isFree() {
- return free;
+
+ public boolean isFree(boolean prioritized) {
+ synchronized (this) {
+ return free && (prioritized == isPriorityWorker);
+ }
}
}
diff -r 7fd598f9cf42 -r 3aa674c581d7 plugin/icedteanp/java/sun/applet/PluginStreamHandler.java
--- a/plugin/icedteanp/java/sun/applet/PluginStreamHandler.java Fri Mar 12 16:46:26 2010 +0000
+++ b/plugin/icedteanp/java/sun/applet/PluginStreamHandler.java Mon Mar 15 13:35:18 2010 -0400
@@ -141,7 +141,7 @@ public class PluginStreamHandler {
//System.err.println("Total wait time: " + totalWait);
if (s != null) {
- consumer.consume(s);
+ consumer.queue(s);
} else {
try {
// Close input/output channels to plugin.
@@ -189,31 +189,6 @@ public class PluginStreamHandler {
};
listenerThread.start();
- }
-
- public void postMessage(String s) {
-
- if (s == null || s.equals("shutdown")) {
- try {
- // Close input/output channels to plugin.
- pluginInputReader.close();
- pluginOutputWriter.close();
- } catch (IOException exception) {
- // Deliberately ignore IOException caused by broken
- // pipe since plugin may have already detached.
- }
- AppletSecurityContextManager.dumpStore(0);
- PluginDebug.debug("APPLETVIEWER: exiting appletviewer");
- System.exit(0);
- }
-
- //PluginAppletSecurityContext.contexts.get(0).store.dump();
- PluginDebug.debug("Plugin posted: " + s);
-
- PluginDebug.debug("Consuming " + s);
- consumer.consume(s);
-
- PluginDebug.debug("Added to queue");
}
public void handleMessage(String message) throws PluginException {
More information about the distro-pkg-dev
mailing list