Break out watching/printing into thier own structures

Still crude, but a step in the right direction
master
Nick Krichevsky 2022-05-10 22:47:02 -04:00
parent 2da2859e9b
commit 4409abd1fa
8 changed files with 312 additions and 90 deletions

67
Cargo.lock generated
View File

@ -259,7 +259,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"time 0.1.44",
"winapi",
]
@ -554,6 +554,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "itoa"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "js-sys"
version = "0.3.57"
@ -736,6 +742,15 @@ dependencies = [
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.10.0"
@ -1019,6 +1034,17 @@ dependencies = [
"libc",
]
[[package]]
name = "simplelog"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786"
dependencies = [
"log",
"termcolor",
"time 0.3.9",
]
[[package]]
name = "slab"
version = "0.4.6"
@ -1094,6 +1120,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.15.0"
@ -1136,6 +1171,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa",
"libc",
"num_threads",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -1371,6 +1424,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@ -1438,9 +1500,12 @@ dependencies = [
"async-std",
"derive-getters",
"futures",
"log",
"mailparse",
"serde",
"serde_yaml",
"simplelog",
"textwrap",
"thiserror",
"tokio",
]

View File

@ -12,6 +12,10 @@ derive-getters = "0.2.0"
async-imap = "0.5.0"
mailparse = "0.13.8"
tokio = { version = "1.18", features = ["full"] }
thiserror = "1.0"
log = "0.4.17"
simplelog = "0.12.0"
# For annoying reasons, we must pin exactly the same versions as async-imap if we want to use
# their types.
# https://github.com/async-email/async-imap/pull/57

View File

@ -1,12 +1,12 @@
use serde::Deserialize;
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct Config {
imap: IMAP,
}
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct IMAP {
domain: String,

View File

@ -1,3 +1,4 @@
use std::error::Error;
use std::{fmt::Debug, time::Duration};
use async_imap::{
@ -7,95 +8,35 @@ use async_imap::{
};
use futures::{AsyncRead, AsyncWrite, StreamExt};
const WAIT_TIMEOUT: Duration = Duration::from_secs(29 * 60);
mod idle {
use super::{IdleResponse, Response};
/// Data is a type-safe wrapper for [`IdleResponse`]. This acts as a wrapper type so
/// we can extract the response data from a response (the data stored in this variant has a private type).
pub struct Data(IdleResponse);
impl Data {
/// `new` constructs a new `IdleData` from an [`IdleResponse`] containing data.
///
/// # Panics
/// Will panic if the response does not have the variant of `IdleResponse::newData`. This is a private module,
/// where we should control the data going in, so we really do consider this unrecoverable.
pub fn new(response: IdleResponse) -> Self {
assert!(matches!(response, IdleResponse::NewData(_)));
Self(response)
}
/// `response` gets the server's response out of our `Data`.
///
/// # Panics
/// This can panic if `Data`'s type storage invariant is violted.
pub fn response(&self) -> &Response {
match &self.0 {
IdleResponse::NewData(data) => data.parsed(),
_ => panic!("not possible by construction"),
}
}
}
}
use crate::inbox::SequenceNumber;
/// `fetch_email` will consume the current session and put it into an `Idle` state, until a new email is received
///
/// # Errors
/// If, for any reason, the email fails to be fetched, one of `async_imap` error's will be returned.
#[allow(clippy::module_name_repetitions)]
pub async fn fetch_email<T>(session: Session<T>) -> async_imap::error::Result<Session<T>>
pub async fn fetch_email<T>(
session: &mut Session<T>,
sequence_number: SequenceNumber,
) -> Result<String, Box<dyn Error>>
where
T: AsyncRead + AsyncWrite + Unpin + Debug + Send,
{
// TODO: check if the server can handle IDLE
let mut idle_handle = session.idle();
idle_handle.init().await?;
let response_data = idle_until_data_received(&mut idle_handle).await?;
let response = response_data.response();
let sequence_number = match response {
Response::MailboxData(MailboxDatum::Exists(seq)) => seq,
_ => panic!("no idea what to do with this {:?}", response),
};
let mut unidled_session = idle_handle.done().await?;
// TODO: This should be done somewhere other than this function, in some kind of concurrent task.
let message = unidled_session
.fetch(format!("{:?}", sequence_number), "RFC822")
let message = session
.fetch(format!("{}", sequence_number.value()), "RFC822")
.await?
// god please don't do this just to get a single message
.next()
.await
.unwrap()?;
let parsed = mailparse::parse_mail(message.body().unwrap()).unwrap();
dbg!(parsed.get_body().unwrap());
let parsed = mailparse::parse_mail(message.body().unwrap())?;
let subparts = parsed.subparts;
// TODO: Don't just concat these parts...
let res = subparts
.into_iter()
.map(|x| x.get_body())
.collect::<Result<String, _>>();
println!("{}", res.unwrap());
.collect::<Result<String, _>>()?;
Ok(unidled_session)
}
async fn idle_until_data_received<T>(
idle_handle: &mut Handle<T>,
) -> async_imap::error::Result<idle::Data>
where
T: AsyncRead + AsyncWrite + Unpin + Debug + Send,
{
loop {
let (idle_response_future, _stop) = idle_handle.wait_with_timeout(WAIT_TIMEOUT);
let idle_response = idle_response_future.await?;
match idle_response {
IdleResponse::ManualInterrupt => panic!("we don't interrupt manually"),
IdleResponse::Timeout => continue,
IdleResponse::NewData(_) => return Ok(idle::Data::new(idle_response)),
}
}
Ok(res)
}

109
src/inbox.rs Normal file
View File

@ -0,0 +1,109 @@
use std::fmt::Debug;
use async_imap::{
error::{Error as IMAPError, Result as IMAPResult},
extensions::idle::Handle,
imap_proto::{MailboxDatum, Response},
Session,
};
use futures::{AsyncRead, AsyncWrite};
use thiserror::Error;
use tokio::sync::broadcast::{self, error::SendError};
mod idle;
#[derive(Clone, Copy, Debug)]
// `SequenceNumber` represents the ID of a single message stored in the inbox.
pub struct SequenceNumber(u32);
/// An error that occured during the watching for an email.
#[derive(Error, Debug)]
pub enum WatchError {
/// `AsyncImapFailure` is a general failure from `async-imap`
#[error("imap failure: {0}")]
AsyncIMAPFailure(IMAPError),
#[error("send failure: {0}")]
SendFailure(SendError<SequenceNumber>),
}
impl From<IMAPError> for WatchError {
fn from(err: IMAPError) -> Self {
Self::AsyncIMAPFailure(err)
}
}
impl From<SendError<SequenceNumber>> for WatchError {
fn from(err: SendError<SequenceNumber>) -> Self {
Self::SendFailure(err)
}
}
/// Monitors an inbox for new messages, and sends them to other listeners.
pub struct Watcher {
sender: broadcast::Sender<SequenceNumber>,
}
impl SequenceNumber {
/// Get the integral value of this sequence number
pub fn value(self) -> u32 {
self.0
}
}
impl Watcher {
/// `new` constructs a new `Watcher` that will send updates to the given Sender
pub fn new(sender: broadcast::Sender<SequenceNumber>) -> Self {
Self { sender }
}
/// Watch for the arrival of a single email. While the `Watcher` is watching, the caller must relinquish ownership
/// of the `Session`, but it will be returned to it upon succesful completion of the
pub async fn watch_for_email<T>(&self, session: Session<T>) -> Result<Session<T>, WatchError>
where
T: AsyncRead + AsyncWrite + Unpin + Debug + Send,
{
let mut idle_handle = session.idle();
let sequence_number = Self::idle_for_email(&mut idle_handle).await?;
self.sender.send(sequence_number)?;
let reclaimed_session = idle_handle.done().await?;
Ok(reclaimed_session)
}
async fn idle_for_email<T>(idle_handle: &mut Handle<T>) -> IMAPResult<SequenceNumber>
where
T: AsyncRead + AsyncWrite + Unpin + Debug + Send,
{
loop {
idle_handle.init().await?;
let idle_res = idle::wait_for_data(idle_handle).await;
match idle_res {
Ok(data) => {
let response = data.response();
let maybe_sequence_number =
Self::get_sequence_number_from_response(response).await;
match maybe_sequence_number {
Some(sequence_number) => return Ok(sequence_number),
None => {
debug!("re-issuing IDLE after getting non-EXISTS response from IDLE command: {:?}", response);
}
}
}
Err(idle::Error::AsyncIMAPError(err)) => return Err(err),
Err(idle::Error::Timeout) => {
debug!("re-issuing IDLE after timeout");
}
}
}
}
async fn get_sequence_number_from_response<'a>(
response: &Response<'a>,
) -> Option<SequenceNumber> {
match response {
Response::MailboxData(MailboxDatum::Exists(seq)) => Some(SequenceNumber(*seq)),
_ => None,
}
}
}

71
src/inbox/idle.rs Normal file
View File

@ -0,0 +1,71 @@
use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use async_imap::{
error::{Error as IMAPError, Result as IMAPResult},
extensions::idle::{Handle, IdleResponse},
imap_proto::Response as IMAPResponse,
};
use futures::{AsyncRead, AsyncWrite};
// 29 minutes, as per RFC2177 which specifies we should re-issue our idle every 29 minutes
const WAIT_TIMEOUT: Duration = Duration::from_secs(29 * 60);
/// Error is a thin wrapper around [`IMAPError`], with the ability to differentiate between different failure cases.
#[derive(Error, Debug)]
pub enum Error {
/// Indicates that an IDLE command timed out, and should be re-issued to continue
#[error("idle timed out")]
Timeout,
#[error("{0}")]
AsyncIMAPError(IMAPError),
}
impl From<IMAPError> for Error {
fn from(err: IMAPError) -> Self {
Self::AsyncIMAPError(err)
}
}
/// Data is a type-safe wrapper for [`IdleResponse`]. This acts as a wrapper type so
/// we can extract the response data from a response (the data stored in this variant has a private type).
pub struct Data(IdleResponse);
impl Data {
/// `new` constructs a new `IdleData` from an [`IdleResponse`] containing data.
///
/// # Panics
/// Will panic if the response does not have the variant of `IdleResponse::newData`. This is a private module,
/// where we should control the data going in, so we really do consider this unrecoverable.
pub fn new(response: IdleResponse) -> Self {
assert!(matches!(response, IdleResponse::NewData(_)));
Self(response)
}
/// `response` gets the server's response out of our `Data`.
///
/// # Panics
/// This can panic if `Data`'s type storage invariant is violated.
pub fn response(&self) -> &IMAPResponse {
match &self.0 {
IdleResponse::NewData(data) => data.parsed(),
_ => panic!("not possible by construction"),
}
}
}
pub async fn wait_for_data<T>(idle_handle: &mut Handle<T>) -> Result<Data, Error>
where
T: AsyncRead + AsyncWrite + Unpin + Debug + Send,
{
println!("starting timeout wait...");
let (idle_response_future, _stop) = idle_handle.wait_with_timeout(WAIT_TIMEOUT);
let idle_response = idle_response_future.await?;
println!("got a response...");
match idle_response {
IdleResponse::ManualInterrupt => panic!("we don't interrupt manually"),
IdleResponse::Timeout => Err(Error::Timeout),
IdleResponse::NewData(_) => Ok(Data::new(idle_response)),
}
}

View File

@ -2,6 +2,9 @@
// TODO: Remove the need for these. I'm experimenting right now.
#![allow(clippy::missing_errors_doc)]
#[macro_use]
extern crate log;
use std::fmt::Debug;
use async_imap::{self, Client, Session};
@ -11,6 +14,12 @@ use futures::{AsyncRead, AsyncWrite};
mod config;
mod fetch;
mod inbox;
// TODO: Remove
pub use fetch::fetch_email;
pub use inbox::{SequenceNumber, Watcher};
pub async fn setup_session(
cfg: &Config,
) -> async_imap::error::Result<Session<impl AsyncRead + AsyncWrite + Unpin + Debug + Send>> {
@ -24,18 +33,6 @@ pub async fn setup_session(
.map_err(|err| err.0)
}
pub async fn fetch_emails<T>(session: Session<T>) -> async_imap::error::Result<()>
where
T: AsyncRead + AsyncWrite + Unpin + Debug + Send,
{
let mut current_session = session;
current_session.examine("INBOX").await?;
loop {
println!("Idling...");
current_session = fetch::fetch_email(current_session).await?;
}
}
async fn build_imap_client(
imap_cfg: &IMAPConfig,
) -> async_imap::error::Result<Client<impl AsyncRead + AsyncWrite + Unpin + Debug>> {

View File

@ -1,6 +1,6 @@
use std::fs::File;
use tokio::runtime::Runtime;
use ynabifier::Config;
use tokio::{runtime::Runtime, sync::broadcast};
use ynabifier::{Config, SequenceNumber, Watcher};
fn main() {
let config_file = File::open("config.yml").expect("failed to open config file");
@ -8,12 +8,47 @@ fn main() {
serde_yaml::from_reader::<_, Config>(config_file).expect("failed to parse config file");
let runtime = Runtime::new().expect("failed to create runtime");
let (tx, mut rx) = broadcast::channel::<SequenceNumber>(16);
let cloned_config = config.clone();
runtime.spawn(async move {
let mut session = ynabifier::setup_session(&cloned_config)
.await
.expect("failed to setup socket");
loop {
let rx_res = rx.recv().await;
if let Err(err) = rx_res {
eprintln!("failed to recv: {}", err);
continue;
}
session.examine("INBOX").await.expect("couldn't examine");
let sequence_number = rx_res.unwrap();
println!("fetching email {}", sequence_number.value());
match ynabifier::fetch_email(&mut session, sequence_number).await {
Ok(email) => println!("{}", email),
Err(err) => eprintln!("failed to fetch email: {}", err),
}
}
});
runtime.block_on(async {
let session = ynabifier::setup_session(&config)
.await
.expect("failed to setup socket");
ynabifier::fetch_emails(session)
let watcher = Watcher::new(tx);
let mut current_session = session;
current_session
.examine("INBOX")
.await
.expect("failed to fetch emails");
.expect("couldn't examine");
loop {
println!("Beginning idle loop");
current_session = watcher.watch_for_email(current_session).await.unwrap();
}
});
}