From 3417870edbd47eb5d4a543f8e0b18d56caaca62b Mon Sep 17 00:00:00 2001 From: Nick Krichevsky Date: Sat, 21 May 2022 10:20:05 -0400 Subject: [PATCH] Refactor broker to separate task data into its own structure --- src/email.rs | 58 +++++++++++++++++++++++++++++++--------------- src/email/inbox.rs | 2 +- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/email.rs b/src/email.rs index affaae0..1d28d90 100644 --- a/src/email.rs +++ b/src/email.rs @@ -52,17 +52,21 @@ pub trait MessageFetcher { async fn fetch_message(&self, sequence_number: SequenceNumber) -> Result; } +struct FetchTask + Clone> { + fetcher: Arc>, + output_sink: O, +} + pub struct Broker where W: SequenceNumberStreamer, F: MessageFetcher, S: Spawn, - O: Sink, + O: Sink + Clone, { watcher: W, - fetcher: Arc>, + task_data: FetchTask, spawner: S, - output_sink: O, } impl Broker @@ -78,9 +82,8 @@ where pub fn new(watcher: W, fetcher: F, spawner: S, output_sink: O) -> Self { Self { watcher, - fetcher: Arc::new(Mutex::new(fetcher)), + task_data: FetchTask::new(fetcher, output_sink), spawner, - output_sink, } } @@ -93,12 +96,9 @@ where let mut stream = self.watcher.watch_for_new_messages().await?; while let Some(sequence_number) = stream.next().await { - let fetcher = self.fetcher.clone(); - let output_sink = self.output_sink.clone(); + let mut task_data = self.task_data.clone(); let spawn_res = self.spawner.spawn(async move { - // Because this is an associated function, we must put this in an async block so that - // we don't have to restrict the lifetimes of our non-static generics. - Self::fetch_and_sink_message(sequence_number, fetcher, output_sink).await; + task_data.fetch_and_sink_message(sequence_number).await; }); if let Err(spawn_err) = spawn_res { error!( @@ -109,15 +109,25 @@ where Ok(()) } +} - // TODO: Should this use a TaskData like we did in MessageFetcher, rather than taking - // onership of clones? - async fn fetch_and_sink_message( - sequence_number: SequenceNumber, - fetcher: Arc>, - mut output_sink: O, - ) { - let fetcher = fetcher.lock().await; +impl FetchTask +where + F: MessageFetcher, + O: Sink + Clone + Send + Sync + Unpin, + O::Error: Debug, +{ + pub fn new(fetcher: F, output_sink: O) -> Self { + Self { + fetcher: Arc::new(Mutex::new(fetcher)), + output_sink, + } + } + + /// Fetch a message with the given sequence number, and send its output to this Task's + /// output sink. Errors for this are logged, as this is intended to be run in a forked off task. + pub async fn fetch_and_sink_message(&mut self, sequence_number: SequenceNumber) { + let fetcher = self.fetcher.lock().await; let fetch_result = fetcher.fetch_message(sequence_number).await; if let Err(fetch_err) = fetch_result { @@ -126,13 +136,23 @@ where } let msg = fetch_result.unwrap(); - let send_res = output_sink.send(msg).await; + let send_res = self.output_sink.send(msg).await; if let Err(send_err) = send_res { error!("failed to send fetched message to output stream {send_err:?}"); } } } +impl + Clone> Clone for FetchTask { + fn clone(&self) -> Self { + Self { + // because `fetcher` is wrapped in an Arc, we must derive Clone manually. + fetcher: self.fetcher.clone(), + output_sink: self.output_sink.clone(), + } + } +} + #[cfg(test)] mod tests { use std::{ diff --git a/src/email/inbox.rs b/src/email/inbox.rs index 132579e..91da70d 100644 --- a/src/email/inbox.rs +++ b/src/email/inbox.rs @@ -39,7 +39,7 @@ pub enum WatchError { SpawnError(SpawnError), } -/// Holds any data that watch tasks my access during a `Watcher`'s stream. +/// Holds any data that watch tasks may be accessed during a `Watcher`'s stream. struct WatchTaskData { session_generator: G, // atomics are used here so that I don't have to fiddle with a lock around the entire task