Skip to content

Commit

Permalink
change blocking code to async, fix walkdir.
Browse files Browse the repository at this point in the history
ingest <-> playlist switch still has hiccups
  • Loading branch information
jb-alvarado committed Dec 20, 2024
1 parent 5b6e9d8 commit 2013f21
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 79 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ tokio-stream = "0.1"
toml_edit = {version = "0.22", features = ["serde"]}
ts-rs = { version = "10", features = ["chrono-impl", "no-serde-warnings"] }
uuid = "1.8"
walkdir = "2"
zeromq = { version = "0.4", default-features = false, features = [
"tokio-runtime",
"tcp-transport",
Expand Down
7 changes: 5 additions & 2 deletions engine/src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ async fn patch_channel(
.lock()
.await
.get(*id)
.await
.ok_or_else(|| format!("Channel {id} not found!"))?;
let mut data = data.into_inner();

Expand Down Expand Up @@ -576,8 +577,9 @@ async fn get_advanced_config(
.lock()
.await
.get(*id)
.await
.ok_or_else(|| ServiceError::BadRequest(format!("Channel ({id}) not exists!")))?;
let config = manager.config.lock().unwrap().advanced.clone();
let config = manager.config.lock().await.advanced.clone();

Ok(web::Json(config))
}
Expand Down Expand Up @@ -636,8 +638,9 @@ async fn get_playout_config(
.lock()
.await
.get(*id)
.await
.ok_or_else(|| ServiceError::BadRequest(format!("Channel ({id}) not exists!")))?;
let config = manager.config.lock().unwrap().clone();
let config = manager.config.lock().await.clone();

Ok(web::Json(config))
}
Expand Down
1 change: 1 addition & 0 deletions engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
io,
process::exit,
sync::{atomic::AtomicBool, Arc, Mutex as Mt},
thread,
};

use actix_web::{middleware::Logger, web, App, HttpServer};
Expand Down
12 changes: 6 additions & 6 deletions engine/src/player/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ impl ChannelManager {
error!("{e}");
};

if !self_clone.channel.lock().await.active {
run_endless = false;
} else {
if self_clone.channel.lock().await.active {
self_clone.run_count.fetch_add(1, Ordering::SeqCst);
self_clone.is_alive.store(true, Ordering::SeqCst);
self_clone.is_terminated.store(false, Ordering::SeqCst);
self_clone.list_init.store(true, Ordering::SeqCst);

tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
} else {
run_endless = false;
}
}

Expand Down Expand Up @@ -310,7 +310,7 @@ impl ChannelController {
}

pub async fn get(&self, id: i32) -> Option<ChannelManager> {
for manager in self.channels.iter() {
for manager in &self.channels {
if manager.channel.lock().await.id == id {
return Some(manager.clone());
}
Expand Down Expand Up @@ -398,7 +398,7 @@ async fn find_m3u8_files(path: &Path) -> io::Result<Vec<String>> {
return Filtering::Continue;
}

Filtering::IgnoreDir
Filtering::Ignore
});

loop {
Expand Down Expand Up @@ -432,7 +432,7 @@ async fn delete_old_segments<P: AsRef<Path> + Clone + std::fmt::Debug>(
return Filtering::Continue;
}

Filtering::IgnoreDir
Filtering::Ignore
});

loop {
Expand Down
9 changes: 4 additions & 5 deletions engine/src/player/input/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
mpsc::channel,
Arc,
},
thread::sleep,
time::Duration,
};

Expand Down Expand Up @@ -37,10 +36,9 @@ pub async fn watchman(
panic!("Folder path not exists: '{path:?}'");
}

// let (tx, rx) = channel();
let (tx, rx) = channel();

let mut debouncer = new_debouncer(Duration::from_secs(1), None, tx).unwrap();
let mut debouncer = new_debouncer(Duration::from_secs(3), None, tx).unwrap();

debouncer.watch(path, RecursiveMode::Recursive).unwrap();

Expand All @@ -50,6 +48,7 @@ pub async fn watchman(
Ok(events) => {
let sources = Arc::clone(&sources);
let config = config.clone();

tokio::spawn(async move {
let events: Vec<_> = events.to_vec();
for event in events {
Expand Down Expand Up @@ -107,7 +106,7 @@ pub async fn watchman(
}
}
_ => {
debug!(target: Target::file_mail(), channel = id; "Not tracked file event: {event:?}")
trace!("Not tracked file event: {event:?}");
}
}
}
Expand All @@ -119,6 +118,6 @@ pub async fn watchman(
}
}

sleep(Duration::from_secs(3));
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
}
}
6 changes: 3 additions & 3 deletions engine/src/player/input/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ pub async fn ingest_server(
);

