NWEN303 Concurrent Programming
13: Actors and Message Passing
Marco Servetto VUW
●
●
●
– –
–
Actors
An actor is a Worker that communicate with other workers strictly by message passing.
What is a Message? — every approach have its own idea.
Most common implementations for messages:
event in a state machine (theoretical approach)
method call of an object in an OO language (typed approach)
objects in an OO language (flexible approach)
● ● ●
–
– –
●
Computation can be on a single machine with multi-threading or on multiple machines in a distributed setting by changing the configuration
Actors — AKKA
We will use the popular Java actor library AKKA.
Very large library, we can not learn it all! Show only ActorSystems with fixed topology:
all actors are created and initialized before any message is sent.
all actors stop at the end when the result is ready
the whole actor system is encapsulated in a method execution
Actors — AKKA
We will not see: (you can try to learn those on your own after the course)
● Hierarchies of Actors: In Akka any actor is the descendant of some other actor that is his supervisor. Organizing the actors in trees allows to perform sub-computations in a flexible manner.
● Fault tolerance: how to continue your distributed computation even if some process/machine goes down, and is restated after a while.
● Clustering: efficient and flexible ways to handle many machines and to automatically distribute the computation load coming from external requests.
● Details of the Actors life cycles: actors objects can be restarted outside of the control of the programmer. This allows for example to move them on another physical machine.
● Logging, Testing and Debugging: Akka comes with its own ways to log, test and debug actor based applications. They are mostly useful to track the other aspects that we are not seeing in this course.
Actors: first look/ first shock!
class A extends AbstractActor{//the Akka class to create actors
void onString(String s){//my own methods, not @Override System.out.println(“‘A’ handles the message ‘”+s+”‘”); this.sender().tell(“‘A’ have received ‘”+s+”‘”, self()); }
void onFoo(Foo s){..}
@Override public Receive createReceive(){//Akka way to dispatch return receiveBuilder()
.match(String.class,this::onString)//register your methods …
.match(Foo.class,this::onFoo)
.build();//Type based: messages are objects!
} }
Cakes with actors
public static Gift computeGift() throws InterruptedException{ ActorSystem s=AkkaConfig.newSystem(“Cakes”,2501, Map.of()); ActorRef alice=//makes wheat
s.actorOf(Props.create(Alice.class,()->new Alice()),”Alice”); ActorRef bob=//makes sugar
s.actorOf(Props.create(Bob.class,()->new Bob()),”Bob”); ActorRef charles=// makes cakes with wheat and sugar
s.actorOf(Props.create(Charles.class,()->new Charles()),”Charles”); ActorRef tim=//wants to eat 1000 cakes
s.actorOf(Props.create(Tim.class,()->new Tim()),”Tim”); //sending init messages
alice.tell(charles,tim);
bob.tell(charles,tim);
CompletableFuture
Cakes with actors
public static Gift computeGift() throws InterruptedException{ ActorSystem s=AkkaConfig.newSystem(“Cakes”,2501, Map.of()); ActorRef alice=//makes wheat
s.actorOf(Props.create(Alice.class,()->new Alice()),”Alice”); ActorRef bob=//makes sugar
s.actorOf(Props.create(Bob.class,()->new Bob()),”Bob”); ActorRef charles=// makes cakes with wheat and sugar
s.actorOf(Props.create(Charles.class,()->new Charles()),”Charles”); ActorRef tim=//wants to eat 1000 cakes
s.actorOf(Props.create(Tim.class,()->new Tim()),”Tim”); //sending init messages
alice.tell(charles,tim);
bob.tell(charles,tim);
CompletableFuture
Cakes with actors
public static Gift computeGift() throws InterruptedException{ ActorSystem s=AkkaConfig.newSystem(“Cakes”,2501, Map.of()); ActorRef alice=//makes wheat
s.actorOf(Props.create(Alice.class,()->new Alice()),”Alice”); ActorRef bob=//makes sugar
s.actorOf(Props.create(Bob.class,()->new Bob()),”Bob”); ActorRef charles=// makes cakes with wheat and sugar
s.actorOf(Props.create(Charles.class,()->new Charles()),”Charles”); ActorRef tim=//wants to eat 1000 cakes
s.actorOf(Props.create(Tim.class,()->new Tim()),”Tim”);
//sending init messages
alice.tell(charles,tim);//from tim: send wheat to charles bob.tell(charles,tim);//from tim: send wheat to charles CompletableFuture
Cakes with actors
public static Gift computeGift() throws InterruptedException{ ActorSystem s=AkkaConfig.newSystem(“Cakes”,2501, Map.of()); ActorRef alice=//makes wheat
s.actorOf(Props.create(Alice.class,()->new Alice()),”Alice”); ActorRef bob=//makes sugar
s.actorOf(Props.create(Bob.class,()->new Bob()),”Bob”); ActorRef charles=// makes cakes with wheat and sugar
s.actorOf(Props.create(Charles.class,()->new Charles()),”Charles”); ActorRef tim=//wants to eat 1000 cakes
s.actorOf(Props.create(Tim.class,()->new Tim()),”Tim”); //sending init messages
alice.tell(charles,tim);
bob.tell(charles,tim);
CompletableFuture
Cakes with actors
public static Gift computeGift() throws InterruptedException{ ActorSystem s=AkkaConfig.newSystem(“Cakes”,2501, Map.of()); ActorRef alice=//makes wheat
s.actorOf(Props.create(Alice.class,()->new Alice()),”Alice”); ActorRef bob=//makes sugar
s.actorOf(Props.create(Bob.class,()->new Bob()),”Bob”); ActorRef charles=// makes cakes with wheat and sugar
s.actorOf(Props.create(Charles.class,()->new Charles()),”Charles”); ActorRef tim=//wants to eat 1000 cakes
s.actorOf(Props.create(Tim.class,()->new Tim()),”Tim”); //sending init messages
alice.tell(charles,tim);
bob.tell(charles,tim);
CompletableFuture
Cakes with actors
class Alice extends AbstractActor{//may be on another machine public Receive createReceive() {
return receiveBuilder() .match(ActorRef.class,r->{
r.tell(new Wheat(), sender());
//dear Charles, Tim asked to give you wheat self().tell(r,sender());//let’s do more wheat })
.build();}}
class Bob extends AbstractActor{//may be on another machine public Receive createReceive() {
return receiveBuilder() .match(ActorRef.class,r->{
r.tell(new Sugar(), sender());
//dear Charles, Tim asked to give you sugar self().tell(r,sender());//let’s do more sugar })
.build();}}
//Actors are killed outside: AVOID self terminations ANTIPATTERN like //while(!akka.isTerminating()) {…}
//It would not work on multiple machines
Cakes with actors
class Charles extends AbstractActor{//may be on another machine List
return receiveBuilder() .match(Wheat.class,w->{
if(ss.isEmpty()) {ws.add(w);return;}//no sugar? store the wheat Sugar s=ss.remove(ss.size()-1);//if there is sugar take it, sender().tell(new Cake(s,w), self());//and mix it with the wheat })
.match(Sugar.class,s->{
if(ws.isEmpty()) {ss.add(s);return;}//same but opposite Wheat w=ws.remove(ws.size()-1);
sender().tell(new Cake(s,w), self());
//hi Tim, here is your cake!
})
.build();}}
//Tim would be the sender at this stage, since Alice and Bob set the //sender as Tim. This is a common pattern, Alice and Bob are transparent //to Charles, that only need to know about Tim.
//The sender do not need to represent the actual concrete sender,
//but the ‘abstract’ sender.
Cakes with actors
class Tim extends AbstractActor{ int hunger=100;
boolean running=true;
ActorRef originalSender=null; public Receive createReceive() {
return receiveBuilder() .match(GiftRequest.class,()->originalSender==null,gr->{
originalSender=sender();//save the sender to reply later
}) .match(Cake.class,()->running,c->{
hunger-=1;
System.out.println(“YEA! but I’m still hungry “+hunger); if(hunger>0) {return;}
running=false;
originalSender.tell(new Gift(),self());//finally reply })
.build();}}
How to run it on multiple machines
AkkaConfig.newSystem(“Cakes”,2501,Map.of( “Tim”,”130.195.6.192″,
“Bob”,”130.195.6.176″,
“Charles”,”130.195.6.135″
//Alice stays local
));
…//all the other code as before. Just change that call
Make a executable Jar with
All the code for your actors OpenAkka.java as the main
●
– –
●
–
●
On machines 192,176 and 135 run such Jar java -jar OpenAkka.jar
Run Cakes on any machine.
●
●
Code longer and more involved than in the non Actor solution.
As we can see from Charles code, computation inside a message handler do not need to care about synchronization.
Easy to run code on a single machine or on multiple machines.
Uses AkkaConfig, an utility class I have done to manage Akka complexity.
OpenAkka is an utility program I have done to accept any Actor and make them work.
●
●
●
Cakes!
●
●
●
●
Bad: code more involved, probably also with more overhead.
Good: if you need multiple machines, it is the simplest solution that scales well.
Good: can be tested on a single machine/process and deployed on many.
If the computation logic is complex, and relies on state mutations, the actor model dramatically simplifies handling with synchronization issues by avoid them in the first place.
Thus, if you can not get to fully functional, you may settle on Actors.
●
Cakes!
Alice Mailbox
Tim:Charles
Bob Mailbox
Tim:Charles
Charles ws:0 ss:0
Mailbox
EMPTY
Tim hunger=1000
Mailbox
EMPTY
Computation model: step0
Alice Mailbox
EMPTY
processing Tim:Charles
Let’s do wheat;
send to Charles: Tim:weat
send to Alice: Tim:Charles
Bob Mailbox
Tim:Charles
Charles ws:0 ss:0
Mailbox
EMPTY
Tim hunger=1000
Mailbox
EMPTY
Computation model: step1
Alice Mailbox
Tim:Charles
Bob Mailbox
Tim:Charles
Charles ws:0 ss:0
Mailbox
Tim:wheat
Tim hunger=1000
Mailbox
EMPTY
Computation model: step2
Alice Mailbox
Tim:Charles
Bob Mailbox
EMPTY
processing Tim:Charles
Let’s do sugar;
send to Charles: Tim:sugar
send to Bob: Tim:Charles
Charles ws:1 ss:0
Mailbox
EMPTY
processing Tim:wheat
Let’s store it!
Tim hunger=1000
Mailbox
EMPTY
Computation model: step3
Alice Mailbox
Tim:Charles
Bob Mailbox
Tim:Charles
Charles ws:1 ss:0
Mailbox
Tim:sugar
Tim hunger=1000
Mailbox
EMPTY
Computation model: step4
Alice Mailbox
EMPTY
processing Tim:Charles
Let’s do wheat;
send to Charles: Tim:weat
send to Alice: Tim:Charles
Bob Mailbox
EMPTY
processing Tim:Charles
Let’s do sugar;
send to Charles: Tim:sugar
send to Bob: Tim:Charles
Charles ws:1 ss:0
Tim hunger=1000
Computation model: step5
Mailbox Mailbox
EMPTY EMPTY
processing Tim:sugar
Let’s make a cake!
send to Tim: Charles:cake
Alice Mailbox
Tim:Charles
Bob Mailbox
Tim:Charles
Charles ws:0 ss:0
Mailbox
Tim:sugar Tim:wheat
Tim hunger=1000
Mailbox
Charles:cake
Computation model: step6
Alice Mailbox
Tim:Charles
Bob Mailbox
Tim:Charles
Charles ws:0 ss:1
Mailbox
Tim:wheat
processing Tim:sugar
Let’s store it!
Tim hunger=1000
Mailbox
Charles:cake
Computation model: step7
●
● ● ●
●
Actors have a mailbox, storing unprocessed messages and an ActorRef, an abstraction over their name/address
Actors handles a message at a time.
On default, the mailbox is a FIFO queue.
Unknown, unpredictable time for a message to reach the mailbox.
The reachable object graph of an actor should be encapsulated:
the actors and all the objects in his reachable object graph are either:
– deeply immutable (includes ActorRefs)
– not reachable by any other running worker (not just other Actors)
If the former holds, then the actors do not need to worry about synchronization:
Their state is owned by them and they behave like a monitor over such state.
Akka implementation assumes that the former holds, thus may break otherwise.
●
●
Computation model
●
●
●
●
Actors should only send encapsulated or deeply immutable objects as messages.
What it really means?
Read some explanation, some examples
Answer 18 YES/NO questions and motivate your answers.
Ass3
●
–
●
●
Next time Let’s understand the rest:
The code of AkkaConfig and OpenAkka. More on the ask Pattern
More foundational ideas on actors