NWEN303 Concurrent Programming
11: Common parallel patterns: Producer Consumer
Marco Servetto VUW
●
– –
Producer consumer
Producer consumer:
pipeline instead of fork-join
factory metaphor, where objects are produced in steps, and each step can be made in parallel with the others, and specialized workers can act better and faster on a specialized part of the manufacturing.
Good for instruction caching
Good when manufacturing requires access to limited resources (for example, reading a file)
– –
Producer Node
Producer Node
Node
Node
Node
Consumer Node
Producer consumer
●
●
●
●
●
●
● ●
●
Alice produce wheat,
Bob produce sugar,
Charles takes wheat and sugar and produce cakes.
Finally the little Tim eat the cakes.
When enough cakes are eaten, we get a `Gift’.
For now, we assume those tasks need to be done sequentially; for example the oven can only bake 1 cake at a time.
Alice, Bob, Charles and Tim will communicate list of products .
Alice and Charles shares list1, Bob and Charles shares list2 and finally Charles and Tim shares list3.
Lets try to get it right with some code!
Producer/Consumer
Cakes! (attempt 1)
public static Gift computeGift(int hunger){ List
int[] timHunger=new int[]{hunger}; Gift[] res=new Gift[]{null};
Thread alice=new Thread(()->{while(true){ws.add(wheat());}});
Thread bob=new Thread(()->{while(true){ ss.add(sugar());}});
Thread charles=new Thread(()->{while(true){ Sugar s=ss.remove(0);
Wheat w=ws.remove(0); cs.add(cake(s,w));}});
Thread tim=new Thread(()->{while(true){
cs.remove(0);
if(timHunger[0]>0){timHunger[0]-=1; res[0]=gift();}}});
alice.start();bob.start();charles.start();tim.start();
return res[0]; }
Cakes! (attempt 1-failed)
public static Gift computeGift(int hunger){ List
int[] timHunger=new int[]{hunger}; Gift[] res=new Gift[]{null};
Thread alice=new Thread(()->{while(true){ws.add(wheat());}});
Thread bob=new Thread(()->{while(true){ ss.add(sugar());}});
Thread charles=new Thread(()->{while(true){
Sugar s=ss.remove(0);//what if there is no sugar/wheat to remove? Wheat w=ws.remove(0);//even if there is the sugar/wheat, cs.add(cake(s,w));}});//we have a race condition on ss and ws
Thread tim=new Thread(()->{while(true){ cs.remove(0);//same as above if(timHunger[0]>0){timHunger[0]-=1; res[0]=gift();}}});
alice.start();bob.start();charles.start();tim.start(); //the threads are never stopped
return res[0];//we are not waiting for the result to be there }//even if we somehow wait, thanks to caching, we may not see it!
Cakes! (attempt 2-failed)
List
Thread alice=new Thread(()->{while(true){ws.add(produceWheat());}}); Thread bob=new Thread(()->{while(true){ ss.add(produceSugar());}});
Thread charles=new Thread(()->{while(true){
Sugar s=ss.remove(0);//still… what if the list is empty? Wheat w=ws.remove(0);
cs.add(produceCake(s,w));}});
●
How to take the sugar?
if (ss.isEmpty()){Thread.sleep(100);} Sugar s=ss.remove(0);
Wrong! may be is still empty
while (ss.isEmpty()){Thread.sleep(100);} Sugar s=ss.remove(0);
Does it works? yes, but only in the current example, since there is only one thread taking the sugar…
Would be better to use wait/notify and have a the sugar producer awaking the sugar user!
●
●
●
How to take the sugar? When we take the sugar
synchronized(ss){ while(ss.isEmpty()){ss.wait();} Sugar s=ss.remove(0);}
When we put the sugar
synchronized(ss){ ss.add(produceSugar()); ss.notifyAll();}
Correct but unsatisfactory. This defies the purpose of synchronized collections, requires all the producer/consumer to properly follow the pattern and calls notify all much more often that what would be actually needed.
It seams so much of a reasonably obvious and common thing to do….
●
●
●
●
It seams so much of a reasonably obvious and common thing to do….
So, someone have done it already and put it into a decent library. It is so common indeed that that library is even in the standard Java.
Search for BlockingQueue!
The main lesson here is that before going head on searching to forge a solution, we should spend at least 4-10 hours searching for a good library that is doing already what we need.
● ●
Cakes!
BlockingQueue
Thread alice=new Thread(()->{while(true){ // add may work, but… try{ws.put(wheat());}catch(InterruptedException e){return;}}}); //thread dies: not “swallowed”
Thread bob=new Thread(()->{while(true){ try{ss.put(sugar());}catch(InterruptedException e){return;}}});
Thread charles=new Thread(()->{while(true){ try{cs.put(cake(ss.take(),ws.take()));}catch(..){return;}}});
●
docs.oracle.com/en/java/javase/15/docs/api/java.base/java/util/concurrent/ BlockingQueue.html
In Particular, try to understand what is the best scenario to use the methods in the first table:
Reading!
Insert: Remove: Examine:
add(e) remove() element()
offer(e) put(e) poll() take() peek() n/a
offer(e, time, unit) poll(time, unit) n/a
Such a nice library!
Cakes, using CompletableFuture
public static Gift computeGift(int hunger){
BlockingQueue
int[] timHunger=new int[]{hunger};
CompletableFuture
Thread alice=new Thread(()->{while(true){ //add may work, but… try{ws.put(wheat());}catch(InterruptedException e){return;}}}); //thread dies: not “swallowed”
Thread bob=new Thread(()->{while(true){ try{ss.put(sugar());}catch(InterruptedException e){return;}}});
Thread charles=new Thread(()->{while(true){ try{cs.put(cake(ss.take(),ws.take()));}catch(..){return;}}});
Thread tim=new Thread(()->{while(true){cs.take(); //setting result try{if(timHunger[0]>0){timHunger[0]-=1;return;}res.complete(gift());} catch(..){return;} }});
alice.start();bob.start();charles.start();tim.start();
try{return res.join();}//waiting for result; join() is like get() but.. finally{alice.interrupt();..bob…charles..;tim.interrupt();} }//remember to interrupt your threads
●
This is a very coarse grained parallelism, every problem would lent to a specific graph of producer/consumers, that have nothing to to with the specific resource of the underling machine.
Not too bad, we still can use future/fork join inside some of the nodes.
The example before assumes that would be impossible/wrong/hard to produce more then one computation for any kind at the same time.
That is, if you can produce/eat more then one cake at the same time, probably just parallelise the process as a whole.
●
●
●
Producer/Consumer
The proposed “correct code” is horrible
Can we do something better?
while/try-catch logic is repeated between nodes
We create threads manually – slow if this is just a part of a bigger program
Can we make an abstraction to run actions in parallel?
●
●
●
●
Ideal code:
public static Gift computeGift(int hunger) throws InterruptedException{ BlockingQueue
AtomicInteger timHunger=new AtomicInteger(hunger); ConcurrentRunner
runner.run(5,()->{//tim, somehow he can eat 5 cakes at the same time cs.take();
if(timHunger.decrementAndGet()>0){return;} runner.setResult(gift());
});
return runner.result(); }
ConcurrentRunner
public interface Action{ void run() throws InterruptedException; } public class ConcurrentRunner
private static ExecutorService exe=Executors.newCachedThreadPool(r->{ Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t;});//deamons allows the JVM to terminate
private List
private CompletableFuture
public void run(int repeats,Action a) { //to add a task/action Runnable r = ()->{ while(runOnce(a)){} };
for(int i=0;i
return !end.isDone() && !Thread.interrupted();}//if false we stop
public void setResult(T res){//result is available! end.complete(res);for(Future> fi:tasks){fi.cancel(true);}}
public T result(){ return end.join(); } } //wait for the result
ConcurrentRunner
public interface Action{ void run() throws InterruptedException; } public class ConcurrentRunner
private static ExecutorService exe=Executors.newCachedThreadPool(r->{ Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t;});//deamons allows the JVM to terminate
private List
private CompletableFuture
public void run(int repeats,Action a) { //to add a task/action Runnable r = ()->{ while(runOnce(a)){} };
for(int i=0;i
return !end.isDone() && !Thread.interrupted();}//if false we stop
public void setResult(T res){//result is available! end.complete(res);for(Future> fi:tasks){fi.cancel(true);}}
public T result(){ return end.join(); } } //wait for the result
ConcurrentRunner
public interface Action{ void run() throws InterruptedException; } public class ConcurrentRunner
private static ExecutorService exe=Executors.newCachedThreadPool(r->{ Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t;});//deamons allows the JVM to terminate
private List
private CompletableFuture
public void run(int repeats,Action a) { //to add a task/action Runnable r = ()->{ while(runOnce(a)){} };
for(int i=0;i
return !end.isDone() && !Thread.interrupted();}//if false we stop
public void setResult(T res){//result is available! end.complete(res);for(Future> fi:tasks){fi.cancel(true);}}
public T result(){ return end.join(); } } //wait for the result
ConcurrentRunner
public interface Action{ void run() throws InterruptedException; } public class ConcurrentRunner
private static ExecutorService exe=Executors.newCachedThreadPool(r->{ Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t;});//deamons allows the JVM to terminate
private List
private CompletableFuture
public void run(int repeats,Action a) { //to add a task/action Runnable r=()->{ while(runOnce(a)){} };
for(int i=0;i
return !end.isDone() && !Thread.interrupted();}//if false we stop
public void setResult(T res){//result is available! end.complete(res);for(Future> fi:tasks){fi.cancel(true);}}
public T result(){ return end.join(); } } //wait for the result
ConcurrentRunner
public interface Action{ void run() throws InterruptedException; } public class ConcurrentRunner
private static ExecutorService exe=Executors.newCachedThreadPool(r->{ Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t;});//deamons allows the JVM to terminate
private List
private CompletableFuture
public void run(int repeats,Action a) { //to add a task/action Runnable r = ()->{ while(runOnce(a)){} };
for(int i=0;i
return !end.isDone() && !Thread.interrupted();}//if false we stop
public void setResult(T res){//result is available! end.complete(res);for(Future> fi:tasks){fi.cancel(true);}}
public T result(){ return end.join(); } } //wait for the result
ConcurrentRunner
public interface Action{ void run() throws InterruptedException; } public class ConcurrentRunner
private static ExecutorService exe=Executors.newCachedThreadPool(r->{ Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t;});//deamons allows the JVM to terminate
private List
private CompletableFuture
public void run(int repeats,Action a) { //to add a task/action Runnable r = ()->{ while(runOnce(a)){} };
for(int i=0;i
return !end.isDone() && !Thread.interrupted();}//if false we stop
public void setResult(T res){//result is available! end.complete(res);for(Future> fi:tasks){fi.cancel(true);}}
public T result(){ return end.join(); } } //wait for the result
● ●
– – –
●
●
●
Then, look at the result and try to abstract your process.
Finally, make an abstraction supporting your code, and rewrite your code using your abstraction.
The last step saves you some maintenance hell.
What is the takeaway?
Start with a prototype using available features
Make it work:
in the good case
when it is called multiple times in a functional sense when failure is involved