From e85ceb6369d9ad0d653b984192c0e2453a75aa2f Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 2 Feb 2023 17:29:21 +0100 Subject: [PATCH] Add asynchronous? option to Testing.Pipeline.terminate/2 --- lib/membrane/pipeline.ex | 25 +++++++++++++++++-- .../integration/child_removal_test.exs | 6 ++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index 791793869..dda26822f 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -297,7 +297,11 @@ defmodule Membrane.Pipeline do @doc """ Terminates the pipeline. - Accepts two options: + Accepts three options: + * `asynchronous?` - if set to `true`, pipline termination won't be blocking and + will be executed in the process, which pid is returned as function result. If + set to `false`, pipeline termination will be blocking and will be executed in + the process, that called this function. * `timeout` - tells how much time (ms) to wait for pipeline to get gracefully terminated. Defaults to 5000. * `force?` - if set to `true` and pipeline is still alive after `timeout`, @@ -305,10 +309,27 @@ defmodule Membrane.Pipeline do `:kill`, and function will return `{:error, :timeout}`. If set to `false` and pipeline is still alive after `timeout`, function will raise an error. Defaults to `false`. + + Returns: + * `{:ok, pid}` - if option `asynchronous?: true` was passed + * :ok - if pipeline was gracefully terminated within `timeout` + * {:error, :timeout} - if pipeline was killed after a `timeout` """ @spec terminate(pipeline :: pid, timeout: non_neg_integer() | :infinity, force?: boolean()) :: - :ok | {:error, :timeout} + :ok | {:ok, pid()} | {:error, :timeout} def terminate(pipeline, opts \\ []) do + {asynchronous?, opts} = Keyword.pop(opts, :asynchronous?, false) + + if asynchronous? do + Task.start(__MODULE__, :run_terminate, [pipeline, opts]) + else + run_terminate(pipeline, opts) + end + end + + @spec run_terminate(pipeline :: pid, timeout: non_neg_integer() | :infinity, force?: boolean()) :: + :ok | {:error, :timeout} + def run_terminate(pipeline, opts) do timeout = Keyword.get(opts, :timeout, 5000) force? = Keyword.get(opts, :force?, false) diff --git a/test/membrane/integration/child_removal_test.exs b/test/membrane/integration/child_removal_test.exs index a92a08083..f0d91dab9 100644 --- a/test/membrane/integration/child_removal_test.exs +++ b/test/membrane/integration/child_removal_test.exs @@ -193,7 +193,7 @@ defmodule Membrane.Integration.ChildRemovalTest do ) monitor = Process.monitor(pipeline) - spawn(fn -> Testing.Pipeline.terminate(pipeline, timeout: :infinity) end) + Testing.Pipeline.terminate(pipeline, asynchronous?: true) Process.sleep(100) assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline) @@ -211,7 +211,7 @@ defmodule Membrane.Integration.ChildRemovalTest do ) monitor = Process.monitor(pipeline) - spawn(fn -> Testing.Pipeline.terminate(pipeline, timeout: :infinity) end) + Testing.Pipeline.terminate(pipeline, asynchronous?: true) Process.sleep(100) assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline) @@ -231,7 +231,7 @@ defmodule Membrane.Integration.ChildRemovalTest do ) pipeline_monitor = Process.monitor(pipeline) - spawn(fn -> Testing.Pipeline.terminate(pipeline, timeout: :infinity) end) + Testing.Pipeline.terminate(pipeline, asynchronous?: true) Process.sleep(100) assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)