diff --git a/src/email.rs b/src/email.rs index 7f5aa36..affaae0 100644 --- a/src/email.rs +++ b/src/email.rs @@ -1,13 +1,36 @@ -use futures::Stream; -use std::error::Error; +use futures::{lock::Mutex, Sink, SinkExt, Stream, StreamExt}; +use std::{ + error::Error, + fmt::{Debug, Display, Formatter}, + sync::Arc, +}; + +use crate::task::Spawn; -use self::inbox::SequenceNumber; use async_trait::async_trait; pub mod inbox; pub mod login; pub mod message; +/// The ID of a single message stored in the inbox. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[cfg_attr(test, derive(Hash))] +pub struct SequenceNumber(u32); + +impl SequenceNumber { + /// Get the integral value of this sequence number + pub fn value(self) -> u32 { + self.0 + } +} + +impl Display for SequenceNumber { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value()) + } +} + /// `SequenceNumberStreamer` will stream new emails, and then return their sequence numbers to the stream. #[async_trait] pub trait SequenceNumberStreamer { @@ -29,7 +52,240 @@ pub trait MessageFetcher { async fn fetch_message(&self, sequence_number: SequenceNumber) -> Result; } -pub struct Broker { +pub struct Broker +where + W: SequenceNumberStreamer, + F: MessageFetcher, + S: Spawn, + O: Sink, +{ watcher: W, - fetcher: F, + fetcher: Arc>, + spawner: S, + output_sink: O, +} + +impl Broker +where + W: SequenceNumberStreamer, + W::Stream: Unpin, + F: MessageFetcher + Send + Sync + 'static, + F::Error: Send, + S: Spawn, + O: Sink + Unpin + Clone + Send + Sync + 'static, + O::Error: Debug, +{ + pub fn new(watcher: W, fetcher: F, spawner: S, output_sink: O) -> Self { + Self { + watcher, + fetcher: Arc::new(Mutex::new(fetcher)), + spawner, + output_sink, + } + } + + /// Stream any incoming mesages from the [`SequenceNumberStreamer`] to the output sink. + /// + /// # Errors + /// Returns an error if there was a problem in setting up the stream. Individual message failures + /// will be logged. + pub async fn stream_incoming_messages_to_sink(&mut self) -> Result<(), W::Error> { + let mut stream = self.watcher.watch_for_new_messages().await?; + + while let Some(sequence_number) = stream.next().await { + let fetcher = self.fetcher.clone(); + let output_sink = self.output_sink.clone(); + let spawn_res = self.spawner.spawn(async move { + // Because this is an associated function, we must put this in an async block so that + // we don't have to restrict the lifetimes of our non-static generics. + Self::fetch_and_sink_message(sequence_number, fetcher, output_sink).await; + }); + if let Err(spawn_err) = spawn_res { + error!( + "failed to spawn task to fetch sequence number {sequence_number}: {spawn_err:?}", + ); + } + } + + Ok(()) + } + + // TODO: Should this use a TaskData like we did in MessageFetcher, rather than taking + // onership of clones? + async fn fetch_and_sink_message( + sequence_number: SequenceNumber, + fetcher: Arc>, + mut output_sink: O, + ) { + let fetcher = fetcher.lock().await; + + let fetch_result = fetcher.fetch_message(sequence_number).await; + if let Err(fetch_err) = fetch_result { + error!("failed to fetch message: {:?}", fetch_err); + return; + } + + let msg = fetch_result.unwrap(); + let send_res = output_sink.send(msg).await; + if let Err(send_err) = send_res { + error!("failed to send fetched message to output stream {send_err:?}"); + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + fmt::{Display, Formatter}, + sync::Arc, + }; + + use crate::task::{Cancel, SpawnError}; + + use super::*; + use futures::{ + channel::mpsc::{self, UnboundedReceiver, UnboundedSender}, + lock::Mutex, + Future, FutureExt, SinkExt, + }; + use futures::{select, StreamExt}; + use thiserror::Error; + + #[derive(Error, Debug)] + struct StringError(String); + + impl Display for StringError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } + } + + struct MockSequenceNumberStreamer { + sender: Arc>>, + receiver: Arc>>, + stopped: bool, + } + + impl MockSequenceNumberStreamer { + pub fn new() -> Self { + let (tx, rx) = mpsc::unbounded(); + Self { + sender: Arc::new(Mutex::new(tx)), + receiver: Arc::new(Mutex::new(rx)), + stopped: false, + } + } + } + + #[async_trait] + impl SequenceNumberStreamer for MockSequenceNumberStreamer { + type Stream = UnboundedReceiver; + type Error = StringError; + + async fn watch_for_new_messages(&mut self) -> Result { + let (mut tx, rx) = mpsc::unbounded(); + + // normally I'd use a spawner for this but this is a mock so it's ok... + let receiver_mutex = self.receiver.clone(); + tokio::spawn(async move { + let mut receiver = receiver_mutex.lock().await; + while let Some(msg) = receiver.next().await { + tx.send(msg).await.expect("send failed"); + } + }); + + Ok(rx) + } + + fn stop(&mut self) { + unimplemented!(); + } + } + + #[derive(Default)] + struct MockMessageFetcher { + messages: HashMap, + } + + impl MockMessageFetcher { + pub fn new() -> Self { + Self::default() + } + + pub fn stage_message(&mut self, seq: SequenceNumber, message: &str) { + self.messages.insert(seq, message.to_string()); + } + } + + #[async_trait] + impl MessageFetcher for MockMessageFetcher { + type Error = StringError; + + async fn fetch_message( + &self, + sequence_number: SequenceNumber, + ) -> Result { + self.messages.get(&sequence_number).cloned().ok_or_else(|| { + StringError(format!("sequence number {:?} not found", sequence_number)) + }) + } + } + + struct TokioSpawner; + impl Spawn for TokioSpawner { + type Cancel = CancelFnOnce; + + fn spawn(&self, future: F) -> Result + where + ::Output: Send, + { + let handle = tokio::spawn(future); + let canceler = CancelFnOnce { + cancel_func: Box::new(move || handle.abort()), + }; + + Ok(canceler) + } + } + + struct CancelFnOnce { + cancel_func: Box, + } + + impl Cancel for CancelFnOnce { + fn cancel(self) { + (self.cancel_func)(); + } + } + + #[tokio::test] + async fn test_messages_from_streamer_go_to_sink() { + let mock_watcher = MockSequenceNumberStreamer::new(); + let mut mock_fetcher = MockMessageFetcher::new(); + let (message_tx, mut message_rx) = mpsc::unbounded::(); + + let watcher_sender_mutex = mock_watcher.sender.clone(); + let mut watcher_sender = watcher_sender_mutex.lock().await; + + mock_fetcher.stage_message(SequenceNumber(123), "hello, world!"); + + let mut broker = Broker::new(mock_watcher, mock_fetcher, TokioSpawner, message_tx); + let join_handle = tokio::spawn(async move { + broker + .stream_incoming_messages_to_sink() + .await + .expect("stream failed"); + }); + + watcher_sender + .send(SequenceNumber(123)) + .await + .expect("send failed"); + + select! { + msg = message_rx.next() => assert_eq!("hello, world!", msg.expect("empty channel")), + _ = join_handle.fuse() => panic!("broker returned, but did not receive message"), + }; + } } diff --git a/src/email/inbox.rs b/src/email/inbox.rs index 82d1239..132579e 100644 --- a/src/email/inbox.rs +++ b/src/email/inbox.rs @@ -1,7 +1,7 @@ //! The inbox module holds implementations to allow for the monitoring of a real inbox use self::idle::{SessionCell, SessionState}; -use super::{login::SessionGenerator, SequenceNumberStreamer}; +use super::{login::SessionGenerator, SequenceNumber, SequenceNumberStreamer}; use crate::{ task::{Cancel, Spawn, SpawnError}, IMAPSession, IMAPTransportStream, @@ -30,10 +30,6 @@ mod idle; const CHANNEL_SIZE: usize = 16; -/// The ID of a single message stored in the inbox. -#[derive(Clone, Copy, Debug)] -pub struct SequenceNumber(u32); - /// An error that occurs during the setup process of a [`Watcher`] stream #[derive(Error, Debug)] pub enum WatchError { @@ -57,13 +53,6 @@ pub struct Watcher { cancelers: Vec, } -impl SequenceNumber { - /// Get the integral value of this sequence number - pub fn value(self) -> u32 { - self.0 - } -} - #[async_trait] impl SequenceNumberStreamer for Watcher where diff --git a/src/email/message.rs b/src/email/message.rs index 8534897..7641a5e 100644 --- a/src/email/message.rs +++ b/src/email/message.rs @@ -1,7 +1,7 @@ use crate::{IMAPSession, SessionGenerator}; use std::iter; -use super::{inbox::SequenceNumber, MessageFetcher}; +use super::{MessageFetcher, SequenceNumber}; use async_imap::{error::Error as IMAPError, types::Fetch}; use async_trait::async_trait; use futures::StreamExt;