From e4305da67493219a224ccb2ccadd75360f81cbee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Feliks=20Pobiedzi=C5=84ski?= <38541925+FelonEkonom@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:42:31 +0100 Subject: [PATCH] Fix bug in handling actions returned from handle_tick. Deprecate handle_spec_started. (#708) * wip * Write tests wip * Write tests wip * Revert "Fix timer running late (#685)" This reverts commit 168f57e12bf3280cda82b1116796151d4b1cb486. * Fix actions handling order bug related to Pipeline.handle_playing * Add assertion on value passed with :setup action * WIP Fix bug in executing handle_buffer while handling actions from previous callback * Fix tests wip * Fix CI * Update changelog * Stopt calling handle_spec_started in between handling actions * Make demands test more strict * Add dots to changelog * Fix double tick bug * wip * Deprecate handle_spec_started/3 * Remove unused aliases * Remove unnecessary warning * Bump version to 1.0.1 * Remove leftovers * Fix docs * Implement suggestions from CR, bump version to 1.1.0-rc --- CHANGELOG.md | 5 +- lib/membrane/bin.ex | 9 +- lib/membrane/core/callback_handler.ex | 13 ++ lib/membrane/core/element.ex | 13 -- lib/membrane/core/element/action_handler.ex | 66 +++--- lib/membrane/core/element/demand_handler.ex | 9 + lib/membrane/core/element/event_controller.ex | 4 +- lib/membrane/core/lifecycle_controller.ex | 18 +- lib/membrane/core/parent.ex | 17 ++ .../child_life_controller/startup_utils.ex | 21 +- lib/membrane/core/pipeline/action_handler.ex | 10 + lib/membrane/core/pipeline/state.ex | 6 +- lib/membrane/core/timer.ex | 21 +- lib/membrane/core/timer_controller.ex | 7 + lib/membrane/pipeline.ex | 9 +- lib/membrane/testing/pipeline.ex | 7 +- mix.exs | 2 +- .../core/element/action_handler_test.exs | 11 - .../core/element/event_controller_test.exs | 3 + .../element/lifecycle_controller_test.exs | 2 + .../core/element/pad_controller_test.exs | 3 + .../element/stream_format_controller_test.exs | 3 + test/membrane/core/pipeline_test.exs | 2 + .../actions_handling_order_test.exs | 191 ++++++++++++++++++ test/membrane/integration/demands_test.exs | 3 +- test/membrane/integration/linking_test.exs | 48 ++--- .../integration/toilet_forwarding_test.exs | 3 +- 27 files changed, 382 insertions(+), 124 deletions(-) create mode 100644 test/membrane/integration/actions_handling_order_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index da6ceab62..283e1bbee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,11 @@ # Changelog +## 1.1.0-rc0 + * Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708) + * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) + ## 1.0.1 * Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614) - * Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693) * Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626) * Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680) * Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681) diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index d13151fd4..b2968073e 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -167,9 +167,11 @@ defmodule Membrane.Bin do ) :: callback_return @doc """ + This callback is deprecated since v1.1.0-rc0 + Callback invoked when children of `Membrane.ChildrenSpec` are started. - By default, it does nothing. + It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens. """ @callback handle_spec_started( children :: [Child.name()], @@ -309,6 +311,7 @@ defmodule Membrane.Bin do alias unquote(__MODULE__) @behaviour unquote(__MODULE__) @before_compile unquote(__MODULE__) + @after_compile {Membrane.Core.Parent, :check_deprecated_callbacks} unquote(bring_spec) unquote(bring_pad) @@ -354,9 +357,6 @@ defmodule Membrane.Bin do {[], state} end - @impl true - def handle_spec_started(new_children, _ctx, state), do: {[], state} - @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -381,7 +381,6 @@ defmodule Membrane.Bin do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 30cb8d3bc..91729aef0 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -8,6 +8,7 @@ defmodule Membrane.Core.CallbackHandler do use Bunch alias Membrane.CallbackError + alias Membrane.Core.Component require Membrane.Logger @@ -191,6 +192,13 @@ defmodule Membrane.Core.CallbackHandler do was_handling_action? = state.handling_action? state = %{state | handling_action?: true} + # Updating :supplying_demand? flag value here is a temporal fix. + # Setting it to `true` while handling actions causes postponing calls + # of handle_redemand/2 and supply_demand/2 until a moment, when all + # actions returned from the callback are handled + was_supplying_demand? = Map.get(state, :supplying_demand?, false) + state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state + state = Enum.reduce(actions, state, fn action, state -> try do @@ -210,6 +218,11 @@ defmodule Membrane.Core.CallbackHandler do do: state, else: %{state | handling_action?: false} + state = + if Component.is_element?(state) and not was_supplying_demand?, + do: %{state | supplying_demand?: false}, + else: state + handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index b6500a593..8f673455f 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -260,20 +260,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:timer_tick, timer_id), state) do - # Guarding the `TimerController.handle_tick/2` invocation is - # required since there might be a case in which `handle_tick` - # callback's implementation returns demand action. - # In this scenario, without this guard, there would a possibility that - # the `handle_buffer` would be called immediately, returning - # some action that would affect the timer and the original state - # of the timer, set with actions returned from `handle_tick`, - # would be overwritten with that action. - # - # For more information see: https://github.com/membraneframework/membrane_core/issues/670 - state = %{state | supplying_demand?: true} state = TimerController.handle_tick(timer_id, state) - state = %{state | supplying_demand?: false} - state = Membrane.Core.Element.DemandHandler.handle_delayed_demands(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 759237203..3650b58ad 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -38,22 +38,47 @@ defmodule Membrane.Core.Element.ActionHandler do require Membrane.Logger @impl CallbackHandler - def transform_actions(actions, callback, _handler_params, state) do + def transform_actions(actions, _callback, _handler_params, state) do actions = join_buffers(actions) - ensure_nothing_after_redemand(actions, callback, state) {actions, state} end defguardp is_demand_size(size) when is_integer(size) or is_function(size) @impl CallbackHandler - def handle_end_of_actions(state) when not state.handling_action? do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) + def handle_end_of_actions(state) do + # Fixed order of handling demand of manual and auto pads would lead to + # favoring manual pads over auto pads (or vice versa), especially after + # introducting auto flow queues. + manual_demands_first? = Enum.random([1, 2]) == 1 + + state = + if manual_demands_first?, + do: maybe_handle_delayed_demands(state), + else: state + + state = maybe_handle_pads_to_snapshot(state) + + state = + if manual_demands_first?, + do: state, + else: maybe_handle_delayed_demands(state) + + state end - @impl CallbackHandler - def handle_end_of_actions(state), do: state + defp maybe_handle_delayed_demands(state) do + with %{supplying_demand?: false} <- state do + DemandHandler.handle_delayed_demands(state) + end + end + + defp maybe_handle_pads_to_snapshot(state) do + with %{handling_action?: false} <- state do + Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) + end + end @impl CallbackHandler def handle_action({action, _}, :handle_init, _params, _state) @@ -284,30 +309,6 @@ defmodule Membrane.Core.Element.ActionHandler do ) end - defp ensure_nothing_after_redemand(actions, callback, state) do - {redemands, actions_after_redemands} = - actions - |> Enum.drop_while(fn - {:redemand, _args} -> false - _other_action -> true - end) - |> Enum.split_while(fn - {:redemand, _args} -> true - _other_action -> false - end) - - case {redemands, actions_after_redemands} do - {_redemands, []} -> - :ok - - {[redemand | _redemands], _actions_after_redemands} -> - raise ActionError, - reason: :actions_after_redemand, - action: redemand, - callback: {state.module, callback} - end - end - @spec send_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() defp send_buffer(_pad_ref, [], state) do state @@ -470,7 +471,8 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> AutoFlowUtils.pop_queues_and_bump_demand() else diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 74c6ce05a..1d22d7eb9 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do end end + @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() + def remove_pad_from_delayed_demands(pad_ref, state) do + Map.update!(state, :delayed_demands, fn delayed_demands_set -> + delayed_demands_set + |> MapSet.delete({pad_ref, :supply}) + |> MapSet.delete({pad_ref, :redemand}) + end) + end + @spec handle_input_queue_output( Pad.ref(), [InputQueue.output_value()], diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index e4aff679e..f6b59a34b 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, + DemandHandler, InputQueue, PlaybackQueue, State @@ -108,7 +109,8 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - PadModel.set_data!(state, pad_ref, :end_of_stream?, true) + DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/lifecycle_controller.ex b/lib/membrane/core/lifecycle_controller.ex index cbfc310cd..5e4e03f72 100644 --- a/lib/membrane/core/lifecycle_controller.ex +++ b/lib/membrane/core/lifecycle_controller.ex @@ -14,12 +14,16 @@ defmodule Membrane.Core.LifecycleController do def handle_setup_operation(operation, state) do :ok = assert_operation_allowed!(operation, state.setup_incomplete?) - case operation do - :incomplete -> + cond do + operation == :incomplete -> Membrane.Logger.debug("Component deferred initialization") %{state | setup_incomplete?: true} - :complete -> + Component.is_pipeline?(state) -> + # complete_setup/1 will be called in Membrane.Core.Pipeline.ActionHandler.handle_end_of_actions/1 + %{state | awaiting_setup_completition?: true} + + Component.is_child?(state) -> complete_setup(state) end end @@ -52,5 +56,13 @@ defmodule Membrane.Core.LifecycleController do """ end + defp assert_operation_allowed!(operation, _status) + when operation not in [:incomplete, :complete] do + raise SetupError, """ + Action {:setup, #{inspect(operation)}} was returned, but second element in the tuple must + be :complete or :incomplete + """ + end + defp assert_operation_allowed!(_operation, _status), do: :ok end diff --git a/lib/membrane/core/parent.ex b/lib/membrane/core/parent.ex index 4f0fb5f8a..f4471b697 100644 --- a/lib/membrane/core/parent.ex +++ b/lib/membrane/core/parent.ex @@ -2,4 +2,21 @@ defmodule Membrane.Core.Parent do @moduledoc false @type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t() + + @spec check_deprecated_callbacks(Macro.Env.t(), binary) :: :ok + def check_deprecated_callbacks(env, _bytecode) do + modules_whitelist = [Membrane.Testing.Pipeline] + + if env.module not in modules_whitelist and + Module.defines?(env.module, {:handle_spec_started, 3}, :def) do + warn_message = """ + Callback handle_spec_started/3 has been deprecated since \ + :membrane_core v1.1.0-rc0, but it is implemented in #{inspect(env.module)} + """ + + IO.warn(warn_message, []) + end + + :ok + end end diff --git a/lib/membrane/core/parent/child_life_controller/startup_utils.ex b/lib/membrane/core/parent/child_life_controller/startup_utils.ex index a4d4d8884..70cc96de8 100644 --- a/lib/membrane/core/parent/child_life_controller/startup_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/startup_utils.ex @@ -104,15 +104,20 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do @spec exec_handle_spec_started([Membrane.Child.name()], Parent.state()) :: Parent.state() def exec_handle_spec_started(children_names, state) do - action_handler = Component.action_handler(state) - - CallbackHandler.exec_and_handle_callback( - :handle_spec_started, - action_handler, - %{context: &Component.context_from_state/1}, - [children_names], + # handle_spec_started/3 callback is deprecated, so we don't require its implementation + if function_exported?(state.module, :handle_spec_started, 3) do + action_handler = Component.action_handler(state) + + CallbackHandler.exec_and_handle_callback( + :handle_spec_started, + action_handler, + %{context: &Component.context_from_state/1}, + [children_names], + state + ) + else state - ) + end end @spec check_if_children_names_and_children_groups_ids_are_unique( diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 92065ed43..382a4f5a8 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -8,6 +8,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.Core.Parent.LifecycleController alias Membrane.Core.Pipeline.State + require Membrane.Logger + @impl CallbackHandler def handle_action({:spec, args}, _cb, _params, %State{terminating?: true}) do raise Membrane.ParentError, @@ -103,4 +105,12 @@ defmodule Membrane.Core.Pipeline.ActionHandler do def handle_action(action, _callback, _params, _state) do raise ActionError, action: action, reason: {:unknown_action, Membrane.Pipeline.Action} end + + @impl CallbackHandler + def handle_end_of_actions(state) do + with %{awaiting_setup_completition?: true} <- state do + %{state | awaiting_setup_completition?: false} + |> Membrane.Core.LifecycleController.complete_setup() + end + end end diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 59ecaded7..6f644947b 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -34,7 +34,8 @@ defmodule Membrane.Core.Pipeline.State do setup_incomplete?: boolean(), handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), - subprocess_supervisor: pid() + subprocess_supervisor: pid(), + awaiting_setup_completition?: boolean() } # READ THIS BEFORE ADDING NEW FIELD!!! @@ -58,5 +59,6 @@ defmodule Membrane.Core.Pipeline.State do handling_action?: false, stalker: nil, resource_guard: nil, - subprocess_supervisor: nil + subprocess_supervisor: nil, + awaiting_setup_completition?: false end diff --git a/lib/membrane/core/timer.ex b/lib/membrane/core/timer.ex index 5a23c957c..bd4eab23f 100644 --- a/lib/membrane/core/timer.ex +++ b/lib/membrane/core/timer.ex @@ -15,11 +15,13 @@ defmodule Membrane.Core.Timer do clock: Clock.t(), next_tick_time: Time.t(), ratio: Clock.ratio(), - timer_ref: reference() | nil + timer_ref: reference() | nil, + awaiting_message?: boolean() } @enforce_keys [:interval, :clock, :init_time, :id] - defstruct @enforce_keys ++ [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil] + defstruct @enforce_keys ++ + [next_tick_time: 0, ratio: Ratio.new(1), timer_ref: nil, awaiting_message?: false] @spec start(id, interval, Clock.t()) :: t def start(id, interval, clock) do @@ -42,8 +44,14 @@ defmodule Membrane.Core.Timer do %__MODULE__{timer | ratio: ratio} end + @spec handle_message_arrived(t) :: t + def handle_message_arrived(%__MODULE__{awaiting_message?: true} = timer) do + %{timer | awaiting_message?: false} + end + @spec tick(t) :: t - def tick(%__MODULE__{interval: :no_interval} = timer) do + def tick(%__MODULE__{} = timer) + when timer.awaiting_message? or timer.interval == :no_interval do timer end @@ -67,7 +75,12 @@ defmodule Membrane.Core.Timer do timer_ref = Process.send_after(self(), Message.new(:timer_tick, id), beam_next_tick_time, abs: true) - %__MODULE__{timer | next_tick_time: next_tick_time |> Ratio.floor(), timer_ref: timer_ref} + %__MODULE__{ + timer + | next_tick_time: next_tick_time |> Ratio.floor(), + timer_ref: timer_ref, + awaiting_message?: true + } end @spec set_interval(t, interval) :: t diff --git a/lib/membrane/core/timer_controller.ex b/lib/membrane/core/timer_controller.ex index 85e154fcb..a5c5faa64 100644 --- a/lib/membrane/core/timer_controller.ex +++ b/lib/membrane/core/timer_controller.ex @@ -58,6 +58,13 @@ defmodule Membrane.Core.TimerController do @spec handle_tick(Timer.id(), Component.state()) :: Component.state() def handle_tick(timer_id, state) when is_timer_present(timer_id, state) do + state = + update_in( + state, + [:synchronization, :timers, timer_id], + &Timer.handle_message_arrived/1 + ) + state = CallbackHandler.exec_and_handle_callback( :handle_tick, diff --git a/lib/membrane/pipeline.ex b/lib/membrane/pipeline.ex index e3da30050..491c76289 100644 --- a/lib/membrane/pipeline.ex +++ b/lib/membrane/pipeline.ex @@ -208,9 +208,11 @@ defmodule Membrane.Pipeline do ) :: {[Action.common_actions()], state()} @doc """ + This callback is deprecated since v1.1.0-rc0. + Callback invoked when children of `Membrane.ChildrenSpec` are started. - By default, it does nothing. + It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens. """ @callback handle_spec_started( children :: [Child.name()], @@ -469,6 +471,7 @@ defmodule Membrane.Pipeline do alias unquote(__MODULE__) require Membrane.Logger @behaviour unquote(__MODULE__) + @after_compile {Membrane.Core.Parent, :check_deprecated_callbacks} unquote(bring_spec) unquote(bring_pad) @@ -512,9 +515,6 @@ defmodule Membrane.Pipeline do {[], state} end - @impl true - def handle_spec_started(new_children, _ctx, state), do: {[], state} - @impl true def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state} @@ -538,7 +538,6 @@ defmodule Membrane.Pipeline do handle_setup: 2, handle_playing: 2, handle_info: 3, - handle_spec_started: 3, handle_element_start_of_stream: 4, handle_element_end_of_stream: 4, handle_child_notification: 4, diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 7d4084ef5..73becaa8c 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -522,7 +522,12 @@ defmodule Membrane.Testing.Pipeline do do: {[], nil} defp eval_injected_module_callback(callback, args, state) do - apply(state.module, callback, args ++ [state.custom_pipeline_state]) + if callback != :handle_spec_started or + function_exported?(state.module, :handle_spec_started, 3) do + apply(state.module, callback, args ++ [state.custom_pipeline_state]) + else + {[], state.custom_pipeline_state} + end end defp notify_test_process(test_process, message) do diff --git a/mix.exs b/mix.exs index ac1df050b..7f7033497 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Membrane.Mixfile do use Mix.Project - @version "1.0.1" + @version "1.1.0-rc0" @source_ref "v#{@version}" def project do diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 5a82d92cd..06b1913ae 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -546,16 +546,5 @@ defmodule Membrane.Core.Element.ActionHandlerTest do end ) end - - test "when :redemand is not the last action", %{state: state} do - assert_raise ActionError, ~r/redemand.*last/i, fn -> - @module.transform_actions( - [redemand: :output, notify_parent: :a, notify_parent: :b], - :handle_other, - %{}, - state - ) - end - end end end diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 356501e6d..27928d271 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -51,7 +51,10 @@ defmodule Membrane.Core.Element.EventControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), + handle_demand_loop_counter: 0, pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 53d06f2cd..3a9be602b 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -50,7 +50,9 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index e52ea842b..82b6a5926 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -18,7 +18,10 @@ defmodule Membrane.Core.Element.PadControllerTest do struct!(State, name: name, module: elem_module, + handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), parent_pid: self(), internal_state: %{}, synchronization: %{clock: nil, parent_clock: nil}, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 1235e26a3..d94e93030 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -42,7 +42,10 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, handling_action?: false, + supplying_demand?: false, pads_to_snapshot: MapSet.new(), + delayed_demands: MapSet.new(), + handle_demand_loop_counter: 0, pads_data: %{ input: struct(Membrane.Element.PadData, diff --git a/test/membrane/core/pipeline_test.exs b/test/membrane/core/pipeline_test.exs index 5ef8f2f7d..d59fc2b2c 100644 --- a/test/membrane/core/pipeline_test.exs +++ b/test/membrane/core/pipeline_test.exs @@ -79,6 +79,7 @@ defmodule Membrane.Core.PipelineTest do [], state ) + |> ActionHandler.handle_end_of_actions() end end @@ -92,6 +93,7 @@ defmodule Membrane.Core.PipelineTest do [], state ) + |> ActionHandler.handle_end_of_actions() end end end diff --git a/test/membrane/integration/actions_handling_order_test.exs b/test/membrane/integration/actions_handling_order_test.exs new file mode 100644 index 000000000..f54ac0d49 --- /dev/null +++ b/test/membrane/integration/actions_handling_order_test.exs @@ -0,0 +1,191 @@ +defmodule Membrane.Integration.ActionsHandlingOrderTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Testing + + defmodule TickingPipeline do + use Membrane.Pipeline + + @tick_time Membrane.Time.milliseconds(100) + + @impl true + def handle_init(_ctx, test_process: test_process), + do: {[], %{ticked?: false, test_process: test_process}} + + @impl true + def handle_setup(_ctx, state) do + {[setup: :incomplete, start_timer: {:one, @tick_time}], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[timer_interval: {:one, @tick_time}], state} + end + + @impl true + def handle_tick(:one, _ctx, %{ticked?: false} = state) do + {[setup: :complete, timer_interval: {:one, :no_interval}], %{state | ticked?: true}} + end + + @impl true + def handle_tick(:one, _ctx, state) do + send(state.test_process, :ticked_two_times) + {[timer_interval: {:one, :no_interval}], state} + end + end + + defmodule NotifyingPipeline do + use Membrane.Pipeline + + alias Membrane.Integration.ActionsHandlingOrderTest.NotifyingPipelineChild + + @impl true + def handle_init(_ctx, _opts) do + spec = child(:child, NotifyingPipelineChild) + {[spec: spec], %{}} + end + + @impl true + def handle_setup(_ctx, state) do + self() |> send(:time_to_play) + {[setup: :incomplete], state} + end + + @impl true + def handle_playing(_ctx, state) do + {[notify_child: {:child, :second_notification}], state} + end + + @impl true + def handle_info(:time_to_play, _ctx, state) do + {[setup: :complete, notify_child: {:child, :first_notification}], state} + end + + @impl true + def handle_info({:get_notifications, test_process}, _ctx, state) do + actions = [notify_child: {:child, :get_notifications}] + state = Map.put(state, :test_process, test_process) + + {actions, state} + end + + @impl true + def handle_child_notification(notifications, :child, _ctx, state) do + send(state.test_process, {:notifications, notifications}) + {[], state} + end + end + + defmodule NotifyingPipelineChild do + use Membrane.Filter + + @impl true + def handle_init(_ctx, _opts), do: {[], %{}} + + @impl true + def handle_parent_notification(:get_notifications, _ctx, state) do + {[notify_parent: state.notifications], state} + end + + @impl true + def handle_parent_notification(notification, _ctx, state) do + state = Map.update(state, :notifications, [notification], &(&1 ++ [notification])) + {[], state} + end + end + + defmodule TickingSink do + use Membrane.Sink + + @short_tick_time Membrane.Time.milliseconds(100) + @long_tick_time Membrane.Time.seconds(2) + + def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any + + @impl true + def handle_init(_ctx, _opts), do: {[], %{tick_counter: 0}} + + @impl true + def handle_parent_notification(:start_timer, _ctx, state) do + {[start_timer: {:timer, @short_tick_time}], state} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 0} = state) do + actions = [ + demand: {:input, 1}, + timer_interval: {:timer, :no_interval} + ] + + {actions, %{state | tick_counter: 1}} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 1} = state) do + actions = [ + notify_parent: :second_tick, + timer_interval: {:timer, @long_tick_time} + ] + + {actions, %{state | tick_counter: 2}} + end + + @impl true + def handle_tick(:timer, _ctx, %{tick_counter: 2} = state) do + {[notify_parent: :third_tick], %{state | tick_counter: 3}} + end + + @impl true + def handle_buffer(:input, _buffer, _ctx, state) do + {[timer_interval: {:timer, @short_tick_time}], state} + end + end + + test "order of handling :tick action" do + {:ok, _supervisor, pipeline} = + Membrane.Pipeline.start_link(TickingPipeline, test_process: self()) + + assert_receive :ticked_two_times + + Membrane.Pipeline.terminate(pipeline) + end + + test "order of handling :notify_child action" do + {:ok, _supervisor, pipeline} = Membrane.Pipeline.start_link(NotifyingPipeline) + + # time for pipeline to play + Process.sleep(500) + + send(pipeline, {:get_notifications, self()}) + + assert_receive {:notifications, [:first_notification, :second_notification]} + + Membrane.Pipeline.terminate(pipeline) + end + + test ":demand and :timer_interval actions don't interact with each other" do + spec = + child(:source, %Testing.Source{output: [<<>>]}) + |> child(:sink, TickingSink) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + # time for pipeline to play + Process.sleep(100) + + Testing.Pipeline.message_child(pipeline, :sink, :start_timer) + + assert_pipeline_notified(pipeline, :sink, :second_tick) + + # third tick should arrive after two seconds, not ealier + refute_pipeline_notified(pipeline, :sink, :third_tick, 1_500) + assert_pipeline_notified(pipeline, :sink, :third_tick) + + assert Testing.Pipeline.get_child_pid!(pipeline, :source) |> Process.alive?() + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index 3f3b30b2c..dacd0d60a 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -15,7 +15,8 @@ defmodule Membrane.Integration.DemandsTest do defp assert_buffers_received(range, pid) do Enum.each(range, fn i -> - assert_sink_buffer(pid, :sink, %Buffer{payload: <<^i::16>> <> <<255>>}) + assert_sink_buffer(pid, :sink, buffer) + assert %Buffer{payload: <<^i::16>> <> <<255>>} = buffer end) end diff --git a/test/membrane/integration/linking_test.exs b/test/membrane/integration/linking_test.exs index b23cde06c..a2a9e5081 100644 --- a/test/membrane/integration/linking_test.exs +++ b/test/membrane/integration/linking_test.exs @@ -110,12 +110,6 @@ defmodule Membrane.Integration.LinkingTest do def handle_info(_msg, _ctx, state) do {[], state} end - - @impl true - def handle_spec_started(_children, _ctx, state) do - send(state.testing_pid, :spec_started) - {[], state} - end end setup do @@ -139,12 +133,13 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"a"}) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"b"}) assert_sink_buffer(pipeline, :sink, %Buffer{payload: ~c"c"}) send(pipeline, {:remove_children, :sink}) assert_pipeline_notified(pipeline, :bin, :handle_pad_removed) + + Membrane.Pipeline.terminate(pipeline) end test "and element crashes, bin forwards the unlink message to child", %{pipeline: pipeline} do @@ -166,10 +161,7 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: bin_spec}}) - assert_receive(:spec_started) - send(pipeline, {:start_spec, %{spec: sink_spec}}) - assert_receive(:spec_started) sink_pid = get_child_pid(:sink, pipeline) bin_pid = get_child_pid(:bin, pipeline) @@ -188,6 +180,8 @@ defmodule Membrane.Integration.LinkingTest do match?(%Membrane.PadError{}, error) assert error.message =~ ~r/static.*pad.*unlink/u + + Membrane.Pipeline.terminate(pipeline) end end @@ -208,13 +202,12 @@ defmodule Membrane.Integration.LinkingTest do ] send(pipeline, {:start_spec, %{spec: spec}}) - send(pipeline, {:kill, [:sink]}) - assert_receive(:spec_started) - assert_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) + + Membrane.Pipeline.terminate(pipeline) end test "element shouldn't crash when its neighbor connected via dynamic pad crashes", %{ @@ -237,12 +230,12 @@ defmodule Membrane.Integration.LinkingTest do spec = [spec_1, spec_2, links_spec] send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) - send(pipeline, {:kill, [:sink]}) refute_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) + + Membrane.Pipeline.terminate(pipeline) end test "element shouldn't crash when its neighbor connected via dynamic pad crashes and the crash groups are set within nested spec", @@ -264,31 +257,12 @@ defmodule Membrane.Integration.LinkingTest do |> get_child(:sink) send(pipeline, {:start_spec, %{spec: [spec, links_spec]}}) - assert_receive(:spec_started) - send(pipeline, {:kill, [:sink]}) refute_pipeline_crash_group_down(pipeline, :group_1) assert_pipeline_crash_group_down(pipeline, :group_2) - end - - test "pipeline playback should change successfully after spec with links has been returned", - %{pipeline: pipeline} do - bin_spec = { - child(:bin, %Bin{child: %Testing.Source{output: [~c"a", ~c"b", ~c"c"]}}), - group: :group_1, crash_group_mode: :temporary - } - - sink_spec = { - child(:sink, Testing.Sink), - group: :group_1, crash_group_mode: :temporary - } - links_spec = get_child(:bin) |> get_child(:sink) - - spec = [bin_spec, sink_spec, links_spec] - send(pipeline, {:start_spec, %{spec: spec}}) - assert_receive(:spec_started) + Membrane.Pipeline.terminate(pipeline) end defmodule SlowSetupSink do @@ -349,6 +323,8 @@ defmodule Membrane.Integration.LinkingTest do {element, pad} end) + + Membrane.Pipeline.terminate(pipeline) end test "Elements and bins can be spawned, linked and removed" do @@ -507,6 +483,8 @@ defmodule Membrane.Integration.LinkingTest do refute_link_removed(pipeline, i) end end + + Membrane.Pipeline.terminate(pipeline) end describe "Spec shouldn't wait on links with" do diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs index 1f240dfff..37097add8 100644 --- a/test/membrane/integration/toilet_forwarding_test.exs +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -234,7 +234,8 @@ defmodule Membrane.Integration.ToiletForwardingTest do ) for i <- 1..3000 do - assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <>}) + assert_sink_buffer(pipeline, :sink, buffer) + assert %Membrane.Buffer{payload: <>} = buffer assert buff_idx == i end