}
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
nextin = 0;
V(occupied);
nextout = 0;
V(empty);
return(item);
}
Note: this is not the first procedures of producer and consumer threads
this is synchronization code (producer and consumer
Operating Systems – CSCI 402
threads calls them to synchronize with each other) Copyright ý . Cheng
321 0
25
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
8
0
0
0
empty
occupied nextin nextout
nextout = 0;
V(empty);
return(item);
}
Producer
Copyright ý . Cheng
01234567
321 0
26
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
8
0
0
0
empty
occupied nextin nextout
nextout = 0;
V(empty);
return(item);
}
Producer
Copyright ý . Cheng
01234567
321 0
27
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
0
0
empty
occupied nextin nextout
nextout = 0;
V(empty);
return(item);
}
Producer
Copyright ý . Cheng
01234567
321 0
28
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
0
0
empty
occupied nextin nextout
nextout = 0;
V(empty);
return(item);
}
Producer
Copyright ý . Cheng
01234567
321 0
29
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
1
0
empty
occupied nextin nextout
nextout = 0;
V(empty);
return(item);
}
Producer
Copyright ý . Cheng
01234567
321 0
30
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
1
0
empty
occupied nextin nextout
nextout = 0;
V(empty);
return(item);
}
Producer
Copyright ý . Cheng
01234567
321 0
31
empty
occupied nextin nextout
}
Producer
note: producer continue to produce
321 0
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
1
1
0
Copyright ý . Cheng
01234567
32
empty
occupied nextin nextout
}
Producer
note: producer continue to produce
321 0
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
1
0
Copyright ý . Cheng
01234567
33
empty
occupied nextin nextout
}
Producer
note: producer continue to produce
321 0
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
Consumer
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
1
0
Copyright ý . Cheng
01234567
34
}
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
1
1
empty
occupied nextin nextout
Consumer
}
Producer
note: producer continue to produce
321 0
Copyright ý . Cheng
01234567
35
}
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
7
0
1
1
empty
occupied nextin nextout
Consumer
}
Producer
note: producer continue to produce
321 0
Copyright ý . Cheng
01234567
36
}
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
8
0
1
1
empty
occupied nextin nextout
Consumer
}
Producer
note: producer continue to produce
321 0
Copyright ý . Cheng
01234567
37
}
nextout = 0;
V(empty);
return(item);
Operating Systems – CSCI 402
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
8
0
1
1
empty
occupied nextin nextout
Consumer
}
Producer
note: producer continue to produce
321 0
Copyright ý . Cheng
01234567
38
}
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
nextin = 0;
V(occupied);
nextout = 0;
V(empty);
return(item);
}
if produce and consume at same rate, no one may ever wait
parallelism of 2
if producer is fast and consumer slow, producer may wait if consumer is fast and producer slow, consumer may wait
321 0
Operating Systems – CSCI 402
39
Copyright ý . Cheng
}
Producer/Consumer with Semaphores
Semaphore empty = B;
Semaphore occupied = 0;
int nextin =0;
int nextout = 0;
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
nextin = 0;
V(occupied);
nextout = 0;
V(empty);
return(item);
}
Mutex by itself is more “coarse grain”
you may use one mutex to control access to the number of empty and occupied cells, nextin, and nextout
Operating Systems – CSCI 402
Semaphore gives more “fine grain parallelism”
321 0
40
but not general enough Copyright ý .
Semaphore has limited use
pretty much the only place it¡¯s really good for is producer-consumer
although it¡¯s a very important application of semaphores
T1 T2 T3 T4 T5
Operating Systems – CSCI 402
Copyright ý . Cheng
the queues above are queues with bounded buffer space recall that the “producer-consumer problem” is also known as the “bounded-buffer problem”
“pipelined parallelism”
41
321 0
Operating Systems – CSCI 402
POSIX Semaphores
#include
sem_t semaphore;
int err;
int pshared = 0; // not shared among processes
int init_value = B; // initial value
err = sem_init(&semaphore, pshared, init_value);
err = sem_destroy(&semaphore);
err = sem_wait(&semaphore); /* P operation */
err = sem_trywait(&semaphore); /* conditional P
err = sem_post(&semaphore);
operation */
/* V operation */
42
321 0
Copyright ý . -Consumer with POSIX Semaphores
void Produce(char item) {
P(empty);
buf[nextin] = item;
nextin = nextin + 1;
if (nextin == B)
nextin = 0;
V(occupied);
}
void Produce(char item) {
sem_wait(&empty);
buf[nextin++] = item;
if (nextin >= B)
nextin = 0;
sem_post(&occupied);
}
char Consume( ) {
char item;
P(occupied);
item = buf[nextout];
nextout = nextout + 1;
if (nextout == B)
nextout = 0;
V(empty);
return(item);
}
Operating Systems – CSCI 402
char Consume() {
char item;
sem_wait(&occupied);
item = buf[nextout++];
if (nextout >= B)
nextout = 0;
sem_post(&empty);
return(item);
}
321 0
43
Copyright ý . Systems – CSCI 402
Implementing Semaphore With Mutex
Semaphore operation
POSIX implementation
when (S > 0) [
S = S – 1;
]
while(1) {
pthread_mutex_lock(&m);
if (S > 0) {
S = S – 1;
pthread_mutex_unlock(&m);
break;
}
pthread_mutex_unlock(&m);
}
[S = S + 1;]
pthread_mutex_lock(&m);
S = S + 1;
pthread_mutex_unlock(&m);
44
321 0
Copyright ý . Systems – CSCI 402
Implementing Semaphore With Mutex
Semaphore operation
POSIX implementation
when (S > 0) [
S = S – 1;
]
while(1) {
pthread_mutex_lock(&m);
if (S > 0) {
S = S – 1;
pthread_mutex_unlock(&m);
break;
}
pthread_mutex_unlock(&m);
}
Implementation of P(S)above works but not good – inefficient what if guard is false (in this case, S = 0) and no other thread is waiting for the mutex?
busy waiting: your thread will not give up CPU while waiting for the guard to become true
it does not do anythnig “useful”
right way to wait is to give up the CPU when waiting Copyright ý . Cheng
321 0
45
Operating Systems – CSCI 402
Implementation Of General Guarded Commands
when (guard) [
/* command sequence */
…
]
In general, the guard can be complicated and involving the evaluation of several variables (e.g., a > 3 && f(b) <= c)
the guard (which evaluates to either true or false) keeps changing its value, continuously and by multiple threads
in multiple CPUs simultaneously
how can we "capture" the instance of time when it evaluates to true so we can execute the command sequence atomically?
we have to "sample" it, i.e., take snap shot of all the variables that are involved and then evaluate it
a mutex is involved, but how?
46
321 0
Copyright ý . Systems - CSCI 402
Implementation Of General Guarded Commands
when (guard) [
/* command sequence */
...
]
In general, the guard can be complicated m and involving the evaluation of several variables (e.g., a > 3 && f(b) <= c)
the guard (which evaluates to either true or false) keeps changing its value, continuously and by multiple threads
in multiple CPUs simultaneously
"Serialization Box"
If you have the mutex locked, you can do:
read/write
execute
execute
a, b, c
CS1
CS2
...
how can we "capture" the instance of time when it evaluates to true so we can execute the command sequence atomically?
we have to "sample" it, i.e., take snap shot of all the variables that are involved and then evaluate it
a mutex is involved, but how?
this would work, but can lead to busy-waiting
321 0
47
Copyright ý . Of General Guarded Commands
when (guard) [
/* command sequence */
...
]
In general, the guard can be complicated m and involving the evaluation of several variables (e.g., a > 3 && f(b) <= c)
the guard (which evaluates to either true or false) keeps changing its value, continuously and by multiple threads
in multiple CPUs simultaneously
Need something else (known as condition variables) and a bunch of rules to follow
Operating Systems - CSCI 402
"Serialization Box"
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
48
321 0
Copyright ý . Of General Guarded Commands
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded commands
a condition variable is a queue of threads waiting for some sort of notification
(an "event" or "condition")
threads, waiting for a guard to become true, go to sleep in such a queue
they wait for a specific condition to be signaled they wait for the right time to re-evaluate the guard
Operating Systems - CSCI 402
"Serialization Box"
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
49
321 0
Copyright ý . Of General Guarded Commands
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded commands
unlike notifications on your cellphone, an "event" (signaling/broadcasting of a condition) happens in an instance of time (duration of this "event" is zero)
if you are not waiting for it, you¡¯ll miss it
how do you make sure you won¡¯t miss an event?
you have to follow the rules/protocol (for multiple interacting threads to follow) described here
Operating Systems - CSCI 402
"Serialization Box"
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
50
321 0
Copyright ý . Of General Guarded Commands
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded commands
threads that do something to potentially change the truth value of the guard can then wake up the threads that were sleeping in the queue
"Serialization Box"
they can signal (wake up one thread sleeping there) or broadcast (wake up all threads sleeping there) the condition
wake up thread(s) by moving it into the mutex queue no guarantee that the guard will be true when it¡¯s
time for another thread to evaluate the guard again
Operating Systems - CSCI 402
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
51
321 0
Copyright ý . Of General Guarded Commands
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded commands
1) pthread_cond_wait( pthread_cond_t *cv,
pthread_mutex_t *mutex)
must only call pthread_cond_wait() if you have the mutex locked
atomically unlocks mutex and wait for the "event" when the event is signaled/broadcasted, pthread_cond_wait()returns with the mutex locked
Operating Systems - CSCI 402
"Serialization Box"
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
52
321 0
Copyright ý . Cheng
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded commands
1) pthread_cond_wait(cv, mutex) atomically unlocks mutex and wait for the "event"
with respect to the operation of the mutex
Time
unlocks mutex wait for event
"Serialization Box"
Operating Systems - CSCI 402
Implementation Of General Guarded Commands
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
ATOMIC
321 0
53
Copyright ý . Cheng
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded commands
1) pthread_cond_wait(cv, mutex) atomically unlocks mutex and wait for the "event"
with respect to the operation of the mutex signal/broadcast
"Serialization Box"
Operating Systems - CSCI 402
Implementation Of General Guarded Commands
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
? OK
unlocks mutex
Time wait for event
ATOMIC
321 0
54
Copyright ý . Cheng
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded
"Serialization Box"
Operating Systems - CSCI 402
Implementation Of General Guarded Commands
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
commands
unlocks mutex
? OK
ATOMIC
signal/broadcast
Time wait for event
2) pthread_cond_broadcast(pthread_cond_t *cv) pthread_cond_signal(pthread_cond_t *cv)
must only call pthread_cond_wait(), pthread_cond_broadcast() or pthread_cond_signal() if you have the corresponding mutex locked
321 0
55
Copyright ý . Cheng
when (guard) [
/* command sequence */
...
]
POSIX provides condition variables (CV) m for programmers to implement guarded
"Serialization Box"
Operating Systems - CSCI 402
Implementation Of General Guarded Commands
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
commands
signal/broadcast
Time unlocks mutex wait for event
ATOMIC
what if pthread_cond_wait() it not atomic?
your thread may miss the event and sleep forever in the CV queue
is this a deadlock?
no, this is a race condition (i.e., bad timing-dependent
behavior)
321 0
56
Copyright ý . Systems - CSCI 402
Set Up
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
If a condition variable cannot be initialized statically, do:
int pthread_cond_init(
pthread_cond_t *cvp,
pthread_condattr_t *attrp)
int pthread_cond_destroy(
pthread_cond_t *cvp)
Usually, condition variable attributes are not used
57
321 0
Copyright ý . Of General Guarded Commands
Synchronization: mutex, condition variables, guards,
critical sections
with respect to a mutex, a thread can be
"Serialization Box"
Operating Systems - CSCI 402
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
waiting in the mutex queue got the lock and inside the "serialization box"
only one thread can be inside
the "serialization box" waiting in the CV queue or outside
with respect to a mutex, a, b, c are variables that can affect the value of the guard
m
can only access (i.e., read/write) them if a thread is inside the "serialization box" (i.e., has the mutex locked)
58
321 0
Copyright ý . Of General Guarded Commands
Synchronization: mutex, condition variables, guards,
critical sections
when you signal CV
one thread in the CV queue gets moved to the mutex queue
when you broadcast CV
all threads in the CV queue get moved to the mutex queue
"Serialization Box"
you can only get added to the CV queue if you have the mutex locked you can only modify the variables in the guard if you have the mutex locked you can only read the variables in the guard (i.e., evaluate the guard) if you have the mutex locked
you can only execute critical section
code if you have the mutex locked
321 0
m
Operating Systems - CSCI 402
If you have the mutex locked, you can do:
read/write
execute
execute
cond_wait signal/broadcast
a, b, c
...
CV
CS1
CS2
59
Copyright ý . Systems - CSCI 402
POSIX Condition Variables
Guarded command
POSIX implementation
when (guard) [
statement 1;
...
statement n;
]
pthread_mutex_lock(&mutex);
while(!guard)
pthread_cond_wait(
&cv,
&mutex);
statement 1;
...
statement n;
pthread_mutex_unlock(&mutex);
[ /* code
* modifying
* the guard
*/
]
pthread_mutex_lock(&mutex);
/*code modifying the guard:*/
...
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&mutex);
if you don¡¯t follow these rules, your code will have
race conditions (i.e., bad timing-dependent behavior) Copyright ý . Cheng
321 0
60
don¡¯t believe that pthread_cond_signal/broadcast()
can be called without locking the mutex
321 0
Operating Systems - CSCI 402
POSIX Condition Variables
Guarded command
POSIX implementation
when (guard) [
statement 1;
...
statement n;
]
pthread_mutex_lock(&mutex);
while(!guard)
pthread_cond_wait(
&cv,
&mutex);
statement 1;
...
statement n;
pthread_mutex_unlock(&mutex);
[ /* code
* modifying
* the guard
*/
]
pthread_mutex_lock(&mutex);
/*code modifying the guard:*/
...
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&mutex);
61
Copyright ý . Cheng