CS计算机代考程序代写 Java algorithm flex junit NWEN303 Concurrent Programming

NWEN303 Concurrent Programming
Raw Threads, Work balancing
and ForkJoin library
Marco Servetto VUW

● ●

Single thread programs
No programs have zero threads.
Single threaded Programs have one flow of control.
Operations are executed one after the other, from left to right, top down.

Single thread programs
class Main{
public static int print(int num){
System.out.println(num); return num;
}
public static void main(String[]a){ print(1+print(2)+print(3));
}}

Single thread programs
class Main{
public static int print(int num){
System.out.println(num); return num;
}
public static void main(String[]a){ print(1+print(2)+print(3));
}}
If I run this code, I will get 2, 3 and 6. Every time.

Multi-threaded programs: Timeslicing
Many flows of control or threads
Each thread executes operations one after another Execution of threads are interleaved with each other
Timeslicing!
Thread runs for certain amount of time (a timeslice) When a timeslice is over, another thread gets to run
So, on a single processor, threads do not actually run at the same time!
No guarantee about the order in which threads are run
On multiprocessor machines each processor can be executing threads concurrently
● ● ●



● ●

Example multi-thread code Extend java.lang.Thread and implement run()
class SimpleThread extends Thread { private String msg; SimpleThread(String m) { msg = m; }
public void run() {
for (int i = 0; i < 1000; i++) { System.out.println(msg); }} public static void main(String args[]) { Thread t1 = new SimpleThread("Hello"); Thread t2 = new SimpleThread("Goodbye"); t1.start(); // start first thread t2.start(); // start second thread }} ● Example multi-thread code Extend java.lang.Thread and implement run() class SimpleThread extends Thread { private String msg; SimpleThread(String m) { msg = m; } public void run() { for (int i = 0; i < 1000; i++) { System.out.println(msg); }} public static void main(String args[]) { Thread t1 = new SimpleThread("Hello"); Thread t2 = new SimpleThread("Goodbye"); t1.start(); // start first thread t2.start(); // start second thread }} ● Example multi-thread code Extend java.lang.Thread and implement run() class SimpleThread extends Thread { private String msg; SimpleThread(String m) { msg = m; } public void run() { for (int i = 0; i < 1000; i++) { System.out.println(msg); }} public static void main(String args[]) { Thread t1 = new SimpleThread("Hello"); Thread t2 = new SimpleThread("Goodbye"); t1.start(); // start first thread t2.start(); // start second thread }} ● Example multi-thread code Extend java.lang.Thread and implement run() class SimpleThread extends Thread { private String msg; SimpleThread(String m) { msg = m; } public void run() { for (int i = 0; i < 1000; i++) { System.out.println(msg); }} public static void main(String args[]) { Thread t1 = new SimpleThread("Hello"); Thread t2 = new SimpleThread("Goodbye"); t1.start(); // start first thread t2.start(); // start second thread }} ● Example multi-thread code Extend java.lang.Thread and implement run() class SimpleThread extends Thread { private String msg; SimpleThread(String m) { msg = m; } public void run() { for (int i = 0; i < 1000; i++) { System.out.println(msg); }} public static void main(String args[]) { Thread t1 = new SimpleThread("Hello"); Thread t2 = new SimpleThread("Goodbye"); t1.start(); // start first thread t2.start(); // start second thread }} ● Example multi-thread code So, how many threads run in this program? class SimpleThread extends Thread { private String msg; SimpleThread(String m) { msg = m; } public void run() { for (int i = 0; i < 1000; i++) { System.out.println(msg); }} public static void main(String args[]) { Thread t1 = new SimpleThread("Hello"); Thread t2 = new SimpleThread("Goodbye"); t1.start(); // start first thread t2.start(); // start second thread }} ● ● Results Every run of the program might be different! Run #1 Hello Goodbye Hello Goodbye Goodbye Hello Goodbye Hello Hello Goodbye Hello Goodbye Goodbye ......... Run #2 Run #3 Hello Goodbye Hello Goodbye Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello Goodbye Hello ● Granularity? However, "Hello HeGoodbye Hello ..." never happens. out is a global variable, and println behaves as an atomic operation over the variable/resource out. Can we combine atomic operations together to get a composed atomic operation? This does not work atomically public class Ex2 extends Thread { private String a; private String b; Ex2(String a,String b) { this.a =a; this.b=b; } public void printAB(){ System.out.print(a); System.out.print(b); } public void run() { for (int i = 0; i < 1000; i++) { printAB(); }} public static void main(String args[]) { Thread t1 = new Ex2("Hello ","World"); Thread t2 = new Ex2("Goodbye ","Friends"); t1.start(); // start first thread t2.start(); // start second thread }} This does not work atomically If we run, we can obtain something like this: ● "...Hello WorldHello FriendsGoodbye Friends..." Umm... We fail, we can not simply perform atomic operations in one method and magically get a composed atomic operation. ● So, as an iconic example: System.out.print(a); System.out.print(b); is not the same as System.out.print(a+b); ● We will learn how to create atomic operations later on. For now, notice how compacting more information in a structured manner and committing it all together is a simple and safe solution. ● Note: the same behavior will arise if we use print while running under the control of a .parallelStream() ● Threads, including new and dead ones are still valid “objects” with methods and fields to work with. ● A thread can be in four possible states! – Runnable: either running or waiting for timeslice – Blocked: thread cannot run for some reason • Will not run until some event occurs new runnable Thread States start() unblock blocked block run() exits dead ● ● ● ● ● ● What happens if the run method of a thread fails to complete and ends up throwing an exception? The thread dies, and the exception stack trace is printed out. Only that thread dies. The other threads will (try to) go on... somehow Futures showed us how to handle exceptions in a better way! If you think that it is a waste of time to 'properly reason on what happens when the unpredictable / unexpected / unsuccessful happens' than you are going to waste a lot of time. Failure Example application using Threads class PrimeTest extends Thread{ int number; boolean result=true; PrimeTest(int number){this.number=number;} public void run(){//check code for(int i=2;i{ do stuff here; });
Anyway, is it very rare to have to use threads directly. Futures works better in near every case.







A Worker is a Thread that is instructed to accept many tasks one after another, and know how to rest quietly when have no task assigned.
Threads, Workers and Tasks
A Thread is a “heavy resource”:
Expensive to create (>2KB metadata, 1MB stack) Expensive to start (depends, but up to 100 ms)
Limited: we can not create too many threads at the same time!
A thread can have any kind of behavior

How many workers
List> l=new ArrayList<>(); for(int i=0;i<10000;i++) { String me="["+i+"]"; l.add(o->System.out.println(me)); }
l.parallelStream().forEach(c->c.accept(c));
What happens when we run this code?
8 Threads (on my machine) get a portion of the 10.000 elements each.
The number of Workers depends (mostly) on the numbers of core on the machine.
A single Task is handling MANY elements of the stream.
[6562]
[6563]
[6564]
[6565]
[8125]
[6566]
[6567]
[8126]
[6568]
[8127]
[6569]
[8128]
[6570]


● ●








To balance efficiency, .parallelStream() creates middle sized tasks.
For example, 10.000 elements can be divided in 100 tasks of 100 elements each (example numbers. We can not assume any specific behavior)
Note: when we use Futures, we take control of the size of the tasks.
How big is a Task?
Maximum locality: The list is split in as many tasks as the workers, and each worker get exactly 1 task:
Bad: a worker may be slower; then all the other workers will just wait for him to finish!
Maximum flexibility: each task is connected to exactly one element of the list.
Bad: this would have a huge fix cost at least as many objects as the list elements, plus high cost of indirection










Fourth attempt? Research is wide open here, after 3 solution, there may be space for more…
How to assign Tasks to workers?
First attempt: fixed division at the start:
If we have 10 workers and 100 tasks, each worker will do 10 tasks
Bad: a worker may be slower; then all the other workers will just wait for him to finish! It is indeed identical to the ‘Maximum Locality’ of before, but with more indirection costs.
Second attempt: common Task queue:
All the 100 tasks go to a common queue, and when a worker is free, it can take a new task from the queue.
Better, but the common queue becomes a bottleneck! Third attempt: Work Stealing
More details on the next slide!




– –



Special (super secret): if while performing a task, the result of a future is needed (.get() ) and the task connected with such future is still unclaimed/queued: Start the (sub-task) sequentially!
Benefit 1: No single bottleneck
Benefit 2: improved locality; a worker is likely to do the tasks it spawns
Work Stealing! Each Worker has its own task queue.
When a worker fork a new task, the task goes in its own queue.
When a worker finishes a task, it first search if there is work in its own queue:
If there is work: take the task and start working
If there is no work: go look in the queue of other workers in your working group and try to steal some of their work!



Java8 introduced CompletableFuture, a kind of future designed to work with lockfree algorithms and work stealing.
Calling Future.get(), CompletableFuture.get() and CompletableFuture.join() locks the current thread until the computation is completed.
Note, not only the “worker” or the “working process” is blocked. The underlying thread object is in the blocked state. Since creating threads is very expensive, and we only have a limited amount of them anyway, blocking a thread while waiting is bad for performance, and can cause “thread pool exhaustion”, when all the threads are blocked waiting for a task that can not be completed, since no new thread can take up the task, since all threads are blocked waiting….
Calling myCompletable.thenXXX(…) or CompletableFuture.allOf(…) sets up a worker for a delayed start: it will start when certain futures are completed.
When using recursion/fork joins, instead of waiting for a result and return the value, just return a CompletableFuture of the result.
In Lab 2 we will experiment with various programming patterns to use Future and CompletableFuture



CompletableFutures

Abstract Code View:
CompletableFutures
script =
dollars =
movie = playVideogames();//wait
m = movie //wait for the movie
public static void main1()throws InterruptedException,ExecutionException{//With futures Future script=exe.submit(()->writeStory()); Futuredollars=exe.submit(()->borrowMoney());
Future movie=exe.submit(()->makeMovie(script.get(),dollars.get())); playVideogames();//while you wait, the ‘movie’ worker is blocked waiting
//for script and dollars. A wasted resource
Movie m=movie.get();//the main thread now is waiting for the movie }
public static void main2(){//With completable futures
CompletableFuture script=CompletableFuture.supplyAsync(()->writeStory()); CompletableFuturedollars=CompletableFuture.supplyAsync(()->borrowMoney()); CompletableFuture movie=script.thenCombine(dollars,(s,d)->makeMovie(s,d)); playVideogames();//while you wait, the ‘movie’ worker is not blocked waiting.
//Only when both script and dollars complete the task will be submitted Movie m=movie.join();//the main thread now is waiting for the movie
}
writeStory()
borrowMoney()
makeMovie( script, dollars)//waiting for script and dollars




Java ForkJoin Library, sometimes is still useful, it balances freedom of futures with the structure of streams. Particularly good for “naturally” recursive tasks.
Let see an example where we want to find the max in a list of numbers. To do it sequentially, we can simply do the following:

List ns;
long sequentialSearch(){
Long r=ns.get(0);
for(Long ni:ns){if(r{
static final int threshold=200;
List ns; Max(Listns){this.ns=ns;} long sequentialSearch(){…}
@Override protected Long compute() {
int size=ns.size();
if(size numbers=new ArrayList();
for(int i=0;i<10000000;i++){numbers.add(r.nextLong());} //Note, only the main write stuff out System.out.println("Data created"); long start = System.currentTimeMillis(); long max=mainPool.invoke(new Max(numbers)); long end = System.currentTimeMillis(); System.out.printf("Bigger: %d\nTime=%d\n",max,end-start); //To better measure the time, run the function many times //and discard the first few thousand results } } ● ● Another ForkJoin Example public class Fibonacci extends RecursiveTask { final int n;
Fibonacci(int n) { this.n = n; }
public Integer compute() {
if (n <= 1) {return n;} Fibonacci f1 = new Fibonacci(n - 1); f1.fork();//fork or invokeAll(f1,f2); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join();//or f1.join()+f2.join(); } When spawning just 2 tasks, we can use fork/join for one task and compute for the other. Still, invokeAll do more “good things” under the hood, so I prefer to just use invokeAll ● ● – – – – ● ● Requirements: Minimal familiarity with Generics and JUnit Next Lab we will do some targeted exercise Assignment 1 Do you know merge sort? Implement merge sort a)sequentially b)with futures c)with completable futures d)with fork-join