#include
#include
#include
#include
#include
#include “uthread.h”
#include “uthread_sem.h”
#define VERBOSE 0
#define VVERBOSE 0
#define LIMIT 100L
#define STALL 5000
#define LITTLESTALL 100
int verbose = VERBOSE;
int vverbose = VVERBOSE;
/*
* The amount of space in the bounded buffer.
*/
#define N 4
/*
* The maximum number of producers and consumers.
*/
#define MAXTHREADS 100000
/*
* The default number of producers and consumers (each).
*/
volatile long nthreads = 99;
/*
* The type of values produced and consumed.
*/
typedef long Message;
/*
* The bounded buffer struct.
*/
typedef struct {
Message message[N];
long in;
long out;
} Buffer;
/*
* The three semaphores used to control access to the buffer.
*/
uthread_sem_t lock; /* Similar to the lock in previous versions. */
uthread_sem_t data; /* Its value is the amount of data available in the buffer. */
uthread_sem_t space; /* Its value is the amount of space available in the buffer. */
/*
* The function used to store data in the bounded buffer.
*/
void send(volatile Buffer *buf, Message msg)
{
/*
* Wait a random amount of time before producing a value.
*/
long stall = random() % STALL;
usleep(stall);
/*
* Wait until there is space in the buffer.
*/
uthread_sem_wait(space);
uthread_sem_wait(lock);
/*
* Add the value in the buffer.
*/
if (verbose)
{
printf(“Buffer[%ld] = %ld\n”, buf->in % N, msg);
}
buf->message[buf->in % N] = msg;
buf->in++;
uthread_sem_signal(lock);
uthread_sem_signal(data);
}
/*
* The function used to retrieve data from the bounded buffer.
*/
Message receive(Buffer *buf)
{
/*
* Wait a random amount of time before requesting a value.
*/
long stall = random() % STALL;
usleep(stall);
/*
* Wait until there is some data in the buffer.
*/
uthread_sem_wait(data);
uthread_sem_wait(lock);
/*
* Get a value from the buffer.
*/
Message msg = buf->message[buf->out % N];
if (verbose)
{
printf(“%ld <- Buffer[%ld]\n", msg, buf->out % N);
}
buf->out++;
uthread_sem_signal(lock);
uthread_sem_signal(space);
return msg;
}
/*
* Producer thread: it generates LIMIT messages and then stops.
*/
void *producer(void *buf)
{
int i;
Buffer *buffer = buf;
for (i = 0; i < LIMIT; ++i)
{
send(buffer, random());
}
return 0;
}
/*
* Consumer thread: it reads LIMIT messages and then stops.
*/
void *consumer(void *buf)
{
int i;
Buffer *buffer = buf;
volatile Message msg; /* volatile so it's not optimized out */
for (i = 0; i < LIMIT; ++i)
{
msg = receive(buffer);
(void) msg;
}
return 0;
}
/*
* Monitoring thread: keeps track of the number of messages produced
* and consumed and prints them out every second.
*/
void *monitor(void *buf)
{
Buffer *buffer = buf;
while (buffer->in < nthreads * LIMIT)
{
usleep(1000000);
uthread_sem_wait(lock);
fprintf(stderr, "Monitor: buffer->in = %ld, buffer->out = %ld\n”, buffer->in, buffer->out);
uthread_sem_signal(lock);
}
return 0;
}
/*
* Main function.
*/
int main(int argc, char **argv)
{
uthread_t p[MAXTHREADS], c[MAXTHREADS], m;
srandom(time(0));
int i;
uthread_init(8);
/*
* Check verbose flags and number of threads.
*/
for (i = 1; i < argc; ++i)
{
if (!strcmp(argv[i], "-v"))
{
verbose = 1;
}
else if (!strcmp(argv[i], "-vv"))
{
vverbose = 1;
}
else
{
nthreads = strtol(argv[i], 0, 0);
}
}
/*
* Create and initialize buffer.
*/
Buffer *b = malloc(sizeof *b);
b->in = b->out = 0;
space = uthread_sem_create(N);
data = uthread_sem_create(0);
lock = uthread_sem_create(1);
/*
* Create the producers, the consumers and the monitor.
*/
for (i = 0; i < nthreads; i++)
{
p[i] = uthread_create(producer, b);
c[i] = uthread_create(consumer, b);
}
m = uthread_create(monitor, b);
uthread_detach(m);
/*
* Wait for the threads to terminate.
*/
for (i = 0; i < nthreads; i++)
{
uthread_join(p[i], NULL);
uthread_join(c[i], NULL);
}
fprintf(stderr, "b->in = %ld, b->out = %ld\n”, b->in, b->out);
return 0;
}