NWEN303 Concurrent Programming
4 Parallelism using libraries (Java)
Marco Servetto VUW
● ●
Writing parallel programs is bad
Sometimes, it is what we have to do anyway.
In this lecture and the next one (and sort of the rest of the course) we will focus on how to survive and suffer as little as possible.
Fork Join using Futures
//To solve a problem using fork join:
if the problem is small enough {
solve problem directly //sequential algorithm
} else {
divide the problem in sub-parts
fork sub-task to solve each sub-part //it may be a loop
join all sub-tasks spawned in previous loop //it would be a different loop combine results from sub-tasks
}
In Java today you have many options to use the Fork-join pattern:
(1)Use Futures (the focus of today)
(2) Use CompletableFutures (next lecture)
(3)Use the (now old) ForkJoin library (next lecture)
(4) use parallelStream(), that uses the ForkJoin library internally
– – – –
Futures – Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*…*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Futures - Proxy pattern
public class ParallelTask {
public static boolean isPrime(int number){/*...*/} private static final ExecutorService pool =
Executors.newFixedThreadPool(10);
//often should be around 2-3 times the processors number?
public static void main(String[] args)throws Throwable{ List
for(int i=0;i<1000;i++){ if(results.get(i).get()){System.out.println(i);} }
} }
Learn how to see the code as behavior + annotations
Callable-->Pool–>Future
Callable
An object that represents a functionality, is ready to fire
and produces a value of type T give your callables to an
ExecutorService:
An object that provide workers, and
abstracts the concurrency away
It takes one of its free threads and gives it the task. Concurrency starts here
T
My result !!
It can give you the result or the exception that stopped the execution.
Future
An object that is NOT a T, but may eventually have a T,
and can return such T.
Concurrency continues here: the thread knows the future, so
when the task is completed, the future is informed
Using Futures to fork in 2
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
Future
Lets go back to our Exercise
public static String sequential() { String a=doA();
String b=doB();
String c=doC();
String d=doD(); String ab=doAB(a,b); String cd=doCD(c,d); return ab+cd;
}
Fork Fork
Join
Join
A
B
Start Fork
Join
C
D
AB(a,b)
CD(c,d)
Visual representation
res(ab,cd)
Lets go back to our Exercise
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
public static String parallel4WrongExceptionHandling() throws InterruptedException, ExecutionException {
Future
return a.get()+b;
});
Future
String cd=c.get()+d;
return ab.get()+cd;
}
Lets go back to our Exercise
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
public static String parallel4WrongExceptionHandling() throws InterruptedException, ExecutionException {
Future
return a.get()+b;
});
Future
String cd=c.get()+d;
return ab.get()+cd;
}
Lets go back to our Exercise
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
public static String parallel4WrongExceptionHandling() throws InterruptedException, ExecutionException {
Future
return a.get()+b;
});
Future
String cd=c.get()+d;
return ab.get()+cd;
}
Code is behaviour+annotations
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
public static String parallel4WrongExceptionHandling() throws InterruptedException, ExecutionException {
Future
return a.get()+b;
});
Future
String cd=c.get()+d;
return ab.get()+cd;
}
Lets go back to our Exercise
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
public static String parallel4WrongExceptionHandling() throws InterruptedException, ExecutionException {
Future
return doAB(a.get(),b);
});
Future
String cd=doCD(c.get(),d);
return doAll(ab.get(),cd);
}
Lets go back to our Exercise
private static final ExecutorService pool=Executors.newFixedThreadPool(10);
public static String parallel4(){ Future
Future
return doAB(get(a),b);
});
Future
String cd=doCD(get(c),d);
return doAll(get(ab),cd);
}
We define our get method, encoding how we wish to handle exceptions.
Lets go back to our Exercise
public static
try {return f.get();}
catch (InterruptedException e) {//we do not expect it
Thread.currentThread().interrupt();//just do it 🙁 throw new Error(e);//turn it into an error
}
catch (ExecutionException e) {
Throwable t=e.getCause();//propagate unchecked exceptions if(t instanceof RuntimeException) {
throw (RuntimeException)t;
}//note: CancellationException is a RuntimeException if(t instanceof Error) {
throw (Error)t;
}
throw new Error(“Unexpected Checked Exception”,t); //our callable/closure did throw a checked exception }
}
Example of personalized get method
●
Lets discuss this code and compare it with the other one. It is much trickier then what it looks like!
Fork A:1
B:10
Start
C:2
● ●
5 futures instead of 3
CD(c,d):10
What about the other way to fork?
public static String parallel5() {
Future
Future
Future
Future
Future
return doAll(get(ab),cd); //abcd
}
Not behaving like the diagram to the left
res(ab,cd):1
Join Fork
Join
AB(a,b):1
D:5
●
What about the other way to fork?
public static String parallel5() {
Future
Future
Future
Future
Future
return doAll(get(ab),cd); //abcd
}
What is that lambda doing?
Wrong Interpretation: The lambda calls doAB. Since that lambda depends on a and b, line ab will wait for futures a and b to
be completed before submitting the task.
Correct Interpretation: The lambda calls doAB(get(a),get(b)).
Task submission is instantaneous, and all that expression is going to be executed later: the worker handling the task created in line ab will wait for futures a and b to be completed.
●
●
●
●
Thus, doAB will start “as soon” as a and b complete.
doCD will start “as soon” as c and d complete.
doAll will start “as soon” as ab and cd complete.
Future
return doAB(get(a),b);
});
Future
String cd=doCD(get(c),d);
return doAll(get(ab),cd);
What about the other way to fork?
public static String parallel5() {
Future
Future
Future
Future
Future
return doAll(get(ab),cd); //abcd
} public static String parallel4(){
● Thus, it is still behaving as } the more optimized case!
●
What about the other way to fork?
To actually get the less optimized version, one need to call all the get after a,b,c,d.
Future
String b=get(fb);
String c=get(fc);
String d=get(fd);
Future
return doAll(get(fab),cd);
Futures and circular dependencies
If you try hard enough, you can get blocked forever by waiting on a future…
It happens rarely, but it does happen in real life.
public class FutureLoop {
private static final ExecutorService
pool=Executors.newFixedThreadPool(10);
static Future
public static void main(String[]arg)throws Exception{
System.out.println(“A”);//Ok, starting a=pool.submit(()->a.get()); System.out.println(“B”);//Worker is blocked on itself here a.get();
System.out.println(“C”);//Will never happen!
} }
●
●
●
Futures: more control on the detail of the execution: Benefits, with costs
Cancel operations that are taking too long or are not needed any more cost: operations need to be stored in reachable variables.
–
●
–
●
–
–
●
cost: the shape will be more rigid. “.parallelStream()” has flexible workload division
–
●
Allowing more freedom in the computation behavior leads to more complicated models of computation
Discussion:
Futures or .parallelStream()
Fork and Join explicitly
Fork/Join/Cancel calls makes the code more verbose
Easier to create a deep nested fork-join in with the desired shape and
Control what operations every task will perform
Control on return type and Control of exceptional behavior
●
●
Futures – When, How?
How to design an operation with futures?
Simple case: you have to do a set of “atomic” actions. Just follow the example before, create a collection of futures and then iterate to get all the results.
Inductive case: If your actions are complex enough, you may end up creating subfutures in that action. That will work just fine, you will use the same pool but every nested level of sub-operations will have its own collections of futures.
Fork–join: setting up and executing parallel programs, so that:
execution fork/branches off in parallel at designated points in the program,
“join” (merge) at the point the sub-task was completed.
Parallel sections may fork recursively until a certain task granularity is reached.
●
–
–
java.util.concurrent.Future
– Attempts to cancel execution of this task. Returns: false if the task could not be cancelled, typically because it has already completed normally
● V
get()
– Waits if necessary for the computation to complete, and then retrieves its result
– throws CancellationException, ExecutionException and InterruptedException.
● V
– Waits if necessary for at most the given time for the computation to complete, and then
retrieves its result, if available. In addition to the former method, throws TimeoutException
● boolean isCancelled()
– Returns true if this task was cancelled before it completed normally.
● boolean isDone()
– Returns true if this task completed.
get(long timeout, TimeUnit unit)
Executors/Threadpools
Executors.newCachedThreadPool() –> ExecutorService
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.
Executors.newFixedThreadPool(int) –> ExecutorService
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.
Executors/Threadpools
Executors.newScheduledThreadPool(int) –> ScheduledExecutorService
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
class Beeper{
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1); public void beepForAnHour() {
final ScheduledFuture> beeperHandle = scheduler.scheduleAtFixedRate(
()->System.out.println(“beep”)), /*initialDelay:*/10, /*period:*/10, SECONDS); scheduler.schedule(
()->beeperHandle.cancel(true), /*delay:*/ 60 * 60, SECONDS);
} }
Executors/Threadpools
Executors.newSingleThreadExecutor() –> ExecutorService
Creates an Executor that uses a single worker thread operating off an unbounded queue. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.) Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
ExecutorService useful methods
ExecutorService.shutdown()
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.
ExecutorService.shutdownNow()
Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
This method does not wait for actively executing tasks to terminate. Use awaitTermination to do that. There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.
ExecutorService.isTerminated()
Returns true if all tasks have completed following shut down. Note that isTerminated is never true unless either shutdown or shutdownNow was called first.
ExecutorService.awaitTermination(long timeout,TimeUnit unit)
Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.