diff --git a/lib/membrane/core/callback_handler.ex b/lib/membrane/core/callback_handler.ex index c1a411690..7bff95beb 100644 --- a/lib/membrane/core/callback_handler.ex +++ b/lib/membrane/core/callback_handler.ex @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do use Bunch alias Membrane.CallbackError - alias Membrane.Core.Component require Membrane.Logger @@ -189,15 +188,8 @@ defmodule Membrane.Core.CallbackHandler do reraise e, __STACKTRACE__ end - # was_handling_action? = state.handling_action? - # state = %{state | handling_action?: true} - - # Updating :delay_demands? flag value here is a temporal fix. - # Setting it to `true` while handling actions causes postponing calls - # of handle_redemand/2 and supply_demand/2 until a moment, when all - # actions returned from the callback are handled - was_delay_demands? = Map.get(state, :delay_demands?, false) - state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state + # was_delay_demands? = Map.get(state, :delay_demands?, false) + # state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state state = Enum.reduce(actions, state, fn action, state -> @@ -214,14 +206,9 @@ defmodule Membrane.Core.CallbackHandler do end) # state = - # if was_handling_action?, - # do: state, - # else: %{state | handling_action?: false} - - state = - if Component.is_element?(state) and not was_delay_demands?, - do: %{state | delay_demands?: false}, - else: state + # if Component.is_element?(state) and not was_delay_demands?, + # do: %{state | delay_demands?: false}, + # else: state handler_module.handle_end_of_actions(callback, state) end diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index d76497770..ead730ecf 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -24,7 +24,6 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, DemandController, - DemandHandler, EffectiveFlowController, EventController, LifecycleController, @@ -211,7 +210,7 @@ defmodule Membrane.Core.Element do end defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do - state = DemandHandler.resume_delayed_demands_loop(state) + state = DemandController.Manual.resume_delayed_demands_loop(state) {:noreply, state} end diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index dc53f1557..ac0beb6cc 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -23,12 +23,10 @@ defmodule Membrane.Core.Element.ActionHandler do alias Membrane.Core.Element.{ DemandController, - DemandHandler, State, StreamFormatController } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.{Events, TimerController} alias Membrane.Element.Action @@ -51,26 +49,31 @@ defmodule Membrane.Core.Element.ActionHandler do # favoring manual pads over auto pads (or vice versa), especially after # introducting auto flow queues. - if Enum.random([1, 2]) == 1 do - snapshot(callback, state) - |> hdd() + # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might + # be executed in between handling actions returned from other callbacks. + # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. + + if callback != :handle_spec_started do + if Enum.random([1, 2]) == 1 do + snapshot(callback, state) + |> hdd() + else + state + |> hdd() + |> then(&snapshot(callback, &1)) + end else state - |> hdd() - |> then(&snapshot(callback, &1)) end end defp hdd(state) do with %{delay_demands?: false} <- state do - DemandHandler.handle_delayed_demands(state) + DemandController.Manual.handle_delayed_demands(state) end end defp snapshot(callback, state) do - # Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might - # be executed in between handling actions returned from other callbacks. - # This callback has been deprecated and should be removed in v2.0.0, along with the if statement below. if callback != :handle_spec_started do state.pads_to_snapshot |> Enum.shuffle() @@ -179,13 +182,13 @@ defmodule Membrane.Core.Element.ActionHandler do @impl CallbackHandler def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.AutoFlowUtils.pause_demands(in_ref, state) + DemandController.Auto.pause_demands(in_ref, state) end @impl CallbackHandler def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state) when type in [:sink, :filter, :endpoint] do - DemandController.AutoFlowUtils.resume_demands(in_ref, state) + DemandController.Auto.resume_demands(in_ref, state) end @impl CallbackHandler @@ -417,7 +420,9 @@ defmodule Membrane.Core.Element.ActionHandler do defp supply_demand(pad_ref, size, state) do with %{direction: :input, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.supply_demand(pad_ref, size, state) + # todo: get_data! above could be eradicated + state = DemandController.Manual.update_demand(pad_ref, size, state) + DemandController.Manual.delay_demand_supply(pad_ref, state) else %{direction: :output} -> raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref @@ -437,7 +442,8 @@ defmodule Membrane.Core.Element.ActionHandler do when type in [:source, :filter, :endpoint] do with %{direction: :output, flow_control: :manual} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.handle_redemand(pad_ref, state) + # todo: get_data! above could be eradicated + DemandController.Manual.delay_redemand(pad_ref, state) else %{direction: :input} -> raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}" @@ -473,10 +479,10 @@ defmodule Membrane.Core.Element.ActionHandler do @spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t() defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do - DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> DemandController.Auto.pop_queues_and_bump_demand() else %{direction: :input} -> raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 986a204cf..a40800cd0 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -12,14 +12,13 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandHandler, + DemandController, EventController, InputQueue, PlaybackQueue, State } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel @@ -27,7 +26,7 @@ defmodule Membrane.Core.Element.BufferController do @doc """ Handles incoming buffer: either stores it in InputQueue, or executes element's - callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2` + callback. Also calls `Membrane.Core.Element.DemandController.Manual.supply_demand/2` to check if there are any unsupplied demands. """ @spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() @@ -70,10 +69,10 @@ defmodule Membrane.Core.Element.BufferController do :atomics.put(stalker_metrics.demand, 1, demand - buf_size) if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do - AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state) + DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state) else state = exec_buffer_callback(pad_ref, buffers, state) - AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state) end end @@ -84,7 +83,7 @@ defmodule Membrane.Core.Element.BufferController do state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) if old_input_queue |> InputQueue.empty?() do - DemandHandler.supply_demand(pad_ref, state) + DemandController.Manual.supply_demand(pad_ref, state) else state end diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 5e8525a87..6cb50d739 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -5,13 +5,12 @@ defmodule Membrane.Core.Element.DemandController do use Bunch - alias __MODULE__.AutoFlowUtils + alias __MODULE__.{Auto, Manual} alias Membrane.Buffer alias Membrane.Core.Element.{ AtomicDemand, - DemandHandler, PlaybackQueue, State } @@ -56,7 +55,7 @@ defmodule Membrane.Core.Element.DemandController do if atomic_value > 0 do state |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref)) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> Auto.pop_queues_and_bump_demand() else state end @@ -79,7 +78,7 @@ defmodule Membrane.Core.Element.DemandController do } ) - DemandHandler.handle_redemand(pad_data.ref, state) + Manual.handle_redemand(pad_data.ref, state) else _other -> state end diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto.ex similarity index 99% rename from lib/membrane/core/element/demand_controller/auto_flow_utils.ex rename to lib/membrane/core/element/demand_controller/auto.ex index 62d0b2077..a71602c22 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto.ex @@ -1,4 +1,4 @@ -defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do +defmodule Membrane.Core.Element.DemandController.Auto do @moduledoc false alias Membrane.Buffer diff --git a/lib/membrane/core/element/demand_controller/manual.ex b/lib/membrane/core/element/demand_controller/manual.ex new file mode 100644 index 000000000..2d6d11f21 --- /dev/null +++ b/lib/membrane/core/element/demand_controller/manual.ex @@ -0,0 +1,256 @@ +defmodule Membrane.Core.Element.DemandController.Manual do + @moduledoc false + + # Module handling demands requested on output pads. + + alias Membrane.Core.CallbackHandler + + alias Membrane.Core.Element.{ + ActionHandler, + BufferController, + CallbackContext, + EventController, + InputQueue, + State, + StreamFormatController + } + + alias Membrane.Element.PadData + alias Membrane.Pad + + require Membrane.Core.Child.PadModel, as: PadModel + require Membrane.Core.Message, as: Message + require Membrane.Logger + + @handle_demand_loop_limit 20 + + def delay_redemand(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) + end + + @doc """ + Called when redemand action was returned. + * If element is currently supplying demand, it means that after finishing `supply_demand` it will call + `handle_delayed_demands`. + * If element isn't supplying demand at the moment and there's some unsupplied demand on the given + output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied. + """ + @spec handle_redemand(Pad.ref(), State.t()) :: State.t() + def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do + delay_redemand(pad_ref, state) + end + + def handle_redemand(pad_ref, %State{} = state) do + do_handle_redemand(pad_ref, state) + |> handle_delayed_demands() + end + + defp do_handle_redemand(pad_ref, state) do + state = %{state | delay_demands?: true} + state = exec_handle_demand(pad_ref, state) + %{state | delay_demands?: false} + end + + @doc """ + If element is not supplying demand currently, this function supplies + demand right away by taking buffers from the InputQueue of the given input pad + and passing it to proper controllers. + + If element is currently supplying demand it delays supplying demand until all + current processing is finished. + + This is necessary due to the case when one requests a demand action while previous + demand is being supplied. This could lead to a situation where buffers are taken + from InputQueue and passed to callbacks, while buffers being currently supplied + have not been processed yet, and therefore to changing order of buffers. + + The `size` argument can be passed optionally to update the demand on the pad + before proceeding to supplying it. + """ + @spec supply_demand( + Pad.ref(), + State.t() + ) :: State.t() + + @spec supply_demand(Pad.ref(), State.t()) :: State.t() + def supply_demand(pad_ref, %State{delay_demands?: true} = state) do + delay_demand_supply(pad_ref, state) + end + + def supply_demand(pad_ref, state) do + do_supply_demand(pad_ref, state) + |> handle_delayed_demands() + end + + def delay_demand_supply(pad_ref, state) do + Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) + end + + defp do_supply_demand(pad_ref, state) do + # marking is state that actual demand supply has been started (note changing back to false when finished) + state = %State{state | delay_demands?: true} + + pad_data = state |> PadModel.get_data!(pad_ref) + + {{_queue_status, popped_data}, new_input_queue} = + InputQueue.take(pad_data.input_queue, pad_data.manual_demand_size) + + state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue) + state = handle_input_queue_output(pad_ref, popped_data, state) + %State{state | delay_demands?: false} + end + + @spec update_demand( + Pad.ref(), + non_neg_integer() | (non_neg_integer() -> non_neg_integer()), + State.t() + ) :: State.t() + def update_demand(pad_ref, size, state) when is_integer(size) do + PadModel.set_data!(state, pad_ref, :manual_demand_size, size) + end + + def update_demand(pad_ref, size_fun, state) when is_function(size_fun) do + manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size) + new_manual_demand_size = size_fun.(manual_demand_size) + + if new_manual_demand_size < 0 do + raise Membrane.ElementError, + "Demand altering function requested negative demand on pad #{inspect(pad_ref)} in #{inspect(state.module)}" + end + + PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) + end + + @spec resume_delayed_demands_loop(State.t()) :: State.t() + def resume_delayed_demands_loop(%State{} = state) do + %{state | resume_delayed_demands_loop_in_mailbox?: false} + |> handle_delayed_demands() + end + + @spec handle_delayed_demands(State.t()) :: State.t() + def handle_delayed_demands(%State{} = state) do + # Taking random element of `:delayed_demands` is done to keep data flow + # balanced among pads, i.e. to prevent situation where demands requested by + # one pad are supplied right away while another one is waiting for buffers + # potentially for a long time. + + cond do + state.delay_demands? -> + raise "Cannot handle delayed demands when delay_demands? flag is set to true" + + state.handle_demand_loop_counter >= @handle_demand_loop_limit -> + state = + with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do + Message.self(:resume_delayed_demands_loop) + %{state | resume_delayed_demands_loop_in_mailbox?: true} + end + + %{state | handle_demand_loop_counter: 0} + + MapSet.size(state.delayed_demands) == 0 -> + %{state | handle_demand_loop_counter: 0} + + true -> + [{pad_ref, action} = entry] = Enum.take_random(state.delayed_demands, 1) + + state = + state + |> Map.update!(:delayed_demands, &MapSet.delete(&1, entry)) + |> Map.update!(:handle_demand_loop_counter, &(&1 + 1)) + + case action do + :supply -> supply_demand(pad_ref, state) + :redemand -> handle_redemand(pad_ref, state) + end + end + end + + @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() + def remove_pad_from_delayed_demands(pad_ref, state) do + Map.update!(state, :delayed_demands, fn delayed_demands_set -> + delayed_demands_set + |> MapSet.delete({pad_ref, :supply}) + |> MapSet.delete({pad_ref, :redemand}) + end) + end + + @spec handle_input_queue_output( + Pad.ref(), + [InputQueue.output_value()], + State.t() + ) :: State.t() + defp handle_input_queue_output(pad_ref, queue_output, state) do + Enum.reduce(queue_output, state, fn item, state -> + do_handle_input_queue_output(pad_ref, item, state) + end) + end + + @spec do_handle_input_queue_output( + Pad.ref(), + InputQueue.output_value(), + State.t() + ) :: State.t() + defp do_handle_input_queue_output(pad_ref, {:event, e}, state), + do: EventController.exec_handle_event(pad_ref, e, state) + + defp do_handle_input_queue_output(pad_ref, {:stream_format, stream_format}, state), + do: StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) + + defp do_handle_input_queue_output( + pad_ref, + {:buffers, buffers, _inbound_metric_buf_size, outbound_metric_buf_size}, + state + ) do + state = + PadModel.update_data!(state, pad_ref, :manual_demand_size, &(&1 - outbound_metric_buf_size)) + + BufferController.exec_buffer_callback(pad_ref, buffers, state) + end + + @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() + defp exec_handle_demand(pad_ref, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + true <- exec_handle_demand?(pad_data) do + do_exec_handle_demand(pad_data, state) + else + _other -> state + end + end + + @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() + defp do_exec_handle_demand(pad_data, state) do + context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) + + CallbackHandler.exec_and_handle_callback( + :handle_demand, + ActionHandler, + %{ + split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), + context: context + }, + [pad_data.ref, pad_data.demand, pad_data.demand_unit], + state + ) + end + + defp exec_handle_demand?(%{end_of_stream?: true}) do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as :end_of_stream action has already been returned + """) + + false + end + + defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as demand is not greater than 0, + demand: #{inspect(demand)} + """) + + false + end + + defp exec_handle_demand?(_pad_data) do + true + end +end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index 1cf00c53d..c7df4af42 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -1,248 +1,252 @@ -defmodule Membrane.Core.Element.DemandHandler do - @moduledoc false - - # Module handling demands requested on output pads. - - alias Membrane.Core.CallbackHandler - - alias Membrane.Core.Element.{ - ActionHandler, - BufferController, - CallbackContext, - EventController, - InputQueue, - State, - StreamFormatController - } - - alias Membrane.Element.PadData - alias Membrane.Pad - - require Membrane.Core.Child.PadModel, as: PadModel - require Membrane.Core.Message, as: Message - require Membrane.Logger - - @handle_demand_loop_limit 20 - - @doc """ - Called when redemand action was returned. - * If element is currently supplying demand, it means that after finishing `supply_demand` it will call - `handle_delayed_demands`. - * If element isn't supplying demand at the moment and there's some unsupplied demand on the given - output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied. - """ - @spec handle_redemand(Pad.ref(), State.t()) :: State.t() - def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) - end - - def handle_redemand(pad_ref, %State{} = state) do - do_handle_redemand(pad_ref, state) - |> handle_delayed_demands() - end - - defp do_handle_redemand(pad_ref, state) do - state = %{state | delay_demands?: true} - state = exec_handle_demand(pad_ref, state) - %{state | delay_demands?: false} - end - - @doc """ - If element is not supplying demand currently, this function supplies - demand right away by taking buffers from the InputQueue of the given input pad - and passing it to proper controllers. - - If element is currently supplying demand it delays supplying demand until all - current processing is finished. - - This is necessary due to the case when one requests a demand action while previous - demand is being supplied. This could lead to a situation where buffers are taken - from InputQueue and passed to callbacks, while buffers being currently supplied - have not been processed yet, and therefore to changing order of buffers. - - The `size` argument can be passed optionally to update the demand on the pad - before proceeding to supplying it. - """ - @spec supply_demand( - Pad.ref(), - size :: non_neg_integer | (non_neg_integer() -> non_neg_integer()), - State.t() - ) :: State.t() - def supply_demand(pad_ref, size, state) do - state = update_demand(pad_ref, size, state) - supply_demand(pad_ref, state) - end - - @spec supply_demand(Pad.ref(), State.t()) :: State.t() - def supply_demand(pad_ref, %State{delay_demands?: true} = state) do - Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) - end - - def supply_demand(pad_ref, state) do - do_supply_demand(pad_ref, state) - |> handle_delayed_demands() - end - - defp do_supply_demand(pad_ref, state) do - # marking is state that actual demand supply has been started (note changing back to false when finished) - state = %State{state | delay_demands?: true} - - pad_data = state |> PadModel.get_data!(pad_ref) - - {{_queue_status, popped_data}, new_input_queue} = - InputQueue.take(pad_data.input_queue, pad_data.manual_demand_size) - - state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue) - state = handle_input_queue_output(pad_ref, popped_data, state) - %State{state | delay_demands?: false} - end - - defp update_demand(pad_ref, size, state) when is_integer(size) do - PadModel.set_data!(state, pad_ref, :manual_demand_size, size) - end - - defp update_demand(pad_ref, size_fun, state) when is_function(size_fun) do - manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size) - new_manual_demand_size = size_fun.(manual_demand_size) - - if new_manual_demand_size < 0 do - raise Membrane.ElementError, - "Demand altering function requested negative demand on pad #{inspect(pad_ref)} in #{inspect(state.module)}" - end - - PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) - end - - @spec resume_delayed_demands_loop(State.t()) :: State.t() - def resume_delayed_demands_loop(%State{} = state) do - %{state | resume_delayed_demands_loop_in_mailbox?: false} - |> handle_delayed_demands() - end - - @spec handle_delayed_demands(State.t()) :: State.t() - def handle_delayed_demands(%State{} = state) do - # Taking random element of `:delayed_demands` is done to keep data flow - # balanced among pads, i.e. to prevent situation where demands requested by - # one pad are supplied right away while another one is waiting for buffers - # potentially for a long time. - - cond do - state.delay_demands? -> - raise "Cannot handle delayed demands when delay_demands? flag is set to true" - - state.handle_demand_loop_counter >= @handle_demand_loop_limit -> - state = - with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do - Message.self(:resume_delayed_demands_loop) - %{state | resume_delayed_demands_loop_in_mailbox?: true} - end - - %{state | handle_demand_loop_counter: 0} - - MapSet.size(state.delayed_demands) == 0 -> - %{state | handle_demand_loop_counter: 0} - - true -> - [{pad_ref, action} = entry] = Enum.take_random(state.delayed_demands, 1) - - state = - state - |> Map.update!(:delayed_demands, &MapSet.delete(&1, entry)) - |> Map.update!(:handle_demand_loop_counter, &(&1 + 1)) - - case action do - :supply -> supply_demand(pad_ref, state) - :redemand -> handle_redemand(pad_ref, state) - end - end - end - - @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() - def remove_pad_from_delayed_demands(pad_ref, state) do - Map.update!(state, :delayed_demands, fn delayed_demands_set -> - delayed_demands_set - |> MapSet.delete({pad_ref, :supply}) - |> MapSet.delete({pad_ref, :redemand}) - end) - end - - @spec handle_input_queue_output( - Pad.ref(), - [InputQueue.output_value()], - State.t() - ) :: State.t() - defp handle_input_queue_output(pad_ref, queue_output, state) do - Enum.reduce(queue_output, state, fn item, state -> - do_handle_input_queue_output(pad_ref, item, state) - end) - end - - @spec do_handle_input_queue_output( - Pad.ref(), - InputQueue.output_value(), - State.t() - ) :: State.t() - defp do_handle_input_queue_output(pad_ref, {:event, e}, state), - do: EventController.exec_handle_event(pad_ref, e, state) - - defp do_handle_input_queue_output(pad_ref, {:stream_format, stream_format}, state), - do: StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) - - defp do_handle_input_queue_output( - pad_ref, - {:buffers, buffers, _inbound_metric_buf_size, outbound_metric_buf_size}, - state - ) do - state = - PadModel.update_data!(state, pad_ref, :manual_demand_size, &(&1 - outbound_metric_buf_size)) - - BufferController.exec_buffer_callback(pad_ref, buffers, state) - end - - @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() - defp exec_handle_demand(pad_ref, state) do - with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), - true <- exec_handle_demand?(pad_data) do - do_exec_handle_demand(pad_data, state) - else - _other -> state - end - end - - @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() - defp do_exec_handle_demand(pad_data, state) do - context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) - - CallbackHandler.exec_and_handle_callback( - :handle_demand, - ActionHandler, - %{ - split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), - context: context - }, - [pad_data.ref, pad_data.demand, pad_data.demand_unit], - state - ) - end - - defp exec_handle_demand?(%{end_of_stream?: true}) do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as :end_of_stream action has already been returned - """) - - false - end - - defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as demand is not greater than 0, - demand: #{inspect(demand)} - """) - - false - end - - defp exec_handle_demand?(_pad_data) do - true - end -end +# defmodule Membrane.Core.Element.DemandHandler do +# @moduledoc false + +# # Module handling demands requested on output pads. + +# alias Membrane.Core.CallbackHandler + +# alias Membrane.Core.Element.{ +# ActionHandler, +# BufferController, +# CallbackContext, +# EventController, +# InputQueue, +# State, +# StreamFormatController +# } + +# alias Membrane.Element.PadData +# alias Membrane.Pad + +# require Membrane.Core.Child.PadModel, as: PadModel +# require Membrane.Core.Message, as: Message +# require Membrane.Logger + +# @handle_demand_loop_limit 20 + +# def delay_redemand(pad_ref, state) do +# Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) +# end + +# @doc """ +# Called when redemand action was returned. +# * If element is currently supplying demand, it means that after finishing `supply_demand` it will call +# `handle_delayed_demands`. +# * If element isn't supplying demand at the moment and there's some unsupplied demand on the given +# output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied. +# """ +# @spec handle_redemand(Pad.ref(), State.t()) :: State.t() +# def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do +# delay_redemand(pad_ref, state) +# end + +# def handle_redemand(pad_ref, %State{} = state) do +# do_handle_redemand(pad_ref, state) +# |> handle_delayed_demands() +# end + +# defp do_handle_redemand(pad_ref, state) do +# state = %{state | delay_demands?: true} +# state = exec_handle_demand(pad_ref, state) +# %{state | delay_demands?: false} +# end + +# @doc """ +# If element is not supplying demand currently, this function supplies +# demand right away by taking buffers from the InputQueue of the given input pad +# and passing it to proper controllers. + +# If element is currently supplying demand it delays supplying demand until all +# current processing is finished. + +# This is necessary due to the case when one requests a demand action while previous +# demand is being supplied. This could lead to a situation where buffers are taken +# from InputQueue and passed to callbacks, while buffers being currently supplied +# have not been processed yet, and therefore to changing order of buffers. + +# The `size` argument can be passed optionally to update the demand on the pad +# before proceeding to supplying it. +# """ +# @spec supply_demand( +# Pad.ref(), +# State.t() +# ) :: State.t() + +# @spec supply_demand(Pad.ref(), State.t()) :: State.t() +# def supply_demand(pad_ref, %State{delay_demands?: true} = state) do +# delay_demand_supply(pad_ref, state) +# end + +# def supply_demand(pad_ref, state) do +# do_supply_demand(pad_ref, state) +# |> handle_delayed_demands() +# end + +# def delay_demand_supply(pad_ref, state) do +# Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply})) +# end + +# defp do_supply_demand(pad_ref, state) do +# # marking is state that actual demand supply has been started (note changing back to false when finished) +# state = %State{state | delay_demands?: true} + +# pad_data = state |> PadModel.get_data!(pad_ref) + +# {{_queue_status, popped_data}, new_input_queue} = +# InputQueue.take(pad_data.input_queue, pad_data.manual_demand_size) + +# state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue) +# state = handle_input_queue_output(pad_ref, popped_data, state) +# %State{state | delay_demands?: false} +# end + +# @spec update_demand(Pad.ref(), non_neg_integer() | (non_neg_integer() -> non_neg_integer()), State.t()) :: State.t() +# def update_demand(pad_ref, size, state) when is_integer(size) do +# PadModel.set_data!(state, pad_ref, :manual_demand_size, size) +# end + +# def update_demand(pad_ref, size_fun, state) when is_function(size_fun) do +# manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size) +# new_manual_demand_size = size_fun.(manual_demand_size) + +# if new_manual_demand_size < 0 do +# raise Membrane.ElementError, +# "Demand altering function requested negative demand on pad #{inspect(pad_ref)} in #{inspect(state.module)}" +# end + +# PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) +# end + +# @spec resume_delayed_demands_loop(State.t()) :: State.t() +# def resume_delayed_demands_loop(%State{} = state) do +# %{state | resume_delayed_demands_loop_in_mailbox?: false} +# |> handle_delayed_demands() +# end + +# @spec handle_delayed_demands(State.t()) :: State.t() +# def handle_delayed_demands(%State{} = state) do +# # Taking random element of `:delayed_demands` is done to keep data flow +# # balanced among pads, i.e. to prevent situation where demands requested by +# # one pad are supplied right away while another one is waiting for buffers +# # potentially for a long time. + +# cond do +# state.delay_demands? -> +# raise "Cannot handle delayed demands when delay_demands? flag is set to true" + +# state.handle_demand_loop_counter >= @handle_demand_loop_limit -> +# state = +# with %{resume_delayed_demands_loop_in_mailbox?: false} <- state do +# Message.self(:resume_delayed_demands_loop) +# %{state | resume_delayed_demands_loop_in_mailbox?: true} +# end + +# %{state | handle_demand_loop_counter: 0} + +# MapSet.size(state.delayed_demands) == 0 -> +# %{state | handle_demand_loop_counter: 0} + +# true -> +# [{pad_ref, action} = entry] = Enum.take_random(state.delayed_demands, 1) + +# state = +# state +# |> Map.update!(:delayed_demands, &MapSet.delete(&1, entry)) +# |> Map.update!(:handle_demand_loop_counter, &(&1 + 1)) + +# case action do +# :supply -> supply_demand(pad_ref, state) +# :redemand -> handle_redemand(pad_ref, state) +# end +# end +# end + +# @spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t() +# def remove_pad_from_delayed_demands(pad_ref, state) do +# Map.update!(state, :delayed_demands, fn delayed_demands_set -> +# delayed_demands_set +# |> MapSet.delete({pad_ref, :supply}) +# |> MapSet.delete({pad_ref, :redemand}) +# end) +# end + +# @spec handle_input_queue_output( +# Pad.ref(), +# [InputQueue.output_value()], +# State.t() +# ) :: State.t() +# defp handle_input_queue_output(pad_ref, queue_output, state) do +# Enum.reduce(queue_output, state, fn item, state -> +# do_handle_input_queue_output(pad_ref, item, state) +# end) +# end + +# @spec do_handle_input_queue_output( +# Pad.ref(), +# InputQueue.output_value(), +# State.t() +# ) :: State.t() +# defp do_handle_input_queue_output(pad_ref, {:event, e}, state), +# do: EventController.exec_handle_event(pad_ref, e, state) + +# defp do_handle_input_queue_output(pad_ref, {:stream_format, stream_format}, state), +# do: StreamFormatController.exec_handle_stream_format(pad_ref, stream_format, state) + +# defp do_handle_input_queue_output( +# pad_ref, +# {:buffers, buffers, _inbound_metric_buf_size, outbound_metric_buf_size}, +# state +# ) do +# state = +# PadModel.update_data!(state, pad_ref, :manual_demand_size, &(&1 - outbound_metric_buf_size)) + +# BufferController.exec_buffer_callback(pad_ref, buffers, state) +# end + +# @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() +# defp exec_handle_demand(pad_ref, state) do +# with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), +# true <- exec_handle_demand?(pad_data) do +# do_exec_handle_demand(pad_data, state) +# else +# _other -> state +# end +# end + +# @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() +# defp do_exec_handle_demand(pad_data, state) do +# context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) + +# CallbackHandler.exec_and_handle_callback( +# :handle_demand, +# ActionHandler, +# %{ +# split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), +# context: context +# }, +# [pad_data.ref, pad_data.demand, pad_data.demand_unit], +# state +# ) +# end + +# defp exec_handle_demand?(%{end_of_stream?: true}) do +# Membrane.Logger.debug_verbose(""" +# Demand controller: not executing handle_demand as :end_of_stream action has already been returned +# """) + +# false +# end + +# defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do +# Membrane.Logger.debug_verbose(""" +# Demand controller: not executing handle_demand as demand is not greater than 0, +# demand: #{inspect(demand)} +# """) + +# false +# end + +# defp exec_handle_demand?(_pad_data) do +# true +# end +# end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex index 6d0849439..e07dcedb5 100644 --- a/lib/membrane/core/element/effective_flow_controller.ex +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -19,7 +19,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. alias Membrane.Core.Element.DemandController - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Element.{AtomicDemand, State} require Membrane.Core.Child.PadModel, as: PadModel @@ -141,6 +140,6 @@ defmodule Membrane.Core.Element.EffectiveFlowController do state end) end - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> DemandController.Auto.pop_queues_and_bump_demand() end end diff --git a/lib/membrane/core/element/event_controller.ex b/lib/membrane/core/element/event_controller.ex index f6b59a34b..66a3d41e5 100644 --- a/lib/membrane/core/element/event_controller.ex +++ b/lib/membrane/core/element/event_controller.ex @@ -12,14 +12,12 @@ defmodule Membrane.Core.Element.EventController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandHandler, + DemandController, InputQueue, PlaybackQueue, State } - alias Membrane.Core.Element.DemandController.AutoFlowUtils - require Membrane.Core.Child.PadModel require Membrane.Core.Message require Membrane.Core.Telemetry @@ -55,7 +53,7 @@ defmodule Membrane.Core.Element.EventController do # event goes to the auto flow control queue not async? and MapSet.member?(state.awaiting_auto_input_pads, pad_ref) -> - AutoFlowUtils.store_event_in_queue(pad_ref, event, state) + DemandController.Auto.store_event_in_queue(pad_ref, event, state) true -> exec_handle_event(pad_ref, event, state) @@ -109,7 +107,7 @@ defmodule Membrane.Core.Element.EventController do Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}") state = - DemandHandler.remove_pad_from_delayed_demands(pad_ref, state) + DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state) |> PadModel.set_data!(pad_ref, :end_of_stream?, true) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 61530f83f..dfbb52f8f 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -10,6 +10,7 @@ defmodule Membrane.Core.Element.PadController do ActionHandler, AtomicDemand, CallbackContext, + DemandController, EffectiveFlowController, EventController, InputQueue, @@ -17,7 +18,6 @@ defmodule Membrane.Core.Element.PadController do StreamFormatController } - alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Parent.Link.Endpoint alias Membrane.LinkError @@ -241,7 +241,7 @@ defmodule Membrane.Core.Element.PadController do |> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref)) |> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref)) - |> AutoFlowUtils.pop_queues_and_bump_demand() + |> DemandController.Auto.pop_queues_and_bump_demand() else {:ok, %{availability: :always}} when state.terminating? -> state @@ -335,7 +335,7 @@ defmodule Membrane.Core.Element.PadController do Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_data.ref)) %{direction: :input, flow_control: :auto} -> - AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state) + DemandController.Auto.auto_adjust_atomic_demand(endpoint.pad_ref, state) |> Map.update!(:auto_input_pads, &[endpoint.pad_ref | &1]) _pad_data -> diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index b22637cc7..f28368a3f 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -8,8 +8,15 @@ defmodule Membrane.Core.Element.StreamFormatController do alias Membrane.{Pad, StreamFormat} alias Membrane.Core.{CallbackHandler, Telemetry} alias Membrane.Core.Child.PadModel - alias Membrane.Core.Element.{ActionHandler, CallbackContext, InputQueue, PlaybackQueue, State} - alias Membrane.Core.Element.DemandController.AutoFlowUtils + + alias Membrane.Core.Element.{ + ActionHandler, + CallbackContext, + DemandController, + InputQueue, + PlaybackQueue, + State + } require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry @@ -41,7 +48,7 @@ defmodule Membrane.Core.Element.StreamFormatController do # stream format goes to the auto flow control queue pad_ref in state.awaiting_auto_input_pads -> - AutoFlowUtils.store_stream_format_in_queue(pad_ref, stream_format, state) + DemandController.Auto.store_stream_format_in_queue(pad_ref, stream_format, state) true -> exec_handle_stream_format(pad_ref, stream_format, state) diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index 3be09d473..f6f9054b9 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do # with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well. atomic_demand: private_field, - # Field used in DemandController.AutoFlowUtils and InputQueue, to caluclate, how much AtomicDemand should be increased. + # Field used in DemandController.Auto and InputQueue, to caluclate, how much AtomicDemand should be increased. # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. manual_demand_size: private_field,