Communication Patterns
CS511
1/37
Client Server Architecture
A Generic Server
Concurrency Patterns
2/37
Client-Server Architecture
Common asynchronous communication pattern
For example: a web server handles requests for web pages from clients (web browsers)
Server
{result,Data}
{request,Data}
Client
Client Client
3/37
Example: Factorial Server
1 2 3 4 5 6 7 8 9
10 11 12 13 14 15 16 17 18 19 20
-module(mserver). -export([start/0,compute_factorial/2]). -import(fact,[fact/1]).
loop(Count) -> receive
{get_count ,
{factorial ,
From, Ref} ->
From ! {result, Ref, Count}, loop(Count);
From, Ref, N} ->
Result = fact(N),
From ! {result, Ref, Result}, loop(Count+1);
stop -> true end.
% starting server with initial state 0
start() -> spawn(fun() -> loop(0) end). Note how the server state is a parameter of loop
4/37
Example: Factorial Server
1 2 3 4 5 6 7
1 2 3 4 5 6
Client
compute_factorial(Pid, N) -> Ref = make_ref(),
Pid ! {factorial, self(), Ref, N}, receive
{result, Ref, Result} -> Result
end.
> c(mserver).
{ok,mserver}
> P=mserver:start().
<0.40.0 >
> mserver:compute_factorial(P,10). 3628800
Test
5/37
Example: Factorial Server
What if the server crashes or stops?
1 > P ! stop
2 > mserver:compute_factorial(P,10). 3 …no response…
Why do we get no response?
Can you modify the code so that we receive a timeout?
6/37
Registered Processes – Recap
1 2 3 4
As seen in class, Erlang has a method for publishing a process identifier
Any other process can communicate with it BIF register
% starting server with initial state 0
start () ->
Pid = spawn(fun() -> loop(0) end), register(server ,Pid).
Unregister with unregister(name) Registration lookup whereis(name)
7/37
Registered Processes – Recap
The atom server can be used instead of a concrete process ID
1 > mserver2:start().
2 true
3 > mserver2:compute_factorial(server ,10).
4 3628800
5 > server ! stop.
6 stop
7 > mserver2:compute_factorial(server ,10).
8 ** exception error: bad argument
9 in function mserver2:compute_factorial/2 (mserver2.erl, line
10 > mserver2:start().
11 true
12 > mserver2:compute_factorial(server ,10).
13 3628800
8/37
Distributed Environments
Message passing abstractions extend easily for distributed environments
Erlang nodes
An instance of an Erlang runtime system
Nodes can easily communicate with each other Creating a node
erl -name ’nodeS@127.0.0.1’ -setcookie lecture
The cookie provides security (not everyone can connect) The name reflects the node’s IP address
9/37
Distributed Environments
Creating two nodes (for simplicity on the same machine) 1 erl -name ’nodeS@127.0.0.1’ -setcookie lecture
2 erl -name ’nodeC@127.0.0.1’ -setcookie lecture Connecting nodes
From nodeC@127.0.0.1
1 (nodeC@127.0.0.1)> net_adm:ping(’nodeS@127.0.0.1’). 2 pong
3 (nodeC@127.0.0.1)> nodes().
4 [’nodeS@127.0.0.1’]
10/37
Distributed Factorial Server – Running Your Code
Send the compiled version of your code to the connected nodes
1 (nodeC@127.0.0.1)> nl(fact).
2 abcast
3 (nodeC@127.0.0.1)> nl(mserver2). 4 abcast
The server gets started on the nodeS node
1 (nodeS@127.0.0.1)> mserver2:start(). 2 true
The client communicates with the server
(nodeC@127.0.0.1)> mserver2:compute_factorial({server, ’nodeS@127.0.0.1’}, 10).
3628800
Use of {registered_name, node@IP} instead of the pid or only the
registered name
Code has not been changed for running in a distributed setting!
11/37
Client Server Architecture
A Generic Server
Concurrency Patterns
12/37
A Generic Server
The code for a generic server takes care of the communication, faults, and upgrades
Programmers then only focus on writing the engine (i.e. what the server does)
No communication primitives are required in the engine
13/37
A Generic Server
The code must expose the following features: Correct
It implements a proper server/client request/reply interaction Parametrized
It is parametric on the engine Robust
It does not crash if the engine goes wrong
Upgradable
It allows to upgrade the engine of the server without shutting it down
14/37
A Generic Server
1 2 3 4 5 6 7 8 9
10 11 12 13 14
loop(State, F) -> receive
{update, From, Ref, NewF} -> From ! {ok, Ref},
loop(State , NewF) ;
{request, From, Ref, Data} ->
{R, NS} = F(State , Data),
From ! {result, Ref, R}, loop(NS, F);
stop -> true
end.
How can the server go wrong when evaluating F(State,Data)?
15/37
Exceptions – The evaluation of expressions can fail
Arithmetic error
1 > 1/0.
2 ** exception error: bad argument in an arithmetic expression
3 in operator ’/’/2
4 called as 1 / 0
Bad pattern matching
1 [] = [1].
2 ** exception error: no match of right hand side value [1] Undefined functions
1 net_adm:ping(1,2).
2 ** exception error: undefined function net_adm:ping/2
16/37
Exceptions
1 > catch (1/0).
2 {’EXIT’,{badarith ,[{erlang ,’/’,[1,0]},
3 {erl_eval ,do_apply ,5},
4 {erl_eval ,expr ,5},
5 {shell ,exprs ,7},
6 {shell ,eval_exprs ,7},
7 {shell ,eval_loop ,3}]}}
8 > catch([] = [1]).
9 {’EXIT’,{{badmatch ,[1]},[{erl_eval ,expr ,3}]}}
10 > catch(net_adm:ping(1,2)).
11 {’EXIT’,{undef,[{net_adm,ping,[1,2]},
12 {erl_eval ,do_apply ,5},
13 {erl_eval ,expr ,5},
14 {shell ,exprs ,7},
15 {shell ,eval_exprs ,7},
16 {shell ,eval_loop ,3}]}}
17 >
17/37
Exceptions
1 ,l,o,o p ( S t a t e , F ) – >
2 ,,,
3 ,,,
4 ,,,
5 ,,,
6 ,,,
7 ,,,
8 ,,,
9 ,,,
10 ,,,
11 ,,,
12 ,,,
13 ,,,
14 ,,,
15 ,,,
16 ,,,
17 ,,,
18 ,e,n,d.
receive
{update, From, Ref, NewF} ->
From ! {ok, Ref}, loop(State, NewF);
{request, From, Ref, Data} ->
case catch(F(State, Data)) of
stop -> true
{’EXIT’, Reason} ->
From!{exit, Ref, Reason},
loop(State , F); {R, NewState} ->
From!{result, Ref, R}, loop(NewState , F)
end;
It propagates the exception from the server to the client
18/37
Exceptions
1 ,l,o,o p ( S t a t e , F ) – >
2 ,,,
3 ,,,
4 ,,,
5 ,,,
6 ,,,
7 ,,,
8 ,,,
9 ,,,
10 ,,,
11 ,,,
12 ,,,
13 ,,,
14 ,,,
15 ,,,
16 ,,,
17 ,,,
18 ,e,n,d.
receive
{update, From, Ref, NewF} ->
From ! {ok, Ref}, loop(State, NewF);
{request, From, Ref, Data} ->
case catch(F(State, Data)) of
{’EXIT’, Reason} ->
From!{exit, Ref, Reason},
loop(State , F);
stop -> true
{R, NewState} -> From!{result, Ref, R},
loop(NewState , F) end;
It propagates the exception from the server to the client
18/37
Exceptions
1 ,l,o,o p ( S t a t e , F ) – >
2 ,,,
3 ,,,
4 ,,,
5 ,,,
6 ,,,
7 ,,,
8 ,,,
9 ,,,
10 ,,,
11 ,,,
12 ,,,
13 ,,,
14 ,,,
15 ,,,
16 ,,,
17 ,,,
18 ,e,n,d.
receive
{update, From, Ref, NewF} ->
From ! {ok, Ref}, loop(State, NewF);
{request, From, Ref, Data} ->
case catch(F(State, Data)) of
stop -> true
{’EXIT’, Reason} ->
From!{exit, Ref, Reason},
loop(State , F);
{R, NewState} ->
From!{result, Ref, R}, loop(NewState , F)
end;
It propagates the exception from the server to the client
18/37
Starting the Generic Server
1 start(Name, State, F) ->
2 Pid = spawn(fun() -> loop(State, F) end),
3 register(Name, Pid),
4 Pid.
19/37
Generic Client
1 2 3 4 5 6 7 8 9
Requests
request(Pid, Data) ->
Ref = make_ref(),
Pid!{request, self(), Ref, Data}, receive
{result, Ref, Result} -> Result;
{exit, Ref, Reason} -> exit(Reason)
end.
20/37
Generic Client
1 2 3 4 5 6 7
Upgrading the server’s engine
update(Pid, Fun) ->
Ref = make_ref(),
Pid!{update, self(), Ref, Fun}, receive
{ok, Ref} -> ok
end.
21/37
Factorial Server Revisited
1 2 3 4 5 6 7 8 9
10 11 12 13 14 15 16
-module(factServer). -export([start/0,compute_factorial/1]). -import(fact,[fact/1]).
engine(Count , {factorial ,N}) ->
Result = math_examples:factorial(N), {Result, Count+1} ;
engine(Count , get_count) -> {Count , Count}.
start() ->
genserver:start(server, 0, fun engine/2).
compute_factorial(N) -> genserver:request(server , {factorial , N}).
Observe that there are no message passing primitives!
22/37
Factorial Server Revisited
1 4> factServer:start().
2 <0.69.0 >
3 5> factServer:compute_factorial(23). 4 25852016738884976640000
23/37
Client Server Architecture
A Generic Server
Concurrency Patterns
24/37
Concurrency Examples and Patterns Revisited
Revisiting the following using message passing: A semaphore (already seen last class)
Barrier synchronisation
Resource allocation
Readers and writers
25/37
Barrier Synchronization Revisited
N processes must wait for the slowest before continuing with the next activity
Widely used in parallel programming
Process 1
Process 2
……..
Process N
Waiting for the others to finish
Continue with next activity
Barrier
Process 1
Process 2
……..
Process N
26/37
Barrier Synchronization Revisited
1 start(N) ->
2 Pid = spawn(fun() -> coordinator(N,N,[]) end),
3 register(coordinator , Pid).
4
5 coordinator(N,0,Ps) ->
6 [ From ! {ack, Ref} || {From, Ref} <- Ps ],
7 coordinator(N,N,[]) ;
8
9 coordinator(N,M,Ps) ->
10 receive
11 {reach , From , Ref} ->
12
13 end.
coordinator(N,M-1, [ {From,Ref} | Ps])
27/37
Barrier Synchronization Revisited
1 2 3 4 5 6
Using the barrier
reach_wait(Server) ->
Ref = make_ref(),
Server ! {reach , self(), Ref}, receive
{ack, Ref} -> true end.
28/37
Resource Allocation
A controller controls access to copies of some resources (of the same kind)
Clients requiring multiple resources should not ask for resources one at a time
Clients make requests to take or return any number of the resources
A request should only succeed if there are sufficiently many resources available
Otherwise the request must block
29/37
Resource Allocation
1 > c(ralloc).
2 {ok,ralloc}
3 > ralloc:start([1,1,1,1]). 4 true
5 > ralloc:request(3).
6 [1,1,1]
7 > ralloc:release([1]).
8 ok
9 > ralloc:request(2).
10 [1,1]
11 > ralloc:request(10).
In the last line, the process blocks
30/37
Resource Allocation
1 2 3 4 5 6 7 8 9
10 11
1 2 3 4 5 6 7 8
loop(Resources) ->
Available = length(Resources), receive
{req, From, Ref, Number} when Number =< Available ->
From ! {res, Ref, lists:sublist(Resources, Number)}, loop(lists:sublist(Resources , Number+1, Available)) ;
{ret, List} -> loop(lists:append(Resources, List)) end.
% continues…
Function lists:sublist returns a slice of a list; Examples
> lists:sublist([1,2,3,4], 2). [1,2]
> lists:sublist([1,2,3,4], 2, 2). [2,3]
> lists:sublist([1,2,3,4], 2, 5). [2,3,4]
> lists:sublist([1,2,3,4], 5, 2). []
31/37
Resource Allocation
1 start(Init) ->
2 Pid = spawn (fun () -> loop(Init) end),
3 register(rserver , Pid).
4 5
6 request(N) ->
7 Ref = make_ref(),
8 rserver ! {req, self(), Ref, N},
9 receive
10 {res, Ref, List} -> List
11 end.
12
13 release(List) ->
14 rserver ! {ret, List},
15 ok
32/37
Readers and Writers Revisited
Two kinds of processes share access to a “database” Readers examine the contents
Multiple readers allowed concurrently Writers examine and modify data
A writer must have mutex
Readers and writers in a few lines
33/37
Readers and Writers Revisited
1 2 3 4 5 6 7 8 9
10 11 12 13 14
loop(Rs, Ws) -> receive
{start_read, From, Ref} when Ws =:= 0 -> From ! {ok_to_read , Ref}, loop(Rs+1,Ws) ;
{start_write, From, Ref} when Ws =:= 0 and Rs =:= 0 -> From ! {ok_to_write , Ref},
loop(Rs, Ws+1) ;
end_read -> loop(Rs-1, Ws) ; end_write -> loop(Rs, Ws-1)
end.
Is it a fair solution?
34/37
Readers and Writers Revisited
1 2 3 4 5 6 7 8 9
10 11 12 13 14
loop(Rs, Ws) -> receive
{start_read, From, Ref} when Ws =:= 0 -> From ! {ok_to_read , Ref}, loop(Rs+1,Ws) ;
{start_write, From, Ref} when Ws =:= 0 and Rs =:= 0 -> From ! {ok_to_write , Ref},
loop(Rs, Ws+1) ;
end_read -> loop(Rs-1, Ws) ; end_write -> loop(Rs, Ws-1)
end.
Is it a fair solution? Unfair for writers
34/37
Fair Readers and Writers
1 2 3 4 5 6 7 8 9
10 11 12 13
loop() -> receive
{start_read, From, Ref} -> From ! {ok_to_read , Ref}, loop_read(1),
loop () ;
{start_write, From, Ref} -> From ! {ok_to_write , Ref}, receive
end.
end_write -> loop() end
35/37
Fair Readers and Writers
1 2 3 4 5 6 7 8 9
10 11 12 13 14 15 16 17
loop_read (0) -> ok ; loop_read(Rs) ->
receive
{start_read, From, Ref} ->
From ! {ok_to_read , Ref}, loop_read(Rs+1) ;
end_read -> loop_read(Rs-1) ;
{start_write, From, Ref} ->
[ receive end_read -> ok end || _ <- lists:seq(1,Rs) ],
From ! {ok_to_write , Ref}, receive
end_write -> ok
end end.
At top-level loop relies on the fairness property of Erlang (i.e. the oldest message that matches any guard is processed) Function loop_read implements fairness
Line [ receive end_read ->ok end || _ <- lists:seq(1,Rs) ] performs as many receive as the number Rs
36/37
Fair Readers and Writers
A FSM that describes its behavior
Format of events: