290 lines
8.1 KiB
C
290 lines
8.1 KiB
C
#include <errno.h>
|
|
#include <pthread.h>
|
|
#include <signal.h>
|
|
#include <stdbool.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include "adder.h"
|
|
#include "mailbox.h"
|
|
#include "main.h"
|
|
|
|
/**
|
|
* Parse a line of input into an input struct.
|
|
*
|
|
* @param input_line A line of input.
|
|
* @param res An already allocated input struct to store the result in.
|
|
*
|
|
* @return int -1 if the input is malformed, and 0 otherwise.
|
|
*/
|
|
static int parse_input_line(char *input_line, struct input *res) {
|
|
// if tmp has anything in it, then we know that extra values have been entered.
|
|
char tmp[2];
|
|
int num_matched = sscanf(input_line, " %d %d %1s", &(res->value), &(res->recipient), tmp);
|
|
|
|
return num_matched == 2 ? 0 : -1;
|
|
}
|
|
|
|
/**
|
|
* Convert the given input to an array of input structs.
|
|
*
|
|
* @param num_threads The number of threads that the program has been spawned with.
|
|
* @param res A pointer that will be allocated as an array of inputs. Must be freed by the caller.
|
|
*
|
|
* @return int the number of input lines that were read.
|
|
*/
|
|
static int get_input(int num_threads, struct input **res) {
|
|
int input_index = 0;
|
|
int current_buffer_size = BUFFER_SIZE;
|
|
char *input_buffer = malloc(current_buffer_size);
|
|
// Keep reaidng until we hit an EOF
|
|
while (true) {
|
|
char *read_result;
|
|
while((read_result = fgets(input_buffer, current_buffer_size, stdin))) {
|
|
int read_size = strlen(input_buffer);
|
|
if (input_buffer[read_size - 1] == '\n') {
|
|
break;
|
|
}
|
|
|
|
// Grow our buffer if we nede to read more input.
|
|
current_buffer_size *= 2;
|
|
input_buffer = realloc(input_buffer, current_buffer_size);
|
|
}
|
|
if (read_result == NULL) {
|
|
break;
|
|
}
|
|
|
|
struct input current_input;
|
|
int parse_result = parse_input_line(input_buffer, ¤t_input);
|
|
if (parse_result == -1) {
|
|
// On bad input, we should assume an EOF and stop.
|
|
break;
|
|
} else if (current_input.recipient > num_threads || current_input.recipient < 0) {
|
|
printf("Invalid recipient id.\n");
|
|
continue;
|
|
}
|
|
|
|
*res = realloc(*res, sizeof(struct input) * (input_index + 1));
|
|
memcpy(*res + input_index, ¤t_input, sizeof(struct input));
|
|
input_index++;
|
|
}
|
|
|
|
free(input_buffer);
|
|
|
|
return input_index;
|
|
}
|
|
|
|
/**
|
|
* Kill all threads. Should only be used in exceptional circumstances.
|
|
*
|
|
* @param threads An array of threads to kill.
|
|
* @param num_threads The number of threads to kill.
|
|
*/
|
|
static void kill_threads(pthread_t *threads, int num_threads) {
|
|
for (int i = 0; i < num_threads; i++) {
|
|
pthread_cancel(threads[i]);
|
|
pthread_join(threads[i], NULL);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Join all of the given adder threads and output when they fail.
|
|
*
|
|
* @param threads The adder threads to join.
|
|
* @param num_threads The number of threads to join.
|
|
*
|
|
* @return int -1 if one of the threads failed or failed to join, 0 otherwise.
|
|
*/
|
|
static int join_adder_threads(pthread_t *threads, int num_threads) {
|
|
bool failed = false;
|
|
for (int i = 0; i < num_threads; i++) {
|
|
void *raw_retval;
|
|
int join_result = pthread_join(threads[i], &raw_retval);
|
|
if (join_result != 0) {
|
|
printf("Failed to join thread #%d. (%s)\n", i + 1, strerror(join_result));
|
|
failed = true;
|
|
continue;
|
|
}
|
|
|
|
int retval = *(int*)(raw_retval);
|
|
if (retval != 0) {
|
|
printf("Thread #%d failed.\n", i + 1);
|
|
failed = true;
|
|
}
|
|
// Adder threads require that their retval be freed.
|
|
free(raw_retval);
|
|
}
|
|
|
|
return failed ? -1 : 0;
|
|
}
|
|
|
|
/**
|
|
* Start the adder threads
|
|
*
|
|
* @param thread_ids An array of all of the thread ids to spawn, in order.
|
|
* @param num_threads The number of threads to spawn
|
|
* @param threads A pre-allocated array to hold the spawned threads.
|
|
*
|
|
* @return int -1 on error, 0 otherwise.
|
|
*/
|
|
static int start_adder_threads(int *thread_ids, int num_threads, pthread_t *threads) {
|
|
for (int i = 0; i < num_threads; i++) {
|
|
int create_result = pthread_create(&threads[i], NULL, adder, &thread_ids[i]);
|
|
if (create_result != 0) {
|
|
// Kill the i threads that we have spawned.
|
|
kill_threads(threads, i);
|
|
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Send all of the inputs as messages to the designated mailboxes.
|
|
*
|
|
* @param inputs An array of lal of the inputs to send.
|
|
* @param n The number of inputs provided.
|
|
*
|
|
* @return int 0 on success, or the index of the input that failed to send otherwise.
|
|
*/
|
|
static int send_input_messages(struct input *inputs, int n) {
|
|
struct msg message = {0};
|
|
for (int i = 0; i < n; i++) {
|
|
message.value = inputs[i].value;
|
|
int send_result = SendMsg(inputs[i].recipient, &message);
|
|
if (send_result != 0) {
|
|
return i;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Send termination messages to all of the threads from 1 through num_threads
|
|
*
|
|
* @param inputs An array of lal of the inputs to send.
|
|
* @param n The number of inputs provided.
|
|
*
|
|
* @return int 0 on success, or the index of the mailbox that failed to send otherwise.
|
|
*/
|
|
static int send_termination_messages(int num_threads) {
|
|
struct msg message = {0};
|
|
// We don't need to send one to mailbox zero. We only need to get send them for all other mailboxes.
|
|
for (int i = 1; i <= num_threads; i++) {
|
|
message.value = -1;
|
|
int send_result = SendMsg(i, &message);
|
|
if (send_result != 0) {
|
|
return i;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Collect n messages from mailbox 0
|
|
*
|
|
* @param raw_num_threads A pointer to an integer that contains the number of threads to recieve messages from.:w
|
|
*
|
|
* @return void* Not used.
|
|
*/
|
|
static void *collect_results(void *raw_n) {
|
|
int n = *(int*)(raw_n);
|
|
struct msg message;
|
|
for (int i = 0; i < n; i++) {
|
|
int recv_result = RecvMsg(0, &message);
|
|
if (recv_result == -1) {
|
|
printf("Failed to receive message from adder thread.\n");
|
|
continue;
|
|
}
|
|
|
|
printf(OUTPUT_FORMAT, message.iFrom, message.value, message.cnt, message.tot);
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
int main(int argc, char *argv[]) {
|
|
struct input *inputs = NULL;
|
|
if (argc < 2 || argc > 3) {
|
|
printf("%s\n", USAGE_STRING);
|
|
return -1;
|
|
} else if (argc == 3) {
|
|
printf("Not yet implemented.");
|
|
return -1;
|
|
}
|
|
|
|
int num_adder_threads = strtol(argv[1], NULL, 10);
|
|
if (errno == ERANGE || (num_adder_threads == 0 && errno == EINVAL)) {
|
|
printf("Invalid number of threads.\n");
|
|
}
|
|
num_adder_threads = num_adder_threads > MAXTHREAD ? MAXTHREAD : num_adder_threads;
|
|
|
|
// TODO: Remove this. It's for debugging.
|
|
int num_inputs = get_input(num_adder_threads, &inputs);
|
|
for (int i = 0; i < num_inputs; i++) {
|
|
printf("#%d recipient=%d value=%d\n", i, inputs[i].recipient, inputs[i].value);
|
|
}
|
|
|
|
// Init one mailbox for each thread, and an additional one for the main thread.
|
|
int init_result = init_mailboxes(num_adder_threads + 1);
|
|
if (init_result != 0) {
|
|
printf("Failed to setup mailboxes.\n");
|
|
free(inputs);
|
|
return 1;
|
|
}
|
|
|
|
// Must allocate an array of the ids to prevent a race condition when adder starts
|
|
// We don't want the thread id to be deallocated.
|
|
// We don't want to allocate an id for thread id 0, so we store all of the others.
|
|
int mailbox_ids[num_adder_threads];
|
|
for (int i = 0; i < num_adder_threads; i++) {
|
|
mailbox_ids[i] = i + 1;
|
|
}
|
|
pthread_t adder_threads[num_adder_threads];
|
|
int start_result = start_adder_threads(mailbox_ids, num_adder_threads, adder_threads);
|
|
if (start_result != 0) {
|
|
printf("Failed to start adder threads. (%s)\n", strerror(errno));
|
|
free(inputs);
|
|
free_mailboxes();
|
|
return 1;
|
|
}
|
|
|
|
pthread_t collect_thread;
|
|
int spawn_result = pthread_create(&collect_thread, NULL, collect_results, &num_adder_threads);
|
|
if (spawn_result != 0) {
|
|
printf("Failed to spawn collection thread.\n");
|
|
free(inputs);
|
|
kill_threads(adder_threads, num_adder_threads);
|
|
return 1;
|
|
}
|
|
|
|
int send_result = send_input_messages(inputs, num_inputs);
|
|
free(inputs);
|
|
if (send_result != 0) {
|
|
printf("Failed to send input to mailbox %d\n", send_result);
|
|
kill_threads(adder_threads, num_adder_threads);
|
|
free_mailboxes();
|
|
return 1;
|
|
}
|
|
|
|
send_result = send_termination_messages(num_adder_threads);
|
|
if (send_result != 0) {
|
|
printf("Failed to send termination message to mailbox %d\n", send_result);
|
|
kill_threads(adder_threads, num_adder_threads);
|
|
free_mailboxes();
|
|
return 1;
|
|
}
|
|
|
|
int join_result = join_adder_threads(adder_threads, num_adder_threads);
|
|
if (join_result != 0) {
|
|
// If one of the joins failed, we have to tell the collection thread we're done.
|
|
pthread_cancel(collect_thread);
|
|
}
|
|
pthread_join(collect_thread, NULL);
|
|
|
|
free_mailboxes();
|
|
} |