From 0f7b0ca68b967a16c918c29f08267cc1fd5dc348 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 26 Mar 2024 14:14:24 +0100 Subject: [PATCH] Add new delayed demands loop tests --- lib/membrane/core/callback_handler.ex | 8 +- lib/membrane/core/element/action_handler.ex | 42 ++++---- .../core/element/demand_controller.ex | 5 +- lib/membrane/core/pipeline/action_handler.ex | 2 +- test/membrane/core/pipeline_test.exs | 2 - .../integration/delayed_demands_loop_test.exs | 96 +++++++++++++++++++ 6 files changed, 127 insertions(+), 28 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 32a4c686f..f7bc0c6e7 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -31,7 +31,7 @@ defmodule Membrane.Core.CallbackHandler do @callback transform_actions(actions :: list, callback :: atom, handler_params, state) :: {actions :: list, state} - @callback handle_end_of_actions(state) :: state + @callback handle_end_of_actions(callback :: atom, state) :: state defmacro __using__(_args) do quote location: :keep do @@ -44,7 +44,7 @@ defmodule Membrane.Core.CallbackHandler do end @impl unquote(__MODULE__) - def handle_end_of_actions(state) do + def handle_end_of_actions(_callback, state) do state end @@ -133,7 +133,7 @@ defmodule Membrane.Core.CallbackHandler do %{context: context_fun}, %{module: module, internal_state: internal_state} = state ) do - args = args ++ [context_fun.(state), internal_state] + args = args ++ [context_fun.(state) |> Map.put(:s, state), internal_state] callback_result = try do @@ -223,6 +223,6 @@ defmodule Membrane.Core.CallbackHandler do do: %{state | supplying_demand?: false}, else: state - handler_module.handle_end_of_actions(state) + handler_module.handle_end_of_actions(callback, state) end end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 7611c668c..464aacc65 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -46,38 +46,39 @@ defmodule Membrane.Core.Element.ActionHandler do defguardp is_demand_size(size) when is_integer(size) or is_function(size) @impl CallbackHandler - def handle_end_of_actions(state) do + def handle_end_of_actions(callback, state) do # Fixed order of handling demand of manual and auto pads would lead to # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. - manual_demands_first? = Enum.random([1, 2]) == 1 - state = - if manual_demands_first?, - do: maybe_handle_delayed_demands(state), - else: state - - state = maybe_handle_pads_to_snapshot(state) - - state = - if manual_demands_first?, - do: state, - else: maybe_handle_delayed_demands(state) - - state + if Enum.random([1, 2]) == 1 do + snapshot(callback, state) + |> hdd() + else + state + |> hdd() + |> then(&snapshot(callback, &1)) + end end - defp maybe_handle_delayed_demands(state) do + defp hdd(state) do with %{supplying_demand?: false} <- state do DemandHandler.handle_delayed_demands(state) end end - defp maybe_handle_pads_to_snapshot(state) do - # with %{handling_action?: false} <- state do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + defp snapshot(callback, state) do + # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might + # be executed in between handling actions returned from other callbacks. + # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. + if callback != :handle_spec_started do + state.pads_to_snapshot + |> Enum.shuffle() + |> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2) |> Map.put(:pads_to_snapshot, MapSet.new()) - # end + else + state + end end @impl CallbackHandler @@ -342,6 +343,7 @@ defmodule Membrane.Core.Element.ActionHandler do stalker_metrics: stalker_metrics } when stream_format != nil <- pad_data do + # todo: move this function to one of the controllers, to avoid redundant PadModet.get_data in the function below state = DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) :atomics.add(stalker_metrics.total_buffers, 1, length(buffers)) Message.send(pid, :buffer, buffers, for_pad: other_ref) diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index daa5fb784..5e8525a87 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -50,7 +50,10 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - if AtomicDemand.get(pad_data.atomic_demand) > 0 do + atomic_value = AtomicDemand.get(pad_data.atomic_demand) + state = PadModel.set_data!(state, pad_data.ref, :demand, atomic_value) + + if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) |> AutoFlowUtils.pop_queues_and_bump_demand() diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 382a4f5a8..7b4ebe153 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -107,7 +107,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_end_of_actions(state) do + def handle_end_of_actions(_callback, state) do with %{awaiting_setup_completition?: true} <- state do %{state | awaiting_setup_completition?: false} |> Membrane.Core.LifecycleController.complete_setup() diff --git a/test/membrane/core/pipeline_test.exs b/test/membrane/core/pipeline_test.exs index d59fc2b2c..5ef8f2f7d 100644 --- a/test/membrane/core/pipeline_test.exs +++ b/test/membrane/core/pipeline_test.exs @@ -79,7 +79,6 @@ defmodule Membrane.Core.PipelineTest do [], state ) - |> ActionHandler.handle_end_of_actions() end end @@ -93,7 +92,6 @@ defmodule Membrane.Core.PipelineTest do [], state ) - |> ActionHandler.handle_end_of_actions() end end end diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs index 7688cc744..da614c980 100644 --- a/test/membrane/integration/delayed_demands_loop_test.exs +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -80,4 +80,100 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do Testing.Pipeline.terminate(pipeline) end + + defmodule VariousFlowFilter do + use Membrane.Filter + + def_input_pad :manual_input, + accepted_format: _any, + flow_control: :manual, + demand_unit: :buffers + + def_input_pad :auto_input, accepted_format: _any, flow_control: :auto + + def_output_pad :manual_output, accepted_format: _any, flow_control: :manual + def_output_pad :auto_output, accepted_format: _any, flow_control: :auto + + defmodule StreamFormat do + defstruct [] + end + + @impl true + def handle_playing(_ctx, _state) do + actions = + [:manual_output, :auto_output] + |> Enum.map(&{:stream_format, {&1, %StreamFormat{}}}) + + {actions, %{}} + end + + @impl true + def handle_demand(:manual_output, size, :buffers, _ctx, state) do + {[demand: {:manual_input, size}], state} + end + + @impl true + def handle_buffer(_pad, buffer, _ctx, state) do + # Aim of this Process.sleep is to make VariousFlowFilter working slower than Testing.Sinks + Process.sleep(1) + + actions = + [:manual_output, :auto_output] + |> Enum.map(&{:buffer, {&1, buffer}}) + + {actions, state} + end + + @impl true + def handle_end_of_stream(_pad, _ctx, state) do + {[], state} + end + end + + test "manual pad doesn't starve auto pad" do + buffers_per_source = 10_000 + input_demand_size = 100 + + manual_source_buffers = + Stream.repeatedly(fn -> %Buffer{metadata: :manual, payload: <<>>} end) + |> Stream.take(buffers_per_source) + + auto_source_buffers = + Stream.repeatedly(fn -> %Buffer{metadata: :auto, payload: <<>>} end) + |> Stream.take(buffers_per_source) + + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: [ + child(:manual_source, %Testing.Source{output: manual_source_buffers}) + |> via_in(:manual_input, target_queue_size: input_demand_size) + |> child(:filter, VariousFlowFilter) + |> via_out(:manual_output) + |> child(:manual_sink, Testing.Sink), + child(:auto_source, %Testing.Source{output: auto_source_buffers}) + |> via_in(:auto_input, auto_demand_size: input_demand_size) + |> get_child(:filter) + |> via_out(:auto_output) + |> child(:auto_sink, Testing.Sink) + ] + ) + + stats = %{manual: 0, auto: 0} + + Enum.reduce(1..10_000, stats, fn _i, stats -> + assert_sink_buffer(pipeline, :auto_sink, buffer) + stats = Map.update!(stats, buffer.metadata, &(&1 + 1)) + + difference_upperbound = + max(stats.auto, stats.manual) + |> div(2) + |> max(5 * input_demand_size) + + assert abs(stats.auto - stats.manual) <= difference_upperbound + + stats + end) + + Testing.Pipeline.terminate(pipeline) + end end