From 187b205c8151b860c5064278e7b53093097ce7dc Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Thu, 21 Mar 2024 16:04:18 +0100 Subject: [PATCH 01/18] Comment out handling_action? flag --- lib/membrane/core/bin/state.ex | 4 ++-- lib/membrane/core/callback_handler.ex | 12 ++++++------ lib/membrane/core/element/action_handler.ex | 4 ++-- lib/membrane/core/element/state.ex | 4 ++-- lib/membrane/core/pipeline/state.ex | 4 ++-- test/membrane/core/element/action_handler_test.exs | 6 +++--- test/membrane/core/element/event_controller_test.exs | 2 +- .../core/element/lifecycle_controller_test.exs | 2 +- test/membrane/core/element/pad_controller_test.exs | 2 +- .../core/element/stream_format_controller_test.exs | 2 +- 10 files changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index 9ac285763..c917513a5 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,7 +45,7 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - handling_action?: boolean(), + # handling_action?: boolean(), stalker: Membrane.Core.Stalker.t() } @@ -73,7 +73,7 @@ defmodule Membrane.Core.Bin.State do initialized?: false, terminating?: false, setup_incomplete?: false, - handling_action?: false, + # handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 91729aef0..32a4c686f 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -189,8 +189,8 @@ defmodule Membrane.Core.CallbackHandler do reraise e, __STACKTRACE__ end - was_handling_action? = state.handling_action? - state = %{state | handling_action?: true} + # was_handling_action? = state.handling_action? + # state = %{state | handling_action?: true} # Updating :supplying_demand? flag value here is a temporal fix. # Setting it to `true` while handling actions causes postponing calls @@ -213,10 +213,10 @@ defmodule Membrane.Core.CallbackHandler do end end) - state = - if was_handling_action?, - do: state, - else: %{state | handling_action?: false} + # state = + # if was_handling_action?, + # do: state, + # else: %{state | handling_action?: false} state = if Component.is_element?(state) and not was_supplying_demand?, diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 3650b58ad..7611c668c 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -74,10 +74,10 @@ defmodule Membrane.Core.Element.ActionHandler do end defp maybe_handle_pads_to_snapshot(state) do - with %{handling_action?: false} <- state do + # with %{handling_action?: false} <- state do Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) |> Map.put(:pads_to_snapshot, MapSet.new()) - end + # end end @impl CallbackHandler diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 102b98a40..ab168a794 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -42,7 +42,7 @@ defmodule Membrane.Core.Element.State do terminating?: boolean(), setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), - handling_action?: boolean(), + # handling_action?: boolean(), popping_auto_flow_queue?: boolean(), pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), @@ -75,7 +75,7 @@ defmodule Membrane.Core.Element.State do terminating?: false, setup_incomplete?: false, supplying_demand?: false, - handling_action?: false, + # handling_action?: false, popping_auto_flow_queue?: false, stalker: nil, resource_guard: nil, diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index 6f644947b..a7aed0415 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -32,7 +32,7 @@ defmodule Membrane.Core.Pipeline.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - handling_action?: boolean(), + # handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), subprocess_supervisor: pid(), awaiting_setup_completition?: boolean() @@ -56,7 +56,7 @@ defmodule Membrane.Core.Pipeline.State do initialized?: false, terminating?: false, setup_incomplete?: false, - handling_action?: false, + # handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 06b1913ae..ff716e515 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -25,7 +25,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do playback: :stopped, synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), - handling_action?: false, + # handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ input: @@ -110,7 +110,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), playback: :stopped, - handling_action?: false, + # handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ @@ -512,7 +512,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do name: :elem_name, synchronization: %{clock: nil, parent_clock: nil}, type: :source, - handling_action?: false, + # handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 27928d271..a75b2aa41 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -50,7 +50,7 @@ defmodule Membrane.Core.Element.EventControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, - handling_action?: false, + # handling_action?: false, supplying_demand?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 3a9be602b..048c025a2 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -49,7 +49,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, - handling_action?: false, + # handling_action?: false, supplying_demand?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index 82b6a5926..fb8b23a34 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -18,7 +18,7 @@ defmodule Membrane.Core.Element.PadControllerTest do struct!(State, name: name, module: elem_module, - handling_action?: false, + # handling_action?: false, supplying_demand?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index d94e93030..95dae55d1 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -41,7 +41,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, - handling_action?: false, + # handling_action?: false, supplying_demand?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), From 0f7b0ca68b967a16c918c29f08267cc1fd5dc348 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 26 Mar 2024 14:14:24 +0100 Subject: [PATCH 02/18] Add new delayed demands loop tests --- lib/membrane/core/callback_handler.ex | 8 +- lib/membrane/core/element/action_handler.ex | 42 ++++---- .../core/element/demand_controller.ex | 5 +- lib/membrane/core/pipeline/action_handler.ex | 2 +- test/membrane/core/pipeline_test.exs | 2 - .../integration/delayed_demands_loop_test.exs | 96 +++++++++++++++++++ 6 files changed, 127 insertions(+), 28 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 32a4c686f..f7bc0c6e7 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -31,7 +31,7 @@ defmodule Membrane.Core.CallbackHandler do @callback transform_actions(actions :: list, callback :: atom, handler_params, state) :: {actions :: list, state} - @callback handle_end_of_actions(state) :: state + @callback handle_end_of_actions(callback :: atom, state) :: state defmacro __using__(_args) do quote location: :keep do @@ -44,7 +44,7 @@ defmodule Membrane.Core.CallbackHandler do end @impl unquote(__MODULE__) - def handle_end_of_actions(state) do + def handle_end_of_actions(_callback, state) do state end @@ -133,7 +133,7 @@ defmodule Membrane.Core.CallbackHandler do %{context: context_fun}, %{module: module, internal_state: internal_state} = state ) do - args = args ++ [context_fun.(state), internal_state] + args = args ++ [context_fun.(state) |> Map.put(:s, state), internal_state] callback_result = try do @@ -223,6 +223,6 @@ defmodule Membrane.Core.CallbackHandler do do: %{state | supplying_demand?: false}, else: state - handler_module.handle_end_of_actions(state) + handler_module.handle_end_of_actions(callback, state) end end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 7611c668c..464aacc65 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -46,38 +46,39 @@ defmodule Membrane.Core.Element.ActionHandler do defguardp is_demand_size(size) when is_integer(size) or is_function(size) @impl CallbackHandler - def handle_end_of_actions(state) do + def handle_end_of_actions(callback, state) do # Fixed order of handling demand of manual and auto pads would lead to # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. - manual_demands_first? = Enum.random([1, 2]) == 1 - state = - if manual_demands_first?, - do: maybe_handle_delayed_demands(state), - else: state - - state = maybe_handle_pads_to_snapshot(state) - - state = - if manual_demands_first?, - do: state, - else: maybe_handle_delayed_demands(state) - - state + if Enum.random([1, 2]) == 1 do + snapshot(callback, state) + |> hdd() + else + state + |> hdd() + |> then(&snapshot(callback, &1)) + end end - defp maybe_handle_delayed_demands(state) do + defp hdd(state) do with %{supplying_demand?: false} <- state do DemandHandler.handle_delayed_demands(state) end end - defp maybe_handle_pads_to_snapshot(state) do - # with %{handling_action?: false} <- state do - Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2) + defp snapshot(callback, state) do + # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might + # be executed in between handling actions returned from other callbacks. + # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. + if callback != :handle_spec_started do + state.pads_to_snapshot + |> Enum.shuffle() + |> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2) |> Map.put(:pads_to_snapshot, MapSet.new()) - # end + else + state + end end @impl CallbackHandler @@ -342,6 +343,7 @@ defmodule Membrane.Core.Element.ActionHandler do stalker_metrics: stalker_metrics } when stream_format != nil <- pad_data do + # todo: move this function to one of the controllers, to avoid redundant PadModet.get_data in the function below state = DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) :atomics.add(stalker_metrics.total_buffers, 1, length(buffers)) Message.send(pid, :buffer, buffers, for_pad: other_ref) diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index daa5fb784..5e8525a87 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -50,7 +50,10 @@ defmodule Membrane.Core.Element.DemandController do %{flow_control: :auto} = pad_data, %{effective_flow_control: :pull} = state ) do - if AtomicDemand.get(pad_data.atomic_demand) > 0 do + atomic_value = AtomicDemand.get(pad_data.atomic_demand) + state = PadModel.set_data!(state, pad_data.ref, :demand, atomic_value) + + if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) |> AutoFlowUtils.pop_queues_and_bump_demand() diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 382a4f5a8..7b4ebe153 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -107,7 +107,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_end_of_actions(state) do + def handle_end_of_actions(_callback, state) do with %{awaiting_setup_completition?: true} <- state do %{state | awaiting_setup_completition?: false} |> Membrane.Core.LifecycleController.complete_setup() diff --git a/test/membrane/core/pipeline_test.exs b/test/membrane/core/pipeline_test.exs index d59fc2b2c..5ef8f2f7d 100644 --- a/test/membrane/core/pipeline_test.exs +++ b/test/membrane/core/pipeline_test.exs @@ -79,7 +79,6 @@ defmodule Membrane.Core.PipelineTest do [], state ) - |> ActionHandler.handle_end_of_actions() end end @@ -93,7 +92,6 @@ defmodule Membrane.Core.PipelineTest do [], state ) - |> ActionHandler.handle_end_of_actions() end end end diff --git a/test/membrane/integration/delayed_demands_loop_test.exs b/test/membrane/integration/delayed_demands_loop_test.exs index 7688cc744..da614c980 100644 --- a/test/membrane/integration/delayed_demands_loop_test.exs +++ b/test/membrane/integration/delayed_demands_loop_test.exs @@ -80,4 +80,100 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do Testing.Pipeline.terminate(pipeline) end + + defmodule VariousFlowFilter do + use Membrane.Filter + + def_input_pad :manual_input, + accepted_format: _any, + flow_control: :manual, + demand_unit: :buffers + + def_input_pad :auto_input, accepted_format: _any, flow_control: :auto + + def_output_pad :manual_output, accepted_format: _any, flow_control: :manual + def_output_pad :auto_output, accepted_format: _any, flow_control: :auto + + defmodule StreamFormat do + defstruct [] + end + + @impl true + def handle_playing(_ctx, _state) do + actions = + [:manual_output, :auto_output] + |> Enum.map(&{:stream_format, {&1, %StreamFormat{}}}) + + {actions, %{}} + end + + @impl true + def handle_demand(:manual_output, size, :buffers, _ctx, state) do + {[demand: {:manual_input, size}], state} + end + + @impl true + def handle_buffer(_pad, buffer, _ctx, state) do + # Aim of this Process.sleep is to make VariousFlowFilter working slower than Testing.Sinks + Process.sleep(1) + + actions = + [:manual_output, :auto_output] + |> Enum.map(&{:buffer, {&1, buffer}}) + + {actions, state} + end + + @impl true + def handle_end_of_stream(_pad, _ctx, state) do + {[], state} + end + end + + test "manual pad doesn't starve auto pad" do + buffers_per_source = 10_000 + input_demand_size = 100 + + manual_source_buffers = + Stream.repeatedly(fn -> %Buffer{metadata: :manual, payload: <<>>} end) + |> Stream.take(buffers_per_source) + + auto_source_buffers = + Stream.repeatedly(fn -> %Buffer{metadata: :auto, payload: <<>>} end) + |> Stream.take(buffers_per_source) + + pipeline = + Testing.Pipeline.start_link_supervised!( + spec: [ + child(:manual_source, %Testing.Source{output: manual_source_buffers}) + |> via_in(:manual_input, target_queue_size: input_demand_size) + |> child(:filter, VariousFlowFilter) + |> via_out(:manual_output) + |> child(:manual_sink, Testing.Sink), + child(:auto_source, %Testing.Source{output: auto_source_buffers}) + |> via_in(:auto_input, auto_demand_size: input_demand_size) + |> get_child(:filter) + |> via_out(:auto_output) + |> child(:auto_sink, Testing.Sink) + ] + ) + + stats = %{manual: 0, auto: 0} + + Enum.reduce(1..10_000, stats, fn _i, stats -> + assert_sink_buffer(pipeline, :auto_sink, buffer) + stats = Map.update!(stats, buffer.metadata, &(&1 + 1)) + + difference_upperbound = + max(stats.auto, stats.manual) + |> div(2) + |> max(5 * input_demand_size) + + assert abs(stats.auto - stats.manual) <= difference_upperbound + + stats + end) + + Testing.Pipeline.terminate(pipeline) + end end From 27a3b01ce47025781eb9269d001ea53d5202bf98 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 26 Mar 2024 14:45:33 +0100 Subject: [PATCH 03/18] supplying_demand? -> delay_demands? --- lib/membrane/core/callback_handler.ex | 10 +++++----- lib/membrane/core/element/action_handler.ex | 2 +- lib/membrane/core/element/demand_handler.ex | 16 ++++++++-------- lib/membrane/core/element/state.ex | 4 ++-- .../core/element/action_handler_test.exs | 4 ++-- .../core/element/event_controller_test.exs | 2 +- .../core/element/lifecycle_controller_test.exs | 2 +- .../core/element/pad_controller_test.exs | 2 +- .../element/stream_format_controller_test.exs | 2 +- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index f7bc0c6e7..c1a411690 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -192,12 +192,12 @@ defmodule Membrane.Core.CallbackHandler do # was_handling_action? = state.handling_action? # state = %{state | handling_action?: true} - # Updating :supplying_demand? flag value here is a temporal fix. + # Updating :delay_demands? flag value here is a temporal fix. # Setting it to `true` while handling actions causes postponing calls # of handle_redemand/2 and supply_demand/2 until a moment, when all # actions returned from the callback are handled - was_supplying_demand? = Map.get(state, :supplying_demand?, false) - state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state + was_delay_demands? = Map.get(state, :delay_demands?, false) + state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state state = Enum.reduce(actions, state, fn action, state -> @@ -219,8 +219,8 @@ defmodule Membrane.Core.CallbackHandler do # else: %{state | handling_action?: false} state = - if Component.is_element?(state) and not was_supplying_demand?, - do: %{state | supplying_demand?: false}, + if Component.is_element?(state) and not was_delay_demands?, + do: %{state | delay_demands?: false}, else: state handler_module.handle_end_of_actions(callback, state) diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 464aacc65..dc53f1557 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -62,7 +62,7 @@ defmodule Membrane.Core.Element.ActionHandler do end defp hdd(state) do - with %{supplying_demand?: false} <- state do + with %{delay_demands?: false} <- state do DemandHandler.handle_delayed_demands(state) end end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index d7d9ec8e6..1cf00c53d 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -32,7 +32,7 @@ defmodule Membrane.Core.Element.DemandHandler do output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied. """ @spec handle_redemand(Pad.ref(), State.t()) :: State.t() - def handle_redemand(pad_ref, %State{supplying_demand?: true} = state) do + def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) end @@ -42,9 +42,9 @@ defmodule Membrane.Core.Element.DemandHandler do end defp do_handle_redemand(pad_ref, state) do - state = %{state | supplying_demand?: true} + state = %{state | delay_demands?: true} state = exec_handle_demand(pad_ref, state) - %{state | supplying_demand?: false} + %{state | delay_demands?: false} end @doc """ @@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.DemandHandler do end @spec supply_demand(Pad.ref(), State.t()) :: State.t() - def supply_demand(pad_ref, %State{supplying_demand?: true} = state) do + def supply_demand(pad_ref, %State{delay_demands?: true} = state) do Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) end @@ -85,7 +85,7 @@ defmodule Membrane.Core.Element.DemandHandler do defp do_supply_demand(pad_ref, state) do # marking is state that actual demand supply has been started (note changing back to false when finished) - state = %State{state | supplying_demand?: true} + state = %State{state | delay_demands?: true} pad_data = state |> PadModel.get_data!(pad_ref) @@ -94,7 +94,7 @@ defmodule Membrane.Core.Element.DemandHandler do state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue) state = handle_input_queue_output(pad_ref, popped_data, state) - %State{state | supplying_demand?: false} + %State{state | delay_demands?: false} end defp update_demand(pad_ref, size, state) when is_integer(size) do @@ -127,8 +127,8 @@ defmodule Membrane.Core.Element.DemandHandler do # potentially for a long time. cond do - state.supplying_demand? -> - raise "Cannot handle delayed demands while already supplying demand" + state.delay_demands? -> + raise "Cannot handle delayed demands when delay_demands? flag is set to true" state.handle_demand_loop_counter >= @handle_demand_loop_limit -> state = diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index ab168a794..b46526e65 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element.State do pads_info: PadModel.pads_info() | nil, pads_data: PadModel.pads_data() | nil, parent_pid: pid, - supplying_demand?: boolean(), + delay_demands?: boolean(), delayed_demands: MapSet.t({Pad.ref(), :supply | :redemand}), handle_demand_loop_counter: non_neg_integer(), synchronization: %{ @@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.State do initialized?: false, terminating?: false, setup_incomplete?: false, - supplying_demand?: false, + delay_demands?: false, # handling_action?: false, popping_auto_flow_queue?: false, stalker: nil, diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index ff716e515..2c8f71cf9 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -60,7 +60,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do setup :demand_test_filter test "delaying demand", %{state: state} do - state = %{state | playback: :playing, supplying_demand?: true} + state = %{state | playback: :playing, delay_demands?: true} state = @module.handle_action({:demand, {:input, 10}}, :handle_info, %{}, state) assert state.pads_data.input.manual_demand_size == 10 assert MapSet.new([{:input, :supply}]) == state.delayed_demands @@ -489,7 +489,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do test "when pad works in auto or manual flow control mode", %{state: state} do state = - %{state | supplying_demand?: true, playback: :playing} + %{state | delay_demands?: true, playback: :playing} |> PadModel.set_data!(:output, :flow_control, :manual) new_state = diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index a75b2aa41..e9ea8f70a 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -51,7 +51,7 @@ defmodule Membrane.Core.Element.EventControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, # handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), handle_demand_loop_counter: 0, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 048c025a2..74131182c 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -50,7 +50,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, # handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), pads_data: %{ diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index fb8b23a34..a81b1e42f 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -19,7 +19,7 @@ defmodule Membrane.Core.Element.PadControllerTest do name: name, module: elem_module, # handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), parent_pid: self(), diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 95dae55d1..5b68ad29a 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -42,7 +42,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, # handling_action?: false, - supplying_demand?: false, + delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), handle_demand_loop_counter: 0, From 9c309b0ca9677090c5a1f58d9667affd203d4ac4 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 26 Mar 2024 17:34:21 +0100 Subject: [PATCH 04/18] Delete unncessary flags, rename some modules --- lib/membrane/core/callback_handler.ex | 23 +++-------- lib/membrane/core/element.ex | 3 +- lib/membrane/core/element/action_handler.ex | 40 +++++++++++-------- .../core/element/buffer_controller.ex | 11 +++-- .../core/element/demand_controller.ex | 7 ++-- .../{auto_flow_utils.ex => auto.ex} | 2 +- .../manual.ex} | 28 ++++++++----- .../core/element/effective_flow_controller.ex | 3 +- lib/membrane/core/element/event_controller.ex | 8 ++-- lib/membrane/core/element/pad_controller.ex | 6 +-- .../core/element/stream_format_controller.ex | 13 ++++-- lib/membrane/element/pad_data.ex | 2 +- 12 files changed, 74 insertions(+), 72 deletions(-) rename lib/membrane/core/element/demand_controller/{auto_flow_utils.ex => auto.ex} (99%) rename lib/membrane/core/element/{demand_handler.ex => demand_controller/manual.ex} (93%) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index c1a411690..7bff95beb 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do use Bunch alias Membrane.CallbackError - alias Membrane.Core.Component require Membrane.Logger @@ -189,15 +188,8 @@ defmodule Membrane.Core.CallbackHandler do reraise e, __STACKTRACE__ end - # was_handling_action? = state.handling_action? - # state = %{state | handling_action?: true} - - # Updating :delay_demands? flag value here is a temporal fix. - # Setting it to `true` while handling actions causes postponing calls - # of handle_redemand/2 and supply_demand/2 until a moment, when all - # actions returned from the callback are handled - was_delay_demands? = Map.get(state, :delay_demands?, false) - state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state + # was_delay_demands? = Map.get(state, :delay_demands?, false) + # state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state state = Enum.reduce(actions, state, fn action, state -> @@ -214,14 +206,9 @@ defmodule Membrane.Core.CallbackHandler do end) # state = - # if was_handling_action?, - # do: state, - # else: %{state | handling_action?: false} - - state = - if Component.is_element?(state) and not was_delay_demands?, - do: %{state | delay_demands?: false}, - else: state + # if Component.is_element?(state) and not was_delay_demands?, + # do: %{state | delay_demands?: false}, + # else: state handler_module.handle_end_of_actions(callback, state) end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index d76497770..ead730ecf 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -24,7 +24,6 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, DemandController, - DemandHandler, EffectiveFlowController, EventController, LifecycleController, @@ -211,7 +210,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do - state = DemandHandler.resume_delayed_demands_loop(state) + state = DemandController.Manual.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index dc53f1557..ac0beb6cc 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -23,12 +23,10 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ DemandController, - DemandHandler, State, StreamFormatController } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.{Events, TimerController} alias Membrane.Element.Action @@ -51,26 +49,31 @@ defmodule Membrane.Core.Element.ActionHandler do # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. - if Enum.random([1, 2]) == 1 do - snapshot(callback, state) - |> hdd() + # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might + # be executed in between handling actions returned from other callbacks. + # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. + + if callback != :handle_spec_started do + if Enum.random([1, 2]) == 1 do + snapshot(callback, state) + |> hdd() + else + state + |> hdd() + |> then(&snapshot(callback, &1)) + end else state - |> hdd() - |> then(&snapshot(callback, &1)) end end defp hdd(state) do with %{delay_demands?: false} <- state do - DemandHandler.handle_delayed_demands(state) + DemandController.Manual.handle_delayed_demands(state) end end defp snapshot(callback, state) do - # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might - # be executed in between handling actions returned from other callbacks. - # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. if callback != :handle_spec_started do state.pads_to_snapshot |> Enum.shuffle() @@ -179,13 +182,13 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.AutoFlowUtils.pause_demands(in_ref, state) + DemandController.Auto.pause_demands(in_ref, state) end @impl CallbackHandler def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.AutoFlowUtils.resume_demands(in_ref, state) + DemandController.Auto.resume_demands(in_ref, state) end @impl CallbackHandler @@ -417,7 +420,9 @@ defmodule Membrane.Core.Element.ActionHandler do defp supply_demand(pad_ref, size, state) do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.supply_demand(pad_ref, size, state) + # todo: get_data! above could be eradicated + state = DemandController.Manual.update_demand(pad_ref, size, state) + DemandController.Manual.delay_demand_supply(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref @@ -437,7 +442,8 @@ defmodule Membrane.Core.Element.ActionHandler do when type in [:source, :filter, :endpoint] do with %{direction: :output, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.handle_redemand(pad_ref, state) + # todo: get_data! above could be eradicated + DemandController.Manual.delay_redemand(pad_ref, state) else %{direction: :input} -> raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}" @@ -473,10 +479,10 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> DemandController.Auto.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 986a204cf..a40800cd0 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -12,14 +12,13 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandHandler, + DemandController, EventController, InputQueue, PlaybackQueue, State } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel @@ -27,7 +26,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Handles incoming buffer: either stores it in InputQueue, or executes element's - callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2` + callback. Also calls `Membrane.Core.Element.DemandController.Manual.supply_demand/2` to check if there are any unsupplied demands. """ @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() @@ -70,10 +69,10 @@ defmodule Membrane.Core.Element.BufferController do :atomics.put(stalker_metrics.demand, 1, demand - buf_size) if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do - AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state) else state = exec_buffer_callback(pad_ref, buffers, state) - AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state) end end @@ -84,7 +83,7 @@ defmodule Membrane.Core.Element.BufferController do state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) if old_input_queue |> InputQueue.empty?() do - DemandHandler.supply_demand(pad_ref, state) + DemandController.Manual.supply_demand(pad_ref, state) else state end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 5e8525a87..6cb50d739 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,13 +5,12 @@ defmodule Membrane.Core.Element.DemandController do use Bunch - alias __MODULE__.AutoFlowUtils + alias __MODULE__.{Auto, Manual} alias Membrane.Buffer alias Membrane.Core.Element.{ AtomicDemand, - DemandHandler, PlaybackQueue, State } @@ -56,7 +55,7 @@ defmodule Membrane.Core.Element.DemandController do if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> Auto.pop_queues_and_bump_demand() else state end @@ -79,7 +78,7 @@ defmodule Membrane.Core.Element.DemandController do } ) - DemandHandler.handle_redemand(pad_data.ref, state) + Manual.handle_redemand(pad_data.ref, state) else _other -> state end diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto.ex similarity index 99% rename from lib/membrane/core/element/demand_controller/auto_flow_utils.ex rename to lib/membrane/core/element/demand_controller/auto.ex index 62d0b2077..a71602c22 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do +defmodule Membrane.Core.Element.DemandController.Auto do @moduledoc false alias Membrane.Buffer diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_controller/manual.ex similarity index 93% rename from lib/membrane/core/element/demand_handler.ex rename to lib/membrane/core/element/demand_controller/manual.ex index 1cf00c53d..2d6d11f21 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_controller/manual.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandHandler do +defmodule Membrane.Core.Element.DemandController.Manual do @moduledoc false # Module handling demands requested on output pads. @@ -24,6 +24,10 @@ defmodule Membrane.Core.Element.DemandHandler do @handle_demand_loop_limit 20 + def delay_redemand(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) + end + @doc """ Called when redemand action was returned. * If element is currently supplying demand, it means that after finishing `supply_demand` it will call @@ -33,7 +37,7 @@ defmodule Membrane.Core.Element.DemandHandler do """ @spec handle_redemand(Pad.ref(), State.t()) :: State.t() def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) + delay_redemand(pad_ref, state) end def handle_redemand(pad_ref, %State{} = state) do @@ -65,17 +69,12 @@ defmodule Membrane.Core.Element.DemandHandler do """ @spec supply_demand( Pad.ref(), - size :: non_neg_integer | (non_neg_integer() -> non_neg_integer()), State.t() ) :: State.t() - def supply_demand(pad_ref, size, state) do - state = update_demand(pad_ref, size, state) - supply_demand(pad_ref, state) - end @spec supply_demand(Pad.ref(), State.t()) :: State.t() def supply_demand(pad_ref, %State{delay_demands?: true} = state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) + delay_demand_supply(pad_ref, state) end def supply_demand(pad_ref, state) do @@ -83,6 +82,10 @@ defmodule Membrane.Core.Element.DemandHandler do |> handle_delayed_demands() end + def delay_demand_supply(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) + end + defp do_supply_demand(pad_ref, state) do # marking is state that actual demand supply has been started (note changing back to false when finished) state = %State{state | delay_demands?: true} @@ -97,11 +100,16 @@ defmodule Membrane.Core.Element.DemandHandler do %State{state | delay_demands?: false} end - defp update_demand(pad_ref, size, state) when is_integer(size) do + @spec update_demand( + Pad.ref(), + non_neg_integer() | (non_neg_integer() -> non_neg_integer()), + State.t() + ) :: State.t() + def update_demand(pad_ref, size, state) when is_integer(size) do PadModel.set_data!(state, pad_ref, :manual_demand_size, size) end - defp update_demand(pad_ref, size_fun, state) when is_function(size_fun) do + def update_demand(pad_ref, size_fun, state) when is_function(size_fun) do manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size) new_manual_demand_size = size_fun.(manual_demand_size) diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 6d0849439..e07dcedb5 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. alias Membrane.Core.Element.DemandController - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Element.{AtomicDemand, State} require Membrane.Core.Child.PadModel, as: PadModel @@ -141,6 +140,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state end) end - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> DemandController.Auto.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index f6b59a34b..66a3d41e5 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,14 +12,12 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandHandler, + DemandController, InputQueue, PlaybackQueue, State } - alias Membrane.Core.Element.DemandController.AutoFlowUtils - require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry @@ -55,7 +53,7 @@ defmodule Membrane.Core.Element.EventController do # event goes to the auto flow control queue not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> - AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + DemandController.Auto.store_event_in_queue(pad_ref, event, state) true -> exec_handle_event(pad_ref, event, state) @@ -109,7 +107,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 61530f83f..dfbb52f8f 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -10,6 +10,7 @@ defmodule Membrane.Core.Element.PadController do ActionHandler, AtomicDemand, CallbackContext, + DemandController, EffectiveFlowController, EventController, InputQueue, @@ -17,7 +18,6 @@ defmodule Membrane.Core.Element.PadController do StreamFormatController } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Parent.Link.Endpoint alias Membrane.LinkError @@ -241,7 +241,7 @@ defmodule Membrane.Core.Element.PadController do |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> DemandController.Auto.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -335,7 +335,7 @@ defmodule Membrane.Core.Element.PadController do Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) %{direction: :input, flow_control: :auto} -> - AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) + DemandController.Auto.auto_adjust_atomic_demand(endpoint.pad_ref, state) |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) _pad_data -> diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index b22637cc7..f28368a3f 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -8,8 +8,15 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.{Pad, StreamFormat} alias Membrane.Core.{CallbackHandler, Telemetry} alias Membrane.Core.Child.PadModel - alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State} - alias Membrane.Core.Element.DemandController.AutoFlowUtils + + alias Membrane.Core.Element.{ + ActionHandler, + CallbackContext, + DemandController, + InputQueue, + PlaybackQueue, + State + } require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry @@ -41,7 +48,7 @@ defmodule Membrane.Core.Element.StreamFormatController do # stream format goes to the auto flow control queue pad_ref in state.awaiting_auto_input_pads -> - AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state) + DemandController.Auto.store_stream_format_in_queue(pad_ref, stream_format, state) true -> exec_handle_stream_format(pad_ref, stream_format, state) diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index 3be09d473..f6f9054b9 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do # with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well. atomic_demand: private_field, - # Field used in DemandController.AutoFlowUtils and InputQueue, to caluclate, how much AtomicDemand should be increased. + # Field used in DemandController.Auto and InputQueue, to caluclate, how much AtomicDemand should be increased. # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field, From 9b05f29c070300e60b7fdc7cc01b3a5f17f4eeaa Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 10:50:44 +0100 Subject: [PATCH 05/18] Introduce ManualFlowController --- lib/membrane/core/element.ex | 3 +- lib/membrane/core/element/action_handler.ex | 13 +++-- .../core/element/buffer_controller.ex | 5 +- .../core/element/demand_controller.ex | 58 ++++++++++++++++++- lib/membrane/core/element/event_controller.ex | 4 +- .../manual.ex => manual_flow_controller.ex} | 58 ++----------------- 6 files changed, 76 insertions(+), 65 deletions(-) rename lib/membrane/core/element/{demand_controller/manual.ex => manual_flow_controller.ex} (81%) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index ead730ecf..806e74aed 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -26,6 +26,7 @@ defmodule Membrane.Core.Element do DemandController, EffectiveFlowController, EventController, + ManualFlowController, LifecycleController, PadController, State, @@ -210,7 +211,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do - state = DemandController.Manual.resume_delayed_demands_loop(state) + state = ManualFlowController.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index ac0beb6cc..72585b796 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -24,7 +24,8 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ DemandController, State, - StreamFormatController + StreamFormatController, + ManualFlowController } alias Membrane.Core.{Events, TimerController} @@ -69,7 +70,7 @@ defmodule Membrane.Core.Element.ActionHandler do defp hdd(state) do with %{delay_demands?: false} <- state do - DemandController.Manual.handle_delayed_demands(state) + ManualFlowController.handle_delayed_demands(state) end end @@ -421,8 +422,8 @@ defmodule Membrane.Core.Element.ActionHandler do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do # todo: get_data! above could be eradicated - state = DemandController.Manual.update_demand(pad_ref, size, state) - DemandController.Manual.delay_demand_supply(pad_ref, state) + state = ManualFlowController.update_demand(pad_ref, size, state) + ManualFlowController.delay_demand_supply(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref @@ -443,7 +444,7 @@ defmodule Membrane.Core.Element.ActionHandler do with %{direction: :output, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do # todo: get_data! above could be eradicated - DemandController.Manual.delay_redemand(pad_ref, state) + ManualFlowController.delay_redemand(pad_ref, state) else %{direction: :input} -> raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}" @@ -479,7 +480,7 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) + ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> DemandController.Auto.pop_queues_and_bump_demand() diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index a40800cd0..162f0d6a7 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -15,6 +15,7 @@ defmodule Membrane.Core.Element.BufferController do DemandController, EventController, InputQueue, + ManualFlowController, PlaybackQueue, State } @@ -26,7 +27,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Handles incoming buffer: either stores it in InputQueue, or executes element's - callback. Also calls `Membrane.Core.Element.DemandController.Manual.supply_demand/2` + callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2` to check if there are any unsupplied demands. """ @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() @@ -83,7 +84,7 @@ defmodule Membrane.Core.Element.BufferController do state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) if old_input_queue |> InputQueue.empty?() do - DemandController.Manual.supply_demand(pad_ref, state) + ManualFlowController.supply_demand(pad_ref, state) else state end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 6cb50d739..84cb2b37a 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,12 +5,20 @@ defmodule Membrane.Core.Element.DemandController do use Bunch + alias Membrane.Core.Element.ManualFlowController alias __MODULE__.{Auto, Manual} alias Membrane.Buffer + alias Membrane.Core.CallbackHandler + alias Membrane.Core.Element.CallbackContext + + alias Membrane.Core.Element.{ + ActionHandler, AtomicDemand, + ManualFlowController, + AutoFlowController, PlaybackQueue, State } @@ -78,7 +86,7 @@ defmodule Membrane.Core.Element.DemandController do } ) - Manual.handle_redemand(pad_data.ref, state) + ManualFlowController.handle_redemand(pad_data.ref, state) else _other -> state end @@ -111,4 +119,52 @@ defmodule Membrane.Core.Element.DemandController do atomic_demand: atomic_demand }) end + + + @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() + def exec_handle_demand(pad_ref, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + true <- exec_handle_demand?(pad_data) do + do_exec_handle_demand(pad_data, state) + else + _other -> state + end + end + + @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() + defp do_exec_handle_demand(pad_data, state) do + context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) + + CallbackHandler.exec_and_handle_callback( + :handle_demand, + ActionHandler, + %{ + split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), + context: context + }, + [pad_data.ref, pad_data.demand, pad_data.demand_unit], + state + ) + end + + defp exec_handle_demand?(%{end_of_stream?: true}) do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as :end_of_stream action has already been returned + """) + + false + end + + defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as demand is not greater than 0, + demand: #{inspect(demand)} + """) + + false + end + + defp exec_handle_demand?(_pad_data) do + true + end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 66a3d41e5..e79394213 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -14,6 +14,8 @@ defmodule Membrane.Core.Element.EventController do CallbackContext, DemandController, InputQueue, + ManualFlowController, + AutoFlowController, PlaybackQueue, State } @@ -107,7 +109,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) + ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/element/demand_controller/manual.ex b/lib/membrane/core/element/manual_flow_controller.ex similarity index 81% rename from lib/membrane/core/element/demand_controller/manual.ex rename to lib/membrane/core/element/manual_flow_controller.ex index 2d6d11f21..04ec59b2e 100644 --- a/lib/membrane/core/element/demand_controller/manual.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.Manual do +defmodule Membrane.Core.Element.ManualFlowController do @moduledoc false # Module handling demands requested on output pads. @@ -9,6 +9,7 @@ defmodule Membrane.Core.Element.DemandController.Manual do ActionHandler, BufferController, CallbackContext, + DemandController, EventController, InputQueue, State, @@ -47,7 +48,7 @@ defmodule Membrane.Core.Element.DemandController.Manual do defp do_handle_redemand(pad_ref, state) do state = %{state | delay_demands?: true} - state = exec_handle_demand(pad_ref, state) + state = DemandController.exec_handle_demand(pad_ref, state) %{state | delay_demands?: false} end @@ -100,11 +101,7 @@ defmodule Membrane.Core.Element.DemandController.Manual do %State{state | delay_demands?: false} end - @spec update_demand( - Pad.ref(), - non_neg_integer() | (non_neg_integer() -> non_neg_integer()), - State.t() - ) :: State.t() + @spec update_demand(Pad.ref(), non_neg_integer() | (non_neg_integer() -> non_neg_integer()), State.t()) :: State.t() def update_demand(pad_ref, size, state) when is_integer(size) do PadModel.set_data!(state, pad_ref, :manual_demand_size, size) end @@ -206,51 +203,4 @@ defmodule Membrane.Core.Element.DemandController.Manual do BufferController.exec_buffer_callback(pad_ref, buffers, state) end - - @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() - defp exec_handle_demand(pad_ref, state) do - with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), - true <- exec_handle_demand?(pad_data) do - do_exec_handle_demand(pad_data, state) - else - _other -> state - end - end - - @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() - defp do_exec_handle_demand(pad_data, state) do - context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) - - CallbackHandler.exec_and_handle_callback( - :handle_demand, - ActionHandler, - %{ - split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), - context: context - }, - [pad_data.ref, pad_data.demand, pad_data.demand_unit], - state - ) - end - - defp exec_handle_demand?(%{end_of_stream?: true}) do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as :end_of_stream action has already been returned - """) - - false - end - - defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as demand is not greater than 0, - demand: #{inspect(demand)} - """) - - false - end - - defp exec_handle_demand?(_pad_data) do - true - end end From bf9aee044e9718e17ad3570bdc68482167e586f9 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 10:55:31 +0100 Subject: [PATCH 06/18] Introduce AutoFlowController --- lib/membrane/core/element.ex | 2 +- lib/membrane/core/element/action_handler.ex | 9 +++++---- .../auto.ex => auto_flow_controller.ex} | 2 +- lib/membrane/core/element/buffer_controller.ex | 7 ++++--- lib/membrane/core/element/demand_controller.ex | 5 +---- lib/membrane/core/element/effective_flow_controller.ex | 3 ++- lib/membrane/core/element/event_controller.ex | 4 ++-- lib/membrane/core/element/manual_flow_controller.ex | 2 +- lib/membrane/core/element/pad_controller.ex | 7 ++++--- lib/membrane/core/element/stream_format_controller.ex | 4 ++-- lib/membrane/element/pad_data.ex | 2 +- 11 files changed, 24 insertions(+), 23 deletions(-) rename lib/membrane/core/element/{demand_controller/auto.ex => auto_flow_controller.ex} (99%) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 806e74aed..119e4f3dd 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, - DemandController, + DemandController, AutoFlowController, EffectiveFlowController, EventController, ManualFlowController, diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 72585b796..8e7d89ddb 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -22,7 +22,8 @@ defmodule Membrane.Core.Element.ActionHandler do } alias Membrane.Core.Element.{ - DemandController, + AutoFlowController, + DemandController, AutoFlowController, State, StreamFormatController, ManualFlowController @@ -183,13 +184,13 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.Auto.pause_demands(in_ref, state) + AutoFlowController.pause_demands(in_ref, state) end @impl CallbackHandler def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.Auto.resume_demands(in_ref, state) + AutoFlowController.resume_demands(in_ref, state) end @impl CallbackHandler @@ -483,7 +484,7 @@ defmodule Membrane.Core.Element.ActionHandler do ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) - |> DemandController.Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/demand_controller/auto.ex b/lib/membrane/core/element/auto_flow_controller.ex similarity index 99% rename from lib/membrane/core/element/demand_controller/auto.ex rename to lib/membrane/core/element/auto_flow_controller.ex index a71602c22..36c6fc3e1 100644 --- a/lib/membrane/core/element/demand_controller/auto.ex +++ b/lib/membrane/core/element/auto_flow_controller.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.Auto do +defmodule Membrane.Core.Element.AutoFlowController do @moduledoc false alias Membrane.Buffer diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 162f0d6a7..bb0fb3055 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -11,8 +11,9 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Element.{ ActionHandler, + AutoFlowController, CallbackContext, - DemandController, + DemandController, AutoFlowController, EventController, InputQueue, ManualFlowController, @@ -70,10 +71,10 @@ defmodule Membrane.Core.Element.BufferController do :atomics.put(stalker_metrics.demand, 1, demand - buf_size) if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do - DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state) + AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state) else state = exec_buffer_callback(pad_ref, buffers, state) - DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state) + AutoFlowController.auto_adjust_atomic_demand(pad_ref, state) end end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 84cb2b37a..905876ea2 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,9 +5,6 @@ defmodule Membrane.Core.Element.DemandController do use Bunch - alias Membrane.Core.Element.ManualFlowController - alias __MODULE__.{Auto, Manual} - alias Membrane.Buffer alias Membrane.Core.CallbackHandler @@ -63,7 +60,7 @@ defmodule Membrane.Core.Element.DemandController do if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) - |> Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else state end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index e07dcedb5..385c2e999 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -19,6 +19,7 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. alias Membrane.Core.Element.DemandController + alias Membrane.Core.Element.AutoFlowController alias Membrane.Core.Element.{AtomicDemand, State} require Membrane.Core.Child.PadModel, as: PadModel @@ -140,6 +141,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state end) end - |> DemandController.Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index e79394213..246eaef16 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,7 +12,7 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, + DemandController, AutoFlowController, InputQueue, ManualFlowController, AutoFlowController, @@ -55,7 +55,7 @@ defmodule Membrane.Core.Element.EventController do # event goes to the auto flow control queue not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> - DemandController.Auto.store_event_in_queue(pad_ref, event, state) + AutoFlowController.store_event_in_queue(pad_ref, event, state) true -> exec_handle_event(pad_ref, event, state) diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index 04ec59b2e..83793d283 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -9,7 +9,7 @@ defmodule Membrane.Core.Element.ManualFlowController do ActionHandler, BufferController, CallbackContext, - DemandController, + DemandController, AutoFlowController, EventController, InputQueue, State, diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index dfbb52f8f..85479f9a1 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -9,8 +9,9 @@ defmodule Membrane.Core.Element.PadController do alias Membrane.Core.Element.{ ActionHandler, AtomicDemand, + AutoFlowController, CallbackContext, - DemandController, + DemandController, AutoFlowController, EffectiveFlowController, EventController, InputQueue, @@ -241,7 +242,7 @@ defmodule Membrane.Core.Element.PadController do |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) - |> DemandController.Auto.pop_queues_and_bump_demand() + |> AutoFlowController.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -335,7 +336,7 @@ defmodule Membrane.Core.Element.PadController do Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) %{direction: :input, flow_control: :auto} -> - DemandController.Auto.auto_adjust_atomic_demand(endpoint.pad_ref, state) + AutoFlowController.auto_adjust_atomic_demand(endpoint.pad_ref, state) |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) _pad_data -> diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index f28368a3f..e70949fa0 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -12,7 +12,7 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, + DemandController, AutoFlowController, InputQueue, PlaybackQueue, State @@ -48,7 +48,7 @@ defmodule Membrane.Core.Element.StreamFormatController do # stream format goes to the auto flow control queue pad_ref in state.awaiting_auto_input_pads -> - DemandController.Auto.store_stream_format_in_queue(pad_ref, stream_format, state) + AutoFlowController.store_stream_format_in_queue(pad_ref, stream_format, state) true -> exec_handle_stream_format(pad_ref, stream_format, state) diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index f6f9054b9..98022cef1 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do # with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well. atomic_demand: private_field, - # Field used in DemandController.Auto and InputQueue, to caluclate, how much AtomicDemand should be increased. + # Field used in AutoFlowController and InputQueue, to caluclate, how much AtomicDemand should be increased. # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field, From be0abad732aeac2e99e8d6b907d376075d42a3d0 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 10:56:32 +0100 Subject: [PATCH 07/18] Format --- lib/membrane/core/element.ex | 3 ++- lib/membrane/core/element/action_handler.ex | 2 +- lib/membrane/core/element/buffer_controller.ex | 3 ++- lib/membrane/core/element/demand_controller.ex | 2 -- lib/membrane/core/element/event_controller.ex | 3 ++- lib/membrane/core/element/manual_flow_controller.ex | 9 +++++++-- lib/membrane/core/element/pad_controller.ex | 3 ++- lib/membrane/core/element/stream_format_controller.ex | 3 ++- 8 files changed, 18 insertions(+), 10 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 119e4f3dd..26d482833 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -23,7 +23,8 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, - DemandController, AutoFlowController, + DemandController, + AutoFlowController, EffectiveFlowController, EventController, ManualFlowController, diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 8e7d89ddb..32aae1537 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ AutoFlowController, - DemandController, AutoFlowController, + DemandController, State, StreamFormatController, ManualFlowController diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index bb0fb3055..99d7173df 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -13,7 +13,8 @@ defmodule Membrane.Core.Element.BufferController do ActionHandler, AutoFlowController, CallbackContext, - DemandController, AutoFlowController, + DemandController, + AutoFlowController, EventController, InputQueue, ManualFlowController, diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 905876ea2..c85940228 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -10,7 +10,6 @@ defmodule Membrane.Core.Element.DemandController do alias Membrane.Core.CallbackHandler alias Membrane.Core.Element.CallbackContext - alias Membrane.Core.Element.{ ActionHandler, AtomicDemand, @@ -117,7 +116,6 @@ defmodule Membrane.Core.Element.DemandController do }) end - @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() def exec_handle_demand(pad_ref, state) do with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 246eaef16..f86c887ff 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,7 +12,8 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, AutoFlowController, + DemandController, + AutoFlowController, InputQueue, ManualFlowController, AutoFlowController, diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index 83793d283..a1440bd52 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -9,7 +9,8 @@ defmodule Membrane.Core.Element.ManualFlowController do ActionHandler, BufferController, CallbackContext, - DemandController, AutoFlowController, + DemandController, + AutoFlowController, EventController, InputQueue, State, @@ -101,7 +102,11 @@ defmodule Membrane.Core.Element.ManualFlowController do %State{state | delay_demands?: false} end - @spec update_demand(Pad.ref(), non_neg_integer() | (non_neg_integer() -> non_neg_integer()), State.t()) :: State.t() + @spec update_demand( + Pad.ref(), + non_neg_integer() | (non_neg_integer() -> non_neg_integer()), + State.t() + ) :: State.t() def update_demand(pad_ref, size, state) when is_integer(size) do PadModel.set_data!(state, pad_ref, :manual_demand_size, size) end diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 85479f9a1..f51a03252 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -11,7 +11,8 @@ defmodule Membrane.Core.Element.PadController do AtomicDemand, AutoFlowController, CallbackContext, - DemandController, AutoFlowController, + DemandController, + AutoFlowController, EffectiveFlowController, EventController, InputQueue, diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index e70949fa0..4a20c6775 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -12,7 +12,8 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, AutoFlowController, + DemandController, + AutoFlowController, InputQueue, PlaybackQueue, State From 6c585e4787982e39274a4885cbb8d2fb91855a72 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 16:42:46 +0100 Subject: [PATCH 08/18] InputQueue -> ManualFlowController.InputQueue --- lib/membrane/children_spec.ex | 2 +- lib/membrane/core/child/pad_model.ex | 3 ++- lib/membrane/core/element.ex | 2 +- .../core/element/buffer_controller.ex | 22 ++++++++++--------- .../core/element/demand_controller.ex | 6 +++-- lib/membrane/core/element/event_controller.ex | 3 ++- .../core/element/manual_flow_controller.ex | 3 ++- .../input_queue.ex | 5 +++-- lib/membrane/core/element/pad_controller.ex | 3 ++- .../core/element/stream_format_controller.ex | 3 ++- .../core/element/event_controller_test.exs | 3 ++- .../core/element/input_queue_test.exs | 3 ++- .../element/lifecycle_controller_test.exs | 3 ++- .../element/stream_format_controller_test.exs | 3 ++- 14 files changed, 39 insertions(+), 25 deletions(-) rename lib/membrane/core/element/{ => manual_flow_controller}/input_queue.ex (98%) diff --git a/lib/membrane/children_spec.ex b/lib/membrane/children_spec.ex index cd810096d..7507da384 100644 --- a/lib/membrane/children_spec.ex +++ b/lib/membrane/children_spec.ex @@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive as many buffers, as demanded, all excess buffers will be queued internally. Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0` - for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future). + for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future). - `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode. See `t:Membrane.Pad.flow_control/0` for more info. - `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1, diff --git a/lib/membrane/core/child/pad_model.ex b/lib/membrane/core/child/pad_model.ex index 9fd1c4ace..cfd7a97a9 100644 --- a/lib/membrane/core/child/pad_model.ex +++ b/lib/membrane/core/child/pad_model.ex @@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do alias Membrane.Core.Child alias Membrane.Core.Element.EffectiveFlowController + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.{Pad, UnknownPadError} @type bin_pad_data :: %Membrane.Bin.PadData{ @@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do pid: pid, other_ref: Pad.ref(), sticky_messages: [Membrane.Event.t()], - input_queue: Membrane.Core.Element.InputQueue.t() | nil, + input_queue: InputQueue.t() | nil, options: %{optional(atom) => any}, auto_demand_size: pos_integer() | nil, sticky_events: [Membrane.Event.t()], diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 26d482833..c4776867f 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -218,7 +218,7 @@ defmodule Membrane.Core.Element do defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do pad_ref = Message.for_pad(msg) - state = BufferController.handle_buffer(pad_ref, buffers, state) + state = BufferController.handle_ingoing_buffers(pad_ref, buffers, state) {:noreply, state} end diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 99d7173df..0398191cf 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -16,12 +16,13 @@ defmodule Membrane.Core.Element.BufferController do DemandController, AutoFlowController, EventController, - InputQueue, ManualFlowController, PlaybackQueue, State } + alias Membrane.Core.Element.ManualFlowController.InputQueue + alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel @@ -32,8 +33,8 @@ defmodule Membrane.Core.Element.BufferController do callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2` to check if there are any unsupplied demands. """ - @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() - def handle_buffer(pad_ref, buffers, state) do + @spec handle_ingoing_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() + def handle_ingoing_buffers(pad_ref, buffers, state) do withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), playback: %State{playback: :playing} <- state do %{ @@ -51,20 +52,21 @@ defmodule Membrane.Core.Element.BufferController do EventController.handle_start_of_stream(pad_ref, state) end - do_handle_buffer(pad_ref, data, buffers, state) + do_handle_ingoing_buffers(pad_ref, data, buffers, state) else pad: {:error, :unknown_pad} -> # We've got a buffer from already unlinked pad state playback: _playback -> - PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state) + PlaybackQueue.store(&handle_ingoing_buffers(pad_ref, buffers, &1), state) end end - @spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: + # todo: move it to the flow controllers? + @spec do_handle_ingoing_buffers(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() - defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do + defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) @@ -79,20 +81,20 @@ defmodule Membrane.Core.Element.BufferController do end end - defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do + defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do %{input_queue: old_input_queue} = data input_queue = InputQueue.store(old_input_queue, buffers) state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) - if old_input_queue |> InputQueue.empty?() do + if InputQueue.empty?(old_input_queue) do ManualFlowController.supply_demand(pad_ref, state) else state end end - defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do + defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :push}, buffers, state) do exec_buffer_callback(pad_ref, buffers, state) end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index c85940228..bbfc109c4 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Element.DemandController do use Bunch alias Membrane.Buffer + alias Membrane.Element.PadData alias Membrane.Core.CallbackHandler alias Membrane.Core.Element.CallbackContext @@ -28,8 +29,9 @@ defmodule Membrane.Core.Element.DemandController do def snapshot_atomic_demand(pad_ref, state) do with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do - if pad_data.direction == :input, - do: raise("cannot snapshot atomic counter in input pad") + if pad_data.direction == :input do + raise("cannot snapshot atomic counter in input pad") + end do_snapshot_atomic_demand(pad_data, state) else diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index f86c887ff..771ddae06 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -14,13 +14,14 @@ defmodule Membrane.Core.Element.EventController do CallbackContext, DemandController, AutoFlowController, - InputQueue, ManualFlowController, AutoFlowController, PlaybackQueue, State } + alias Membrane.Core.Element.ManualFlowController.InputQueue + require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index a1440bd52..39497332a 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -12,11 +12,12 @@ defmodule Membrane.Core.Element.ManualFlowController do DemandController, AutoFlowController, EventController, - InputQueue, State, StreamFormatController } + alias __MODULE__.InputQueue + alias Membrane.Element.PadData alias Membrane.Pad diff --git a/lib/membrane/core/element/input_queue.ex b/lib/membrane/core/element/manual_flow_controller/input_queue.ex similarity index 98% rename from lib/membrane/core/element/input_queue.ex rename to lib/membrane/core/element/manual_flow_controller/input_queue.ex index a3b3a2e15..dbebea030 100644 --- a/lib/membrane/core/element/input_queue.ex +++ b/lib/membrane/core/element/manual_flow_controller/input_queue.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.InputQueue do +defmodule Membrane.Core.Element.ManualFlowController.InputQueue do @moduledoc false # Queue that is attached to the `:input` pad when working in a `:manual` flow control mode. @@ -105,7 +105,8 @@ defmodule Membrane.Core.Element.InputQueue do |> maybe_increase_atomic_demand() end - @spec store(t(), atom(), queue_item() | [queue_item()]) :: t() + @spec store(t(), :buffer | :buffers | :event | :stream_format, queue_item() | [queue_item()]) :: + t() def store(input_queue, type \\ :buffers, v) def store(input_queue, :buffers, v) when is_list(v) do diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index f51a03252..cbad131a3 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -15,11 +15,12 @@ defmodule Membrane.Core.Element.PadController do AutoFlowController, EffectiveFlowController, EventController, - InputQueue, State, StreamFormatController } + alias Membrane.Core.Element.ManualFlowController.InputQueue + alias Membrane.Core.Parent.Link.Endpoint alias Membrane.LinkError diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index 4a20c6775..a713b0749 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -14,11 +14,12 @@ defmodule Membrane.Core.Element.StreamFormatController do CallbackContext, DemandController, AutoFlowController, - InputQueue, PlaybackQueue, State } + alias Membrane.Core.Element.ManualFlowController.InputQueue + require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index e9ea8f70a..57e2ccfad 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -1,7 +1,8 @@ defmodule Membrane.Core.Element.EventControllerTest do use ExUnit.Case, async: true - alias Membrane.Core.Element.{AtomicDemand, EventController, InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, EventController, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.Events alias Membrane.Core.SubprocessSupervisor alias Membrane.Event diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index efa95c490..42904d7aa 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -2,7 +2,8 @@ defmodule Membrane.Core.Element.InputQueueTest do use ExUnit.Case, async: true alias Membrane.Buffer - alias Membrane.Core.Element.{AtomicDemand, InputQueue} + alias Membrane.Core.Element.AtomicDemand + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.Message alias Membrane.Core.SubprocessSupervisor alias Membrane.Testing.Event diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 74131182c..de2a33d1a 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -1,7 +1,8 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do use ExUnit.Case, async: true - alias Membrane.Core.Element.{AtomicDemand, InputQueue, LifecycleController, State} + alias Membrane.Core.Element.{AtomicDemand, LifecycleController, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.{ Message, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 5b68ad29a..914f02c8d 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -3,7 +3,8 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do alias Membrane.Buffer alias Membrane.Core.Message - alias Membrane.Core.Element.{AtomicDemand, InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, State} + alias Membrane.Core.Element.ManualFlowController.InputQueue alias Membrane.Core.SubprocessSupervisor alias Membrane.StreamFormat.Mock, as: MockStreamFormat alias Membrane.Support.DemandsTest.Filter From 415e7c5670c7d8fd8c3af8b1663ba245427e31b3 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 17:02:24 +0100 Subject: [PATCH 09/18] Bump default input queue size to 100 --- lib/membrane/core/element/action_handler.ex | 12 ++++++------ lib/membrane/core/element/buffer_controller.ex | 7 ++++++- lib/membrane/core/element/manual_flow_controller.ex | 11 ++++++----- .../element/manual_flow_controller/input_queue.ex | 6 +++--- lib/membrane/core/element/pad_controller.ex | 2 +- test/membrane/core/element/event_controller_test.exs | 2 +- test/membrane/core/element/input_queue_test.exs | 8 ++++---- .../core/element/lifecycle_controller_test.exs | 2 +- .../core/element/stream_format_controller_test.exs | 2 +- 9 files changed, 29 insertions(+), 23 deletions(-) diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 32aae1537..df8603e8d 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -245,7 +245,7 @@ defmodule Membrane.Core.Element.ActionHandler do %State{type: type} = state ) when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do - supply_demand(pad_ref, size, state) + delay_supplying_demand(pad_ref, size, state) end @impl CallbackHandler @@ -403,28 +403,28 @@ defmodule Membrane.Core.Element.ActionHandler do end end - @spec supply_demand( + @spec delay_supplying_demand( Pad.ref(), Action.demand_size(), State.t() ) :: State.t() - defp supply_demand(pad_ref, 0, state) do + defp delay_supplying_demand(pad_ref, 0, state) do Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}") state end - defp supply_demand(pad_ref, size, _state) + defp delay_supplying_demand(pad_ref, size, _state) when is_integer(size) and size < 0 do raise ElementError, "Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}" end - defp supply_demand(pad_ref, size, state) do + defp delay_supplying_demand(pad_ref, size, state) do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do # todo: get_data! above could be eradicated state = ManualFlowController.update_demand(pad_ref, size, state) - ManualFlowController.delay_demand_supply(pad_ref, state) + ManualFlowController.delay_supplying_demand(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 0398191cf..397a01be4 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -64,7 +64,12 @@ defmodule Membrane.Core.Element.BufferController do end # todo: move it to the flow controllers? - @spec do_handle_ingoing_buffers(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: + @spec do_handle_ingoing_buffers( + Pad.ref(), + PadModel.pad_data(), + [Buffer.t()] | Buffer.t(), + State.t() + ) :: State.t() defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index 39497332a..f2f2ca215 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -77,7 +77,7 @@ defmodule Membrane.Core.Element.ManualFlowController do @spec supply_demand(Pad.ref(), State.t()) :: State.t() def supply_demand(pad_ref, %State{delay_demands?: true} = state) do - delay_demand_supply(pad_ref, state) + delay_supplying_demand(pad_ref, state) end def supply_demand(pad_ref, state) do @@ -85,10 +85,6 @@ defmodule Membrane.Core.Element.ManualFlowController do |> handle_delayed_demands() end - def delay_demand_supply(pad_ref, state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) - end - defp do_supply_demand(pad_ref, state) do # marking is state that actual demand supply has been started (note changing back to false when finished) state = %State{state | delay_demands?: true} @@ -103,6 +99,11 @@ defmodule Membrane.Core.Element.ManualFlowController do %State{state | delay_demands?: false} end + @spec delay_supplying_demand(Pad.ref(), State.t()) :: State.t() + def delay_supplying_demand(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) + end + @spec update_demand( Pad.ref(), non_neg_integer() | (non_neg_integer() -> non_neg_integer()), diff --git a/lib/membrane/core/element/manual_flow_controller/input_queue.ex b/lib/membrane/core/element/manual_flow_controller/input_queue.ex index dbebea030..8eb1d4ed4 100644 --- a/lib/membrane/core/element/manual_flow_controller/input_queue.ex +++ b/lib/membrane/core/element/manual_flow_controller/input_queue.ex @@ -54,12 +54,12 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do defstruct @enforce_keys ++ [size: 0, demand: 0] - @default_target_size_factor 40 + @default_target_size_factor 100 @spec default_min_demand_factor() :: number() def default_min_demand_factor, do: 0.25 - @spec init(%{ + @spec new(%{ inbound_demand_unit: Buffer.Metric.unit(), outbound_demand_unit: Buffer.Metric.unit(), atomic_demand: AtomicDemand.t(), @@ -67,7 +67,7 @@ defmodule Membrane.Core.Element.ManualFlowController.InputQueue do log_tag: String.t(), target_size: pos_integer() | nil }) :: t() - def init(config) do + def new(config) do %{ inbound_demand_unit: inbound_demand_unit, outbound_demand_unit: outbound_demand_unit, diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index cbad131a3..68863f30a 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -376,7 +376,7 @@ defmodule Membrane.Core.Element.PadController do } = pad_data input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: other_pad_info[:demand_unit] || this_demand_unit, outbound_demand_unit: this_demand_unit, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 57e2ccfad..5b67e4853 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.EventControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, pad_ref: :some_pad, diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index 42904d7aa..0bef78f60 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -25,7 +25,7 @@ defmodule Membrane.Core.Element.InputQueueTest do end test "return InputQueue struct and send demand message", context do - assert InputQueue.init(%{ + assert InputQueue.new(%{ inbound_demand_unit: context.inbound_demand_unit, outbound_demand_unit: context.outbound_demand_unit, pad_ref: context.pad_ref, @@ -187,7 +187,7 @@ defmodule Membrane.Core.Element.InputQueueTest do describe ".take/2 should" do setup do input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, pad_ref: :output_pad_ref, @@ -302,7 +302,7 @@ defmodule Membrane.Core.Element.InputQueueTest do atomic_demand = new_atomic_demand() queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :bytes, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, @@ -339,7 +339,7 @@ defmodule Membrane.Core.Element.InputQueueTest do atomic_demand = new_atomic_demand() queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :bytes, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index de2a33d1a..61b4bece6 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -33,7 +33,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 914f02c8d..1e3a2cb6e 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -26,7 +26,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do }) input_queue = - InputQueue.init(%{ + InputQueue.new(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, atomic_demand: atomic_demand, From 95b045f68c989541269267dac64eb8e93cd6a5f2 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 17:26:11 +0100 Subject: [PATCH 10/18] Fix tests --- test/membrane/integration/auto_demands_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index 2a25604a2..44f5c7e93 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -339,7 +339,7 @@ defmodule Membrane.Integration.AutoDemandsTest do end test "when there is no demand on the output pad", %{pipeline: pipeline} do - manual_flow_queue_size = 40 + manual_flow_queue_size = 100 assert_pipeline_notified(pipeline, :filter, :playing) @@ -374,7 +374,7 @@ defmodule Membrane.Integration.AutoDemandsTest do test "when an element returns :pause_auto_demand and :resume_auto_demand action", %{ pipeline: pipeline } do - manual_flow_queue_size = 40 + manual_flow_queue_size = 100 auto_flow_demand_size = 400 assert_pipeline_notified(pipeline, :filter, :playing) From 93de2fcdc24289942ca42ee44f01628cb432dd46 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 17:36:01 +0100 Subject: [PATCH 11/18] Remove unnecessary comments --- lib/membrane/core/bin.ex | 2 -- lib/membrane/core/element.ex | 2 -- lib/membrane/core/element/action_handler.ex | 3 --- lib/membrane/core/element/buffer_controller.ex | 1 - 4 files changed, 8 deletions(-) diff --git a/lib/membrane/core/bin.ex b/lib/membrane/core/bin.ex index 9c3ce1950..37b09b22a 100644 --- a/lib/membrane/core/bin.ex +++ b/lib/membrane/core/bin.ex @@ -70,8 +70,6 @@ defmodule Membrane.Core.Bin do if node do result = :rpc.call(node, GenServer, :start, [__MODULE__, options]) - - # TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid) result else diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index c4776867f..bc1ba9f6a 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -86,8 +86,6 @@ defmodule Membrane.Core.Element do # rpc if necessary if node do result = :rpc.call(node, GenServer, :start, [__MODULE__, options]) - - # TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid) result else diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index df8603e8d..0fc4f068c 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -348,7 +348,6 @@ defmodule Membrane.Core.Element.ActionHandler do stalker_metrics: stalker_metrics } when stream_format != nil <- pad_data do - # todo: move this function to one of the controllers, to avoid redundant PadModet.get_data in the function below state = DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) :atomics.add(stalker_metrics.total_buffers, 1, length(buffers)) Message.send(pid, :buffer, buffers, for_pad: other_ref) @@ -422,7 +421,6 @@ defmodule Membrane.Core.Element.ActionHandler do defp delay_supplying_demand(pad_ref, size, state) do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - # todo: get_data! above could be eradicated state = ManualFlowController.update_demand(pad_ref, size, state) ManualFlowController.delay_supplying_demand(pad_ref, state) else @@ -444,7 +442,6 @@ defmodule Membrane.Core.Element.ActionHandler do when type in [:source, :filter, :endpoint] do with %{direction: :output, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - # todo: get_data! above could be eradicated ManualFlowController.delay_redemand(pad_ref, state) else %{direction: :input} -> diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 397a01be4..b8f0feb3d 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -63,7 +63,6 @@ defmodule Membrane.Core.Element.BufferController do end end - # todo: move it to the flow controllers? @spec do_handle_ingoing_buffers( Pad.ref(), PadModel.pad_data(), From bc4ba89b7c867ddbf90cef0b6c3ad0a3b0f81948 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 17:54:44 +0100 Subject: [PATCH 12/18] Remove leftovers --- lib/membrane/core/bin/state.ex | 2 - lib/membrane/core/callback_handler.ex | 10 +---- lib/membrane/core/element/action_handler.ex | 45 +++++++++---------- lib/membrane/core/element/state.ex | 2 - lib/membrane/core/pipeline/state.ex | 2 - .../core/element/action_handler_test.exs | 3 -- .../core/element/event_controller_test.exs | 1 - .../element/lifecycle_controller_test.exs | 1 - .../core/element/pad_controller_test.exs | 1 - .../element/stream_format_controller_test.exs | 1 - 10 files changed, 21 insertions(+), 47 deletions(-) diff --git a/lib/membrane/core/bin/state.ex b/lib/membrane/core/bin/state.ex index c917513a5..e9605a1cf 100644 --- a/lib/membrane/core/bin/state.ex +++ b/lib/membrane/core/bin/state.ex @@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - # handling_action?: boolean(), stalker: Membrane.Core.Stalker.t() } @@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do initialized?: false, terminating?: false, setup_incomplete?: false, - # handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 7bff95beb..41c870077 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -132,7 +132,7 @@ defmodule Membrane.Core.CallbackHandler do %{context: context_fun}, %{module: module, internal_state: internal_state} = state ) do - args = args ++ [context_fun.(state) |> Map.put(:s, state), internal_state] + args = args ++ [context_fun.(state), internal_state] callback_result = try do @@ -188,9 +188,6 @@ defmodule Membrane.Core.CallbackHandler do reraise e, __STACKTRACE__ end - # was_delay_demands? = Map.get(state, :delay_demands?, false) - # state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state - state = Enum.reduce(actions, state, fn action, state -> try do @@ -205,11 +202,6 @@ defmodule Membrane.Core.CallbackHandler do end end) - # state = - # if Component.is_element?(state) and not was_delay_demands?, - # do: %{state | delay_demands?: false}, - # else: state - handler_module.handle_end_of_actions(callback, state) end end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0fc4f068c..0790b2104 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -45,45 +45,40 @@ defmodule Membrane.Core.Element.ActionHandler do defguardp is_demand_size(size) when is_integer(size) or is_function(size) + # Match in the function below is caused by a fact, that handle_spec_started is the only callback, that + # might be executed in between handling actions returned from other callbacks. + # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. + @impl CallbackHandler - def handle_end_of_actions(callback, state) do + def handle_end_of_actions(:handle_spec_started, state), do: state + + def handle_end_of_actions(_callback, state) do # Fixed order of handling demand of manual and auto pads would lead to # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. - # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might - # be executed in between handling actions returned from other callbacks. - # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. - - if callback != :handle_spec_started do - if Enum.random([1, 2]) == 1 do - snapshot(callback, state) - |> hdd() - else - state - |> hdd() - |> then(&snapshot(callback, &1)) - end + if Enum.random([true, false]) do + state + |> handle_pads_to_snapshot() + |> maybe_handle_delayed_demands() else state + |> maybe_handle_delayed_demands() + |> handle_pads_to_snapshot() end end - defp hdd(state) do + defp maybe_handle_delayed_demands(state) do with %{delay_demands?: false} <- state do ManualFlowController.handle_delayed_demands(state) end end - defp snapshot(callback, state) do - if callback != :handle_spec_started do - state.pads_to_snapshot - |> Enum.shuffle() - |> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2) - |> Map.put(:pads_to_snapshot, MapSet.new()) - else - state - end + defp handle_pads_to_snapshot(state) do + state.pads_to_snapshot + |> Enum.shuffle() + |> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2) + |> Map.put(:pads_to_snapshot, MapSet.new()) end @impl CallbackHandler @@ -245,7 +240,7 @@ defmodule Membrane.Core.Element.ActionHandler do %State{type: type} = state ) when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do - delay_supplying_demand(pad_ref, size, state) + delay_supplying_demand(pad_ref, size, state) end @impl CallbackHandler diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index b46526e65..d73c29335 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -42,7 +42,6 @@ defmodule Membrane.Core.Element.State do terminating?: boolean(), setup_incomplete?: boolean(), effective_flow_control: EffectiveFlowController.effective_flow_control(), - # handling_action?: boolean(), popping_auto_flow_queue?: boolean(), pads_to_snapshot: MapSet.t(), stalker: Membrane.Core.Stalker.t(), @@ -75,7 +74,6 @@ defmodule Membrane.Core.Element.State do terminating?: false, setup_incomplete?: false, delay_demands?: false, - # handling_action?: false, popping_auto_flow_queue?: false, stalker: nil, resource_guard: nil, diff --git a/lib/membrane/core/pipeline/state.ex b/lib/membrane/core/pipeline/state.ex index a7aed0415..37f24ea2d 100644 --- a/lib/membrane/core/pipeline/state.ex +++ b/lib/membrane/core/pipeline/state.ex @@ -32,7 +32,6 @@ defmodule Membrane.Core.Pipeline.State do terminating?: boolean(), resource_guard: Membrane.ResourceGuard.t(), setup_incomplete?: boolean(), - # handling_action?: boolean(), stalker: Membrane.Core.Stalker.t(), subprocess_supervisor: pid(), awaiting_setup_completition?: boolean() @@ -56,7 +55,6 @@ defmodule Membrane.Core.Pipeline.State do initialized?: false, terminating?: false, setup_incomplete?: false, - # handling_action?: false, stalker: nil, resource_guard: nil, subprocess_supervisor: nil, diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 2c8f71cf9..48456f781 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -25,7 +25,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do playback: :stopped, synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), - # handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ input: @@ -110,7 +109,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do synchronization: %{clock: nil, parent_clock: nil}, delayed_demands: MapSet.new(), playback: :stopped, - # handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ @@ -512,7 +510,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do name: :elem_name, synchronization: %{clock: nil, parent_clock: nil}, type: :source, - # handling_action?: false, pads_to_snapshot: MapSet.new(), pads_data: %{ output: %{ diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index 5b67e4853..ef32c93e7 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -51,7 +51,6 @@ defmodule Membrane.Core.Element.EventControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil}, - # handling_action?: false, delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 61b4bece6..c7397f6fe 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -50,7 +50,6 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do playback: :playing, parent_pid: self(), synchronization: %{clock: nil, parent_clock: nil}, - # handling_action?: false, delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index a81b1e42f..4537a21bf 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -18,7 +18,6 @@ defmodule Membrane.Core.Element.PadControllerTest do struct!(State, name: name, module: elem_module, - # handling_action?: false, delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 1e3a2cb6e..766bb01d2 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -42,7 +42,6 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do type: :filter, playback: :playing, synchronization: %{clock: nil, parent_clock: nil}, - # handling_action?: false, delay_demands?: false, pads_to_snapshot: MapSet.new(), delayed_demands: MapSet.new(), From 214aefdff3bc5cdaaf61e72249dbe51335896f11 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 18:03:48 +0100 Subject: [PATCH 13/18] Clean up aliases --- lib/membrane/core/element.ex | 3 +-- lib/membrane/core/element/action_handler.ex | 4 ++-- lib/membrane/core/element/buffer_controller.ex | 2 -- lib/membrane/core/element/demand_controller.ex | 2 +- lib/membrane/core/element/effective_flow_controller.ex | 5 ++--- lib/membrane/core/element/event_controller.ex | 4 +--- lib/membrane/core/element/manual_flow_controller.ex | 7 +------ lib/membrane/core/element/pad_controller.ex | 2 -- lib/membrane/core/element/stream_format_controller.ex | 3 +-- 9 files changed, 9 insertions(+), 23 deletions(-) diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index bc1ba9f6a..3299c305c 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -24,11 +24,10 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, DemandController, - AutoFlowController, EffectiveFlowController, EventController, - ManualFlowController, LifecycleController, + ManualFlowController, PadController, State, StreamFormatController diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 0790b2104..12914f4a9 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -24,9 +24,9 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ AutoFlowController, DemandController, + ManualFlowController, State, - StreamFormatController, - ManualFlowController + StreamFormatController } alias Membrane.Core.{Events, TimerController} diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index b8f0feb3d..2fcd3d9cb 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -13,8 +13,6 @@ defmodule Membrane.Core.Element.BufferController do ActionHandler, AutoFlowController, CallbackContext, - DemandController, - AutoFlowController, EventController, ManualFlowController, PlaybackQueue, diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index bbfc109c4..1f7d99de4 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -14,8 +14,8 @@ defmodule Membrane.Core.Element.DemandController do alias Membrane.Core.Element.{ ActionHandler, AtomicDemand, - ManualFlowController, AutoFlowController, + ManualFlowController, PlaybackQueue, State } diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 385c2e999..7bdc49abb 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -18,9 +18,8 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. - alias Membrane.Core.Element.DemandController - alias Membrane.Core.Element.AutoFlowController - alias Membrane.Core.Element.{AtomicDemand, State} + alias Membrane.Core.Element.{AtomicDemand, AutoFlowController, DemandController, State} + require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Message, as: Message diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index 771ddae06..f97169308 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -11,11 +11,9 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, - CallbackContext, - DemandController, AutoFlowController, + CallbackContext, ManualFlowController, - AutoFlowController, PlaybackQueue, State } diff --git a/lib/membrane/core/element/manual_flow_controller.ex b/lib/membrane/core/element/manual_flow_controller.ex index f2f2ca215..be20a37d9 100644 --- a/lib/membrane/core/element/manual_flow_controller.ex +++ b/lib/membrane/core/element/manual_flow_controller.ex @@ -3,14 +3,9 @@ defmodule Membrane.Core.Element.ManualFlowController do # Module handling demands requested on output pads. - alias Membrane.Core.CallbackHandler - alias Membrane.Core.Element.{ - ActionHandler, BufferController, - CallbackContext, DemandController, - AutoFlowController, EventController, State, StreamFormatController @@ -18,7 +13,6 @@ defmodule Membrane.Core.Element.ManualFlowController do alias __MODULE__.InputQueue - alias Membrane.Element.PadData alias Membrane.Pad require Membrane.Core.Child.PadModel, as: PadModel @@ -27,6 +21,7 @@ defmodule Membrane.Core.Element.ManualFlowController do @handle_demand_loop_limit 20 + @spec delay_redemand(Pad.ref(), State.t()) :: State.t() def delay_redemand(pad_ref, state) do Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) end diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 68863f30a..633d26d9c 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -11,8 +11,6 @@ defmodule Membrane.Core.Element.PadController do AtomicDemand, AutoFlowController, CallbackContext, - DemandController, - AutoFlowController, EffectiveFlowController, EventController, State, diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index a713b0749..3075e9c15 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -11,9 +11,8 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.Core.Element.{ ActionHandler, - CallbackContext, - DemandController, AutoFlowController, + CallbackContext, PlaybackQueue, State } From c14ca1384da5ab7914492036463ad90b9ce470e2 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 27 Mar 2024 18:04:17 +0100 Subject: [PATCH 14/18] format --- lib/membrane/core/element/effective_flow_controller.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 7bdc49abb..639914e6e 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -20,7 +20,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do alias Membrane.Core.Element.{AtomicDemand, AutoFlowController, DemandController, State} - require Membrane.Core.Child.PadModel, as: PadModel require Membrane.Core.Message, as: Message require Membrane.Logger From e684bc193b0d4ac80cbfadba5dd6f5761a30cb33 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 2 Apr 2024 18:13:18 +0200 Subject: [PATCH 15/18] Implement comments from CR --- lib/membrane/core/callback_handler.ex | 6 +++--- lib/membrane/core/element.ex | 2 +- lib/membrane/core/element/action_handler.ex | 8 +------- lib/membrane/core/element/buffer_controller.ex | 16 ++++++++-------- lib/membrane/core/element/demand_controller.ex | 2 +- lib/membrane/core/pipeline/action_handler.ex | 2 +- 6 files changed, 15 insertions(+), 21 deletions(-) diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index 41c870077..c42a7400c 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -30,7 +30,7 @@ defmodule Membrane.Core.CallbackHandler do @callback transform_actions(actions :: list, callback :: atom, handler_params, state) :: {actions :: list, state} - @callback handle_end_of_actions(callback :: atom, state) :: state + @callback handle_end_of_actions(state) :: state defmacro __using__(_args) do quote location: :keep do @@ -43,7 +43,7 @@ defmodule Membrane.Core.CallbackHandler do end @impl unquote(__MODULE__) - def handle_end_of_actions(_callback, state) do + def handle_end_of_actions(state) do state end @@ -202,6 +202,6 @@ defmodule Membrane.Core.CallbackHandler do end end) - handler_module.handle_end_of_actions(callback, state) + handler_module.handle_end_of_actions(state) end end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index 3299c305c..bf9ab243e 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -215,7 +215,7 @@ defmodule Membrane.Core.Element do defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do pad_ref = Message.for_pad(msg) - state = BufferController.handle_ingoing_buffers(pad_ref, buffers, state) + state = BufferController.handle_incoming_buffers(pad_ref, buffers, state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 12914f4a9..d6027870d 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -45,14 +45,8 @@ defmodule Membrane.Core.Element.ActionHandler do defguardp is_demand_size(size) when is_integer(size) or is_function(size) - # Match in the function below is caused by a fact, that handle_spec_started is the only callback, that - # might be executed in between handling actions returned from other callbacks. - # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. - @impl CallbackHandler - def handle_end_of_actions(:handle_spec_started, state), do: state - - def handle_end_of_actions(_callback, state) do + def handle_end_of_actions(state) do # Fixed order of handling demand of manual and auto pads would lead to # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 2fcd3d9cb..d275502d3 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -31,8 +31,8 @@ defmodule Membrane.Core.Element.BufferController do callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2` to check if there are any unsupplied demands. """ - @spec handle_ingoing_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() - def handle_ingoing_buffers(pad_ref, buffers, state) do + @spec handle_incoming_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() + def handle_incoming_buffers(pad_ref, buffers, state) do withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), playback: %State{playback: :playing} <- state do %{ @@ -50,25 +50,25 @@ defmodule Membrane.Core.Element.BufferController do EventController.handle_start_of_stream(pad_ref, state) end - do_handle_ingoing_buffers(pad_ref, data, buffers, state) + do_handle_incoming_buffers(pad_ref, data, buffers, state) else pad: {:error, :unknown_pad} -> # We've got a buffer from already unlinked pad state playback: _playback -> - PlaybackQueue.store(&handle_ingoing_buffers(pad_ref, buffers, &1), state) + PlaybackQueue.store(&handle_incoming_buffers(pad_ref, buffers, &1), state) end end - @spec do_handle_ingoing_buffers( + @spec do_handle_incoming_buffers( Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t() ) :: State.t() - defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do + defp do_handle_incoming_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) @@ -83,7 +83,7 @@ defmodule Membrane.Core.Element.BufferController do end end - defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do + defp do_handle_incoming_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do %{input_queue: old_input_queue} = data input_queue = InputQueue.store(old_input_queue, buffers) @@ -96,7 +96,7 @@ defmodule Membrane.Core.Element.BufferController do end end - defp do_handle_ingoing_buffers(pad_ref, %{flow_control: :push}, buffers, state) do + defp do_handle_incoming_buffers(pad_ref, %{flow_control: :push}, buffers, state) do exec_buffer_callback(pad_ref, buffers, state) end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 1f7d99de4..f4d6f7c7d 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -30,7 +30,7 @@ defmodule Membrane.Core.Element.DemandController do with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do if pad_data.direction == :input do - raise("cannot snapshot atomic counter in input pad") + raise("cannot snapshot atomic counter in input pad #{inspect(pad_ref)}") end do_snapshot_atomic_demand(pad_data, state) diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index 7b4ebe153..382a4f5a8 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -107,7 +107,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do end @impl CallbackHandler - def handle_end_of_actions(_callback, state) do + def handle_end_of_actions(state) do with %{awaiting_setup_completition?: true} <- state do %{state | awaiting_setup_completition?: false} |> Membrane.Core.LifecycleController.complete_setup() From 5357161ac8a8f9cb66276d2462fe2d2f07f9e97e Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Tue, 2 Apr 2024 18:22:49 +0200 Subject: [PATCH 16/18] Fix test performance --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fedce8b42..aebd69baf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -155,10 +155,10 @@ jobs: - run: cp -r benchmark/ ~/benchmark_backup/ - run: cp mix.exs ~/benchmark_backup/ - run: docker pull membraneframeworklabs/docker_membrane - - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/feature_branch_results + - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/feature_branch_results - run: git checkout -f master - run: cp ~/benchmark_backup/mix.exs ~/app - - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/master_results + - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/master_results - run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix run benchmark/compare.exs /root/results/feature_branch_results /root/results/master_results - run: command: rm ~/results/* From 1430ecc07b3091bf70fa5fb41577876e2334f6de Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 3 Apr 2024 12:04:32 +0200 Subject: [PATCH 17/18] Fix performance test --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index aebd69baf..9b890aa0f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -152,6 +152,7 @@ jobs: steps: - attach_workspace: at: . + - run: cd app; git add -A; git reset --hard; cd .. - run: cp -r benchmark/ ~/benchmark_backup/ - run: cp mix.exs ~/benchmark_backup/ - run: docker pull membraneframeworklabs/docker_membrane From 83269302412f3fd12fe33731964a2db200bbeeb3 Mon Sep 17 00:00:00 2001 From: "feliks.pobiedzinski@swmansion.com" Date: Wed, 3 Apr 2024 14:02:39 +0200 Subject: [PATCH 18/18] Implement suggestion from CR --- lib/membrane/core/element/demand_controller.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index f4d6f7c7d..985e56c85 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -30,7 +30,8 @@ defmodule Membrane.Core.Element.DemandController do with {:ok, pad_data} when not pad_data.end_of_stream? <- PadModel.get_data(state, pad_ref), %State{playback: :playing} <- state do if pad_data.direction == :input do - raise("cannot snapshot atomic counter in input pad #{inspect(pad_ref)}") + raise Membrane.ElementError, + "Cannot snapshot atomic counter in input pad #{inspect(pad_ref)}" end do_snapshot_atomic_demand(pad_data, state)