Add ability to terminate threads with no input immediately

master
Nick Krichevsky 2019-09-25 23:23:50 -04:00
parent 6c33d90aaf
commit 518cfd2992
1 changed files with 51 additions and 6 deletions

57
main.c
View File

@ -211,19 +211,64 @@ static int handle_blocking_input(struct input *inputs, int num_inputs, int num_t
return 0;
}
/**
* Generate a termination input for every thread that does not have an entry in the input list. Entries must be freed
* by the caller when dequeued.
*
* @param inputs An array of all of the inputs to send.
* @param num_inputs The number of inputs provided.
* @param num_threads The number of adder threads.
* @param res_queue an already allocated queue with enough capacity to hold all threads.
*
* @return -1 on error, 0 otherwise.
*/
static int generate_non_inputted_mailbox_terminations(struct input *inputs, int num_inputs, int num_threads, struct queue *res_queue) {
bool have_message[num_threads];
memset(have_message, false, sizeof(bool) * num_threads);
for (int i = 0; i < num_inputs; i++) {
int recipient = inputs[i].recipient;
// Someone can enter a mailbox of [1, n] with n threads, so we must subtract one so it fits.
have_message[recipient - 1] = true;
}
for (int i = 0; i < num_threads; i++) {
if (have_message[i]) {
continue;
}
struct input *termination_input = malloc(sizeof(struct input));
termination_input->recipient = i + 1;
termination_input->value = -1;
int enqueue_result = priority_enqueue(res_queue, termination_input);
if (enqueue_result != 0) {
// There is a potential for a memory leak here, but it can only happen if there's programmer error
// (not enough capacity in the queue), so I'm not worried about it.
return -1;
}
}
return 0;
}
/**
* A implementation of cycling over all of the given messages until they send.
*
* @param inputs An array of lal of the inputs to send.
* @param n The number of inputs provided.
* @param inputs An array of all of the inputs to send.
* @param num_inputs The number of inputs provided.
* @param num_threads The number of adder threads.
*
* @return int 0 on success, or -1 on error.
*/
static int cycle_send_input_messages(struct input *inputs, int n) {
struct queue input_queue = init_queue(n);
for (int i = 0; i < n; i++) {
static int cycle_send_input_messages(struct input *inputs, int num_inputs, int num_threads) {
struct queue input_queue = init_queue(num_inputs + num_threads);
for (int i = 0; i < num_inputs; i++) {
enqueue(&input_queue, &inputs[i]);
}
int termination_enqueue_result = generate_non_inputted_mailbox_terminations(inputs, num_inputs, num_threads, &input_queue);
if (termination_enqueue_result != 0) {
free_queue(&input_queue);
return -1;
}
struct msg message = {0};
void *raw_input;
@ -323,7 +368,7 @@ int main(int argc, char *argv[]) {
int send_result;
if (non_blocking) {
send_result = cycle_send_input_messages(inputs, num_inputs);
send_result = cycle_send_input_messages(inputs, num_inputs, num_adder_threads);
// TODO: Remove this once we fix non-blocking termination
if (send_result != 0) {
printf("Failed to send to mailbox.\n");