if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !is_free_tcp_port(id, url) {
if is_free_tcp_port(id, url) {
info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: <b><magenta>{url}</></b>");
} else {
channel_mgr.channel.lock().await.active = false;
channel_mgr.stop_all(false).await?;
} else {
info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: <b><magenta>{url}</></b>");
}
};

Expand Down
30 changes: 14 additions & 16 deletions engine/src/player/output/hls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ out:
*/

use std::{
process::Stdio,
sync::atomic::Ordering,
thread::{self, sleep},
time::{Duration, SystemTime},
};
use std::{process::Stdio, sync::atomic::Ordering, time::SystemTime};

use async_iterator::Iterator;
use log::*;
Expand All @@ -42,7 +37,10 @@ use crate::{
valid_stream, Media,
},
},
utils::{errors::ServiceError, logging::Target},
utils::{
errors::ServiceError,
logging::{fmt_cmd, Target},
},
};

/// Ingest Server for HLS
Expand Down Expand Up @@ -85,7 +83,7 @@ async fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ServiceErro
info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: <b><magenta>{url}</></b>");
} else {
manager.channel.lock().await.active = false;
manager.stop_all(false).await;
manager.stop_all(false).await?;
}
};

Expand All @@ -98,8 +96,8 @@ async fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ServiceErro
let timer = SystemTime::now();

debug!(target: Target::file_mail(), channel = id;
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
server_cmd.join(" ")
"Server CMD: <bright-blue>ffmpeg {}</>",
fmt_cmd(&server_cmd)
);

let proc_ctl = manager.clone();
Expand Down Expand Up @@ -199,7 +197,7 @@ pub async fn write_hls(manager: ChannelManager) -> Result<(), ServiceError> {

// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
thread::spawn(move || ingest_to_hls_server(channel_mgr_2));
tokio::spawn(ingest_to_hls_server(channel_mgr_2));
}

