More on Thread-Safe and Synchronization Strategies and Problems
Shuaiwen Leon Song
Clarification from the previous lectures
2
Waitpid ()
What is the output of the program on the left?
A. acbc
B. bcac
C. abcc
D. bacc
E. AorCorD
void main() {
if (fork() == 0) { printf(“a”);
}
else {
printf(“b”);
waitpid(-1, NULL, 0); }
printf(“c”);
exit(0); }
■
We must synchronize the execution of the threads so that they can never have overlap critical sections.
▪ i.e., need to guarantee mutually exclusive access for each critical section.
Solutions:
▪ Semaphores
▪ Mutex locks
▪ Condition variables ▪ Atomic operations
▪ Barrier
▪ Spinlocks
▪ Read-write locks ▪ deadlocks
■
Outline
4
■ Semaphore: non-negative global integer synchronization variable.
■ Unlike mutex (a locking mechanism), it is a signaling mechanism. It
guarantees both ordering and atomicity.
■ Unlike a mutex, any thread can release a semaphore, not only the one that has acquired it in the first place.
Semaphores
5
A semaphore controls the access to a shared resource: the counter determines the maximum number of threads that can simultaneously access it. At the beginning of your program, when the semaphore gets initialized, you choose that number according to your needs. Then, a thread that wants to access a shared resource calls acquire:
• if the counter is greater than zero the thread can proceed. The counter gets reduced by one right away, then the current thread starts doing its job. When done, it calls release which in turn increases the counter by one.
• if the counter is equal to zero the thread cannot proceed: other threads have already filled up the available space. The current thread is put to sleep by the operating system and will wake up when the semaphore counter becomes greater than zero again (that is when any other thread calls release once its job is done).
• Unlike a mutex, any thread can release a semaphore, not only the one that has acquired it in the first place.
6
Semaphores
■ Semaphore: non-negative global integer synchronization variable.
semaphore
7
Semaphores
■ Semaphore: non-negative global integer synchronization variable.
2
semaphore
8
Semaphores
■ Semaphore: non-negative global integer synchronization variable.
1
semaphore
9
Semaphores
■ Semaphore: non-negative global integer synchronization variable.
0
semaphore
10
Semaphores
■ Semaphore: non-negative global integer synchronization variable.
1
semaphore
11
Semaphores
■ Semaphore: non-negative global integer synchronization variable. Manipulated by P and V operations.
■ P(s) —- sem_wait ()
▪ If s is nonzero, then decrement s by 1 and return immediately.
▪ Test and decrement operations occur atomically (indivisibly)
▪ If s is zero, then suspend thread until s becomes nonzero and the thread is
restarted by a V operation.
▪ After restarting, the P operation decrements s and returns control to the caller.
■ V(s) —- sem_post()
▪ Increment s by 1.
▪ Increment operation occurs atomically
▪ If there are any threads blocked in a P operation waiting for s to become non- zero, then restart exactly one of those threads, which then completes its P operation by decrementing s.
■ Semaphore invariant: (s >= 0)
12
C Semaphore Operations
Pthreads functions:
#include
int sem_init(sem_t *s, 0, unsigned int val);} /* s = val */
int sem_wait(sem_t *s); /* P(s) */ int sem_post(sem_t *s); /* V(s) */
13
Improper Synchronization
/* Global shared variable */ long cnt =0; /* Counter */
int main(int argc, char **argv) {
long niters; pthread_t tid1, tid2;
niters =atoi(argv[1]); Pthread_create(&tid1, NULL,
thread, &niters); Pthread_create(&tid2, NULL,
thread, &niters); Pthread_join(tid1, NULL); Pthread_join(tid2, NULL);
/* Checkresult */
if (cnt!=(2*niters)) printf(“BOOM!cnt=%ld\n”, cnt);
else
printf(“OK cnt=%ld\n”, cnt); exit(0);
}
/* Threadroutine */ void *thread(void *vargp)
{
longi, niters=
*((long *)vargp);
for(i =0;i
OK cnt=1000000
linux> ./goodcnt 1000000
OK cnt=1000000
16
Binary Semaphore
› A semaphore whose counter is restricted to the values 0 and 1 is called binary semaphore: only one thread at a time can access the shared resource. Wait: this is basically a mutex protecting a critical section! You can actually replicate the mutex behavior with a binary semaphore. However there are two important points to keep in mind:
› a mutex can be unlocked only by thread that had locked it first, while a semaphore can be released from any other thread. This could lead to confusion and subtle bugs if what you want is just a locking mechanism;
› semaphores are signaling mechanisms that orchestrate threads, while mutexes are locking mechanisms that protects shared resources. You should not use semaphores to protect shared resources, nor mutexes for signaling: your intent will be more clear to you and to anyone who will read your code.
17
Producer-Consumer with Semaphore
› The producer-consumer scenario imposes the following constraints:
– The producer thread must not overwrite the shared buffer when the previous task
has not been picked up by a consumer thread.
– The consumer threads must not pick up tasks until there is something present in the shared buffer.
– Individual consumer threads should pick up tasks one at a time. Producer
Producer Producer
Consumer Consumer
Consumer
Buffer Queue
18
sem_t empty; // empty slot in buffer
sem_t occupied; // occupied slot in buffer sem_t p_lock; // mutex for producer function sem_t c_lock; // mutex for consumer function void *producer(void *producer_thread_data) {
while (!done()) {
} }
sem_wait(&empty);
sem_wait(&p_lock);
insert_into_queue();
sem_post(&p_lock);
sem_post(&occupied);
void *consumer(void *consumer_thread_data) { while (!done()) {
sem_wait(&occupied); sem_wait(&c_lock);
my_task = extract_from_queue();
sem_post(&c_lock);
sem_post(&empty);
process_task(my_task); }
}
void main() {
/* declarations and initializations */ sem_init(&p_lock, 0, 1); /* mutex = 0 */ sem_init(&c_lock, 0, 1); /* mutex = 0 */ sem_init(&empty, 0, BUFFER_SIZE); /* mutex = 0 */ sem_init(&occupied, 0, 0); /* mutex = 0 */
……
}
Producer-Consumer with Semaphore
19
sem_t s;
sem_init (&s, 0, 4)
void* wait ()
{
……
sem_wait (&s);
}
void*post () {
sem_post(&s); ……
}
Are there possible?
WWWWWPPWW WPWPWWWWW PPWPWPWWWWWW
Examples
20
Exercise
Consider two threads A and B that perform two operations each. Let the operations of thread A be A1 and A2;let the operations of thread B be B1 and B2. We require that threads A and B each perform their first operation before either can proceed to the second operation. That is, we require that A1 be run before B2 and B1 before A2. Consider the following solutions based on semaphores for this problem (the code run by threads A and Bis shown in two columns next to each other). Explain whether the solution is correct or not. If it is incorrect, you must also point out why the solution is incorrect. (wait operation for wait semaphore, post operation to increase semaphore)
sem A1Done = 0; sem B1Done = 0;
//Thread A A1
wait(B1Done) post(A1Done)
A2
// Thread B B1
wait(A1Done) post(B1Done)
B2
21
Deadlock
› Deadlock is when two or more tasks never make progress because each is waiting for some resource held by another process/thread.
22
Deadlock
› Deadlock is when two or more tasks never make progress because each is waiting for some resource held by another process/thread.
23
› Deadlock can arise if four conditions hold simultaneously
› Mutual exclusion
– Only one process at a time can use a resource
› Hold and wait
– A process holding at least one resource and waiting to acquire – additional resources held by other processes
› No preemption
– A resource can be released only voluntarily by the process – holding it, after that process has completed its task
› Circular wait
– Set {P0, P1, …, P0} of waiting processes/threads
– P0 → P1, P1 → P2, …, Pn–1 → Pn, and Pn → P0
Deadlock
24
› Deadlock Prevention
– Ensure that at least one of four necessary conditions cannot hold
› Deadlock Avoidance
– Do not allow a resource request → Potential to lead to a deadlock – Requires advance info of all requests
› Deadlock Detection
– Always allow resource requests
– Periodically check for deadlocks
– If a deadlock exists → Recover from it
› Ignore
– Makes sense if the likelihood is very low, say once per year – Cheaper than prevention, avoidance or detection
– Used by most common OS
Handling Deadlocks
25
■ Programmers need a clear model of how variables are shared by threads.
■ Variables shared by multiple threads must be protected to ensure mutually
exclusive access.
■ Semaphores are a fundamental mechanism for enforcing mutual exclusion.
Summary
107
■ Solutions:
▪ Semaphores
▪ Mutex locks
▪ Condition variables ▪ Atomic operations ▪ Barrier
▪ Spinlocks
▪ Read-write locks
▪ deadlocks
Outline
27
Mutex Locks
› Critical sections in Pthreads can be mutually exclusive accessed using mutex locks.
› Mutex-locks have two states: locked and unlocked. At any point of time, only one thread can lock a mutex lock. A lock is an atomic operation.
› A thread entering a critical section first tries to get a lock. It goes ahead when the lock is granted.
› A thread release the lock when leaving a critical section
› The API provides the following functions for handling mutex-locks:
int pthread_mutex_init (pthread_mutex_t *mutex_lock, const pthread_mutexattr_t *lock_attr); int pthread_mutex_lock ( pthread_mutex_t *mutex_lock);
int pthread_mutex_unlock (pthread_mutex_t *mutex_lock);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
› Hint: Attempting to initialize an already initialized mutex results in undefined behavior.
28
› Pthreads supports three types of mutexes.
– A normal mutex deadlocks if a thread that already has a lock tries a second lock on it.
– A recursive mutex allows a single thread to lock a mutex as many times as it wants. It simply increments a count on the number of locks. A lock is relinquished by a thread when the count becomes zero.
– An error check mutex reports an error when a thread with a lock tries to lock it again (as opposed to deadlocking in the first case, or granting the lock, as in the second case).
› The type of the mutex can be set in the attributes object before it is passed at time of initialization.
› Initialize a recursive mutex:
Types of Mutexes
pthread_mutex_t Mutex;
pthread_mutexattr_t Attr;
pthread_mutexattr_init(&Attr);
pthread_mutexattr_settype(&Attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&Mutex, &Attr);
29
Attributes Objects for Mutexes
› Initialize the attrributes object using function:
– pthread_mutexattr_init (pthread_mutexattr_t *attr);
› The function pthread_mutexattr_settype can be used for setting the type of mutex specified by the mutex attributes object.
– pthread_mutexattr_settype (pthread_mutexattr_t *attr, int type);
› Here, type specifies the type of the mutex and can take one of:
– PTHREAD_MUTEX_NORMAL
– PTHREAD_MUTEX_RECURSIVE
– PTHREAD_MUTEX_ERRORCHECK
30
Proper Synchronization using Mutex Lock
pthread_mutex_t lock;
volatile long cnt = 0; /* Counter */ int main(int argc, char **argv)
{
long niters; pthread_t tid1, tid2;
if (pthread_mutex_init(&lock, NULL) != 0) {
printf(“\n mutex init has failed\n”);
return 1; }
niters = atoi(argv[1]);
pthread_create(&tid1, NULL,thread, &niters);
pthread_create(&tid2, NULL,thread, &niters);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
/* Check result */
if (cnt != (2 * niters)) printf(“BOOM! cnt=%ld\n”, cnt);
else printf(“OK cnt=%ld\n”, cnt);
pthread_mutex_destroy(&lock);
return 0; }
void *thread(void *vargp) {
long i, niters = *((long *)vargp);
pthread_mutex_lock(&lock);
for (i = 0; i < niters; i++) cnt++;
pthread_mutex_unlock(&lock);
return NULL;
}
31
Overheads of Locking
› Locks represent serialization points since critical sections must be executed by threads one after the other.
› Encapsulating large segments of the program within locks can lead to significant performance degradation.
› It is often possible to reduce the idling overhead associated with locks using
- int pthread_mutex_trylock (pthread_mutex_t *mutex_lock);
which attempts to lock mutex_lock, but if unsuccessful, will return immediately with a “busy” error code.
32
Condition Variables
› A condition variable allows a thread to block itself until a specified condition becomes true.
› When a thread executes pthread_cond_wait(condition_variable), it is blocked until another thread executes following:
- pthread_cond_signal(condition_variable)or
- pthread_cond_broadcast(condition_variable)
› pthread_cond_signal () is used to unblock one of the threads blocked waiting on
the condition variable.
› pthread_cond_broadcast() is used to unblock all the threads blocked waiting on the condition_variable.
› If no threads are waiting on the condition variable, then a pthread_cond_signal() or pthread_cond_broadcast() will have no effect.
33
› A condition variable always has a mutex associated with it. A thread locks this mutex before issuing a wait, a signal or a broadcast on condition variables.
› While the thread is waiting on a condition variable, the mutex is automatically unlocked, and when the thread is signaled, the mutex is automatically locked again.
› Pthreads provides the following functions for condition variables
- int pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr); - int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);
- int pthread_cond_signal (pthread_cond_t *cond);
- int pthread_cond_broadcast (pthread_cond_t *cond);
- int pthread_cond_destroy (pthread_cond_t *cond);
Condition Variables
34
› Condition variables should be used as a place to wait and be notified. › General usage pattern:
Condition Variables
// safely examine the condition, prevent other threads from // altering it
pthread_mutex_lock (&lock);
while ( SOME-CONDITION is false)
pthread_cond_wait (&cond, &lock);
// Do whatever you need to do when condition becomes true do_stuff();
pthread_mutex_unlock (&lock);
› On the other hand, a thread, signaling the condition variable, typically looks like
// ensure we have exclusive access to whatever comprises the condition pthread_mutex_lock (&lock);
ALTER-CONDITION
// Wakeup at least one of the threads that are waiting on the condition (if any) pthread_cond_signal (&cond);
// allow others to proceed pthread_mutex_unlock (&lock)
35
Main function:
Producer-Consumer Using Condition Variables
pthread_cond_t queue_not_full, queue_not_empty; pthread_mutex_t queue_cond_lock;
pthread_t p, c;
/* other data structures here */
void main() {
/* declarations and initializations */ task_available = 0; pthread_mutex_init(&queue_cond_lock, NULL); pthread_create(&p, NULL, producer, NULL); pthread_create(&p, NULL, consumer, NULL); pthread_cond_init(&queue_not_empty, NULL); pthread_cond_init(&queue_not_full, NULL);
pthread_join(p, NULL);
pthread_join(c, NULL);
/* create and join producer and consumer threads */
}
36
Producer-Consumer Using Condition Variables
void *producer(void *producer_thread_data) { while (!done()) {
pthread_mutex_lock(&queue_cond_lock);
if (task_available == BUFSIZE)
pthread_cond_wait(&queue_not_full, &queue_cond_lock);
} }
task_available++;
insert_into_queue(); pthread_cond_signal(&queue_not_empty); pthread_mutex_unlock(&queue_cond_lock);
void *consumer(void *consumer_thread_data) { while (!done()) {
pthread_mutex_lock(&queue_cond_lock);
if (task_available == 0)
pthread_cond_wait(&queue_not_empty, &queue_cond_lock);
} }
task_available--;
my_task = extract_from_queue(); pthread_cond_signal(&queue_not_full); pthread_mutex_unlock(&queue_cond_lock); process_task(my_task);
37
› What is the output of the following Pthread program??
Synchronization (barriers)
int main(int argc, char *argv) {
double A[5] , B[5], C[5]; /* global, shared variables*/ for (i = 0; i < 5 ; i++) A[i] = B[i] = 1;
for (i = 0; i < 4 ; i++)
pthread_create( ... , DoStuff, int i ); ...
for (i = 0; i < 4 ; i++) pthread_join (... , DoStuff, ...);
Print the values of C;
}
void DoStuff (int threadID) { int k;
B[threadID+1] = 2 * A[threadID];
Barrier
C[threadID] = 2 * B[threadID]; }
38
Barriers - a composite synchronization construct
› By design, Pthreads provide support for a basic set of operations.
› Higher level constructs can be built using basic synchronization constructs.
› We discuss one such constructs – barriers.
› A barrier holds a thread until all threads participating in the barrier have reached it.
› Barriers can be implemented using a counter, a mutex and a condition variable.
› A single integer (counter) is used to keep track of the number of threads that have reached the barrier.
› If the count is less than the total number of threads, the threads execute a condition wait.
› The last thread entering (and setting the count to the number of threads) wakes up all the threads using a condition broadcast and resets the count to zero (to prepare for the next barrier).
39
Defining your own barrier construct
typedef struct {
pthread_mutex_t count_lock; pthread_cond_t ok_to_proceed; int count;
} mylib_barrier_t;
void mylib_init_barrier(mylib_barrier_t *b) {
b->count = 0;
pthread_mutex_init(&(b->count_lock), NULL); pthread_cond_init(&(b- >ok_to_proceed), NULL);
}
void mylib_barrier(mylib_barrier_t *b, int thread_count) {
pthread_mutex_lock(&(b->count_lock));
b->count++;
if (b->count == thread_count) {
b->count = 0; pthread_cond_broadcast(&(b->ok_to_proceed));
}
else pthread_cond_wait(&(b->ok_to_proceed), &(b->count_lock)); pthread_mutex_unlock(&(b->count_lock));
}
40
Using the defined barrier
mylib_barrier_t
my_barrier
; /*declare a barrier */
int main(int argc, char *argv) {
mylib_init_barrier (my_barrier) ; /* initialize the barrier */ double A[5] , B[5], C[5]; /* global, shared variables*/
for (i = 0; i < 5 ; i++) A[i] = B[i] = 1;
for (i = 0; i < 4 ; i++)
pthread_create( ... , DoStuff, int i ); ...
for (i = 0; i < 4 ; i++) pthread_join (... , DoStuff, ...);
Print the values of C;
}
void DoStuff (int threadID) { int k;
B[threadID+1] = 2 * A[threadID];
mylib_barrier ( my_barrier, 4) ;
C[threadID] = 2 * B[threadID]; }
/* call the barrier */
41
› Recall the cnt++ operation in Lecture04
› Need lock to ensure exclusive access on critical section.
Atomic Operations in C
movq cnt(%rip),%rdx addq $1, %rdx
movq %rdx, cnt(%rip)
42
› Recall the cnt++ operation in Lecture04
› Need lock to ensure exclusive access on critical section.
› What if cnt++ operation is atomic? - Cannot be divided
- No half-done state
- All or nothing
Atomic Operations in C
movq cnt(%rip),%rdx addq $1, %rdx
movq %rdx, cnt(%rip)
43
› Recall the cnt++ operation in Lecture04
› Need lock to ensure exclusive access on critical section.
› What if cnt++ operation is atomic? - Cannot be divided
- No have-done state
- All or nothing
› Can “critical section” still overlapped? Do we still need locks to prevent race condition?
Atomic Operations in C
movq cnt(%rip),%rdx addq $1, %rdx
movq %rdx, cnt(%rip)
44
Atomic Operations in C
› Concurrent access using atomic operations is guaranteed to not cause data races
› Allowing for lockless concurrent programming
45
Atomic Operations in C
› Let’s revisit this count cnt program (improper synchronization)
volatile long cnt = 0; /* Counter */ int main(int argc, char **argv)
{
long niters; pthread_t tid1, tid2;
niters = atoi(argv[1]);
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL); pthread_join(tid2, NULL);
/* Check result */
if (cnt != (2 * niters)) printf("BOOM! cnt=%ld\n", cnt);
else
printf("OK cnt=%ld\n", cnt); exit(0);
}
void *thread(void *vargp) {
long i, niters = *((long
*)vargp);
for (i = 0; i < niters; i++) cnt++;
return NULL;
}
donglin@ubuntu:~/ctest$ ./main 10000000 BOOM! cnt=13739449
How to fix this program using atomic operation?
46
› Add cnt program using atomic operation
Atomic Operations in C
47
› Add cnt program using atomic operation
volatile long cnt = 0; /* Counter */
>
int main(int argc, char **argv) {
long niters; pthread_t tid1, tid2;
niters = atoi(argv[1]);
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL); pthread_join(tid2, NULL);
/* Check result */
if (cnt != (2 * niters)) printf(“BOOM! cnt=%ld\n”, cnt);
else
printf(“OK cnt=%ld\n”, cnt); exit(0);
Atomic Operations in C
#include