Skip to content

Commit

Permalink
Add asynchronous? option to Testing.Pipeline.terminate/2
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Feb 2, 2023
1 parent ad225d4 commit e85ceb6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
25 changes: 23 additions & 2 deletions lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,39 @@ defmodule Membrane.Pipeline do
@doc """
Terminates the pipeline.
Accepts two options:
Accepts three options:
* `asynchronous?` - if set to `true`, pipline termination won't be blocking and
will be executed in the process, which pid is returned as function result. If
set to `false`, pipeline termination will be blocking and will be executed in
the process, that called this function.
* `timeout` - tells how much time (ms) to wait for pipeline to get gracefully
terminated. Defaults to 5000.
* `force?` - if set to `true` and pipeline is still alive after `timeout`,
pipeline will be killed using `Process.exit/2` with second argument set to
`:kill`, and function will return `{:error, :timeout}`. If set to `false` and
pipeline is still alive after `timeout`, function will raise an error.
Defaults to `false`.
Returns:
* `{:ok, pid}` - if option `asynchronous?: true` was passed
* :ok - if pipeline was gracefully terminated within `timeout`
* {:error, :timeout} - if pipeline was killed after a `timeout`
"""
@spec terminate(pipeline :: pid, timeout: non_neg_integer() | :infinity, force?: boolean()) ::
:ok | {:error, :timeout}
:ok | {:ok, pid()} | {:error, :timeout}
def terminate(pipeline, opts \\ []) do
{asynchronous?, opts} = Keyword.pop(opts, :asynchronous?, false)

if asynchronous? do
Task.start(__MODULE__, :run_terminate, [pipeline, opts])
else
run_terminate(pipeline, opts)
end
end

@spec run_terminate(pipeline :: pid, timeout: non_neg_integer() | :infinity, force?: boolean()) ::
:ok | {:error, :timeout}
def run_terminate(pipeline, opts) do
timeout = Keyword.get(opts, :timeout, 5000)
force? = Keyword.get(opts, :force?, false)

Expand Down
6 changes: 3 additions & 3 deletions test/membrane/integration/child_removal_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ defmodule Membrane.Integration.ChildRemovalTest do
)

monitor = Process.monitor(pipeline)
spawn(fn -> Testing.Pipeline.terminate(pipeline, timeout: :infinity) end)
Testing.Pipeline.terminate(pipeline, asynchronous?: true)
Process.sleep(100)

assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)
Expand All @@ -211,7 +211,7 @@ defmodule Membrane.Integration.ChildRemovalTest do
)

monitor = Process.monitor(pipeline)
spawn(fn -> Testing.Pipeline.terminate(pipeline, timeout: :infinity) end)
Testing.Pipeline.terminate(pipeline, asynchronous?: true)
Process.sleep(100)

assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)
Expand All @@ -231,7 +231,7 @@ defmodule Membrane.Integration.ChildRemovalTest do
)

pipeline_monitor = Process.monitor(pipeline)
spawn(fn -> Testing.Pipeline.terminate(pipeline, timeout: :infinity) end)
Testing.Pipeline.terminate(pipeline, asynchronous?: true)
Process.sleep(100)

assert %{module: Membrane.Core.Pipeline.Zombie} = :sys.get_state(pipeline)
Expand Down

0 comments on commit e85ceb6

Please sign in to comment.