diff --git a/CHANGELOG.md b/CHANGELOG.md index 559a56bfb..0ba74ed2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,12 +2,15 @@ ## 0.10.0 * Remove all deprecated stuff [#399](https://github.com/membraneframework/membrane_core/pull/399) + * Add `Membrane.RemoteControlled.Pipeline` - a basic implementation of a `Membrane.Pipeline` that
+ can be spawned and controlled by an external process [#366](https://github.com/membraneframework/membrane_core/pull/366) * Disallow sending buffers without sending caps first [#341](https://github.com/membraneframework/membrane_core/issues/341) - ## 0.9.0 * Automatic demands [#313](https://github.com/membraneframework/membrane_core/pull/313) * Stop forwarding notifications by default in bins [#358](https://github.com/membraneframework/membrane_core/pull/358) * More fine-grained control over emitted metrics [#365](https://github.com/membraneframework/membrane_core/pull/365) + + ### PRs not influencing public API: * Added log metadata when reporting init in telemetry [#376](https://github.com/membraneframework/membrane_core/pull/376) * Fix generation of pad documentation inside an element [#377](https://github.com/membraneframework/membrane_core/pull/377) * Leaving static pads unlinked and transiting to a playback state other than `:stopped` will result diff --git a/lib/membrane/remote_controlled/message.ex b/lib/membrane/remote_controlled/message.ex new file mode 100644 index 000000000..61c0ee4b2 --- /dev/null +++ b/lib/membrane/remote_controlled/message.ex @@ -0,0 +1,80 @@ +defmodule Membrane.RemoteControlled.Message do + @moduledoc """ + An abstract module aggregating all the messages that can be sent by the `RemoteControlled.Pipeline`. + The available messages are: + * `Membrane.RemoteControlled.Message.PlaybackState.t()` + * `Membrane.RemoteControlled.Message.StartOfStream.t()` + * `Membrane.RemoteControlled.Message.EndOfStream.t()` + * `Membrane.RemoteControlled.Message.Notification.t()` + * `Membrane.RemoteControlled.Message.Terminated.t()` + """ + + @type t :: + __MODULE__.PlaybackState.t() + | __MODULE__.StartOfStream.t() + | __MODULE__.EndOfStream.t() + | __MODULE__.Notification.t() + | __MODULE__.Terminated.t() + + defmodule PlaybackState do + @moduledoc """ + Message sent when the pipeline changes its playback state + """ + @type t :: %__MODULE__{from: pid(), state: Membrane.PlaybackState.t()} + + @enforce_keys [:from, :state] + defstruct @enforce_keys + end + + defmodule StartOfStream do + @moduledoc """ + Message sent when some element of the pipeline receives the start of stream event on some pad. + """ + @type t :: %__MODULE__{ + from: pid(), + element: Membrane.Element.name_t(), + pad: Membrane.Pad.name_t() + } + + @enforce_keys [:from, :element, :pad] + defstruct @enforce_keys + end + + defmodule EndOfStream do + @moduledoc """ + Message sent when some element of the pipeline receives the start of stream event on some pad. + """ + @type t :: %__MODULE__{ + from: pid(), + element: Membrane.Element.name_t(), + pad: Membrane.Pad.name_t() + } + + @enforce_keys [:from, :element, :pad] + defstruct @enforce_keys + end + + defmodule Notification do + @moduledoc """ + Message sent when the some element of the pipeline receives a notification. + """ + @type t :: %__MODULE__{ + from: pid(), + element: Membrane.Element.name_t(), + data: Membrane.Notification.t() + } + + @enforce_keys [:from, :element, :data] + defstruct @enforce_keys + end + + defmodule Terminated do + @moduledoc """ + Message sent when the pipeline gracefully terminates. + """ + @type t :: %__MODULE__{from: pid(), reason: :normal | :shutdown | {:shutdown, any()} | term()} + + @enforce_keys [:from, :reason] + defstruct @enforce_keys + end +end diff --git a/lib/membrane/remote_controlled/pipeline.ex b/lib/membrane/remote_controlled/pipeline.ex new file mode 100644 index 000000000..1f6aaa5d4 --- /dev/null +++ b/lib/membrane/remote_controlled/pipeline.ex @@ -0,0 +1,436 @@ +defmodule Membrane.RemoteControlled.Pipeline do + @moduledoc """ + `Membrane.RemoteControlled.Pipeline` is a basic `Membrane.Pipeline` implementation that can be + controlled by a controlling process. + + The controlling process can request the execution of arbitrary + valid `Membrane.Pipeline.Action`: + ``` + children = ... + links = ... + actions = [{:spec, %ParentSpec{children: children, links: links}}] + Pipeline.exec_actions(pipeline, actions) + ``` + + The controlling process can also subscribe to the messages + sent by the pipeline and later on synchroniously await for these messages: + ``` + # subscribes to message which is sent when the pipeline enters any playback state + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: _}) + ... + # awaits for the message sent when the pipeline enters :playing playback state + Pipeline.await_playback_state(pipeline, :playing) + ... + # awaits for the message sent when the pipeline enters :stopped playback state + Pipeline.await_playback_state(pipeline, :stopped) + ``` + + `Membrane.RemoteControlled.Pipeline` can be used when there is no need for introducing a custom + logic in the `Membrane.Pipeline` callbacks implementation. An example of usage could be running a + pipeline from the elixir script. `Membrane.RemoteControlled.Pipeline` sends the following messages: + * `Membrane.RemoteControlled.Message.PlaybackState.t()` sent when pipeline enters a given playback state, + * `Membrane.RemoteControlled.Message.StartOfStream.t()` sent + when one of direct pipeline children informs the pipeline about start of a stream, + * `Membrane.RemoteControlled.Message.EndOfStream.t()` sent + when one of direct pipeline children informs the pipeline about end of a stream, + * `Membrane.RemoteControlled.Message.Notification.t()` sent when pipeline + receives notification from one of its children, + * `Membrane.RemoteControlled.Message.Terminated.t()` sent when the pipeline gracefully terminates. + """ + + use Membrane.Pipeline + + alias Membrane.Pipeline + alias Membrane.RemoteControlled.Message + + alias Membrane.RemoteControlled.Message.{ + PlaybackState, + StartOfStream, + EndOfStream, + Notification, + Terminated + } + + defmodule State do + @moduledoc false + + @enforce_keys [:controller_pid] + defstruct @enforce_keys ++ [matching_functions: []] + end + + @doc """ + Starts the `Membrane.RemoteControlled.Pipeline` and links it to the current process. The process + that makes the call to the `start_link/1` automatically become the controller process. + """ + @spec start_link(GenServer.options()) :: GenServer.on_start() + def start_link(process_options \\ []) do + Pipeline.start_link(__MODULE__, %{controller_pid: self()}, process_options) + end + + @doc """ + Does the same as the `start_link/1` but starts the process outside of the supervision tree. + """ + @spec start(GenServer.options()) :: GenServer.on_start() + def start(process_options \\ []) do + Pipeline.start(__MODULE__, %{controller_pid: self()}, process_options) + end + + defmacrop pin_leaf_nodes(ast) do + quote do + Macro.postwalk(unquote(ast), fn node -> + if not Macro.quoted_literal?(node) and match?({_name, _ctx, _args}, node) do + {_name, ctx, args} = node + + case args do + nil -> {:^, ctx, [node]} + _not_nil -> node + end + else + node + end + end) + end + end + + defmacrop do_await(pipeline, message_type, keywords \\ []) do + keywords = pin_leaf_nodes(keywords) + + quote do + receive do + %unquote(message_type){ + unquote_splicing(Macro.expand(keywords, __ENV__)), + from: ^unquote(pipeline) + } = msg -> + msg + end + end + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.PlaybackState()` + message with no further constraints, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for any playback state change occuring in the pipeline: + ``` + Pipeline.await_playback_state(pipeline) + ``` + """ + @spec await_playback_state(pid()) :: Membrane.RemoteControlled.Message.PlaybackState.t() + def await_playback_state(pipeline) do + do_await(pipeline, PlaybackState) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.PlaybackState()` + message with the given `state`, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the pipeline's playback state to change into `:playing`: + ``` + Pipeline.await_playback_state(pipeline, :playing) + ``` + """ + @spec await_playback_state(pid, Membrane.PlaybackState.t()) :: + Membrane.RemoteControlled.Message.PlaybackState.t() + def await_playback_state(pipeline, playback_state) do + do_await(pipeline, PlaybackState, state: playback_state) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.StartOfStream()` message + with no further constraints, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first `start_of_stream` occuring on any pad of any element in the pipeline: + ``` + Pipeline.await_start_of_stream(pipeline) + ``` + """ + @spec await_start_of_stream(pid) :: Membrane.RemoteControlled.Message.StartOfStream.t() + def await_start_of_stream(pipeline) do + do_await(pipeline, StartOfStream) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.StartOfStream()` message + concerning the given `element`, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first `start_of_stream` occuring on any pad of the `:element_id` element in the pipeline: + ``` + Pipeline.await_start_of_stream(pipeline, :element_id) + ``` + """ + @spec await_start_of_stream(pid(), Membrane.Element.name_t()) :: + Membrane.RemoteControlled.Message.StartOfStream.t() + def await_start_of_stream(pipeline, element) do + do_await(pipeline, StartOfStream, element: element) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.StartOfStream()` message + concerning the given `element` and the `pad`, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first `start_of_stream` occuring on the `:pad_id` pad of the `:element_id` element in the pipeline: + ``` + Pipeline.await_start_of_stream(pipeline, :element_id, :pad_id) + ``` + """ + @spec await_start_of_stream(pid(), Membrane.Element.name_t(), Membrane.Pad.name_t()) :: + Membrane.RemoteControlled.Message.StartOfStream.t() + def await_start_of_stream(pipeline, element, pad) do + do_await(pipeline, StartOfStream, element: element, pad: pad) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.EndOfStream()` message + with no further constraints, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first `end_of_stream` occuring on any pad of any element in the pipeline: + ``` + Pipeline.await_end_of_stream(pipeline) + ``` + """ + @spec await_end_of_stream(pid()) :: Membrane.RemoteControlled.Message.EndOfStream.t() + def await_end_of_stream(pipeline) do + do_await(pipeline, EndOfStream) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.EndOfStream()` message + concerning the given `element`, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first `end_of_stream` occuring on any pad of the `:element_id` element in the pipeline: + ``` + Pipeline.await_end_of_stream(pipeline, :element_id) + ``` + """ + @spec await_end_of_stream(pid(), Membrane.Element.name_t()) :: + Membrane.RemoteControlled.Message.EndOfStream.t() + def await_end_of_stream(pipeline, element) do + do_await(pipeline, EndOfStream, element: element) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.EndOfStream()` message + concerning the given `element` and the `pad`, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first `end_of_stream` occuring on the `:pad_id` of the `:element_id` element in the pipeline: + ``` + Pipeline.await_end_of_stream(pipeline, :element_id, :pad_id) + ``` + """ + @spec await_end_of_stream(pid(), Membrane.Element.name_t(), Membrane.Pad.name_t()) :: + Membrane.RemoteControlled.Message.EndOfStream.t() + def await_end_of_stream(pipeline, element, pad) do + do_await(pipeline, EndOfStream, element: element, pad: pad) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.Notification()` + message with no further constraints, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first notification send to any element in the pipeline: + ``` + Pipeline.await_notification(pipeline) + ``` + """ + @spec await_notification(pid()) :: Membrane.RemoteControlled.Message.Notification.t() + def await_notification(pipeline) do + do_await(pipeline, Notification) + end + + @doc """ + Awaits for the first `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.Notification()` message + concerning the given `element`, sent by the process with `pipeline` pid. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the first notification send to the `:element_id` element in the pipeline: + ``` + Pipeline.await_notification(pipeline, :element_id) + ``` + """ + @spec await_notification(pid(), Membrane.Notification.t()) :: + Membrane.RemoteControlled.Message.Notification.t() + def await_notification(pipeline, element) do + do_await(pipeline, Notification, element: element) + end + + @doc """ + Awaits for the `Membrane.RemoteControlled.Message()` wrapping the `Membrane.RemoteControlled.Message.Terminated` message, + which is send when the pipeline gracefully terminates. + It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting + for that message. + + Usage example: + 1) awaiting for the pipeline termination: + ``` + Pipeline.await_termination(pipeline) + ``` + """ + @spec await_termination(pid()) :: Membrane.RemoteControlled.Message.Terminated.t() + def await_termination(pipeline) do + do_await(pipeline, Terminated) + end + + @doc """ + Subscribes to a given `subscription_pattern`. The `subscription_pattern` should describe some subset + of elements of `Membrane.RemoteControlled.Pipeline.message_t()` type. The `subscription_pattern` + must be a match pattern. + + + Usage examples: + 1) making the `Membrane.RemoteControlled.Pipeline` send to the controlling process `Message.StartOfStream` message + when any pad of the `:element_id` receives `:start_of_stream` event. + + ``` + subscribe(pipeline, %Message.StartOfStream{element: :element_id, pad: _}) + ``` + + 2) making the `Membrane.RemoteControlled.Pipeline` send to the controlling process `Message.PlaybackState` message when the pipeline playback state changes to any state + (that is - for all the :stopped, :prepared and :playing playback states). + + ``` + subscribe(pipeline, %Message.PlaybackState{state: _}) + ``` + """ + defmacro subscribe(pipeline, subscription_pattern) do + quote do + send( + unquote(pipeline), + {:subscription, fn message -> match?(unquote(subscription_pattern), message) end} + ) + end + end + + @doc """ + Sends a list of `Pipeline.Action.t()` to the given `Membrane.RemoteControlled.Pipeline` for execution. + + Usage example: + 1) making the `Membrane.RemoteControlled.Pipeline` start the `Membrane.ParentSpec` + specified in the action. + ``` + children = ... + links = ... + actions = [{:spec, %ParentSpec{children: children, links: links}}] + Pipeline.exec_actions(pipeline, actions) + ``` + """ + @spec exec_actions(pid(), [Pipeline.Action.t()]) :: :ok + def exec_actions(pipeline, actions) do + send(pipeline, {:exec_actions, actions}) + :ok + end + + @impl true + def handle_init(opts) do + %{controller_pid: controller_pid} = opts + state = %State{controller_pid: controller_pid} + {:ok, state} + end + + @impl true + def handle_playing_to_prepared(_ctx, state) do + pipeline_event = %Message.PlaybackState{from: self(), state: :prepared} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_prepared_to_playing(_ctx, state) do + pipeline_event = %Message.PlaybackState{from: self(), state: :playing} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_prepared_to_stopped(_ctx, state) do + pipeline_event = %Message.PlaybackState{from: self(), state: :stopped} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_stopped_to_prepared(_ctx, state) do + pipeline_event = %Message.PlaybackState{from: self(), state: :prepared} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_stopped_to_terminating(_ctx, state) do + pipeline_event = %Message.PlaybackState{from: self(), state: :terminating} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_element_end_of_stream({element_name, pad_ref}, _ctx, state) do + pipeline_event = %Message.EndOfStream{from: self(), element: element_name, pad: pad_ref} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_element_start_of_stream({element_name, pad_ref}, _ctx, state) do + pipeline_event = %Message.StartOfStream{from: self(), element: element_name, pad: pad_ref} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_notification(notification, element, _ctx, state) do + pipeline_event = %Message.Notification{from: self(), data: notification, element: element} + send_event_to_controller_if_subscribed(pipeline_event, state) + {:ok, state} + end + + @impl true + def handle_other({:exec_actions, actions}, _ctx, state) do + {{:ok, actions}, state} + end + + @impl true + def handle_other({:subscription, pattern}, _ctx, state) do + {:ok, %{state | matching_functions: [pattern | state.matching_functions]}} + end + + @impl true + def handle_shutdown(reason, state) do + pipeline_event = %Message.Terminated{from: self(), reason: reason} + send_event_to_controller_if_subscribed(pipeline_event, state) + :ok + end + + defp send_event_to_controller_if_subscribed(message, state) do + if Enum.any?(state.matching_functions, & &1.(message)) do + send(state.controller_pid, message) + end + end +end diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index df873ae54..90396fac3 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -55,8 +55,8 @@ defmodule Membrane.Testing.Pipeline do ``` options = %Membrane.Testing.Pipeline.Options { module: Your.Module - } - ``` + } + ``` See `Membrane.Testing.Pipeline.Options` for available options. @@ -166,7 +166,7 @@ defmodule Membrane.Testing.Pipeline do raise """ You provided no information about pipeline contents. Please provide either: - - list of elemenst via `elements` field of Options struct with optional links between + - list of elements via `elements` field of Options struct with optional links between them via `links` field of `Options` struct - module that implements `Membrane.Pipeline` callbacks via `module` field of `Options` struct @@ -359,6 +359,11 @@ defmodule Membrane.Testing.Pipeline do {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)} end + @impl true + def handle_other({:exec_actions, actions}, _ctx, %State{} = state) do + {{:ok, actions}, state} + end + @impl true def handle_other({:for_element, element, message}, ctx, %State{} = state) do injected_module_result = diff --git a/test/membrane/remote_controlled/pipeline_test.exs b/test/membrane/remote_controlled/pipeline_test.exs new file mode 100644 index 000000000..c2df666d2 --- /dev/null +++ b/test/membrane/remote_controlled/pipeline_test.exs @@ -0,0 +1,184 @@ +defmodule Membrane.RemoteControlled.PipelineTest do + use ExUnit.Case + alias Membrane.RemoteControlled.Pipeline + alias Membrane.RemoteControlled.Message + alias Membrane.ParentSpec + require Membrane.RemoteControlled.Pipeline + + defmodule Filter do + use Membrane.Filter + alias Membrane.Buffer + + def_output_pad :output, caps: :any, availability: :always + def_input_pad :input, demand_unit: :buffers, caps: :any, availability: :always + + @impl true + def handle_init(_opts) do + {:ok, %{buffer_count: 0}} + end + + @impl true + def handle_process(_input, buf, _ctx, state) do + state = %{state | buffer_count: state.buffer_count + 1} + + notification_actions = + if rem(state.buffer_count, 3) == 0 do + [{:notify, %Buffer{payload: "test"}}] + else + [] + end + + {{:ok, [{:buffer, {:output, buf}}] ++ notification_actions}, state} + end + + @impl true + def handle_demand(:output, size, _unit, _ctx, state) do + {{:ok, demand: {:input, size}}, state} + end + end + + defp setup_pipeline(_context) do + {:ok, pipeline} = Pipeline.start_link() + + children = [ + a: %Membrane.Testing.Source{output: [0xA1, 0xB2, 0xC3, 0xD4]}, + b: Filter, + c: Membrane.Testing.Sink + ] + + links = [ParentSpec.link(:a) |> ParentSpec.to(:b) |> ParentSpec.to(:c)] + actions = [{:spec, %ParentSpec{children: children, links: links}}] + + Pipeline.exec_actions(pipeline, actions) + {:ok, pipeline: pipeline} + end + + describe "Membrane.RemoteControlled.Pipeline.subscribe/2" do + setup :setup_pipeline + + test "testing process should receive all subscribed events", %{pipeline: pipeline} do + # SETUP + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: :prepared}) + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: :playing}) + Pipeline.subscribe(pipeline, %Message.Notification{element: :b, data: %Membrane.Buffer{}}) + Pipeline.subscribe(pipeline, %Message.StartOfStream{element: :b, pad: :input}) + + # RUN + Pipeline.play(pipeline) + + # TEST + assert_receive %Message.PlaybackState{from: ^pipeline, state: :prepared} + assert_receive %Message.PlaybackState{from: ^pipeline, state: :playing} + + assert_receive %Message.Notification{ + from: ^pipeline, + element: :b, + data: %Membrane.Buffer{payload: "test"} + } + + assert_receive %Message.StartOfStream{from: ^pipeline, element: :b, pad: :input} + + refute_receive %Message.Terminated{from: ^pipeline} + refute_receive %Message.PlaybackState{from: ^pipeline, state: :stopped} + + # STOP + Pipeline.stop_and_terminate(pipeline, blocking?: true) + end + + test "should allow to use wildcards in subscription pattern", %{pipeline: pipeline} do + # SETUP + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: _}) + Pipeline.subscribe(pipeline, %Message.EndOfStream{}) + + # RUN + Pipeline.play(pipeline) + + # TEST + assert_receive %Message.PlaybackState{from: ^pipeline, state: :prepared} + assert_receive %Message.PlaybackState{from: ^pipeline, state: :playing} + + assert_receive %Message.EndOfStream{from: ^pipeline, element: :b, pad: :input} + + assert_receive %Message.EndOfStream{from: ^pipeline, element: :c, pad: :input} + + # STOP + Pipeline.stop_and_terminate(pipeline, blocking?: true) + + # TEST + assert_receive %Message.PlaybackState{from: ^pipeline, state: :stopped} + refute_receive %Message.Terminated{from: ^pipeline} + refute_receive %Message.Notification{from: ^pipeline} + refute_receive %Message.StartOfStream{from: ^pipeline, element: _, pad: _} + end + end + + describe "Membrane.RemoteControlled.Pipeline await_* functions" do + setup :setup_pipeline + + test "should await for requested messages", %{pipeline: pipeline} do + # SETUP + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: _}) + Pipeline.subscribe(pipeline, %Message.StartOfStream{element: _, pad: _}) + Pipeline.subscribe(pipeline, %Message.Notification{element: _, data: _}) + Pipeline.subscribe(pipeline, %Message.Terminated{}) + + # RUN + Pipeline.play(pipeline) + + # TEST + Pipeline.await_playback_state(pipeline, :playing) + + Pipeline.await_start_of_stream(pipeline, :c, :input) + Pipeline.await_notification(pipeline, :b) + + # STOP + Pipeline.stop_and_terminate(pipeline, blocking?: true) + end + + test "should await for requested messages with parts of message body not being specified", %{ + pipeline: pipeline + } do + # SETUP + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: _}) + Pipeline.subscribe(pipeline, %Message.StartOfStream{element: _, pad: _}) + Pipeline.subscribe(pipeline, %Message.Notification{element: _, data: _}) + + # RUN + Pipeline.play(pipeline) + + # TEST + Pipeline.await_start_of_stream(pipeline, :c) + msg = Pipeline.await_notification(pipeline, :b) + + assert msg == %Message.Notification{ + from: pipeline, + element: :b, + data: %Membrane.Buffer{payload: "test"} + } + + # STOP + Pipeline.stop_and_terminate(pipeline, blocking?: true) + end + + test "should await for requested messages with pinned variables as message body parts", %{ + pipeline: pipeline + } do + # SETUP + Pipeline.subscribe(pipeline, %Message.PlaybackState{state: _}) + Pipeline.subscribe(pipeline, %Message.StartOfStream{element: _, pad: _}) + Pipeline.subscribe(pipeline, %Message.Notification{element: _, data: _}) + state = :playing + element = :c + + # START + Pipeline.play(pipeline) + + # TEST + Pipeline.await_playback_state(pipeline, state) + Pipeline.await_start_of_stream(pipeline, element, :input) + + # STOP + Pipeline.stop_and_terminate(pipeline, blocking?: true) + end + end +end