commit 02adaa573e61d0949514e36ca93dd69fd6b3b71a Author: Nick Krichevsky Date: Mon Jan 2 17:58:45 2023 -0500 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..a33bcd2 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,382 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "either" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" + +[[package]] +name = "getrandom" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "indexmap" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" + +[[package]] +name = "libc" +version = "0.2.139" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro2" +version = "1.0.49" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "ryu" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" + +[[package]] +name = "serde" +version = "1.0.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_yaml" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92b5b431e8907b50339b51223b97d102db8d987ced36f6e4d03621db9316c834" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + +[[package]] +name = "simplelog" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786" +dependencies = [ + "log", + "termcolor", + "time", +] + +[[package]] +name = "syn" +version = "1.0.107" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "teevee" +version = "0.1.0" +dependencies = [ + "glob", + "itertools", + "log", + "rand", + "serde", + "serde_yaml", + "simplelog", + "test-case", + "thiserror", +] + +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "test-case" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21d6cf5a7dffb3f9dceec8e6b8ca528d9bd71d36c9f074defb548ce161f598c0" +dependencies = [ + "test-case-macros", +] + +[[package]] +name = "test-case-macros" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e45b7bf6e19353ddd832745c8fcf77a17a93171df7151187f26623f2b75b5b26" +dependencies = [ + "cfg-if", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "time" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +dependencies = [ + "itoa", + "libc", + "num_threads", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +dependencies = [ + "time-core", +] + +[[package]] +name = "unicode-ident" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" + +[[package]] +name = "unsafe-libyaml" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7ed8ba44ca06be78ea1ad2c3682a43349126c8818054231ee6f4748012aed2" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7dbf52e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "teevee" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_yaml = "0.9" +glob = "0.3.0" +rand = "0.8.5" +thiserror = "1.0" +log = "0.4.17" +simplelog = "0.12.0" +itertools = "0.10.5" + +[dev-dependencies] +test-case = "2.2" diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..ba201c3 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,93 @@ +use log::LevelFilter; +use serde::Deserialize; + +/// Holds the configuration for `teevee`. +#[derive(Debug, Clone, Deserialize)] +pub struct Config { + #[serde(default = "defaults::ffmpeg_path")] + ffmpeg_path: String, + rtmp_uri: String, + video_globs: Vec, + #[serde(default = "defaults::log_level")] + log_level: LogLevel, +} + +#[derive(Clone, Debug, Deserialize)] +#[cfg_attr(test, derive(PartialEq, Eq))] +enum LogLevel { + #[serde(rename = "debug")] + Debug, + #[serde(rename = "info")] + Info, + #[serde(rename = "warning")] + Warning, + #[serde(rename = "error")] + Error, +} + +impl Config { + /// Get the path to the ffmpeg binary. If not specified, defaults to `/usr/bin/ffmpeg`. + #[must_use] + pub fn ffmpeg_path(&self) -> &str { + &self.ffmpeg_path + } + + /// Get the URI where the video RTMP should be streamed. + #[must_use] + pub fn rtmp_uri(&self) -> &str { + &self.rtmp_uri + } + + /// Get the paths to search for videos to play. These should be in glob form which correspond to files + /// e.g. `/library/tv/Futurama/*.mp4` would be a valid entry for this configuration + #[must_use] + pub fn video_globs(&self) -> &[String] { + &self.video_globs + } + + #[must_use] + pub fn log_level(&self) -> LevelFilter { + match &self.log_level { + LogLevel::Debug => LevelFilter::Debug, + LogLevel::Info => LevelFilter::Info, + LogLevel::Warning => LevelFilter::Warn, + LogLevel::Error => LevelFilter::Error, + } + } +} + +mod defaults { + use super::LogLevel; + + pub(super) fn log_level() -> LogLevel { + LogLevel::Info + } + + pub(super) fn ffmpeg_path() -> String { + "/usr/bin/ffmpeg".to_string() + } +} + +#[cfg(test)] +mod tests { + // Workaround for testcase + #![allow(clippy::needless_pass_by_value)] + + use super::*; + use std::mem; + use test_case::test_case; + + #[test_case("error", LogLevel::Error)] + #[test_case("warning", LogLevel::Warning)] + #[test_case("info", LogLevel::Info)] + #[test_case("debug", LogLevel::Debug)] + fn test_parse_log_level(raw: &str, expected: LogLevel) { + let parsed = serde_yaml::from_str::(&format!(r#""{raw}""#)) + .unwrap_or_else(|err| panic!("failed to parse log level '{raw}': {err}")); + + assert!( + mem::discriminant(&parsed) == mem::discriminant(&expected), + "Expected {expected:?}, got {parsed:?}" + ); + } +} diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs new file mode 100644 index 0000000..1fa0639 --- /dev/null +++ b/src/ffmpeg.rs @@ -0,0 +1,127 @@ +use std::{ + io, + process::{Command, ExitStatus}, +}; + +use itertools::Itertools; + +pub fn stream_files(ffmpeg_path: &str, rtmp_uri: &str, files: &[&str]) -> io::Result { + let mut cmd = Command::new(ffmpeg_path); + let input_args = build_input_file_args(files); + let complex_filter_args = build_complex_filter_args(files); + let output_args = build_output_args(rtmp_uri); + + input_args + .into_iter() + .chain(complex_filter_args.iter().map(String::as_ref)) + .chain(output_args.iter().map(String::as_ref)) + .for_each(|arg| { + cmd.arg(arg); + }); + + // TODO: See if we can get this piped through the logger + cmd.status() +} + +fn build_input_file_args<'a>(files: &[&'a str]) -> Vec<&'a str> { + let i_flag_iter = files.iter().copied().flat_map(|file| ["-i", file]); + + let mut res = vec!["-re"]; + res.extend(i_flag_iter); + + res +} + +fn build_complex_filter_args(files: &[&str]) -> Vec { + vec![ + "-filter_complex".to_string(), + build_complex_filter(files), + "-map".to_string(), + "[v]".to_string(), + "-map".to_string(), + "[a]".to_string(), + ] +} + +fn build_complex_filter(files: &[&str]) -> String { + let scale = build_scale_filter(files); + let concat = build_concat_filter(files); + format!("{scale}; {concat}") +} + +fn build_scale_filter(files: &[&str]) -> String { + files + .iter() + .copied() + .enumerate() + .map(|(idx, _file)| format!("[{idx}:v]scale=1920:1080:force_original_aspect_ratio=decrease,setsar=1:1,pad=1920:1080:-1:-1:color=black[v{idx}]")) + .join("; ") +} + +fn build_concat_filter(files: &[&str]) -> String { + let inputs = files + .iter() + .copied() + .enumerate() + .map(|(idx, _file)| format!("[v{idx}] [{idx}:a]")) + .join(" "); + + let num_files = files.len(); + format!("{inputs} concat=n={num_files}:v=1:a=1 [v] [a]") +} + +fn build_output_args(rtmp_url: &str) -> Vec { + vec![ + "-vcodec".to_string(), + "libx264".to_string(), + "-f".to_string(), + "flv".to_string(), + rtmp_url.to_string(), + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_input_file_args() { + let files = ["a.mp4", "b.mkv", "c.flv"]; + let args = build_input_file_args(&files); + assert_eq!(&args, &["-re", "-i", "a.mp4", "-i", "b.mkv", "-i", "c.flv"]); + } + + #[test] + fn test_build_complex_filter_args() { + let files = ["a.mp4", "b.mkv", "c.flv"]; + let args = build_complex_filter_args(&files); + assert_eq!(&args, &[ + "-filter_complex".to_string(), + "[0:v]scale=1920:1080:force_original_aspect_ratio=decrease:flags=lanczos,setsar=1:1,pad=1920:1080:-1:-1:color=black[v0]; \ + [1:v]scale=1920:1080:force_original_aspect_ratio=decrease:flags=lanczos,setsar=1:1,pad=1920:1080:-1:-1:color=black[v1]; \ + [2:v]scale=1920:1080:force_original_aspect_ratio=decrease:flags=lanczos,setsar=1:1,pad=1920:1080:-1:-1:color=black[v2]; \ + [v0] [0:a] [v1] [1:a] [v2] [2:a] concat=n=3:v=1:a=1 [v] [a]".to_string(), + "-map".to_string(), + "[v]".to_string(), + "-map".to_string(), + "[a]".to_string() + ] + ); + } + + #[test] + fn test_build_output_args() { + let rtmp_url = "rtmp://127.0.0.1/live/stream"; + let args = build_output_args(rtmp_url); + assert_eq!( + &args, + &[ + "-vcodec".to_string(), + "libx264".to_string(), + "-f".to_string(), + "flv".to_string(), + "rtmp://127.0.0.1/live/stream".to_string() + ] + ); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..66c0314 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,69 @@ +#![warn(clippy::all, clippy::pedantic)] + +#[macro_use] +extern crate log; + +pub use config::Config; + +use glob::{GlobError, PatternError}; +use itertools::Itertools; +use rand::seq::SliceRandom; +use std::path::PathBuf; +use thiserror::Error; + +mod config; +mod ffmpeg; + +#[derive(Error, Debug)] +pub enum SetupError { + #[error("Failed to parse globs: {0}")] + GlobError(PatternError), + #[error("Failed to load file globs: {0}")] + FileLoadFailed(GlobError), +} + +/// Stream all of the videos in the `Config` to its RTMP path. +/// +/// # Errors +/// Returns `SetupError` if the stream setup fails. After setup, all errors will be logged. +pub fn stream_videos(config: &Config) -> Result<(), SetupError> { + let paths = resolve_files(config.video_globs())?; + let mut path_strs = paths + .iter() + .filter_map(|path| { + let str_path = path.to_str(); + if str_path.is_none() { + warn!("{path:?} is not readable as utf8, skipping"); + } + str_path + }) + .collect::>(); + + let mut rng = rand::thread_rng(); + loop { + path_strs.shuffle(&mut rng); + for path_chunk in &path_strs.iter().copied().chunks(100) { + let path_chunk_copy = path_chunk.collect::>(); + let stream_res = + ffmpeg::stream_files(config.ffmpeg_path(), config.rtmp_uri(), &path_chunk_copy); + if let Err(err) = stream_res { + error!("Failed to stream files, retrying - status code: {err:?}"); + } + } + } +} + +fn resolve_files>(globs: &[S]) -> Result, SetupError> { + let path_iterators = globs + .iter() + .map(S::as_ref) + .map(glob::glob) + .collect::, _>>() + .map_err(SetupError::GlobError)?; + + path_iterators + .into_iter() + .flatten() + .collect::, _>>() + .map_err(SetupError::FileLoadFailed) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..24f4e3f --- /dev/null +++ b/src/main.rs @@ -0,0 +1,23 @@ +#![warn(clippy::all, clippy::pedantic)] +use log::LevelFilter; +use simplelog::{ColorChoice, Config as SimpleLogConfig, TermLogger, TerminalMode}; +use std::fs::File; +use teevee::{stream_videos, Config}; + +fn main() { + let config_reader = File::open("config.yml").expect("failed to open config file"); + let config = + serde_yaml::from_reader::<_, Config>(config_reader).expect("failed to parse config"); + + setup_logger(config.log_level()); + stream_videos(&config).expect("stream setup failed"); +} + +fn setup_logger(level: LevelFilter) { + TermLogger::new( + level, + SimpleLogConfig::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ); +}