#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#define BUFFER_SIZE 32
#define NUM_PRODUCERS 2
#define NUM_CONSUMERS 2
struct message {
char *content;
};
struct circular_buffer {
struct message buffer[BUFFER_SIZE];
int head;
int tail;
sem_t filled_slot;
sem_t empty_slot;
sem_t mutex;
};
struct thread_args {
struct circular_buffer *cb;
int id;
};
/* Function prototypes */
void message_init(struct message *msg, const char *content);
void message_destroy(struct message *msg);
void circular_buffer_init(struct circular_buffer *cb);
void circular_buffer_destroy(struct circular_buffer *cb);
void *producer(void *arg);
void *consumer(void *arg);
/* Main.
* - Creates the buffer
* - Starts the producer and consumer threads, and then,
* - Wits for them to finish.
*/
int
main(void)
{
struct circular_buffer cb;
pthread_t prod_threads[NUM_PRODUCERS];
pthread_t cons_threads[NUM_CONSUMERS];
struct thread_args prod_args[NUM_PRODUCERS];
struct thread_args cons_args[NUM_CONSUMERS];
int i;
circular_buffer_init(&cb);
for (i = 0; i < NUM_PRODUCERS; i++) {
prod_args[i].cb = &cb;
prod_args[i].id = i;
pthread_create(&prod_threads[i], NULL, producer, &prod_args[i]);
}
for (i = 0; i < NUM_CONSUMERS; i++) {
cons_args[i].cb = &cb;
cons_args[i].id = i;
pthread_create(&cons_threads[i], NULL, consumer, &cons_args[i]);
}
for (i = 0; i < NUM_PRODUCERS; i++)
pthread_join(prod_threads[i], NULL);
for (i = 0; i < NUM_CONSUMERS; i++)
pthread_join(cons_threads[i], NULL);
circular_buffer_destroy(&cb);
return 0;
}
/* The buffer stores `struct message` objects, which are just strings. */
void
message_init(struct message *msg, const char *content)
{
msg->content = strdup(content);
}
void
message_destroy(struct message *msg)
{
free(msg->content);
msg->content = NULL;
}
/* Initialization and destruction for the circular buffer. */
void
circular_buffer_init(struct circular_buffer *cb)
{
cb->head = 0;
cb->tail = 0;
// Usage: sem_init(sem_address, 0, initial_value);
// (the second argument is for advanced Unix features we're not using)
sem_init(&cb->filled_slot, 0, 0);
sem_init(&cb->empty_slot, 0, BUFFER_SIZE);
sem_init(&cb->mutex, 0, 1);
}
void
circular_buffer_destroy(struct circular_buffer *cb)
{
sem_destroy(&cb->filled_slot);
sem_destroy(&cb->empty_slot);
sem_destroy(&cb->mutex);
}
/* Producer and consumer functions. */
void *
producer(void *arg)
{
struct thread_args *args = (struct thread_args *)arg;
struct circular_buffer *cb = args->cb;
int id = args->id;
char message_content[50];
for (int i = 0; i < 10; i++) {
snprintf(message_content, sizeof(message_content),
"Message %d from Producer %d", i, id);
sem_wait(&cb->empty_slot);
sem_wait(&cb->mutex);
message_init(&cb->buffer[cb->tail], message_content);
cb->tail = (cb->tail + 1) % BUFFER_SIZE;
sem_post(&cb->mutex);
sem_post(&cb->filled_slot);
usleep(rand() % 1000000); /* Sleep up to 1 second */
}
/* Send termination message */
sem_wait(&cb->empty_slot);
sem_wait(&cb->mutex);
message_init(&cb->buffer[cb->tail], "Bye");
cb->tail = (cb->tail + 1) % BUFFER_SIZE;
sem_post(&cb->mutex);
sem_post(&cb->filled_slot);
return NULL;
}
void *
consumer(void *arg)
{
struct thread_args *args = (struct thread_args *)arg;
struct circular_buffer *cb = args->cb;
int id = args->id;
struct message msg;
while (1) {
sem_wait(&cb->filled_slot);
sem_wait(&cb->mutex);
msg = cb->buffer[cb->head];
cb->head = (cb->head + 1) % BUFFER_SIZE;
sem_post(&cb->mutex);
sem_post(&cb->empty_slot);
printf("Consumer %d: %s\n", id, msg.content);
if (strcmp(msg.content, "Bye") == 0) {
message_destroy(&msg);
break;
}
message_destroy(&msg);
usleep(rand() % 1000000); /* Sleep up to 1/10 second */
}
return NULL;
}