Refactor blocking input sends in preperation for fixing nonblocking inputs

master
Nick Krichevsky 2019-09-25 17:22:04 -04:00
parent 98b0dfe85e
commit cedaf7305f
1 changed files with 63 additions and 31 deletions

94
main.c
View File

@ -145,7 +145,7 @@ static int start_adder_threads(int *thread_ids, int num_threads, pthread_t *thre
/**
* Send all of the inputs as messages to the designated mailboxes.
*
* @param inputs An array of lal of the inputs to send.
* @param inputs An array of all of the inputs to send.
* @param n The number of inputs provided.
*
* @return int 0 on success, or the index of the input that failed to send otherwise.
@ -163,6 +163,54 @@ static int send_input_messages(struct input *inputs, int n) {
return 0;
}
/**
* Send termination messages to all of the threads from 1 through num_threads
*
* @param inputs An array of lal of the inputs to send.
* @param n The number of inputs provided.
*
* @return int 0 on success, or the index of the mailbox that failed to send otherwise.
*/
static int send_termination_messages(int num_threads) {
struct msg message = {0};
// We don't need to send one to mailbox zero. We only need to get send them for all other mailboxes.
for (int i = 1; i <= num_threads; i++) {
message.value = -1;
int send_result = SendMsg(i, &message);
if (send_result != 0) {
return i;
}
}
return 0;
}
/**
* Send all of the inputs and the termination messages to adder threads
*
* @param inputs An array of all of the inputs to send.
* @param num_inputs The number of items in the inputs array.
* @param num_threads The number of threads that are running. Termination signals will be sent to all threads from
* 1 through num_threads.
*
* @return int -1
*/
static int handle_blocking_input(struct input *inputs, int num_inputs, int num_threads) {
int send_result = send_input_messages(inputs, num_inputs);
if (send_result != 0) {
printf("Failed to send input to mailbox #%d.\n", send_result);
return -1;
}
int termination_result = send_termination_messages(num_threads);
if (termination_result != 0) {
printf("Failed to send termination messsages to mailbox #%d.\n", termination_result);
return -1;
}
return 0;
}
/**
* A implementation of cycling over all of the given messages until they send.
*
@ -195,28 +243,6 @@ static int cycle_send_input_messages(struct input *inputs, int n) {
return 0;
}
/**
* Send termination messages to all of the threads from 1 through num_threads
*
* @param inputs An array of lal of the inputs to send.
* @param n The number of inputs provided.
*
* @return int 0 on success, or the index of the mailbox that failed to send otherwise.
*/
static int send_termination_messages(int num_threads) {
struct msg message = {0};
// We don't need to send one to mailbox zero. We only need to get send them for all other mailboxes.
for (int i = 1; i <= num_threads; i++) {
message.value = -1;
int send_result = SendMsg(i, &message);
if (send_result != 0) {
return i;
}
}
return 0;
}
/**
* Collect n messages from mailbox 0
*
@ -298,24 +324,30 @@ int main(int argc, char *argv[]) {
int send_result;
if (non_blocking) {
send_result = cycle_send_input_messages(inputs, num_inputs);
// TODO: Remove this once we fix non-blocking termination
if (send_result != 0) {
printf("Failed to send to mailbox.\n");
}
} else {
send_result = send_input_messages(inputs, num_inputs);
send_result = handle_blocking_input(inputs, num_inputs, num_adder_threads);
}
free(inputs);
if (send_result != 0) {
printf("Failed to send input to mailbox %d\n", send_result);
kill_threads(adder_threads, num_adder_threads);
free_mailboxes();
return 1;
}
send_result = send_termination_messages(num_adder_threads);
if (send_result != 0) {
printf("Failed to send termination message to mailbox.\n");
kill_threads(adder_threads, num_adder_threads);
free_mailboxes();
return 1;
// TODO: Remove this when we fix non-blocking termination
if (non_blocking) {
send_result = send_termination_messages(num_adder_threads);
if (send_result != 0) {
printf("Failed to send termination message to mailbox.\n");
kill_threads(adder_threads, num_adder_threads);
free_mailboxes();
return 1;
}
}
int join_result = join_adder_threads(adder_threads, num_adder_threads);