NWEN303 Concurrent Programming
16: Actors and Message Passing 3
Marco Servetto VUW
●
●
●
package akkaUtils;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import akka.actor.ActorSystem;
import akka.actor.Terminated;
public class OpenAkka{
public static void main(String[]args) throws InterruptedException {
ActorSystem s = AkkaConfig.newSystem(“OpenAkka”,2500,Map.of());
String ip=””+s.settings().config().getAnyRef(“akka.remote.netty.tcp.hostname”); System.out.println(“Chosen IP is “+ip);
keybordClose(s);
}
public static void keybordClose(ActorSystem s) throws InterruptedException{
CompletableFuture
}}
catch (IOException ioe) {s.terminate();}
}; }
Very little code: just an ActorSystem with the default AkkaConfig configuration.
It allows other ActorSystems to create actors on him, and those actors can do any computation.
Instead of manually firing OpenAkka up, you may want to insert a ssh script in your Java code so that it is created and closed programmatically
Open Akka
AkkaConfig
public static ActorSystem newSystem(String name,int port,Map
if(ips.size()!=1){throw new Error(“Unable to detect ip ..”+ips);}
Config config = ConfigFactory.parseString( “akka.actor.guardian-supervisor-strategy = ”
+TerminatorSupervisor.class.getCanonicalName() ).withFallback(ConfigFactory.parseString( “akka.actor.provider = remote” )).withFallback(ConfigFactory.parseString( “akka.remote.enabled-transports = [\”akka.remote.netty.tcp\”]” )).withFallback(ConfigFactory.parseString( “akka.remote.netty.tcp.hostname = \””+ips.get(0)+”\”” )).withFallback(ConfigFactory.parseString( “akka.remote.netty.tcp.port = “+port
));
for(Entry
“akka.actor.deployment.\”/”+e.getKey()
+”\”.remote = \”akka.tcp://OpenAkka@”+e.getValue()+”:2500\””); config=config.withFallback(c);
}
config=config.withFallback(ConfigFactory.load());
return ActorSystem.create(name,config); }
AkkaConfig
akka.actor.guardian-supervisor-strategy = TerminatorSupervisor akka.actor.provider = remote
akka.remote.enabled-transports = [“akka.remote.netty.tcp”] akka.remote.netty.tcp.hostname = MyIp akka.remote.netty.tcp.port = port
akka.actor.deployment.”/Actor1.Name”.remote =
“akka.tcp://OpenAkka@”Actor1.Ip:2500″ akka.actor.deployment.”/Actorn.Name”.remote =
“akka.tcp://OpenAkka@”Actorn.Ip:2500″ And fallback on default configuration
…
●
●
Akka heavily configurable, and configuration is hard.
You can have configuration files, but near always you will need some way to programmatically edit the configuration before the ActorSystem start.
After the ActorSystem is started, the configuration can not be changed/updated.
●
AkkaConfig, should we care?
AkkaConfig, should we care?
akka.actor.deployment.”/Alice”.remote = “akka.tcp://OpenAkka@130.195.6.192:2500”
The former configuration line will make any actor called exactly Alice run on another machine.
Not the only way of doing things.
Akka Clustering is more expressive / more involved.
Do not just use AkkaConfig as it is. Tame it, understand it and make it yours. To properly use Akka you need to be able to write your own versions of AkkaConfig, you need to understand how to configure Akka to fit your needs.
●
● ●
●
●
●
●
●
●
●
●
An actor is a Worker that communicate with other workers strictly by message passing.
We focus on Messages as objects
Actors have a mailbox, storing unprocessed message objects.
Actors have a ActorRef, an abstraction over their name/address.
It is always safe to share ActorRefs.
Actors handles a message at a time.
On default, the mailbox is a FIFO queue.
Unknown, unpredictable time for a message to reach the mailbox.
Actors behaves like monitors, but without the need of any syncronization/lock
●
Actors
Actors are encapsulated
The actor object must encapsulate
all the objects in its reachable object graph, that is:
No other actor/worker have a reference to the actor object.
No other actor/worker have a reference to any mutable object in the Reachable Object Graph (ROG) of the actor object.
Deeply immutable objects can be freely shared. The message objects must be either:
deeply immutable (includes ActorRefs) OR
encapsulated: their mutable ROG is not reachable by any running worker.
–
–
–
– –
Actors are encapsulated When actors are properly encapsulated:
Local reasoning about synchronization:
if the actor do not have internal parallelism, no race conditions.
No difference if the actors are on a single machine or multiple machines (if serialization is correctly implemented)
Simpler to move actors around to different machines.
Some new languages may be able to ENFORCE that actors are used correctly, see Pony and 42
(BUT NOT RUST)
–
–
–
CharlesRef TimRef
Alice
Wheat
Charles List
Wheat1 Wheat2 .. Wheatn
● Alice have just refs
● Charles have Lists of Wheat and Sugar
●
Alice can make Wheats
Diagram1
Alice
CharlesRef TimRef
Wheat
Charles List
Wheat1 Wheat2 .. Wheatn
● When Alice create a Wheat, it is encapsulated.
● When Alice give the Wheat to Charles, she do not retain any alias to the Wheat
● Thus, the Wheat Charles get is still encapsulated.
Thus, he can put in the list, and the
whole ROG of Charles is still encapsulated.
Diagram2
●
●
There is a moment when neither Alice, nor Charles hold the Wheat.
There is no moment when both hold the Wheat.
Alice
CharlesRef TimRef
Wheat
Charles List
Wheat1 Wheat2 .. Wheatn
Diagram3
● ●
●
Actors as lock-free monitors An actor can serve as a monitor to its ROG:
Sending a message to an actor is similar to calling a method to a monitor object.
The actor can respond by sending another message, as if it was the return value of such method. (Akka support this with “the ask pattern”)
Since sending messages is an asynchronous operation, an actor is never ‘just waiting’, unless:
it has an empty mailbox (non blocking)
the message handler use some blocking operation, like waiting for a future (BAD, do not do it!)
●
– –
Example: Groups of Persons
-A Person have a String name and many Person friends. -A Group of friend represent a set of friendships.
We want to enforce the following:
-A Person only belong to a single Group, and have friends only in that Group.
public final class Group{//facade pattern private List
return Collections.unmodifiableList(ps);}
public void addPerson(String name){ps.add(new Person(name));} private void checkInGroup(Person p) {
if(!ps.contains(p)) {throw new Error(“”);}
}
public void connect(Person p1,Person p2) {
checkInGroup(p1); checkInGroup(p2); if(p1==p2) {return;} if(p1.friends.contains(p2)) {return;} p1.friends.add(p2); p2.friends.add(p1); }
public final static class Person{//important: nested private String name;
public String getName() {return this.name;} private Person(String name) {this.name=name;} private List
return Collections.unmodifiableList(friends);} public String toString() {return …;}
}}
public final class Group{//facade pattern private List
return Collections.unmodifiableList(ps);}
public void addPerson(String name){ps.add(new Person(name));} private void checkInGroup(Person p) {
if(!ps.contains(p)) {throw new Error(“”);}
}
public void connect(Person p1,Person p2) {
checkInGroup(p1); checkInGroup(p2); if(p1==p2) {return;} if(p1.friends.contains(p2)) {return;} p1.friends.add(p2); p2.friends.add(p1); }
public final static class Person{//important: nested private String name;
public String getName() {return this.name;} private Person(String name) {this.name=name;} private List
return Collections.unmodifiableList(friends);} public String toString() {return …;}
}}
public final class Group{//facade pattern private List
return Collections.unmodifiableList(ps);}
public void addPerson(String name){ps.add(new Person(name));} private void checkInGroup(Person p) {
if(!ps.contains(p)) {throw new Error(“”);}
}
public void connect(Person p1,Person p2) {
checkInGroup(p1); checkInGroup(p2); if(p1==p2) {return;} if(p1.friends.contains(p2)) {return;} p1.friends.add(p2); p2.friends.add(p1); }
public final static class Person{//important: nested private String name;
public String getName() {return this.name;} private Person(String name) {this.name=name;} private List
return Collections.unmodifiableList(friends);} public String toString() {return …;}
}}
public final class Group{//facade pattern private List
return Collections.unmodifiableList(ps);}
public void addPerson(String name){ps.add(new Person(name));} private void checkInGroup(Person p) {
if(!ps.contains(p)) {throw new Error(“”);}
}
public void connect(Person p1,Person p2) {
checkInGroup(p1); checkInGroup(p2); if(p1==p2) {return;} if(p1.friends.contains(p2)) {return;} p1.friends.add(p2); p2.friends.add(p1); }
public final static class Person{//important: nested private String name;
public String getName() {return this.name;} private Person(String name) {this.name=name;} private List
return Collections.unmodifiableList(friends);} public String toString() {return …;}
}}
public final class Group{//facade pattern private List
return Collections.unmodifiableList(ps);}
public void addPerson(String name){ps.add(new Person(name));} private void checkInGroup(Person p) {
if(!ps.contains(p)) {throw new Error(“”);}
}
public void connect(Person p1,Person p2) {
checkInGroup(p1); checkInGroup(p2); if(p1==p2) {return;} if(p1.friends.contains(p2)) {return;} p1.friends.add(p2); p2.friends.add(p1); }
public final static class Person{//important: nested private String name;
public String getName() {return this.name;} private Person(String name) {this.name=name;} private List
return Collections.unmodifiableList(friends);} public String toString() {return …;}
}}
Private means: private to the whole set of nested classes
How is up to now? ● Success?
● Can the user modify Person directly? ● no, all mutation operation are
private, so only Groups
can modify them.
● code logic check that persons of
different groups are never connected.
● Persons are created by the Group,
and not directly by the user.
● A delicate balance! Change one thing,
all collapse!
● Of course, we are assuming to run
under a SecurityMangager preventing nasty reflection tricks
(not like the default one)
Groups of Persons
A more usable Group
● How to expose only a read view of a group:
ReadGroup is an interface, Group will implement it, and there will be a method to wrap a Group in a ReadGroup.
Note how there is no way to extract the inner group from the result of a ReadGroup.of(..)
public class Group implements ReadGroup{…}
public interface ReadGroup {
public List
return new ReadGroup(){
public List
return g.getPersons();
} };
} }
Note: a readonly wrapper is NOT deeply immutable if any mutable alias to the wrapped object exists; thus is not always safe to send them as messages.
A more usable Group
● How offer a deepClone() operation:
Hard to do by hand: circular graph of friends! Java idiomatic way: use serialization!
public class Group implements ReadGroup,Serializable{… public final static class Person implements Serializable{..} public Group deepClone() {return DeepClone.copy(this);} }
class DeepClone{
@SuppressWarnings(“unchecked”)public static
ByteArrayOutputStream aux=new ByteArrayOutputStream(); try(ObjectOutputStream out= new ObjectOutputStream(aux)){
out.writeObject(orig);
out.flush(); }
catch(IOException e) {throw new Error(e);} try(ObjectInputStream in = new ObjectInputStream(
new ByteArrayInputStream(aux.toByteArray()))){ return (T)in.readObject(); }
catch(IOException|ClassNotFoundException e) { throw new Error(e); }
}}
Group: testing/example usage
String s(ReadGroup g) {return g.getPersons().toString();}
@Test public void test() {
Group g=new Group();
g.addPerson(“Bob”);//persons created by the group g.addPerson(“Alice”);
//modification handled by the group g.connect(g.getPersons().get(0), g.getPersons().get(1)); assertEquals(“[Bob[Alice], Alice[Bob]]”,s(g));
ReadGroup wrapper=ReadGroup.of(g);//read only view
ReadGroup imm=ReadGroup.of(g.deepClone());//immutable datatype assertEquals(“[Bob[Alice], Alice[Bob]]”,s(wrapper)); assertEquals(“[Bob[Alice], Alice[Bob]]”,s(imm));
g.addPerson(“Wally”);//modifies only wrapper assertEquals(“[Bob[Alice], Alice[Bob], Wally[]]”,s(wrapper)); assertEquals(“[Bob[Alice], Alice[Bob]]”,s(imm));
}