CS计算机代考程序代写 Java algorithm NWEN303 Concurrent Programming

NWEN303 Concurrent Programming
15: Actors, advantages and pitfalls Marco Servetto
VUW

● ●


Actors exchange messages Two components: Actor and ActorRef
The actor object is very ‘private’, only the actor itself should be able to ever see it.
An actor processes one message at a time, sequentially: this reduce the need of synchronization.
Any action of an actor start from receiving a message, any communication happen by inserting new messages in the mailbox of actors.
The whole process can be non-blocking!
Threads are never blocked, actor thread pools do not get exhausted (if blocking features are not used by the library user).
● ●


The implementation of the actor mailbox is the crucial point: the messages are pushed to one end, and are took from another end. In this way, with a smart implementation the mailbox do not need locks; just volatile.
See for example:
Lindsay Groves: ”Verifying Michael and Scott’s lock-free queue algorithm using trace reduction”
https://dl.acm.org/citation.cfm?id=1379385
● ●
Mailboxes


Thanks to those non blocking algorithms, millions of actors can live on the same machine and only keep a few dozen threads busy.
An actor with no messages to process is NOT blocking a thread.
For example, an actor based web server could easily keep one running actor for each open session, and all the data relative to that session will be naturally encapsulated under the actor.


How many actors

Too many messages
MailBox overflow

Too many messages
class Alice extends AbstractActor{//may be on another machine public Receive createReceive() {
return receiveBuilder() .match(ActorRef.class,r->{
r.tell(new Wheat(), sender());
//dear Charles, Tim asked to give you wheat self().tell(r,sender());//let’s do more wheat })
.build();}}


Alice would jut make a ton of Wheat and send every unit of Wheat as a message to Charles.
Charles may be a little slow to process the Wheat. this would cause an unbound accumulation of messages in the mailbox of Charles. It is a problem called
Mailbox overflow

Too many messages
class Alice extends AbstractActor{//may be on another machine public Receive createReceive() {
return receiveBuilder() .match(ActorRef.class,r->{
r.tell(new Wheat(), sender());
//dear Charles, Tim asked to give you wheat self().tell(r,sender());//let’s do more wheat })
.build();}}
new Wheat() new Wheat()
new Wheat()
new Wheat() new Wheat()




– –
MailBox overflow
To avoid mailbox overflow you must design your application better.
Naive application design using Actors tend to go in mailbox overflow
General principles:
do not be scared to send more messages around
do not use mailboxes as data containers, but only as a todo list.

MailBox overflow or blocking?
public static Gift computeGift(int hunger) throws InterruptedException{ BlockingQueue ws=new LinkedBlockingQueue<>(10); BlockingQueue ss=new LinkedBlockingQueue<>(10); BlockingQueue cs=new LinkedBlockingQueue<>(10);
AtomicInteger timHunger=new AtomicInteger(hunger); ConcurrentRunner runner=new ConcurrentRunner<>(); runner.run(10,()->ws.put(wheat()));//alice, duplicated 10 times runner.run(10,()->ss.put(sugar()));//bob, duplicated 10 times runner.run(1,()->cs.put(cake(ss.take(),ws.take())));//charles, 1 time runner.run(5,()->{//tim, somehow he can eat 5 cakes at the same time
if(timHunger.decrementAndGet()>0){return;}
runner.setResult(gift());}); return runner.result();
}
Bounded buffers: blocking when size >=10
That is, the Wheat producer stops producing, since its thread is blocked.
We block a thread!! Thus we can not have so many nodes! Shown program can block up to 26 threads.

● ●


● ●

How to solve Mailbox overflow with our Cakes example, without ever blocking a thread?
Welcome to Ass 4!
For Ass 4, you need to adapt the code of our Cakes example.
You need to implement a very precise specification.
Solving Mailbox overflow

Solving Mailbox overflow
Just one? ok…. I’m working on it!
Here is your Wheat!
Can I have a Wheat
Cool!


A Producer can make future products.
Since Actors need non blocking computation, what kind of Future should it be?
The (long?) computation actually making the product should ideally be asynchronous with respect to the Producer actor,
so that they will be able to keep answering other messages.
Alice and Bob will just make Wheat and Sugar.
Charles will ask Alice and Bob for Wheat and Sugar, he will then
combine the ingredients to produce a future Cake.
Tim now need to ask Charles for cakes
The overall process should keep the same behavior as before.





Solving Mailbox overflow

