代写 parallel in this assignment you will be writing a multi-threaded producer/consumer engine. your engine will be supplied shared libraries with producer/consumer logic that your engine will run. your engine will be invoked as follows:

in this assignment you will be writing a multi-threaded producer/consumer engine. your engine will be supplied shared libraries with producer/consumer logic that your engine will run. your engine will be invoked as follows:
./prodcon shared_lib consumer_count producer_count optional_args ….
for example, word count is the “hello world” of parallel processing. word count simply counts all the words in a file. it outputs a list of words in the file with the number of times that word appeard. you are provided an example implementation of wordcount.c, which you can compile using:
#include “prodcon.h”
#include
#include
#include
#include
#include
#include /*
* this function will take a file split it into equal sized chunks. each
consumer will process
* their corresponding chunks. to handle word overlap, each consumer
except the first will skip
* the first word of their chunk. quick and dirty… might have bugs…
*/
void run_producer(int num, int producer_count, produce_f produce, int
argc, char **argv)
{
if (argc != 1) {
if (num == 0) {
printf(“wordcount requires exactly one file to be passed on
commandline got %d:\n”,
argc);
for (int i = 0; i < argc; i++) { printf("%d: %s\n", i, argv[i]); } exit(2); } return; } FILE *fh = fopen(argv[0], "r"); if (fh == NULL) { perror(argv[0]); exit(3); } fseek(fh, 0, SEEK_END); long file_size = ftell(fh); long start = file_size * num / producer_count; long end = file_size * (num+1) / producer_count; if (num == producer_count-1) { end = file_size; } fseek(fh, start, SEEK_SET); // peek into the stream to see if we are at a space int start_with_space = isspace(fgetc(fh)); fseek(fh, start, SEEK_SET); if (num != 0 && !start_with_space) { // if we don't start with a space then we skip the first word since the // producer before us will read it. char *ptr; fscanf(fh, "%ms", &ptr); free(ptr); } while (ftell(fh) <= end) { char *s; int rc = fscanf(fh, "%ms", &s); if (rc != 1) { break; } produce(s); free(s); // skip over the whitespace, we have to go back one after we find a non // whitespace character while (isspace(fgetc(fh))); fseek(fh, -1, SEEK_CUR); } } // simple hashmap static const int hash_table_size = 1000; struct hash_entry_s { char *str; int count; // link list for collisions struct hash_entry_s *next; }; int hash_string(int count, const char *str) { int total = 0; // silly hash function. not the best, but easy to write. for (int i = 0; str[i]; i++) { total += total << 5; total ^= str[i]; total ^= total >> 8;
}
return abs(total) % count;
}
struct hash_entry_s *find_entry(struct hash_entry_s **hash_array, int
hash, const char *str)
{
struct hash_entry_s *entry = hash_array[hash];
while (entry) {
if (strcmp(str, entry->str) == 0) {
break;
}
entry = entry->next;
}
return entry;
}
void add_string(struct hash_entry_s **hash_array, int count, const char
*str)
{
int hash = hash_string(count, str);
struct hash_entry_s *entry = find_entry(hash_array, hash, str);
if (entry) {
entry->count++;
} else {
entry = malloc(sizeof(*entry));
entry->str = strdup(str);
entry->count = 1;
entry->next = hash_array[hash];
hash_array[hash] = entry;
}
}
void dump_table(int num, struct hash_entry_s **hash_array, int count) {
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
fflush(stdout);
pthread_mutex_lock(&mutex);
fflush(stdout);
for (int i = 0; i < count; i++) { struct hash_entry_s *entry = hash_array[i]; while (entry) { printf("%s %d\n", entry->str, entry->count);
free(entry->str);
struct hash_entry_s *old = entry;
entry = entry->next;
free(old);
}
}
pthread_mutex_unlock(&mutex);
fflush(stdout);
}
/*
* this consumer will put everything in a hashtable that included a
count. at the end
* it will dump the keys and counts.
*/
void run_consumer(int num, consume_f consume, int argc, char **argv)
{
struct hash_entry_s *hash_array[hash_table_size];
memset(hash_array, 0, sizeof(hash_array));
char *str;
while ((str = consume())) {
add_string(hash_array, hash_table_size, str);
free(str);
}
dump_table(num, hash_array, hash_table_size);
}
int assign_consumer(int count, char *str)
{
return hash_string(count, str);
}

