cs-3013-assignment-3/main.c

461 lines
14 KiB
C
Raw Permalink Normal View History

2019-09-22 01:30:07 +00:00
#include <errno.h>
2019-09-23 17:14:00 +00:00
#include <pthread.h>
#include <signal.h>
2019-09-22 01:30:07 +00:00
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
2019-09-23 17:14:00 +00:00
#include "adder.h"
2019-09-22 01:30:07 +00:00
#include "mailbox.h"
#include "main.h"
2019-09-24 08:09:35 +00:00
#include "queue.h"
2019-09-22 01:30:07 +00:00
/**
* Parse a line of input into an input struct.
*
* @param input_line A line of input.
* @param res An already allocated input struct to store the result in.
*
* @return int -1 if the input is malformed, and 0 otherwise.
*/
static int parse_input_line(char *input_line, struct input *res) {
// if tmp has anything in it, then we know that extra values have been entered.
char tmp[2];
int num_matched = sscanf(input_line, " %d %d %1s", &(res->value), &(res->recipient), tmp);
return num_matched == 2 ? 0 : -1;
}
/**
* Convert the given input to an array of input structs.
*
* @param num_threads The number of threads that the program has been spawned with.
* @param res A pointer that will be allocated as an array of inputs. Must be freed by the caller.
*
* @return int the number of input lines that were read.
*/
static int get_input(int num_threads, struct input **res) {
int input_index = 0;
int current_buffer_size = BUFFER_SIZE;
char *input_buffer = malloc(current_buffer_size);
// Keep reaidng until we hit an EOF
while (true) {
char *read_result;
while((read_result = fgets(input_buffer, current_buffer_size, stdin))) {
int read_size = strlen(input_buffer);
if (input_buffer[read_size - 1] == '\n') {
break;
}
// Grow our buffer if we nede to read more input.
current_buffer_size *= 2;
input_buffer = realloc(input_buffer, current_buffer_size);
}
if (read_result == NULL) {
break;
}
struct input current_input;
int parse_result = parse_input_line(input_buffer, &current_input);
if (parse_result == -1) {
// On bad input, we should assume an EOF and stop.
break;
2019-09-27 00:26:32 +00:00
} else if (current_input.recipient > num_threads || current_input.recipient <= 0) {
2019-09-22 01:30:07 +00:00
printf("Invalid recipient id.\n");
continue;
2019-09-27 00:26:32 +00:00
} else if (current_input.value < 0) {
printf("Invalid value.\n");
continue;
2019-09-22 01:30:07 +00:00
}
2019-09-23 20:37:04 +00:00
*res = realloc(*res, sizeof(struct input) * (input_index + 1));
2019-09-22 01:30:07 +00:00
memcpy(*res + input_index, &current_input, sizeof(struct input));
input_index++;
}
free(input_buffer);
return input_index;
}
2019-09-26 05:19:16 +00:00
/**
* Kills the given thread. Should only be used in exceptional circumstances.
*
* @param thread The thread to kill.
2019-09-26 16:23:24 +00:00
* @param free_ret If true, the memory associated with the return value will be freed.
2019-09-26 05:19:16 +00:00
*/
2019-09-26 16:23:24 +00:00
static void kill_thread(pthread_t thread, bool free_ret) {
2019-09-26 05:19:16 +00:00
pthread_cancel(thread);
2019-09-26 16:23:24 +00:00
void *ret;
pthread_join(thread, free_ret ? &ret : NULL);
if (free_ret && ret != PTHREAD_CANCELED) {
free(ret);
}
2019-09-26 05:19:16 +00:00
}
2019-09-23 19:36:27 +00:00
/**
* Kill all threads. Should only be used in exceptional circumstances.
*
* @param threads An array of threads to kill.
* @param num_threads The number of threads to kill.
2019-09-26 16:23:24 +00:00
* @param free_ret If true, the memory associated with the return value will be freed.
2019-09-23 19:36:27 +00:00
*/
2019-09-26 16:23:24 +00:00
static void kill_threads(pthread_t *threads, int num_threads, bool free_ret) {
2019-09-23 19:36:27 +00:00
for (int i = 0; i < num_threads; i++) {
2019-09-26 16:23:24 +00:00
kill_thread(threads[i], free_ret);
2019-09-23 19:36:27 +00:00
}
}
2019-09-23 20:37:04 +00:00
/**
* Join all of the given adder threads and output when they fail.
*
* @param threads The adder threads to join.
* @param num_threads The number of threads to join.
*
* @return int -1 if one of the threads failed or failed to join, 0 otherwise.
*/
static int join_adder_threads(pthread_t *threads, int num_threads) {
bool failed = false;
for (int i = 0; i < num_threads; i++) {
void *raw_retval;
int join_result = pthread_join(threads[i], &raw_retval);
if (join_result != 0) {
printf("Failed to join thread #%d. (%s)\n", i + 1, strerror(join_result));
failed = true;
continue;
}
int retval = *(int*)(raw_retval);
if (retval != 0) {
printf("Thread #%d failed.\n", i + 1);
failed = true;
}
// Adder threads require that their retval be freed.
free(raw_retval);
}
return failed ? -1 : 0;
}
2019-09-23 17:14:00 +00:00
/**
* Start the adder threads
*
* @param thread_ids An array of all of the thread ids to spawn, in order.
* @param num_threads The number of threads to spawn
* @param threads A pre-allocated array to hold the spawned threads.
*
* @return int -1 on error, 0 otherwise.
*/
static int start_adder_threads(int *thread_ids, int num_threads, pthread_t *threads) {
for (int i = 0; i < num_threads; i++) {
int create_result = pthread_create(&threads[i], NULL, adder, &thread_ids[i]);
if (create_result != 0) {
2019-09-23 19:36:27 +00:00
// Kill the i threads that we have spawned.
2019-09-26 16:23:24 +00:00
kill_threads(threads, i, true);
2019-09-23 17:14:00 +00:00
return -1;
}
}
return 0;
}
2019-09-23 19:36:27 +00:00
/**
* Send all of the inputs as messages to the designated mailboxes.
*
* @param inputs An array of all of the inputs to send.
2019-09-23 19:36:27 +00:00
* @param n The number of inputs provided.
*
* @return int 0 on success, or the index of the input that failed to send otherwise.
*/
static int send_input_messages(struct input *inputs, int n) {
struct msg message = {0};
for (int i = 0; i < n; i++) {
message.value = inputs[i].value;
int send_result = SendMsg(inputs[i].recipient, &message);
if (send_result != 0) {
return i;
}
}
return 0;
}
/**
* Send termination messages to all of the threads from 1 through num_threads
*
* @param inputs An array of lal of the inputs to send.
* @param n The number of inputs provided.
*
* @return int 0 on success, or the index of the mailbox that failed to send otherwise.
*/
static int send_termination_messages(int num_threads) {
struct msg message = {0};
// We don't need to send one to mailbox zero. We only need to get send them for all other mailboxes.
for (int i = 1; i <= num_threads; i++) {
message.value = -1;
int send_result = SendMsg(i, &message);
if (send_result != 0) {
return i;
}
}
return 0;
}
/**
* Send all of the inputs and the termination messages to adder threads
*
* @param inputs An array of all of the inputs to send.
* @param num_inputs The number of items in the inputs array.
* @param num_threads The number of threads that are running. Termination signals will be sent to all threads from
* 1 through num_threads.
*
2019-09-26 05:06:04 +00:00
* @return int -1 on error, 0 otherwise.
*/
static int handle_blocking_input(struct input *inputs, int num_inputs, int num_threads) {
int send_result = send_input_messages(inputs, num_inputs);
if (send_result != 0) {
printf("Failed to send input to mailbox #%d.\n", send_result);
return -1;
}
int termination_result = send_termination_messages(num_threads);
if (termination_result != 0) {
printf("Failed to send termination messsages to mailbox #%d.\n", termination_result);
return -1;
}
return 0;
}
/**
* Generate a termination input for every thread that does not have any inputs. Entries must be freed by the caller
* when dequeued.
*
* @param message_counts A count of the messages for each thread, with thread 1 indexed at position zero.
* @param num_threads The number of adder threads.
* @param res_queue an already allocated queue with enough capacity to hold all threads.
*
* @return -1 on error, 0 otherwise.
*/
static int generate_non_inputted_mailbox_terminations(int *message_counts, int num_threads, struct queue *res_queue) {
for (int i = 0; i < num_threads; i++) {
if (message_counts[i] > 0) {
continue;
}
struct input *termination_input = malloc(sizeof(struct input));
termination_input->recipient = i + 1;
termination_input->value = -1;
int enqueue_result = priority_enqueue(res_queue, termination_input);
if (enqueue_result != 0) {
// There is a potential for a memory leak here, but it can only happen if there's programmer error
// (not enough capacity in the queue), so I'm not worried about it.
return -1;
}
}
return 0;
}
/**
* Get the number of messages for each thread in the given input array.
*
* @param inputs An array of all of the inputs to process.
* @param num_inputs The number of inputs provided.
* @param counts An allocated int array of num_threads length to hold the counts
* @param num_threads The number of adder threads
*/
static void generate_message_counts(struct input *inputs, int num_inputs, int *counts, int num_threads) {
memset(counts, 0, sizeof(int) * num_threads);
for (int i = 0; i < num_inputs; i++) {
struct input current_input = inputs[i];
// Because thread ids go from [1, n], we need to shift them down.
counts[current_input.recipient - 1]++;
}
}
2019-09-24 07:07:21 +00:00
/**
2019-09-24 08:09:35 +00:00
* A implementation of cycling over all of the given messages until they send.
2019-09-24 07:07:21 +00:00
*
* @param inputs An array of all of the inputs to send.
* @param num_inputs The number of inputs provided.
* @param num_threads The number of adder threads.
2019-09-24 07:07:21 +00:00
*
2019-09-24 08:09:35 +00:00
* @return int 0 on success, or -1 on error.
2019-09-24 07:07:21 +00:00
*/
static int cycle_send_input_messages(struct input *inputs, int num_inputs, int num_threads) {
struct queue input_queue = init_queue(num_inputs + num_threads);
for (int i = 0; i < num_inputs; i++) {
// So we can free the inputs we generate dynamically, we're going to reallocate the inputs for elegance's sake.
// A bit wasteful, but it allows for a cleaner implementation.
struct input *input_to_queue = malloc(sizeof(struct input));
memcpy(input_to_queue, &inputs[i], sizeof(struct input));
enqueue(&input_queue, input_to_queue);
2019-09-24 07:07:21 +00:00
}
int message_counts[num_threads];
// Get counts for all of the messages and generate termination messages for those that need to terminate
// before we start sending actual inputs.
generate_message_counts(inputs, num_inputs, message_counts, num_threads);
int termination_enqueue_result = generate_non_inputted_mailbox_terminations(message_counts, num_threads, &input_queue);
if (termination_enqueue_result != 0) {
free_queue(&input_queue);
return -1;
}
2019-09-24 07:07:21 +00:00
struct msg message = {0};
2019-09-24 08:09:35 +00:00
void *raw_input;
while (dequeue(&input_queue, &raw_input) != -1) {
struct input *input = raw_input;
message.value = input->value;
int send_result = NBSendMsg(input->recipient, &message);
int recipient = input->recipient;
2019-09-24 08:09:35 +00:00
if (send_result != 0 && errno != EAGAIN) {
free_queue(&input_queue);
return -1;
} else if (send_result != 0 && errno == EAGAIN) {
enqueue(&input_queue, input);
continue;
} else {
free(input);
message_counts[recipient - 1]--;
}
if (message_counts[recipient - 1] == 0) {
struct input *input_to_queue = malloc(sizeof(struct input));
input_to_queue->recipient = recipient;
input_to_queue->value = -1;
int enqueue_result = priority_enqueue(&input_queue, input_to_queue);
if (enqueue_result != 0) {
free_queue(&input_queue);
return -1;
}
2019-09-24 07:07:21 +00:00
}
}
2019-09-24 08:09:35 +00:00
free_queue(&input_queue);
2019-09-24 07:07:21 +00:00
return 0;
}
2019-09-26 05:06:04 +00:00
/**
* Send all of the inputs and the termination messages to adder threads without blocking on sends.
*
* @param inputs An array of all of the inputs to send.
* @param num_inputs The number of items in the inputs array.
* @param num_threads The number of threads that are running. Termination signals will be sent to all threads from
* 1 through num_threads.
*
* @return int -1 on error, 0 otherwise.
*/
static int handle_nonblocking_input(struct input *inputs, int num_inputs, int num_threads) {
int send_result = cycle_send_input_messages(inputs, num_inputs, num_threads);
if (send_result != 0) {
printf("Failed to send input to a mailbox.\n");
return -1;
}
return 0;
}
2019-09-23 20:37:04 +00:00
/**
* Collect n messages from mailbox 0
*
* @param raw_num_threads A pointer to an integer that contains the number of threads to recieve messages from.:w
*
* @return void* Not used.
*/
static void *collect_results(void *raw_n) {
int n = *(int*)(raw_n);
struct msg message;
for (int i = 0; i < n; i++) {
int recv_result = RecvMsg(0, &message);
if (recv_result == -1) {
printf("Failed to receive message from adder thread.\n");
continue;
}
printf(OUTPUT_FORMAT, message.iFrom, message.value, message.cnt, message.tot);
}
return NULL;
}
2019-09-22 01:30:07 +00:00
int main(int argc, char *argv[]) {
struct input *inputs = NULL;
2019-09-24 07:07:21 +00:00
bool non_blocking = false;
2019-09-22 01:36:38 +00:00
if (argc < 2 || argc > 3) {
printf("%s\n", USAGE_STRING);
return -1;
} else if (argc == 3) {
2019-09-24 07:07:21 +00:00
non_blocking = true;
2019-09-22 01:36:38 +00:00
}
2019-09-23 20:37:04 +00:00
int num_adder_threads = strtol(argv[1], NULL, 10);
if (errno == ERANGE || (num_adder_threads == 0 && errno == EINVAL)) {
2019-09-22 01:36:38 +00:00
printf("Invalid number of threads.\n");
}
2019-09-23 20:37:04 +00:00
num_adder_threads = num_adder_threads > MAXTHREAD ? MAXTHREAD : num_adder_threads;
2019-09-22 01:36:38 +00:00
2019-09-23 20:37:04 +00:00
int num_inputs = get_input(num_adder_threads, &inputs);
// Init one mailbox for each thread, and an additional one for the main thread.
2019-09-23 20:37:04 +00:00
int init_result = init_mailboxes(num_adder_threads + 1);
2019-09-22 18:48:05 +00:00
if (init_result != 0) {
printf("Failed to setup mailboxes.\n");
2019-09-23 20:39:22 +00:00
free(inputs);
2019-09-22 18:48:05 +00:00
return 1;
}
2019-09-23 17:14:00 +00:00
// Must allocate an array of the ids to prevent a race condition when adder starts
// We don't want the thread id to be deallocated.
// We don't want to allocate an id for thread id 0, so we store all of the others.
2019-09-23 20:37:04 +00:00
int mailbox_ids[num_adder_threads];
for (int i = 0; i < num_adder_threads; i++) {
2019-09-23 17:14:00 +00:00
mailbox_ids[i] = i + 1;
}
2019-09-23 20:37:04 +00:00
pthread_t adder_threads[num_adder_threads];
int start_result = start_adder_threads(mailbox_ids, num_adder_threads, adder_threads);
2019-09-23 17:14:00 +00:00
if (start_result != 0) {
printf("Failed to start adder threads. (%s)\n", strerror(errno));
2019-09-23 20:39:22 +00:00
free(inputs);
2019-09-23 17:14:00 +00:00
free_mailboxes();
return 1;
}
2019-09-23 20:37:04 +00:00
pthread_t collect_thread;
int spawn_result = pthread_create(&collect_thread, NULL, collect_results, &num_adder_threads);
if (spawn_result != 0) {
printf("Failed to spawn collection thread.\n");
2019-09-23 20:39:22 +00:00
free(inputs);
2019-09-26 16:23:24 +00:00
kill_threads(adder_threads, num_adder_threads, true);
2019-09-26 05:19:16 +00:00
free_mailboxes();
2019-09-23 20:37:04 +00:00
return 1;
}
2019-09-24 07:07:21 +00:00
int send_result;
if (non_blocking) {
2019-09-26 05:06:04 +00:00
send_result = handle_nonblocking_input(inputs, num_inputs, num_adder_threads);
2019-09-24 07:07:21 +00:00
} else {
send_result = handle_blocking_input(inputs, num_inputs, num_adder_threads);
2019-09-24 07:07:21 +00:00
}
2019-09-23 20:39:22 +00:00
free(inputs);
2019-09-23 19:36:27 +00:00
if (send_result != 0) {
2019-09-26 16:23:24 +00:00
kill_threads(adder_threads, num_adder_threads, true);
kill_thread(collect_thread, false);
2019-09-23 19:36:27 +00:00
free_mailboxes();
return 1;
}
2019-09-23 20:37:04 +00:00
int join_result = join_adder_threads(adder_threads, num_adder_threads);
if (join_result != 0) {
2019-09-23 20:37:04 +00:00
// If one of the joins failed, we have to tell the collection thread we're done.
pthread_cancel(collect_thread);
}
pthread_join(collect_thread, NULL);
2019-09-23 19:36:27 +00:00
2019-09-22 18:48:05 +00:00
free_mailboxes();
2019-09-22 01:30:07 +00:00
}