BGGA closures prototype update: nonlocal transfers

Mark Mahieu mark at twistedbanana.demon.co.uk
Tue Aug 5 17:13:47 PDT 2008


On 5 Aug 2008, at 17:00, Neal Gafter wrote:

> Mark-
>
> Excellent; thanks!  I hope you publish the code for others to use.
>
> Regards,
> Neal

Yeah maybe I should write a blog entry or something once I've cleaned  
up the code and commented it.

One thing I can't seem to get around though, is the need for an  
unchecked cast in order to get the exceptions out.  I've pasted the  
code below, and the cast is right at the end - if anyone can suggest  
a way to improve that bit of nastiness it'd be most welcome...

Mark



     public static for <T, throws X> void eachConcurrently 
(Iterable<T> items, Executor executor, {T ==> void throws X} block)  
throws X {
         new EachConcurrently().run(items, executor, block);
     }

     private static class EachConcurrently {

         private volatile boolean cancelled = false;

         <T, throws X> void run(Iterable<T> items, Executor executor,  
{T ==> void throws X} block) throws X {

             CompletionService<Throwable> completionService =
                 new ExecutorCompletionService<Throwable>(executor);

             List<T> rejectedItems = new ArrayList<T>();

             int count = 0;
             for (Iterator<T> iter = items.iterator();
                 !cancelled && iter.hasNext();) {

                 T item = iter.next();

                 try {
                     Callable<Throwable> callable =
                         {=>
                             Throwable result = null;
                             if (!cancelled) {
                                 try {
                                     block.invoke(item);
                                 }
                                 catch (Throwable ex) {
                                     cancelled = true;
                                     result = ex;
                                 }
                             }
                             result
                         };

                     completionService.submit(callable);
                     count++;
                 }
                 catch (RejectedExecutionException ex) {
                     rejectedItems.add(item);
                 }
             }

             Throwable throwable = null;
             try {
                 // Wait for each task to complete.
                 // Store the first result encountered.
                 while (count-- > 0) {
                     try {
                         Throwable result = completionService.take 
().get();
                         if (throwable == null)
                             throwable = result;
                     }
                     catch (ExecutionException ex) {
                         throw new AssertionError("Should never get  
here");
                     }
                     catch (InterruptedException ex) {
                         Thread.currentThread().interrupt();
                     }
                 }

                 // Process any rejected items synchronously
                 for (Iterator<T> iter = rejectedItems.iterator();
                     throwable == null && iter.hasNext();) {

                     try {
                         block.invoke(iter.next());
                     }
                     catch (Throwable ex) {
                         throwable = ex;
                     }
                 }
             }
             finally {
                 if (throwable != null) {
                     // Handle nonlocal transfers
                     if (throwable instanceof UnmatchedTransfer)
                         throw ((UnmatchedTransfer)  
throwable).transfer();

                     // Handle RuntimeExceptions
                     if (throwable instanceof RuntimeException)
                         throw (RuntimeException) throwable;

                     // Look away now
                     @SuppressWarnings("unchecked")
                     X x = (X) throwable;
                     throw x;
                 }
             }
         }
     }






More information about the closures-dev mailing list