cc -fPIC -shared wordcount.c -o libwordcount.so -lpthread
this will compile and link a shared library called libwordcount.so. we can then run using
./prodcon ./libwordcount.so 2 2 words
make sure you put the path to libwordcount.so. you must have a /. ./prodcon will load the libwordcount.so library and use the run_producer function provided in that library to generate a list of words to pass to producers. the assign_consumer function in libwordcount.so will indicate which consumer a given word should be routed to. the run_consumer function will consume the words and accumulate counts which it will print once all the words have been consumed. for example, if we use words to run the above command, we get:
list 1
five 5
this 1
one 1
four 1
two 2
words 1
a 1
three 1
is 1
big 1
of 1
you will need to startup a thread for each producer and consumer. they will all run concurrently until all the producers have stopped producing input and all the consumers have finished consuming everything.
to get you going, i’m providing you will an example prodcon.c implementation.
#include
#include
#include #include
#include “prodcon.h”
struct llist_node {
struct llist_node *next;
char *str;
};
/**
* pop a node off the start of the list.
*
* @param phead the head of the list. this will be modified by the call
unless the list is empty
* (*phead == NULL).
* @return NULL if list is empty or a pointer to the string at the top of
the list. the caller is
* incharge of calling free() on the pointer when finished with the
string.
*/
char *pop(struct llist_node **phead)
{
if (*phead == NULL) {
return NULL;
}
char *s = (*phead)->str;
struct llist_node *next = (*phead)->next;
free(*phead);
* phead = next;
return s;
}
/**
* push a node onto the start of the list. a copy of the string will be
made.
* @param phead the head of the list. this will be modified by this call
to point to the new node
* being added for the string.
* @param s the string to add. a copy of the string will be made and
placed at the beginning of
* the list.
*/
void push(struct llist_node **phead, const char *s)
{
struct llist_node *new_node = malloc(sizeof(*new_node));
new_node->next = *phead;
new_node->str = strdup(s);
* phead = new_node;
}
// the array of list heads. the size should be equal to the number of
consumers
static struct llist_node **heads;
static assign_consumer_f assign_consumer;
static int producer_count;
static int consumer_count;
static int my_consumer_number;
void queue(int consumer, const char *str)
{
push(&heads[consumer], str);
}
void produce(const char *buffer)
{
int hash = assign_consumer(consumer_count, buffer);
queue(hash, buffer);
}
char *consume() {
char *str = pop(&heads[my_consumer_number]);
return str;
}
void do_usage(char *prog)
{
printf(“USAGE: %s shared_lib consumer_count producer_count ….\n”,
prog);
exit(1);
}
int main(int argc, char **argv)
{
if (argc < 4) { do_usage(argv[0]); } char *shared_lib = argv[1]; producer_count = strtol(argv[2], NULL, 10); consumer_count = strtol(argv[3], NULL, 10); char **new_argv = &argv[4]; int new_argc = argc - 4; setlinebuf(stdout); if (consumer_count <= 0 || producer_count <= 0) { do_usage(argv[0]); } void *dh = dlopen(shared_lib, RTLD_LAZY); // the array of list heads. the size should be equal to the number of consumers static struct llist_node **heads; static assign_consumer_f assign_consumer; static int producer_count; static int consumer_count; static int my_consumer_number; void queue(int consumer, const char *str) { push(&heads[consumer], str); } void produce(const char *buffer) { int hash = assign_consumer(consumer_count, buffer); queue(hash, buffer); } char *consume() { char *str = pop(&heads[my_consumer_number]); return str; } void do_usage(char *prog) { printf("USAGE: %s shared_lib consumer_count producer_count ....\n", prog); exit(1); } int main(int argc, char **argv) { if (argc < 4) { do_usage(argv[0]); } char *shared_lib = argv[1]; producer_count = strtol(argv[2], NULL, 10); consumer_count = strtol(argv[3], NULL, 10); char **new_argv = &argv[4]; int new_argc = argc - 4; setlinebuf(stdout); if (consumer_count <= 0 || producer_count <= 0) { do_usage(argv[0]); } void *dh = dlopen(shared_lib, RTLD_LAZY); // load the producer, consumer, and assignment functions from the library run_producer_f run_producer = dlsym(dh, "run_producer"); run_consumer_f run_consumer = dlsym(dh, "run_consumer"); assign_consumer = dlsym(dh, "assign_consumer"); if (run_producer == NULL || run_consumer == NULL || assign_consumer == NULL) { printf("Error loading functions: prod %p cons %p assign %p\n", run_producer, run_consumer, assign_consumer); exit(2); } heads = calloc(consumer_count, sizeof(*heads)); for (int i = 0; i < producer_count; i++) { run_producer(i, producer_count, produce, new_argc, new_argv); } for (int i = 0; i < consumer_count; i++) { my_consumer_number = i; run_consumer(i, consume, new_argc, new_argv); } return 0; } IT IS NOT CORRECT. IT IS SINGLE THREADED. it does show you how to load libraries and get function pointers from them. feel free to use the code in your implementation. i'm also providing you another shared library for you to test with: randnums.c. it generates random numbers and sums them up. #include "prodcon.h" #include
#include
/*
* this is a simple producer consumer shared library that will generate
random
* numbers in the producer printing the sum of the numbers they each have
* generated at the end. the consumers will sum the numbers they consume,
printing
* the sum at the end.
*/
void run_producer(int num, int producer_count, produce_f produce, int
argc, char
**argv)
{
unsigned int seed = num;
long long total = 0;
for (
int i = 0; i < 10000; i++) { int r = rand_r(&seed) % 1000; char buffer[21]; sprintf(buffer, "%d", r); produce(buffer); total += r; } printf("producer %d produced: %lld\n", num, total); } void run_consumer(int num, consume_f consume, int argc, char **argv) { long long total = 0; char *str; while ((str = consume()) != NULL) { total += atol(str); free(str); } printf("consumer %d consumed: %lld\n", num, total); } int assign_consumer(int consumer_count, const char *buffer) { return abs(atol(buffer)) % consumer_count; } you will submit a zip file with exactly one file in it: prodcon.c. use the prodcon.h file i've given you to get the correct function prototypes for the plugin interface. #ifndef PRODUCER_CONSUMER_PRODCON_H #define PRODUCER_CONSUMER_PRODCON_H // these next two functions pointers are defined to allow the producer consumer engine to pass // functions to collect strings produced by a plugin and to provide strings to be consumed by a // consumer /** * will be called by the producer to hand off a string to the producer consumer engine. */ typedef void (*produce_f)(const char *buffer); /** * will be called by a consumer to get the next string to be consumed. this call may block if * producers are still running. a NULL will be returned once all producers have finished and * there are no more strings for the consumer. */ typedef char *(*consume_f)(void); // the following functions will be provided by plugin libraries: a library must export a function // named "run_producer", "run_consumer", and "assign_consumer". /** * this function will be called to start a producer running. once the producer has nothing more * to produce, it should return from the function. * @param num the producer number starting at 0. * @param producer_count the number of producers that will be started. * @param produce the function to call when a string has been produced. * @param argc the command line argument count passed to the plugin * @param argv the command line arguments passed to the plugin * */ typedef void (*run_producer_f)(int num, int producer_count, produce_f produce, int argc, char **argv); /** * this function will be called to start a consumer running. once the consumer has nothing more * to consume, it should return from the function. * @param num the consumer number starting at 0. * @param consume the function to call to get the next string to be consumed. * @param argc the command line argument count passed to the plugin * @param argv the command line arguments passed to the plugin * */ typedef void (*run_consumer_f)(int num, consume_f consume, int argc, char **argv); /** * this function assigns a string to a consumer. function must return an integer from zero up to * but not including consumer_count. the returned integer indicates the number the string is * assigned to. */ typedef int (*assign_consumer_f)(int consumer_count, const char *buffer); #endif //PRODUCER_CONSUMER_PRODCON_H /** * this function assigns a string to a consumer. function must return an integer from zero up to * but not including consumer_count. the returned integer indicates the number the string is * assigned to. */ typedef int (*assign_consumer_f)(int consumer_count, const char *buffer); #endif //PRODUCER_CONSUMER_PRODCON_H rubric one file named prodcon.c in zipfile 5 one thread is started for each producer and consumer 10 consumers do not finish until all output from producers have been consumed 10 the assign_consumer function is used correctly to assign output 10 producers and consumers all run concurrently 10 logic is correct and readable 20 optional command line arguments are passed correctly to functions 5 program builds cleanly without errors or warnings 10 all memory is freed up (no memory leaks) 10 no limits on the number of producers or consumers 10