The ask pattern
CompletableFuture result = Patterns.ask(actorRef,message,Duration.ofMillis(time)) .toCompletableFuture();
result.join();//blocking operation. Not good while handling a msg
Akka make it quite verbose. The reason is that Akka work on both Java and Scala, and Scala have its own futures. Using it as shown avoids ending up mixing Java and Scala futures.
From the point of view of the receiver, it is just a message, and it may reply as usual.
However, the sender is “special”, it is like a dedicated actor just waiting to get the response.
No way to not provide a timeout. If you want to be able to wait for a long time,
just provide a LONG LONG timeout.



//Alice code
The ask pattern
.match(String.class,s->{
CompletableFuture sugg1 = Patterns.ask(bob, new WhatShouldIDo(),
Duration.ofMillis(/*..*/)).toCompletableFuture(); CompletableFuture sugg2 = Patterns.ask(tim, new WhatShouldIDo(),
Duration.ofMillis(/*..*/)).toCompletableFuture(); String r1 = (String) sugg1.join();//Blocking call!!!!!
String r2 = (String) sugg2.join();//How to avoid blocking?? bob.tell(“thanks, “+r1+ ” and “+r2+ “are great ideas”, self());
tim.tell(“thanks, “+r1+ ” and “+r2+ “are great ideas”, self());
//Bob and Tim code
.match(WhatShouldIDo.class,w->{ sender().tell(“Just ignore it!”,self()); })

//Alice code
The ask pattern
.match(String.class,s->{
CompletableFuture sugg1 = Patterns.ask(bob, new WhatShouldIDo(),
Duration.ofMillis(/*..*/)).toCompletableFuture(); CompletableFuture sugg2 = Patterns.ask(tim, new WhatShouldIDo(),
Duration.ofMillis(/*..*/)).toCompletableFuture();
CompletableFuture thanks = CompletableFuture.allOf(sugg1, sugg2) .thenApplyAsync(v -> {
String r1 = (String) sugg1.join();//certain sugg1/2 are completed String r2 = (String) sugg2.join();//thus it is non-blocking here return “thanks, “+r1+ ” and “+r2+ “are great ideas”;
});
thanks.thenAcceptAsync(obj->bob.tell(obj,self())); thanks.thenAcceptAsync(obj->tim.tell(obj,self()));
//when ‘thanks’ is ready, it will be sent to both Bob and Tim.
//In this way, no actor is ever blocked.
//It is like continuation passing style.

//Alice code
The ask pattern
.match(String.class,s->{
CompletableFuture sugg1 = Patterns.ask(bob, new WhatShouldIDo(),
Duration.ofMillis(/*..*/)).toCompletableFuture(); CompletableFuture sugg2 = Patterns.ask(tim, new WhatShouldIDo(),
Duration.ofMillis(/*..*/)).toCompletableFuture();
CompletableFuture thanks = CompletableFuture.allOf(sugg1, sugg2) .thenApplyAsync(v -> {
String r1 = (String) sugg1.join();//certain sugg1/2 are completed String r2 = (String) sugg2.join();//thus it is non-blocking here return “thanks, “+r1+ ” and “+r2+ “are great ideas”;
});
pipe(thanks, context().dispatcher()).to(bob);//Better with pipe pipe(thanks, context().dispatcher()).to(tim);//Better with pipe
//when ‘thanks’ is ready, it will be sent to both Bob and Tim.
//In this way, no actor is ever blocked.
//It is like continuation passing style.
//pipe is “more elegant” and sightly more efficient than using //thenAcceptAsync over yet another lambda

The ask pattern
//pipe is “more elegant” and sightly more efficient than using //yet another lambda
//also
pipe(doSomething(), context().dispatcher()).to(sender()); //is near equivalent to
ActorRef s = sender(); doSomething().thenAcceptAsync(obj->s.tell(obj,self()));
//but the following is different:
doSomething().thenAcceptAsync(obj->sender().tell(obj,self())); //since now sender() is evaluated later, thus we have lost track
//of the original sender…
//note: when an actor receives a ‘piped’ message, the ‘sender()’ //will not be the actor initiating the ‘piping’

Reading files, or other blocking ops
When reading a file, or doing a blocking operation in response of a message, do not block the actor:
String res=new String(Files.readAllBytes(Paths.get(s)));//blocking! bob.tell(res,self());
Instead, use CompletableFuture and pipes!
CompletableFuture fRes=CompletableFuture.supplyAsync(()->{ try {return new String(Files.readAllBytes(Paths.get(s)));} catch (IOException e) {throw new CompletionException(e);} //CompletionException: use it with CompletableFutures },myExecutorWhereThreadsCanGetBlocked);
pipe(fRes, context().dispatcher()).to(bob);
Note: using futures may break the synchronization advantages of the actor model.
Let’s discuss why computation in futures should not access the actor mutable state!