#include
#include
#include
#include
#include
#include “uthread.h”
#include “uthread_mutex_cond.h”
#define VERBOSE 0
#define VVERBOSE 0
#define LIMIT 100L
#define STALL 5000
#define LITTLESTALL 100
int verbose = VERBOSE;
int vverbose = VVERBOSE;
/*
* The amount of space in the bounded buffer.
*/
#define N 4
/*
* The maximum number of producers and consumers.
*/
#define MAXTHREADS 100000
/*
* The default number of producers and consumers (each).
*/
volatile long nthreads = 99;
/*
* The type of values produced and consumed.
*/
typedef long Message;
/*
* The bounded buffer struct.
*/
typedef struct {
Message message[N];
long in;
long out;
} Buffer;
/*
* The lock used to control access to the buffer.
*/
uthread_mutex_t lock;
/*
* The two condition variables used to wait for space or data.
*/
uthread_cond_t forspace;
uthread_cond_t fordata;
/*
* The function used to store data in the bounded buffer.
*/
void send(volatile Buffer *buf, Message msg)
{
/*
* Wait a random amount of time before producing a value.
*/
long stall = random() % STALL;
usleep(stall);
/*
* Wait until there is space in the buffer.
*/
uthread_mutex_lock(lock);
while (buf->in – buf->out == N)
{
if(vverbose)
{
printf(“Send waits\n”);
}
uthread_cond_wait(forspace);
}
/*
* There is space, so add the value in the buffer.
*/
if (verbose)
{
printf(“Buffer[%ld] = %ld\n”, buf->in % N, msg);
}
buf->message[buf->in % N] = msg;
buf->in++;
uthread_cond_signal(fordata);
uthread_mutex_unlock(lock);
}
/*
* The function used to retrieve data from the bounded buffer.
*/
Message receive(Buffer *buf)
{
/*
* Wait a random amount of time before requesting a value.
*/
long stall = random() % STALL;
usleep(stall);
/*
* Wait until there is some data in the buffer.
*/
uthread_mutex_lock(lock);
while (buf->in == buf->out)
{
if (vverbose)
{
printf(“Receive waits\n”);
}
uthread_cond_wait(fordata);
}
/*
* There is data, so get a value from the buffer.
*/
Message msg = buf->message[buf->out % N];
if (verbose)
{
printf(“%ld <- Buffer[%ld]\n", msg, buf->out % N);
}
buf->out++;
uthread_cond_signal(forspace);
uthread_mutex_unlock(lock);
return msg;
}
/*
* Producer thread: it generates LIMIT messages and then stops.
*/
void *producer(void *buf)
{
int i;
Buffer *buffer = buf;
for (i = 0; i < LIMIT; ++i)
{
send(buffer, random());
}
return 0;
}
/*
* Consumer thread: it reads LIMIT messages and then stops.
*/
void *consumer(void *buf)
{
int i;
Buffer *buffer = buf;
volatile Message msg; /* volatile so it's not optimized out */
for (i = 0; i < LIMIT; ++i)
{
msg = receive(buffer);
(void) msg;
}
return 0;
}
/*
* Monitoring thread: keeps track of the number of messages produced
* and consumed and prints them out every second.
*/
void *monitor(void *buf)
{
Buffer *buffer = buf;
while (buffer->in < nthreads * LIMIT)
{
usleep(1000000);
uthread_mutex_lock(lock);
fprintf(stderr, "Monitor: buffer->in = %ld, buffer->out = %ld\n”, buffer->in, buffer->out);
uthread_mutex_unlock(lock);
}
return 0;
}
/*
* Main function.
*/
int main(int argc, char **argv)
{
uthread_t p[MAXTHREADS], c[MAXTHREADS], m;
srandom(time(0));
int i;
uthread_init(8);
lock = uthread_mutex_create();
fordata = uthread_cond_create(lock);
forspace = uthread_cond_create(lock);
/*
* Check verbose flags and number of threads.
*/
for (i = 1; i < argc; ++i)
{
if (!strcmp(argv[i], "-v"))
{
verbose = 1;
}
else if (!strcmp(argv[i], "-vv"))
{
vverbose = 1;
}
else
{
nthreads = strtol(argv[i], 0, 0);
}
}
/*
* Create and initialize buffer.
*/
Buffer *b = malloc(sizeof *b);
b->in = b->out = 0;
/*
* Create the producers, the consumers and the monitor.
*/
for (i = 0; i < nthreads; i++)
{
p[i] = uthread_create(producer, b);
c[i] = uthread_create(consumer, b);
}
m = uthread_create(monitor, b);
uthread_detach(m);
/*
* Wait for the threads to terminate.
*/
for (i = 0; i < nthreads; i++)
{
uthread_join(p[i], NULL);
uthread_join(c[i], NULL);
}
fprintf(stderr, "b->in = %ld, b->out = %ld\n”, b->in, b->out);
return 0;
}