CONCURRENCY: SEMAPHORES
Andrea Arpaci-Dusseau CS 537, Fall 019
ADMINISTRIVIA
– Project 4 due today at 5:00 pm (or 7:00 am…)
– Project 5 available Wed morning (xv6 Memory)
– Requestnewprojectpartnerifdesired
– Midterm 2: Nov 11/6 (Wed) from 7:30-9:30pm
– Notifybytomorrowifconflict
– Two”quizzes”onraceconditionsinCanvas
– Office Hours Changed Today: 4-5 pm
LEARNING OUTCOMES: Semaphores
Semaphores (vs. condition variables?)
How to implement a lock with semaphores?
How to implement semaphores with locks and condition variables? How to implement join and producer/consumer with semaphores? How to synchronize dining philosophers?
How to implement reader/writer locks with semaphores?
RECAP
Concurrency Objectives
Mutual exclusion (e.g.,A and B don’t run at same time) solved with locks
Ordering (e.g., B runs after A does something) solved with condition variables and semaphores
Condition Variables
wait(cond_t *cv, mutex_t *lock)
– assumes the lock is held when wait() is called
– puts caller to sleep + releases the lock (atomically) – when awoken, reacquires lock before returning
signal(cond_t *cv)
– wake a single waiting thread (if >= 1 thread is waiting) – if there is no waiting thread, just return, doing nothing
cv A B C D lock D A
signal(cv) – what happens? release(lock) – what happens?
Ordering Example: Join
pthread_t p1, p2;
Pthread_create(&p1, NULL, mythread, “A”);
Pthread_create(&p2, NULL, mythread, “B”);
// join waits for the threads to finish (call exit()) Pthread_join(p1, NULL);
Pthread_join(p2, NULL);
printf(“main: done\n [balance: %d]\n [should: %d]\n”,
balance, max*2);
return 0;
how to implement join()?
Join Implementation: COrrect
Parent: Child:
void thread_join() { Mutex_lock(&m); // w
}
if (done == 0) // x Cond_wait(&c, &m); // y
Mutex_unlock(&m); // z
Parent: w x y z
Child:
a b c
Use mutex to ensure no race between interacting with state and wait/signal
void thread_exit() {
Mutex_lock(&m);
// a
// b
// c
// d
}
done = 1;
Cond_signal(&c);
Mutex_unlock(&m);
Producer/Consumer: Two CVs and WHILE
void *producer(void *arg) {
while (1) {
Mutex_lock(&m); // p1
while (numfull == max) // p2
Cond_wait(&empty, &m); // p3
do_fill(); // p4
Cond_signal(&fill); // p5
Mutex_unlock(&m); //p6
} }}
}
void *consumer(void *arg) {
while (1) {
Mutex_lock(&m);
while (numfull == 0)
Cond_wait(&fill, &m);
int tmp = do_get();
Cond_signal(&empty);
Mutex_unlock(&m);
do_work(tmp);
Summary: rules of thumb for CVs
1. Keep state in addition to CV’s
2. Always do wait/signal with lock held
3. Whenever thread wakes from waiting, recheck state (while loop)
INTRODUCING Semaphores
Condition variables have no state (other than waiting queue) – Programmer must track additional state
Semaphores have state: track integer value
– State cannot be directly accessed by user program,
but state determines behavior of semaphore operations
Equivalence Claim
Semaphores are equally powerful to Locks+CVs – what does this mean?
One might be more convenient, but that’s not relevant Equivalence means each can be built from the other
Locks
Semaphores
CV’s
Semaphores
Semaphores
Locks
CV’s
Semaphore Operations
Allocate and Initialize
sem_t sem;
sem_init(sem_t *s, int initval) {
s->value = initval;
}
User cannot read or write value directly after initialization
Wait or Test (sometime P() for Dutch) sem_wait(sem_t*) Waits until value of sem is > 0; Decrements sem value,
Signal or Post (sometime V() for Dutch) sem_post(sem_t*) Increment sem value, if value > 0, wake a single waiter
wait and post are atomic
Build Lock from Semaphore
typedef struct __lock_t { sem_t sem;
} lock_t;
void init(lock_t *lock) {
}
void acquire(lock_t *lock) { }
void release(lock_t *lock) { }
sem_init(sem_t*, int initial) sem_wait(sem_t*):Waituntilvalue> 0;dec sem_post(sem_t*): Increment; if > 0, wake single waiter
Locks
Semaphores
Build Lock from Semaphore
typedef struct __lock_t { sem_t sem;
} lock_t;
void init(lock_t *lock) {
sem_init(&lock->sem, 1);
}
void acquire(lock_t *lock) { }
void release(lock_t *lock) { }
sem_init(sem_t*, int initial) sem_wait(sem_t*):Waituntilvalue> 0;dec sem_post(sem_t*): Increment; if > 0, wake single waiter
Locks
Semaphores
sem_wait(&lock->sem);
sem_post(&lock->sem);
Building CV’s over Semaphores
Possible, but really hard to do right
CV’s
Semaphores
Read about Microsoft Research’s attempts:
http: //research. microsoft. com/pubs/64242/ImplementingC Vs. pdf
void thread_join() {
Mutex_lock(&m);
}
// w
void thread_exit() {
Mutex_lock(&m);
// a
// b
// c
// d
Join with CV vs Semaphores
// x Mutex_unlock(&m); // z
done = 1;
Cond_signal(&c);
Mutex_unlock(&m);
if (done == 0)
Cond_wait(&c, &m); // y
}
sem_t s;
sem_init(&s,
void thread_join() {
sem_wait(&s);
sem_wait(sem_t*):Waituntilvalue> 0;dec sem_post(sem_t*): Increment; if > 0, wake single waiter
void thread_exit() {
sem_post(&s)
0);
}}
Build Semaphore from Lock and CV
Typedef struct { int value;
cond_t cond;
lock_t lock; } sem_t;
Sem_wait(): Waits until value > 0, then decrement Sem_post(): Increment value, then wake a single waiter
Void sem_init(sem_t *s, int value) { s->value = value; cond_init(&s->cond); lock_init(&s->lock);
}
Semaphores
Locks
CV’s
Build Semaphore from Lock and CV
Sem_wait{sem_t *s) { lock_acquire(&s->lock); while (s->value <= 0)
cond_wait(&s->cond); s->value–;
lock_release(&s->lock); } }
Sem_wait(): Waits until value > 0, then decrement Sem_post(): Increment value, then wake a single waiter
Sem_post{sem_t *s) { lock_acquire(&s->lock);
s->value++; cond_signal(&s->cond); lock_release(&s->lock);
Semaphores
Locks
CV’s
Producer/Consumer: Semaphores #1
Single producer thread, single consumer thread
Single shared buffer between producer and consumer
Use 2 semaphores
– emptyBuffer: Initialize to:
– fullBuffer: Initialize to:
Producer
while (1) {
sem_wait(&emptyBuffer);
Fill(&buffer);
sem_signal(&fullBuffer); }}
1 à 1 empty buffer; producer can run 1 time first
0 à 0 full buffers; consumer can run 0 times first
Consumer
while (1) {
sem_wait(&fullBuffer);
Use(&buffer);
sem_signal(&emptyBuffer);
Producer/Consumer: Semaphores #2
Single producer thread, single consumer thread
Shared buffer with N elements between producer and consumer Use 2 semaphores
– emptyBuffer: Initialize to: N à N empty buffers; producer can run N times first
– fullBuffer: Initialize to:
Producer
i = 0;
while (1) {
sem_wait(&emptyBuffer);
Fill(&buffer[i]);
i = (i+1)%N;
sem_signal(&fullBuffer);
}}
0 à 0 full buffers; consumer can run 0 times first
Consumer
j = 0;
While (1) {
sem_wait(&fullBuffer);
Use(&buffer[j]);
j = (j+1)%N;
sem_signal(&emptyBuffer);
Final case:
Producer/Consumer: Semaphore #3
– Multiple producer threads, multiple consumer threads
– Shared buffer with N elements between producer and consumer
Requirements
– Each consumer must grab unique filled element
– Each producer must grab unique empty element
– Why will previous code (shown below) not work???
Producer/Consumer: Multiple Threads
Does this code work correctly?
Producer
while (1) {
sem_wait(&emptyBuffer);
my_i = findempty(&buffer);
Fill(&buffer[my_i]);
sem_signal(&fullBuffer); }}
Are my_i and my_j private or shared? Where is mutual exclusion needed???
Consumer
while (1) {
sem_wait(&fullBuffer);
my_j = findfull(&buffer);
Use(&buffer[my_j]);
sem_signal(&emptyBuffer);
Producer/Consumer: Multiple Threads
Consider three possible locations for mutual exclusion Which work??? Which is best???
Consumer #1
sem_wait(&mutex);
sem_wait(&fullBuffer);
my_j = findfull(&buffer);
Use(&buffer[my_j]);
sem_signal(&emptyBuffer);
sem_signal(&mutex);
Producer #1
sem_wait(&mutex);
sem_wait(&emptyBuffer);
my_i = findempty(&buffer);
Fill(&buffer[my_i]);
sem_signal(&fullBuffer);
sem_signal(&mutex);
Problem: Deadlock at mutex (e.g., consumer runs first; won’t release mutex)
Producer/Consumer: Multiple Threads
Producer #2
sem_wait(&emptyBuffer);
sem_wait(&mutex);
myi = findempty(&buffer);
Fill(&buffer[myi]);
sem_signal(&mutex);
sem_signal(&fullBuffer);
Consumer #2
sem_wait(&fullBuffer);
sem_wait(&mutex);
myj = findfull(&buffer);
Use(&buffer[myj]);
sem_signal(&mutex);
sem_signal(&emptyBuffer);
Works, but limits concurrency:
Only 1 thread at a time can be using or filling different buffers
Producer/Consumer: Multiple Threads
Producer #3
sem_wait(&emptyBuffer);
sem_wait(&mutex);
myi = findempty(&buffer);
sem_signal(&mutex);
Fill(&buffer[myi]);
sem_signal(&fullBuffer);
Consumer #3
sem_wait(&fullBuffer);
sem_wait(&mutex);
myj = findfull(&buffer);
sem_signal(&mutex);
Use(&buffer[myj]);
sem_signal(&emptyBuffer);
Works and increases concurrency; only finding a buffer is protected by mutex; Filling or Using different buffers can proceed concurrently
Dining Philosophers
Problem Statement
– N Philosophers sitting at a round table
– Each philosopher shares a chopstick (or fork) with neighbor
– Each philosopher must have both chopsticks to eat
– Neighbors can’t eat simultaneously
– Philosophers alternate between thinking and eating
Each philosopher/thread i runs : while (1) {
think();
take_chopsticks(i);
eat();
put_chopsticks(i);
0
}
0 4
1
1
4 3
2
3
2
Dining Philosophers: Attempt #1
Two neighbors can’t use chopstick at same time Must test if chopstick is there and grab it atomically
Represent each chopstick with a semaphore Grab right chopstick then left chopstick
Code for 5 philosophers:
sem_t chopstick[5]; // Initialize each to 1 take_chopsticks(int i) {
wait(&chopstick[i]);
wait(&chopstick[(i+1)%5]); 4 }
0 0
4 3
put_chopsticks(int i) { signal(&chopstick[i]); signal(&chopstick[(i+1)%5]);
}
1
1
2 3
2
Dining Philosophers: Attempt #1
Two neighbors can’t use chopstick at same time Must test if chopstick is there and grab it atomically
Represent each chopstick with a semaphore Grab right chopstick then left chopstick
Code for 5 philosophers:
sem_t chopstick[5]; // Initialize each to 1 take_chopsticks(int i) {
wait(&chopstick[i]);
wait(&chopstick[(i+1)%5]); 4 }
Deadlocked!
0 0
4 3
put_chopsticks(int i) { signal(&chopstick[i]); signal(&chopstick[(i+1)%5]);
}
1
1
2 3
2
Dining Philosophers: Attempt #2
Grab lower-numbered chopstick first, then higher-numbered
sem_t chopstick[5]; // Initialize to 1 take_chopsticks(int i) {
if (i < 4) {
wait(&chopstick[i]);
wait(&chopstick[i+1]);
} else {
wait(&chopstick[0]);
wait(&chopstick[4]);
}
Philosopher 3 finishes take_chopsticks() and eventually calls put_chopsticks();
Who can run then?
What is wrong with this solution???
0
0 4
4 3
1
1
2 3
2
Dining Philosophers: How to Approach
Guarantee two goals
– Safety: Ensure nothing bad happens (don’t violate constraints of problem)
– Liveness: Ensure something good happens when it can (make as much progress as possible)
Introduce state variable for each philosopher i
state[i] = THINKING, HUNGRY, or EATING
Safety:
No two adjacent philosophers eat simultaneously
for all i: !(state[i]==EATING && state[i+1%5]==EATING)
Liveness:
Not the case that a philosopher is hungry and his neighbors are not eating
for all i: !(state[i]==HUNGRY && (state[i+4%5]!=EATING && state[i+1%5]!=EATING))
sem_t mayEat[5]; // how to initialize? sem_t mutex; // how to init?
int state[5] = {THINKING}; take_chopsticks(int i) {
wait(&mutex); // enter critical section state[i] = HUNGRY;
testSafetyAndLiveness(i); // check if I can run signal(&mutex); // exit critical section wait(&mayEat[i]);
}
put_chopsticks(int i) {
wait(&mutex); // enter critical section state[i] = THINKING;
test(i+1 %5); // check if neighbor can run now test(i+4 %5);
signal(&mutex); // exit critical section }
testSafetyAndLiveness(int i) { if(state[i]==HUNGRY&&state[i+4%5]!=EATING&&state[i+1%5]!=EATING) {
state[i] = EATING;
signal(&mayEat[i]); }}
Dining Philosophers: Example Execution
Take_chopsticks(0) Take_chopsticks(1) Take_chopsticks(2) Take_chopsticks(3) Take_chopsticks(4) Put_chopsticks(0) Put_chopsticks(2)
0 H4
4
3
H
E0 T
2 3
E
E
1
1H
2
sem_t mayEat[5]; // how to initialize? sem_t mutex; // how to init?
int state[5] = {THINKING}; take_chopsticks(int i) {
wait(&mutex); // enter critical section state[i] = HUNGRY;
testSafetyAndLiveness(i); // check if I can run signal(&mutex); // exit critical section wait(&mayEat[i]);
}
put_chopsticks(int i) {
wait(&mutex); // enter critical section state[i] = THINKING;
test(i+1 %5); // check if neighbor can run now test(i+4 %5);
signal(&mutex); // exit critical section }
Take_chopsticks(0) Take_chopsticks(1) Take_chopsticks(2) Take_chopsticks(3) Take_chopsticks(4) Put_chopsticks(0) Put_chopsticks(2)
testSafetyAndLiveness(int i) { if(state[i]==HUNGRY&&state[i+4%5]!=EATING&&state[i+1%5]!=EATING) {
state[i] = EATING;
signal(&mayEat[i]); }}
Reader/Writer Locks
Protect shared data structure; Goal:
Let multiple reader threads grab lock with other readers (shared) Only one writer thread can grab lock (exclusive)
– No reader threads
– No other writer threads
Two possibilities for priorities – different implementations
1) No reader waits unless writer in critical section • How can writers starve?
2) No writer waits longer than absolute minimum • How can readers starve?
Let us see if we can understand code...
Readers have priority
Version 1
Reader/Writer Locks
1 typedef struct _rwlock_t {
2 sem_t lock;
3 sem_t writelock;
4 int readers;
5 } rwlock_t;
6
7
8
9
10 11 }
void rwlock_init(rwlock_t *rw) { rw->readers = 0; sem_init(&rw->lock, 1);
sem_init(&rw->writelock, 1);
Reader/Writer Locks
13 void rwlock_acquire_readlock(rwlock_t *rw) {
14 sem_wait(&rw->lock);
15 rw->readers++;
16 if (rw->readers == 1)
17 sem_wait(&rw->writelock);
18 sem_post(&rw->lock);
19 }
21 void rwlock_release_readlock(rwlock_t *rw) {
22 sem_wait(&rw->lock);
23 rw->readers–;
24 if (rw->readers == 0)
25 sem_post(&rw->writelock);
26 sem_post(&rw->lock);
27 }
29 rwlock_acquire_writelock(rwlock_t *rw) { sem_wait(&rw->writelock); } 31 rwlock_release_writelock(rwlock_t *rw) { sem_post(&rw->writelock); }
T1: acquire_readlock() T2: acquire_readlock() T3: acquire_writelock() T2: release_readlock() T1: release_readlock()
// who runs?
T4: acquire_readlock()
// what happens? T5: acquire_readlock() // where blocked?
T3: release_writelock()
// what happens next?
READER/Writer Locks
13 void rwlock_acquire_readlock(rwlock_t *rw) {
14 sem_wait(&rw->lock);
15 rw->readers++;
16 if (rw->readers == 1)
17 sem_wait(&rw->writelock);
18 sem_post(&rw->lock);
19 }
21 void rwlock_release_readlock(rwlock_t *rw) { 22 sem_wait(&rw->lock);
T1: acquire_readlock() T2: acquire_readlock() T3: acquire_writelock() T4: acquire_readlock()
// what happens?
23 rw->readers–;
24 if (rw->readers == 0)
25 sem_post(&rw->writelock);
26 sem_post(&rw->lock);
27 }
29 rwlock_acquire_writelock(rwlock_t *rw) { sem_wait(&rw->writelock); } 31 rwlock_release_writelock(rwlock_t *rw) { sem_post(&rw->writelock); }
Writers have priority Three semaphores
Version 2
– Mutex
– OKToRead (siimilar to myEat[] in Dining Philosphers, but 1 for all readers) – OKToWrite
How to initialize? Shared variables
Waiting Readers, ActiveReaders Waiting Writers Active Writers
Acquire_readlock() {
Sem_wait(&mutex);
If (ActiveWriters +
WaitingWriters==0) {
sem_post(OKToRead);
ActiveReaders++;
} else WaitingReaders++;
Sem_post(&mutex);
Sem_wait(OKToRead);
Release_readlock() { Sem_wait(&mutex);
ActiveReaders–;
If (ActiveReaders==0 &&
WaitingWriters > 0) {
Sem_post(OKToWrite);
ActiveWriters++;
WaitingWriters–;
}
Sem_post(&mutex);
Acquire_writelock
()
{
Sem If (
);
ActiveReaders
(&mutex
+
_wait
ActiveWriters
ActiveWriters
+
Release_writelock() {
Sem_wait(&mutex);
ActiveWriters–;
If (WaitingWriters > 0) {
ActiveWriters++;
WaitingWriters–;
Sem_post(OKToWrite);
WaitingWriters
==0) {
++;
OKToWrite
sem else
( (&mutex);
); ++;
_post
WaitingWriters
} Sem Sem
_post _wait
OKToWrite
);
}
( }
T1: acquire_readlock() T2: acquire_readlock() T3: acquire_writelock() T4: acquire_readlock()
// what happens?
} else while(WaitingReaders>0) {
ActiveReaders++;
WaitingReaders–;
sem_post(OKToRead);
}
Sem_post(&mutex);
}
Semaphores
Semaphores are equivalent to locks + condition variables
– Can be used for both mutual exclusion and ordering
Semaphores contain state
– How they are initialized depends on how they will be used – Init to 0: Join (1 thread must arrive first, then other)
– Init to N: Number of available resources
Sem_wait(): Waits until value > 0, then decrement (atomic)
Sem_post(): Increment value, then wake a single waiter (atomic)
Can use semaphores in producer/consumer and for reader/writer locks
Review: Processes vs Threads
int a = 0;
int main() {
fork();
a++;
fork();
a++;
if (fork() == 0) { printf(“Hello!\n”);
} else { printf(“Goodbye!\n”);
How many times will “Hello!\n” be displayed?
4
}
a++; the final line of the program?
printf(“a is %d\n”, a); }
What will be the final value of “a” as displayed by 3