let mut error_count = 0;
Expand Down Expand Up @@ -233,7 +231,7 @@ pub async fn write_hls(manager: ChannelManager) -> Result<(), ServiceError> {
if config.task.path.is_file() {
let channel_mgr_3 = manager.clone();

thread::spawn(move || task_runner::run(channel_mgr_3));
tokio::spawn(task_runner::run(channel_mgr_3));
} else {
error!(target: Target::file_mail(), channel = id;
"<bright-blue>{:?}</> executable not exists!",
Expand Down Expand Up @@ -270,8 +268,8 @@ pub async fn write_hls(manager: ChannelManager) -> Result<(), ServiceError> {
let dec_cmd = prepare_output_cmd(&config, dec_prefix, &node.filter);

debug!(target: Target::file_mail(), channel = id;
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
"HLS writer CMD: <bright-blue>ffmpeg {}</>",
fmt_cmd(&dec_cmd)
);

let mut dec_proc = match Command::new("ffmpeg")
Expand All @@ -298,7 +296,7 @@ pub async fn write_hls(manager: ChannelManager) -> Result<(), ServiceError> {
}

while ingest_is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}

if let Ok(elapsed) = timer.elapsed() {
Expand All @@ -315,7 +313,7 @@ pub async fn write_hls(manager: ChannelManager) -> Result<(), ServiceError> {
}
}

sleep(Duration::from_secs(1));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

manager.stop_all(false).await?;

Expand Down
2 changes: 2 additions & 0 deletions engine/src/player/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ pub async fn player(manager: ChannelManager) -> Result<(), ServiceError> {
}
}

drop(decoder_stdout);

if let Err(e) = manager.wait(Decoder).await {
error!(target: Target::file_mail(), channel = id; "{e}");
}
Expand Down
21 changes: 10 additions & 11 deletions engine/src/player/utils/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@ impl FolderSource {
let mut media_list = vec![];
let mut index: usize = 0;

debug!(target: Target::file_mail(), channel = id;
"generate: {:?}, paths: {:?}",
config.general.generate, config.storage.paths
);

if config.general.generate.is_some() && !config.storage.paths.is_empty() {
for path in &config.storage.paths {
path_list.push(path);
}
if !config.storage.paths.is_empty() && config.general.generate.is_some() {
path_list.extend(&config.storage.paths);
} else {
path_list.push(&config.channel.storage);
}

if let Some(dates) = &config.general.generate {
debug!(target: Target::file_mail(), channel = id;
"generate: {dates:?}, paths: {path_list:?}"
);
}

for path in &path_list {
if !path.is_dir() {
error!(target: Target::file_mail(), channel = id; "Path not exists: <b><magenta>{path:?}</></b>");
Expand All @@ -55,7 +54,7 @@ impl FolderSource {
return Filtering::Continue;
}

Filtering::IgnoreDir
Filtering::Ignore
}
});

Expand Down Expand Up @@ -203,7 +202,7 @@ pub async fn fill_filler_list(
return Filtering::Continue;
}

Filtering::IgnoreDir
Filtering::Ignore
}
});

Expand Down
24 changes: 12 additions & 12 deletions engine/src/player/utils/json_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
fs::File,
path::Path,
sync::{atomic::AtomicBool, Arc},
thread,
};

use log::*;
Expand Down Expand Up @@ -147,14 +146,12 @@ pub async fn read_json(
let list_clone = playlist.clone();

if !config.general.skip_validation {
thread::spawn(move || {
validate_playlist(
config_clone,
current_list,
list_clone,
is_terminated,
);
});
tokio::spawn(validate_playlist(
config_clone,
current_list,
list_clone,
is_terminated,
));
}

set_defaults(&mut playlist);
Expand Down Expand Up @@ -191,9 +188,12 @@ pub async fn read_json(
let list_clone = playlist.clone();

if !config.general.skip_validation {
thread::spawn(move || {
validate_playlist(config_clone, current_list, list_clone, is_terminated);
});
tokio::spawn(validate_playlist(
config_clone,
current_list,
list_clone,
is_terminated,
));
}

set_defaults(&mut playlist);
Expand Down
16 changes: 9 additions & 7 deletions engine/src/player/utils/json_validate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
io::{BufRead, BufReader},
process::{Command, Stdio},
process::Stdio,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand All @@ -10,7 +9,11 @@ use std::{

use log::*;
use regex::Regex;
use tokio::sync::Mutex;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
sync::Mutex,
};

use crate::player::filter::FilterType::Audio;
use crate::player::utils::{
Expand Down Expand Up @@ -96,14 +99,13 @@ async fn check_media(
.spawn()?;

let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
let mut lines = enc_err.lines();
let mut silence_start = 0.0;
let mut silence_end = 0.0;
let re_start = Regex::new(r"silence_start: ([0-9]+:)?([0-9.]+)")?;
let re_end = Regex::new(r"silence_end: ([0-9]+:)?([0-9.]+)")?;

for line in enc_err.lines() {
let line = line?;

while let Some(line) = lines.next_line().await? {
if !FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i))
&& !config.logging.ignore_lines.iter().any(|i| line.contains(i))
&& (line.contains("[error]") || line.contains("[fatal]"))
Expand Down Expand Up @@ -141,7 +143,7 @@ async fn check_media(

error_list.clear();

if let Err(e) = enc_proc.wait() {
if let Err(e) = enc_proc.wait().await {
error!(target: Target::file_mail(), channel = id; "Validation process: {e:?}");
}

Expand Down
Loading

0 comments on commit 2013f21

Please sign in to comment.