Refactor broker to separate task data into its own structure

This commit is contained in:
Nick Krichevsky 2022-05-21 10:20:05 -04:00
parent ca69471067
commit 3417870edb
2 changed files with 40 additions and 20 deletions

View file

@ -52,17 +52,21 @@ pub trait MessageFetcher {
async fn fetch_message(&self, sequence_number: SequenceNumber) -> Result<String, Self::Error>; async fn fetch_message(&self, sequence_number: SequenceNumber) -> Result<String, Self::Error>;
} }
struct FetchTask<F: MessageFetcher, O: Sink<String> + Clone> {
fetcher: Arc<Mutex<F>>,
output_sink: O,
}
pub struct Broker<W, F, S, O> pub struct Broker<W, F, S, O>
where where
W: SequenceNumberStreamer, W: SequenceNumberStreamer,
F: MessageFetcher, F: MessageFetcher,
S: Spawn, S: Spawn,
O: Sink<String>, O: Sink<String> + Clone,
{ {
watcher: W, watcher: W,
fetcher: Arc<Mutex<F>>, task_data: FetchTask<F, O>,
spawner: S, spawner: S,
output_sink: O,
} }
impl<W, F, S, O> Broker<W, F, S, O> impl<W, F, S, O> Broker<W, F, S, O>
@ -78,9 +82,8 @@ where
pub fn new(watcher: W, fetcher: F, spawner: S, output_sink: O) -> Self { pub fn new(watcher: W, fetcher: F, spawner: S, output_sink: O) -> Self {
Self { Self {
watcher, watcher,
fetcher: Arc::new(Mutex::new(fetcher)), task_data: FetchTask::new(fetcher, output_sink),
spawner, spawner,
output_sink,
} }
} }
@ -93,12 +96,9 @@ where
let mut stream = self.watcher.watch_for_new_messages().await?; let mut stream = self.watcher.watch_for_new_messages().await?;
while let Some(sequence_number) = stream.next().await { while let Some(sequence_number) = stream.next().await {
let fetcher = self.fetcher.clone(); let mut task_data = self.task_data.clone();
let output_sink = self.output_sink.clone();
let spawn_res = self.spawner.spawn(async move { let spawn_res = self.spawner.spawn(async move {
// Because this is an associated function, we must put this in an async block so that task_data.fetch_and_sink_message(sequence_number).await;
// we don't have to restrict the lifetimes of our non-static generics.
Self::fetch_and_sink_message(sequence_number, fetcher, output_sink).await;
}); });
if let Err(spawn_err) = spawn_res { if let Err(spawn_err) = spawn_res {
error!( error!(
@ -109,15 +109,25 @@ where
Ok(()) Ok(())
} }
}
// TODO: Should this use a TaskData like we did in MessageFetcher, rather than taking impl<F, O> FetchTask<F, O>
// onership of clones? where
async fn fetch_and_sink_message( F: MessageFetcher,
sequence_number: SequenceNumber, O: Sink<String> + Clone + Send + Sync + Unpin,
fetcher: Arc<Mutex<F>>, O::Error: Debug,
mut output_sink: O, {
) { pub fn new(fetcher: F, output_sink: O) -> Self {
let fetcher = fetcher.lock().await; Self {
fetcher: Arc::new(Mutex::new(fetcher)),
output_sink,
}
}
/// Fetch a message with the given sequence number, and send its output to this Task's
/// output sink. Errors for this are logged, as this is intended to be run in a forked off task.
pub async fn fetch_and_sink_message(&mut self, sequence_number: SequenceNumber) {
let fetcher = self.fetcher.lock().await;
let fetch_result = fetcher.fetch_message(sequence_number).await; let fetch_result = fetcher.fetch_message(sequence_number).await;
if let Err(fetch_err) = fetch_result { if let Err(fetch_err) = fetch_result {
@ -126,13 +136,23 @@ where
} }
let msg = fetch_result.unwrap(); let msg = fetch_result.unwrap();
let send_res = output_sink.send(msg).await; let send_res = self.output_sink.send(msg).await;
if let Err(send_err) = send_res { if let Err(send_err) = send_res {
error!("failed to send fetched message to output stream {send_err:?}"); error!("failed to send fetched message to output stream {send_err:?}");
} }
} }
} }
impl<F: MessageFetcher, O: Sink<String> + Clone> Clone for FetchTask<F, O> {
fn clone(&self) -> Self {
Self {
// because `fetcher` is wrapped in an Arc, we must derive Clone manually.
fetcher: self.fetcher.clone(),
output_sink: self.output_sink.clone(),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{ use std::{

View file

@ -39,7 +39,7 @@ pub enum WatchError {
SpawnError(SpawnError), SpawnError(SpawnError),
} }
/// Holds any data that watch tasks my access during a `Watcher`'s stream. /// Holds any data that watch tasks may be accessed during a `Watcher`'s stream.
struct WatchTaskData<G: SessionGenerator> { struct WatchTaskData<G: SessionGenerator> {
session_generator: G, session_generator: G,
// atomics are used here so that I don't have to fiddle with a lock around the entire task // atomics are used here so that I don't have to fiddle with a lock around the entire task