Skip to content

Commit

Permalink
feat: improve handling connect and disconnects in listener mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Wkkkkk committed Aug 16, 2023
1 parent 9d4e8fe commit ded8808
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
9 changes: 9 additions & 0 deletions src/domain/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ impl SharableAppState {
)))
}

pub async fn reset(&self) -> Result<(), MyError> {
// Try to hold the lock and reset the app state
let mut app_state = self.lock_err().await?;
let connections = &mut app_state.connections;

connections.clear();
Ok(())
}

pub async fn has_connection(&self, id: String) -> Result<bool, MyError> {
// Try to hold the lock and check if the connection exists
let mut app_state = self.lock_err().await?;
Expand Down
14 changes: 11 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ use tokio::time::{sleep, Duration};

/// Run a pipeline until it encounters EOS or an error. Clean up the pipeline after it finishes.
/// This function can be called multiple times to handle EOS.
async fn run_pipeline(pipeline: &mut SharablePipeline, args: &Args) -> Result<(), Box<dyn Error>> {
async fn run_pipeline(
pipeline: &mut SharablePipeline,
args: &Args,
state: &SharableAppState,
) -> Result<(), Box<dyn Error>> {
pipeline.init(args).await?;

// Block until EOS or error message pops up
pipeline.run().await?;

// Clean up the pipeline when it finishes so it can be rerun
pipeline.clean_up().await?;

// Reset app state
state.reset().await?;
Ok(())
}

Expand All @@ -46,11 +53,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
let should_stop = Arc::new(AtomicBool::new(false));

let should_stop_clone = should_stop.clone();
let app_data_clone = app_data.clone();
let mut pipeline_clone = pipeline_data.clone();
// Run the pipeline in a separate thread
let pipeline_thread = task::spawn(async move {
while !should_stop_clone.load(Ordering::Relaxed) {
if let Err(err) = run_pipeline(&mut pipeline_clone, &args).await {
if let Err(err) = run_pipeline(&mut pipeline_clone, &args, &app_data_clone).await {
tracing::error!("Failed to run pipeline: {}", err);

// break the loop when the pipeline runs into an error
Expand All @@ -59,7 +67,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Reset and rerun the pipeline when it encounters EOS
sleep(Duration::from_secs(1)).await;
tracing::info!("Pipeline reaches EOS. Reset and rerun the pipeline");
tracing::info!("Pipeline reaches EOS. Reset app and rerun the pipeline");
}

// Stop the pipeline when the thread is aborted
Expand Down
4 changes: 3 additions & 1 deletion src/stream/gst_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,9 @@ impl PipelineBase for SharablePipeline {
async fn clean_up(&self) -> Result<(), Error> {
let mut pipeline_state = self.lock_err().await?;
if let Some(pipeline) = pipeline_state.pipeline.as_ref() {
pipeline.set_state(gst::State::Null)?;
pipeline.call_async(move |pipeline| {
let _ = pipeline.set_state(gst::State::Null);
});
pipeline_state.pipeline = None;
}

Expand Down

0 comments on commit ded8808

Please sign in to comment.