From b9f8289ffd59fa0fa653529b812c8680807c9c71 Mon Sep 17 00:00:00 2001 From: Nick Krichevsky Date: Thu, 26 Sep 2019 00:54:22 -0400 Subject: [PATCH] immediately teminate when nb messages finish for mailbox --- main.c | 65 +++++++++++++++++++++++++++++++++++++++++++-------------- queue.c | 5 +++++ 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/main.c b/main.c index cc5a0d9..d5cc09e 100644 --- a/main.c +++ b/main.c @@ -212,27 +212,18 @@ static int handle_blocking_input(struct input *inputs, int num_inputs, int num_t } /** - * Generate a termination input for every thread that does not have an entry in the input list. Entries must be freed - * by the caller when dequeued. + * Generate a termination input for every thread that does not have any inputs. Entries must be freed by the caller + * when dequeued. * - * @param inputs An array of all of the inputs to send. - * @param num_inputs The number of inputs provided. + * @param message_counts A count of the messages for each thread, with thread 1 indexed at position zero. * @param num_threads The number of adder threads. * @param res_queue an already allocated queue with enough capacity to hold all threads. * * @return -1 on error, 0 otherwise. */ -static int generate_non_inputted_mailbox_terminations(struct input *inputs, int num_inputs, int num_threads, struct queue *res_queue) { - bool have_message[num_threads]; - memset(have_message, false, sizeof(bool) * num_threads); - for (int i = 0; i < num_inputs; i++) { - int recipient = inputs[i].recipient; - // Someone can enter a mailbox of [1, n] with n threads, so we must subtract one so it fits. - have_message[recipient - 1] = true; - } - +static int generate_non_inputted_mailbox_terminations(int *message_counts, int num_threads, struct queue *res_queue) { for (int i = 0; i < num_threads; i++) { - if (have_message[i]) { + if (message_counts[i] > 0) { continue; } @@ -250,6 +241,23 @@ static int generate_non_inputted_mailbox_terminations(struct input *inputs, int return 0; } +/** + * Get the number of messages for each thread in the given input array. + * + * @param inputs An array of all of the inputs to process. + * @param num_inputs The number of inputs provided. + * @param counts An allocated int array of num_threads length to hold the counts + * @param num_threads The number of adder threads + */ +static void generate_message_counts(struct input *inputs, int num_inputs, int *counts, int num_threads) { + memset(counts, 0, sizeof(int) * num_threads); + for (int i = 0; i < num_inputs; i++) { + struct input current_input = inputs[i]; + // Because thread ids go from [1, n], we need to shift them down. + counts[current_input.recipient - 1]++; + } +} + /** * A implementation of cycling over all of the given messages until they send. * @@ -262,9 +270,18 @@ static int generate_non_inputted_mailbox_terminations(struct input *inputs, int static int cycle_send_input_messages(struct input *inputs, int num_inputs, int num_threads) { struct queue input_queue = init_queue(num_inputs + num_threads); for (int i = 0; i < num_inputs; i++) { - enqueue(&input_queue, &inputs[i]); + // So we can free the inputs we generate dynamically, we're going to reallocate the inputs for elegance's sake. + // A bit wasteful, but it allows for a cleaner implementation. + struct input *input_to_queue = malloc(sizeof(struct input)); + memcpy(input_to_queue, &inputs[i], sizeof(struct input)); + enqueue(&input_queue, input_to_queue); } - int termination_enqueue_result = generate_non_inputted_mailbox_terminations(inputs, num_inputs, num_threads, &input_queue); + + int message_counts[num_threads]; + // Get counts for all of the messages and generate termination messages for those that need to terminate + // before we start sending actual inputs. + generate_message_counts(inputs, num_inputs, message_counts, num_threads); + int termination_enqueue_result = generate_non_inputted_mailbox_terminations(message_counts, num_threads, &input_queue); if (termination_enqueue_result != 0) { free_queue(&input_queue); return -1; @@ -276,11 +293,27 @@ static int cycle_send_input_messages(struct input *inputs, int num_inputs, int n struct input *input = raw_input; message.value = input->value; int send_result = NBSendMsg(input->recipient, &message); + int recipient = input->recipient; if (send_result != 0 && errno != EAGAIN) { free_queue(&input_queue); return -1; } else if (send_result != 0 && errno == EAGAIN) { enqueue(&input_queue, input); + continue; + } else { + free(input); + message_counts[recipient - 1]--; + } + + if (message_counts[recipient - 1] == 0) { + struct input *input_to_queue = malloc(sizeof(struct input)); + input_to_queue->recipient = recipient; + input_to_queue->value = -1; + int enqueue_result = priority_enqueue(&input_queue, input_to_queue); + if (enqueue_result != 0) { + free_queue(&input_queue); + return -1; + } } } free_queue(&input_queue); diff --git a/queue.c b/queue.c index 6f07adc..ff3c94d 100644 --- a/queue.c +++ b/queue.c @@ -21,6 +21,11 @@ struct queue init_queue(int cap) { * @param q The queue to free. */ void free_queue(struct queue *q) { + void *item; + while (dequeue(q, &item) != -1) { + free(item); + } + free(q->items); }