immediately teminate when nb messages finish for mailbox

master
Nick Krichevsky 2019-09-26 00:54:22 -04:00
parent 518cfd2992
commit b9f8289ffd
2 changed files with 54 additions and 16 deletions

65
main.c
View File

@ -212,27 +212,18 @@ static int handle_blocking_input(struct input *inputs, int num_inputs, int num_t
} }
/** /**
* Generate a termination input for every thread that does not have an entry in the input list. Entries must be freed * Generate a termination input for every thread that does not have any inputs. Entries must be freed by the caller
* by the caller when dequeued. * when dequeued.
* *
* @param inputs An array of all of the inputs to send. * @param message_counts A count of the messages for each thread, with thread 1 indexed at position zero.
* @param num_inputs The number of inputs provided.
* @param num_threads The number of adder threads. * @param num_threads The number of adder threads.
* @param res_queue an already allocated queue with enough capacity to hold all threads. * @param res_queue an already allocated queue with enough capacity to hold all threads.
* *
* @return -1 on error, 0 otherwise. * @return -1 on error, 0 otherwise.
*/ */
static int generate_non_inputted_mailbox_terminations(struct input *inputs, int num_inputs, int num_threads, struct queue *res_queue) { static int generate_non_inputted_mailbox_terminations(int *message_counts, int num_threads, struct queue *res_queue) {
bool have_message[num_threads];
memset(have_message, false, sizeof(bool) * num_threads);
for (int i = 0; i < num_inputs; i++) {
int recipient = inputs[i].recipient;
// Someone can enter a mailbox of [1, n] with n threads, so we must subtract one so it fits.
have_message[recipient - 1] = true;
}
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
if (have_message[i]) { if (message_counts[i] > 0) {
continue; continue;
} }
@ -250,6 +241,23 @@ static int generate_non_inputted_mailbox_terminations(struct input *inputs, int
return 0; return 0;
} }
/**
* Get the number of messages for each thread in the given input array.
*
* @param inputs An array of all of the inputs to process.
* @param num_inputs The number of inputs provided.
* @param counts An allocated int array of num_threads length to hold the counts
* @param num_threads The number of adder threads
*/
static void generate_message_counts(struct input *inputs, int num_inputs, int *counts, int num_threads) {
memset(counts, 0, sizeof(int) * num_threads);
for (int i = 0; i < num_inputs; i++) {
struct input current_input = inputs[i];
// Because thread ids go from [1, n], we need to shift them down.
counts[current_input.recipient - 1]++;
}
}
/** /**
* A implementation of cycling over all of the given messages until they send. * A implementation of cycling over all of the given messages until they send.
* *
@ -262,9 +270,18 @@ static int generate_non_inputted_mailbox_terminations(struct input *inputs, int
static int cycle_send_input_messages(struct input *inputs, int num_inputs, int num_threads) { static int cycle_send_input_messages(struct input *inputs, int num_inputs, int num_threads) {
struct queue input_queue = init_queue(num_inputs + num_threads); struct queue input_queue = init_queue(num_inputs + num_threads);
for (int i = 0; i < num_inputs; i++) { for (int i = 0; i < num_inputs; i++) {
enqueue(&input_queue, &inputs[i]); // So we can free the inputs we generate dynamically, we're going to reallocate the inputs for elegance's sake.
// A bit wasteful, but it allows for a cleaner implementation.
struct input *input_to_queue = malloc(sizeof(struct input));
memcpy(input_to_queue, &inputs[i], sizeof(struct input));
enqueue(&input_queue, input_to_queue);
} }
int termination_enqueue_result = generate_non_inputted_mailbox_terminations(inputs, num_inputs, num_threads, &input_queue);
int message_counts[num_threads];
// Get counts for all of the messages and generate termination messages for those that need to terminate
// before we start sending actual inputs.
generate_message_counts(inputs, num_inputs, message_counts, num_threads);
int termination_enqueue_result = generate_non_inputted_mailbox_terminations(message_counts, num_threads, &input_queue);
if (termination_enqueue_result != 0) { if (termination_enqueue_result != 0) {
free_queue(&input_queue); free_queue(&input_queue);
return -1; return -1;
@ -276,11 +293,27 @@ static int cycle_send_input_messages(struct input *inputs, int num_inputs, int n
struct input *input = raw_input; struct input *input = raw_input;
message.value = input->value; message.value = input->value;
int send_result = NBSendMsg(input->recipient, &message); int send_result = NBSendMsg(input->recipient, &message);
int recipient = input->recipient;
if (send_result != 0 && errno != EAGAIN) { if (send_result != 0 && errno != EAGAIN) {
free_queue(&input_queue); free_queue(&input_queue);
return -1; return -1;
} else if (send_result != 0 && errno == EAGAIN) { } else if (send_result != 0 && errno == EAGAIN) {
enqueue(&input_queue, input); enqueue(&input_queue, input);
continue;
} else {
free(input);
message_counts[recipient - 1]--;
}
if (message_counts[recipient - 1] == 0) {
struct input *input_to_queue = malloc(sizeof(struct input));
input_to_queue->recipient = recipient;
input_to_queue->value = -1;
int enqueue_result = priority_enqueue(&input_queue, input_to_queue);
if (enqueue_result != 0) {
free_queue(&input_queue);
return -1;
}
} }
} }
free_queue(&input_queue); free_queue(&input_queue);

View File

@ -21,6 +21,11 @@ struct queue init_queue(int cap) {
* @param q The queue to free. * @param q The queue to free.
*/ */
void free_queue(struct queue *q) { void free_queue(struct queue *q) {
void *item;
while (dequeue(q, &item) != -1) {
free(item);
}
free(q->items); free(q->items);
} }