#include #include #include #include #include #include #include #include "adder.h" #include "mailbox.h" #include "main.h" #include "queue.h" /** * Parse a line of input into an input struct. * * @param input_line A line of input. * @param res An already allocated input struct to store the result in. * * @return int -1 if the input is malformed, and 0 otherwise. */ static int parse_input_line(char *input_line, struct input *res) { // if tmp has anything in it, then we know that extra values have been entered. char tmp[2]; int num_matched = sscanf(input_line, " %d %d %1s", &(res->value), &(res->recipient), tmp); return num_matched == 2 ? 0 : -1; } /** * Convert the given input to an array of input structs. * * @param num_threads The number of threads that the program has been spawned with. * @param res A pointer that will be allocated as an array of inputs. Must be freed by the caller. * * @return int the number of input lines that were read. */ static int get_input(int num_threads, struct input **res) { int input_index = 0; int current_buffer_size = BUFFER_SIZE; char *input_buffer = malloc(current_buffer_size); // Keep reaidng until we hit an EOF while (true) { char *read_result; while((read_result = fgets(input_buffer, current_buffer_size, stdin))) { int read_size = strlen(input_buffer); if (input_buffer[read_size - 1] == '\n') { break; } // Grow our buffer if we nede to read more input. current_buffer_size *= 2; input_buffer = realloc(input_buffer, current_buffer_size); } if (read_result == NULL) { break; } struct input current_input; int parse_result = parse_input_line(input_buffer, ¤t_input); if (parse_result == -1) { // On bad input, we should assume an EOF and stop. break; } else if (current_input.recipient > num_threads || current_input.recipient <= 0) { printf("Invalid recipient id.\n"); continue; } else if (current_input.value < 0) { printf("Invalid value.\n"); continue; } *res = realloc(*res, sizeof(struct input) * (input_index + 1)); memcpy(*res + input_index, ¤t_input, sizeof(struct input)); input_index++; } free(input_buffer); return input_index; } /** * Kills the given thread. Should only be used in exceptional circumstances. * * @param thread The thread to kill. * @param free_ret If true, the memory associated with the return value will be freed. */ static void kill_thread(pthread_t thread, bool free_ret) { pthread_cancel(thread); void *ret; pthread_join(thread, free_ret ? &ret : NULL); if (free_ret && ret != PTHREAD_CANCELED) { free(ret); } } /** * Kill all threads. Should only be used in exceptional circumstances. * * @param threads An array of threads to kill. * @param num_threads The number of threads to kill. * @param free_ret If true, the memory associated with the return value will be freed. */ static void kill_threads(pthread_t *threads, int num_threads, bool free_ret) { for (int i = 0; i < num_threads; i++) { kill_thread(threads[i], free_ret); } } /** * Join all of the given adder threads and output when they fail. * * @param threads The adder threads to join. * @param num_threads The number of threads to join. * * @return int -1 if one of the threads failed or failed to join, 0 otherwise. */ static int join_adder_threads(pthread_t *threads, int num_threads) { bool failed = false; for (int i = 0; i < num_threads; i++) { void *raw_retval; int join_result = pthread_join(threads[i], &raw_retval); if (join_result != 0) { printf("Failed to join thread #%d. (%s)\n", i + 1, strerror(join_result)); failed = true; continue; } int retval = *(int*)(raw_retval); if (retval != 0) { printf("Thread #%d failed.\n", i + 1); failed = true; } // Adder threads require that their retval be freed. free(raw_retval); } return failed ? -1 : 0; } /** * Start the adder threads * * @param thread_ids An array of all of the thread ids to spawn, in order. * @param num_threads The number of threads to spawn * @param threads A pre-allocated array to hold the spawned threads. * * @return int -1 on error, 0 otherwise. */ static int start_adder_threads(int *thread_ids, int num_threads, pthread_t *threads) { for (int i = 0; i < num_threads; i++) { int create_result = pthread_create(&threads[i], NULL, adder, &thread_ids[i]); if (create_result != 0) { // Kill the i threads that we have spawned. kill_threads(threads, i, true); return -1; } } return 0; } /** * Send all of the inputs as messages to the designated mailboxes. * * @param inputs An array of all of the inputs to send. * @param n The number of inputs provided. * * @return int 0 on success, or the index of the input that failed to send otherwise. */ static int send_input_messages(struct input *inputs, int n) { struct msg message = {0}; for (int i = 0; i < n; i++) { message.value = inputs[i].value; int send_result = SendMsg(inputs[i].recipient, &message); if (send_result != 0) { return i; } } return 0; } /** * Send termination messages to all of the threads from 1 through num_threads * * @param inputs An array of lal of the inputs to send. * @param n The number of inputs provided. * * @return int 0 on success, or the index of the mailbox that failed to send otherwise. */ static int send_termination_messages(int num_threads) { struct msg message = {0}; // We don't need to send one to mailbox zero. We only need to get send them for all other mailboxes. for (int i = 1; i <= num_threads; i++) { message.value = -1; int send_result = SendMsg(i, &message); if (send_result != 0) { return i; } } return 0; } /** * Send all of the inputs and the termination messages to adder threads * * @param inputs An array of all of the inputs to send. * @param num_inputs The number of items in the inputs array. * @param num_threads The number of threads that are running. Termination signals will be sent to all threads from * 1 through num_threads. * * @return int -1 on error, 0 otherwise. */ static int handle_blocking_input(struct input *inputs, int num_inputs, int num_threads) { int send_result = send_input_messages(inputs, num_inputs); if (send_result != 0) { printf("Failed to send input to mailbox #%d.\n", send_result); return -1; } int termination_result = send_termination_messages(num_threads); if (termination_result != 0) { printf("Failed to send termination messsages to mailbox #%d.\n", termination_result); return -1; } return 0; } /** * Generate a termination input for every thread that does not have any inputs. Entries must be freed by the caller * when dequeued. * * @param message_counts A count of the messages for each thread, with thread 1 indexed at position zero. * @param num_threads The number of adder threads. * @param res_queue an already allocated queue with enough capacity to hold all threads. * * @return -1 on error, 0 otherwise. */ static int generate_non_inputted_mailbox_terminations(int *message_counts, int num_threads, struct queue *res_queue) { for (int i = 0; i < num_threads; i++) { if (message_counts[i] > 0) { continue; } struct input *termination_input = malloc(sizeof(struct input)); termination_input->recipient = i + 1; termination_input->value = -1; int enqueue_result = priority_enqueue(res_queue, termination_input); if (enqueue_result != 0) { // There is a potential for a memory leak here, but it can only happen if there's programmer error // (not enough capacity in the queue), so I'm not worried about it. return -1; } } return 0; } /** * Get the number of messages for each thread in the given input array. * * @param inputs An array of all of the inputs to process. * @param num_inputs The number of inputs provided. * @param counts An allocated int array of num_threads length to hold the counts * @param num_threads The number of adder threads */ static void generate_message_counts(struct input *inputs, int num_inputs, int *counts, int num_threads) { memset(counts, 0, sizeof(int) * num_threads); for (int i = 0; i < num_inputs; i++) { struct input current_input = inputs[i]; // Because thread ids go from [1, n], we need to shift them down. counts[current_input.recipient - 1]++; } } /** * A implementation of cycling over all of the given messages until they send. * * @param inputs An array of all of the inputs to send. * @param num_inputs The number of inputs provided. * @param num_threads The number of adder threads. * * @return int 0 on success, or -1 on error. */ static int cycle_send_input_messages(struct input *inputs, int num_inputs, int num_threads) { struct queue input_queue = init_queue(num_inputs + num_threads); for (int i = 0; i < num_inputs; i++) { // So we can free the inputs we generate dynamically, we're going to reallocate the inputs for elegance's sake. // A bit wasteful, but it allows for a cleaner implementation. struct input *input_to_queue = malloc(sizeof(struct input)); memcpy(input_to_queue, &inputs[i], sizeof(struct input)); enqueue(&input_queue, input_to_queue); } int message_counts[num_threads]; // Get counts for all of the messages and generate termination messages for those that need to terminate // before we start sending actual inputs. generate_message_counts(inputs, num_inputs, message_counts, num_threads); int termination_enqueue_result = generate_non_inputted_mailbox_terminations(message_counts, num_threads, &input_queue); if (termination_enqueue_result != 0) { free_queue(&input_queue); return -1; } struct msg message = {0}; void *raw_input; while (dequeue(&input_queue, &raw_input) != -1) { struct input *input = raw_input; message.value = input->value; int send_result = NBSendMsg(input->recipient, &message); int recipient = input->recipient; if (send_result != 0 && errno != EAGAIN) { free_queue(&input_queue); return -1; } else if (send_result != 0 && errno == EAGAIN) { enqueue(&input_queue, input); continue; } else { free(input); message_counts[recipient - 1]--; } if (message_counts[recipient - 1] == 0) { struct input *input_to_queue = malloc(sizeof(struct input)); input_to_queue->recipient = recipient; input_to_queue->value = -1; int enqueue_result = priority_enqueue(&input_queue, input_to_queue); if (enqueue_result != 0) { free_queue(&input_queue); return -1; } } } free_queue(&input_queue); return 0; } /** * Send all of the inputs and the termination messages to adder threads without blocking on sends. * * @param inputs An array of all of the inputs to send. * @param num_inputs The number of items in the inputs array. * @param num_threads The number of threads that are running. Termination signals will be sent to all threads from * 1 through num_threads. * * @return int -1 on error, 0 otherwise. */ static int handle_nonblocking_input(struct input *inputs, int num_inputs, int num_threads) { int send_result = cycle_send_input_messages(inputs, num_inputs, num_threads); if (send_result != 0) { printf("Failed to send input to a mailbox.\n"); return -1; } return 0; } /** * Collect n messages from mailbox 0 * * @param raw_num_threads A pointer to an integer that contains the number of threads to recieve messages from.:w * * @return void* Not used. */ static void *collect_results(void *raw_n) { int n = *(int*)(raw_n); struct msg message; for (int i = 0; i < n; i++) { int recv_result = RecvMsg(0, &message); if (recv_result == -1) { printf("Failed to receive message from adder thread.\n"); continue; } printf(OUTPUT_FORMAT, message.iFrom, message.value, message.cnt, message.tot); } return NULL; } int main(int argc, char *argv[]) { struct input *inputs = NULL; bool non_blocking = false; if (argc < 2 || argc > 3) { printf("%s\n", USAGE_STRING); return -1; } else if (argc == 3) { non_blocking = true; } int num_adder_threads = strtol(argv[1], NULL, 10); if (errno == ERANGE || (num_adder_threads == 0 && errno == EINVAL)) { printf("Invalid number of threads.\n"); } num_adder_threads = num_adder_threads > MAXTHREAD ? MAXTHREAD : num_adder_threads; int num_inputs = get_input(num_adder_threads, &inputs); // Init one mailbox for each thread, and an additional one for the main thread. int init_result = init_mailboxes(num_adder_threads + 1); if (init_result != 0) { printf("Failed to setup mailboxes.\n"); free(inputs); return 1; } // Must allocate an array of the ids to prevent a race condition when adder starts // We don't want the thread id to be deallocated. // We don't want to allocate an id for thread id 0, so we store all of the others. int mailbox_ids[num_adder_threads]; for (int i = 0; i < num_adder_threads; i++) { mailbox_ids[i] = i + 1; } pthread_t adder_threads[num_adder_threads]; int start_result = start_adder_threads(mailbox_ids, num_adder_threads, adder_threads); if (start_result != 0) { printf("Failed to start adder threads. (%s)\n", strerror(errno)); free(inputs); free_mailboxes(); return 1; } pthread_t collect_thread; int spawn_result = pthread_create(&collect_thread, NULL, collect_results, &num_adder_threads); if (spawn_result != 0) { printf("Failed to spawn collection thread.\n"); free(inputs); kill_threads(adder_threads, num_adder_threads, true); free_mailboxes(); return 1; } int send_result; if (non_blocking) { send_result = handle_nonblocking_input(inputs, num_inputs, num_adder_threads); } else { send_result = handle_blocking_input(inputs, num_inputs, num_adder_threads); } free(inputs); if (send_result != 0) { kill_threads(adder_threads, num_adder_threads, true); kill_thread(collect_thread, false); free_mailboxes(); return 1; } int join_result = join_adder_threads(adder_threads, num_adder_threads); if (join_result != 0) { // If one of the joins failed, we have to tell the collection thread we're done. pthread_cancel(collect_thread); } pthread_join(collect_thread, NULL); free_mailboxes(); }