Parent directory Makefile producer-consumer.c producer-consumer.md
Download
CC = gcc CFLAGS = -Wall -g LDLIBS = -lpthread producer-consumer: producer-consumer.o clean: rm -f producer-consumer *.o all: clean producer-consumer
#include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <assert.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> uint32_t sum_initial; uint32_t sum_to_send; uint32_t sum_received = 0; pthread_mutex_t lock_sum_to_send = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t lock_sum_received = PTHREAD_MUTEX_INITIALIZER; uint32_t *buffer = NULL; uint32_t buf_size; int fd[2]; sem_t e; sem_t f; sem_t g; uint32_t pi = 0; uint32_t ci = 0; void enqueue(uint32_t piece) { sem_wait(&e); sem_wait(&g); buffer[pi] = piece; pi = (pi + 1) % buf_size; sem_post(&g); sem_post(&f); } uint32_t dequeue() { uint32_t piece; sem_wait(&f); sem_wait(&g); piece = buffer[ci]; ci = (ci + 1) % buf_size; sem_post(&g); sem_post(&e); return piece; } void *producer(void *i) { uint32_t piece; while (1) { piece = random() % 10 + 1; // 1 ~ 10 pthread_mutex_lock(&lock_sum_to_send); assert(sum_to_send >= 0); if (sum_to_send == 0) { pthread_mutex_unlock(&lock_sum_to_send); break; } if (piece > sum_to_send) { piece = sum_to_send; } sum_to_send -= piece; printf("p%3ld enq %2u, sum_to_send:%u\n", (long)i, piece, sum_to_send); pthread_mutex_unlock(&lock_sum_to_send); enqueue(piece); } return NULL; } void *consumer(void *i) { uint32_t piece; int done = 0; while (1) { piece = dequeue(); pthread_mutex_lock(&lock_sum_received); sum_received += piece; printf("\t\t\t\t c%3ld deq %2d, sum_received:%u\n", (long)i, piece, sum_received); assert(sum_received <= sum_initial); if (sum_received == sum_initial) { done = 1; } pthread_mutex_unlock(&lock_sum_received); if (done) { write(fd[1], "x", 1); } } } int main(int argc, char **argv) { char x; int i, res; pthread_t tid; uint32_t num_producers, num_consumers; if (argc != 5) { fprintf(stderr, "usage: %s <sum> <buf_size> <num_producers> <num_consumers>\n", argv[0]); exit(1); } sum_initial = sum_to_send = atoi(argv[1]); buf_size = atoi(argv[2]); num_producers = atoi(argv[3]); num_consumers = atoi(argv[4]); srandom(time(NULL)); buffer = malloc(sizeof(uint32_t) * buf_size); assert(buffer); res = pipe(fd); assert(res == 0); res = sem_init(&e, 0, buf_size); assert(res == 0); res = sem_init(&f, 0, 0); assert(res == 0); res = sem_init(&g, 0, 1); assert(res == 0); for (i = 0; i < num_producers; i++) { res = pthread_create(&tid, NULL, producer, (void *)(long)i); assert(res == 0); res = pthread_detach(tid); assert(res == 0); } for (i = 0; i < num_consumers; i++) { res = pthread_create(&tid, NULL, consumer, (void *)(long)i); assert(res == 0); res = pthread_detach(tid); assert(res == 0); } read(fd[0], &x, 1); close(fd[0]); close(fd[1]); free(buffer); exit(0); }
producer-consumer.c ------------------- In this simulation, a number of producer threads break up a large integer into a sum of small integers and put them into a bounded buffer; a number of consumer threads read the numbers from the bounded buffer and sum them up to the original large integer. Here is an example run where six producer threads break up the number `100` and put the pieces into a bounded buffer of size `3`. Then, six consumer threads read the numbers from the buffer and sum them to `100`. The line `p 0 enq 7, sum_to_send:93` shows the producer #0 enqueues number 7 into the buffer. At that time, the remaining `sum_to_send` is 93. The line `c 0 deq 7, sum_received:7` shows the consumer #0 dequeues number 7 from the buffer, and at that time, the `sum_received` so far is 7. ``` $ ./producer-consumer 100 3 6 6 p 0 enq 7, sum_to_send:93 p 0 enq 6, sum_to_send:87 p 0 enq 3, sum_to_send:84 p 0 enq 5, sum_to_send:79 p 4 enq 7, sum_to_send:72 p 5 enq 8, sum_to_send:64 c 0 deq 7, sum_received:7 p 0 enq 2, sum_to_send:62 c 0 deq 6, sum_received:13 c 0 deq 3, sum_received:16 c 0 deq 5, sum_received:21 c 0 deq 2, sum_received:23 p 0 enq 5, sum_to_send:57 p 0 enq 8, sum_to_send:49 p 0 enq 2, sum_to_send:47 p 0 enq 6, sum_to_send:41 c 0 deq 5, sum_received:28 c 0 deq 8, sum_received:36 c 0 deq 2, sum_received:38 c 1 deq 8, sum_received:46 c 0 deq 6, sum_received:52 p 4 enq 3, sum_to_send:38 p 4 enq 6, sum_to_send:32 p 4 enq 10, sum_to_send:22 p 4 enq 2, sum_to_send:20 c 2 deq 7, sum_received:59 p 4 enq 5, sum_to_send:15 c 2 deq 3, sum_received:62 p 4 enq 6, sum_to_send:9 c 2 deq 6, sum_received:68 p 4 enq 2, sum_to_send:7 c 2 deq 10, sum_received:78 c 2 deq 5, sum_received:83 p 4 enq 5, sum_to_send:2 p 4 enq 2, sum_to_send:0 c 2 deq 5, sum_received:88 c 2 deq 2, sum_received:90 c 1 deq 2, sum_received:92 c 4 deq 6, sum_received:98 c 3 deq 2, sum_received:100 ``` Note that all producer and consumer threads are running concurrently, enqueuing and dequeuing to and from the bounded buffer. Producers block if the buffer is full, and consumers block if the buffer is empty. The code does not contain any calls to `sleep()` or `usleep()` to synchronize between the threads. When the `sum_received` has reached the initial number that was passed in as a command-line argument (`100` in this example), the program shuts down gracefully.