COMS 4995 Advanced Systems Programming

Index of 2024-1/code/08

Parent directory
Makefile
producer-consumer.c
producer-consumer.md

Makefile

CC = gcc
CFLAGS = -Wall -g
LDLIBS = -lpthread

producer-consumer: producer-consumer.o

clean:
	rm -f producer-consumer *.o

all: clean producer-consumer

producer-consumer.c

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

uint32_t sum_initial;
uint32_t sum_to_send;
uint32_t sum_received = 0;

pthread_mutex_t lock_sum_to_send  = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock_sum_received = PTHREAD_MUTEX_INITIALIZER;

uint32_t *buffer = NULL;
uint32_t  buf_size;
int       fd[2];

sem_t e;
sem_t f;
sem_t g;

uint32_t  pi = 0;
uint32_t  ci = 0;

void enqueue(uint32_t piece)
{
    sem_wait(&e);
    sem_wait(&g);

    buffer[pi] = piece;
    pi = (pi + 1) % buf_size;
    
    sem_post(&g);
    sem_post(&f);
}

uint32_t dequeue() 
{
    uint32_t piece;
    sem_wait(&f);
    sem_wait(&g);

    piece = buffer[ci];
    ci = (ci + 1) % buf_size;
    
    sem_post(&g);
    sem_post(&e);
    return piece;
}

void *producer(void *i) 
{
    uint32_t piece;

    while (1) {
        piece = random() % 10 + 1; // 1 ~ 10

        pthread_mutex_lock(&lock_sum_to_send);

        assert(sum_to_send >= 0);
        if (sum_to_send == 0) {
            pthread_mutex_unlock(&lock_sum_to_send);
            break;
        }

        if (piece > sum_to_send) {
            piece = sum_to_send;
        }
        sum_to_send -= piece;
        printf("p%3ld enq %2u, sum_to_send:%u\n", (long)i, piece, sum_to_send);

        pthread_mutex_unlock(&lock_sum_to_send);

        enqueue(piece);
    }
    return NULL;
}

void *consumer(void *i) 
{
    uint32_t piece;
    int done = 0;

    while (1) {
        piece = dequeue();

        pthread_mutex_lock(&lock_sum_received);

        sum_received += piece;
        printf("\t\t\t\t c%3ld deq %2d, sum_received:%u\n", (long)i, piece, sum_received);

        assert(sum_received <= sum_initial);
        if (sum_received == sum_initial) {
            done = 1;
        }
        
        pthread_mutex_unlock(&lock_sum_received);

        if (done) {
            write(fd[1], "x", 1);
        }
    }
}

int main(int argc, char **argv)
{
    char x;
    int i, res;
    pthread_t tid;
    uint32_t num_producers, num_consumers;

    if (argc != 5) {
        fprintf(stderr, "usage: %s <sum> <buf_size> <num_producers> <num_consumers>\n", argv[0]);
        exit(1);
    }

    sum_initial = sum_to_send = atoi(argv[1]);
    buf_size = atoi(argv[2]);
    num_producers = atoi(argv[3]);
    num_consumers = atoi(argv[4]);

    srandom(time(NULL));
    buffer = malloc(sizeof(uint32_t) * buf_size);
    assert(buffer);
    res = pipe(fd);
    assert(res == 0);

    res = sem_init(&e, 0, buf_size);  assert(res == 0);
    res = sem_init(&f, 0, 0);         assert(res == 0);
    res = sem_init(&g, 0, 1);         assert(res == 0);

    for (i = 0; i < num_producers; i++) {
        res = pthread_create(&tid, NULL, producer, (void *)(long)i);
        assert(res == 0);
        res = pthread_detach(tid);
        assert(res == 0);
    }
    for (i = 0; i < num_consumers; i++) {
        res = pthread_create(&tid, NULL, consumer, (void *)(long)i);
        assert(res == 0);
        res = pthread_detach(tid);
        assert(res == 0);
    }

    read(fd[0], &x, 1);
    close(fd[0]);
    close(fd[1]);

    free(buffer);
    exit(0);
}

producer-consumer.md

producer-consumer.c
-------------------

In this simulation, a number of producer threads break up a large integer into a
sum of small integers and put them into a bounded buffer; a number of consumer
threads read the numbers from the bounded buffer and sum them up to the original
large integer. 

Here is an example run where six producer threads break up the number `100` and
put the pieces into a bounded buffer of size `3`. Then, six consumer threads
read the numbers from the buffer and sum them to `100`.

The line `p  0 enq  7, sum_to_send:93` shows the producer #0 enqueues number 7
into the buffer.  At that time, the remaining `sum_to_send` is 93. The line `c
0 deq  7, sum_received:7` shows the consumer #0 dequeues number 7 from the
buffer, and at that time, the `sum_received` so far is 7.

```
$ ./producer-consumer 100 3 6 6
p  0 enq  7, sum_to_send:93
p  0 enq  6, sum_to_send:87
p  0 enq  3, sum_to_send:84
p  0 enq  5, sum_to_send:79
p  4 enq  7, sum_to_send:72
p  5 enq  8, sum_to_send:64
                                 c  0 deq  7, sum_received:7
p  0 enq  2, sum_to_send:62
                                 c  0 deq  6, sum_received:13
                                 c  0 deq  3, sum_received:16
                                 c  0 deq  5, sum_received:21
                                 c  0 deq  2, sum_received:23
p  0 enq  5, sum_to_send:57
p  0 enq  8, sum_to_send:49
p  0 enq  2, sum_to_send:47
p  0 enq  6, sum_to_send:41
                                 c  0 deq  5, sum_received:28
                                 c  0 deq  8, sum_received:36
                                 c  0 deq  2, sum_received:38
                                 c  1 deq  8, sum_received:46
                                 c  0 deq  6, sum_received:52
p  4 enq  3, sum_to_send:38
p  4 enq  6, sum_to_send:32
p  4 enq 10, sum_to_send:22
p  4 enq  2, sum_to_send:20
                                 c  2 deq  7, sum_received:59
p  4 enq  5, sum_to_send:15
                                 c  2 deq  3, sum_received:62
p  4 enq  6, sum_to_send:9
                                 c  2 deq  6, sum_received:68
p  4 enq  2, sum_to_send:7
                                 c  2 deq 10, sum_received:78
                                 c  2 deq  5, sum_received:83
p  4 enq  5, sum_to_send:2
p  4 enq  2, sum_to_send:0
                                 c  2 deq  5, sum_received:88
                                 c  2 deq  2, sum_received:90
                                 c  1 deq  2, sum_received:92
                                 c  4 deq  6, sum_received:98
                                 c  3 deq  2, sum_received:100
```

Note that all producer and consumer threads are running concurrently, enqueuing
and dequeuing to and from the bounded buffer. Producers block if the buffer is
full, and consumers block if the buffer is empty. The code does not contain any
calls to `sleep()` or `usleep()` to synchronize between the threads. When the
`sum_received` has reached the initial number that was passed in as a
command-line argument (`100` in this example), the program shuts down
gracefully.