diff --git a/lib/membrane/bin.ex b/lib/membrane/bin.ex index 3ed2e8286..a6e566161 100644 --- a/lib/membrane/bin.ex +++ b/lib/membrane/bin.ex @@ -18,7 +18,6 @@ defmodule Membrane.Bin do alias Membrane.Core.OptionsSpecs require Membrane.Core.Message - require Membrane.Logger @type state :: any() diff --git a/lib/membrane/children_spec.ex b/lib/membrane/children_spec.ex index 0e1afa6d4..3c1b8c783 100644 --- a/lib/membrane/children_spec.ex +++ b/lib/membrane/children_spec.ex @@ -538,11 +538,11 @@ defmodule Membrane.ChildrenSpec do """ @spec via_in(builder(), Pad.name() | Pad.ref(), options: pad_options(), - toilet_capacity: number | nil, - target_queue_size: number | nil, - min_demand_factor: number | nil, - auto_demand_size: number | nil, - throttling_factor: number | nil + toilet_capacity: non_neg_integer() | nil, + target_queue_size: non_neg_integer() | nil, + min_demand_factor: non_neg_integer() | nil, + auto_demand_size: non_neg_integer() | nil, + throttling_factor: non_neg_integer() | nil ) :: builder() | no_return def via_in(builder, pad, props \\ []) @@ -567,7 +567,7 @@ defmodule Membrane.ChildrenSpec do min_demand_factor: [default: nil], auto_demand_size: [default: nil], toilet_capacity: [default: nil], - throttling_factor: [default: 1] + throttling_factor: [default: nil] ) |> case do {:ok, props} -> diff --git a/lib/membrane/core/bin/pad_controller.ex b/lib/membrane/core/bin/pad_controller.ex index 3cb2f8984..946c127a9 100644 --- a/lib/membrane/core/bin/pad_controller.ex +++ b/lib/membrane/core/bin/pad_controller.ex @@ -200,12 +200,10 @@ defmodule Membrane.Core.Bin.PadController do SpecificationParser.raw_endpoint(), SpecificationParser.raw_endpoint(), %{ - initiator: :parent, stream_format_validation_params: StreamFormatController.stream_format_validation_params() } | %{ - initiator: :sibling, other_info: PadModel.pad_info() | nil, link_metadata: map, stream_format_validation_params: @@ -238,7 +236,7 @@ defmodule Membrane.Core.Bin.PadController do child_endpoint = %{child_endpoint | pad_props: pad_props} - if params.initiator == :sibling do + if direction == :input do :ok = Child.PadController.validate_pad_mode!( {endpoint.pad_ref, pad_data}, @@ -297,11 +295,10 @@ defmodule Membrane.Core.Bin.PadController do def handle_unlink(pad_ref, state) do with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do state = maybe_handle_pad_removed(pad_ref, state) - endpoint = PadModel.get_data!(state, pad_ref, :endpoint) {pad_data, state} = PadModel.pop_data!(state, pad_ref) - if endpoint do - Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref) + if pad_data.endpoint do + Message.send(pad_data.endpoint.pid, :handle_unlink, pad_data.endpoint.pad_ref) ChildLifeController.proceed_spec_startup(pad_data.spec_ref, state) else Membrane.Logger.debug(""" diff --git a/lib/membrane/core/child/pad_controller.ex b/lib/membrane/core/child/pad_controller.ex index d345ab457..2f9ae8931 100644 --- a/lib/membrane/core/child/pad_controller.ex +++ b/lib/membrane/core/child/pad_controller.ex @@ -5,7 +5,6 @@ defmodule Membrane.Core.Child.PadController do alias Membrane.{LinkError, Pad} require Membrane.Core.Child.PadModel - require Membrane.Logger @type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Element.State.t() diff --git a/lib/membrane/core/child/pad_model.ex b/lib/membrane/core/child/pad_model.ex index cfda7e50c..7c0a4d10f 100644 --- a/lib/membrane/core/child/pad_model.ex +++ b/lib/membrane/core/child/pad_model.ex @@ -6,6 +6,7 @@ defmodule Membrane.Core.Child.PadModel do use Bunch alias Membrane.Core.Child + alias Membrane.Core.Element.EffectiveFlowController alias Membrane.{Pad, UnknownPadError} @type bin_pad_data :: %Membrane.Bin.PadData{ @@ -24,11 +25,13 @@ defmodule Membrane.Core.Child.PadModel do @type element_pad_data :: %Membrane.Element.PadData{ availability: Pad.availability(), stream_format: Membrane.StreamFormat.t() | nil, - demand: integer() | nil, + demand_snapshot: integer() | nil, + manual_demand_size: integer(), start_of_stream?: boolean(), end_of_stream?: boolean(), direction: Pad.direction(), flow_control: Pad.flow_control(), + other_effective_flow_control: EffectiveFlowController.effective_flow_control() | nil, name: Pad.name(), ref: Pad.ref(), demand_unit: Membrane.Buffer.Metric.unit() | nil, @@ -38,7 +41,6 @@ defmodule Membrane.Core.Child.PadModel do sticky_messages: [Membrane.Event.t()], input_queue: Membrane.Core.Element.InputQueue.t() | nil, options: %{optional(atom) => any}, - toilet: Membrane.Core.Element.Toilet.t() | nil, auto_demand_size: pos_integer() | nil, associated_pads: [Pad.ref()] | nil, sticky_events: [Membrane.Event.t()] @@ -52,6 +54,7 @@ defmodule Membrane.Core.Child.PadModel do required(:availability) => Pad.availability(), required(:direction) => Pad.direction(), required(:name) => Pad.name(), + required(:accepted_formats_str) => String.t(), optional(:flow_control) => Pad.flow_control(), optional(:demand_unit) => Membrane.Buffer.Metric.unit(), optional(:other_demand_unit) => Membrane.Buffer.Metric.unit() diff --git a/lib/membrane/core/element.ex b/lib/membrane/core/element.ex index e648041f5..b2c24ca0e 100644 --- a/lib/membrane/core/element.ex +++ b/lib/membrane/core/element.ex @@ -18,6 +18,7 @@ defmodule Membrane.Core.Element do use Bunch use GenServer + alias Membrane.Core.Element.DemandHandler alias Membrane.{Clock, Core, ResourceGuard, Sync} alias Membrane.Core.{SubprocessSupervisor, TimerController} @@ -25,6 +26,7 @@ defmodule Membrane.Core.Element do alias Membrane.Core.Element.{ BufferController, DemandController, + EffectiveFlowController, EventController, LifecycleController, PadController, @@ -171,9 +173,13 @@ defmodule Membrane.Core.Element do @compile {:inline, do_handle_info: 2} - defp do_handle_info(Message.new(:demand, size, _opts) = msg, state) do - pad_ref = Message.for_pad(msg) - state = DemandController.handle_demand(pad_ref, size, state) + defp do_handle_info(Message.new(:atomic_demand_increased, pad_ref), state) do + state = DemandController.snapshot_atomic_demand(pad_ref, state) + {:noreply, state} + end + + defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do + state = DemandHandler.handle_delayed_demands(state) {:noreply, state} end @@ -215,6 +221,23 @@ defmodule Membrane.Core.Element do {:noreply, state} end + defp do_handle_info( + Message.new(:sender_effective_flow_control_resolved, [ + input_pad_ref, + effective_flow_control + ]), + state + ) do + state = + EffectiveFlowController.handle_sender_effective_flow_control( + input_pad_ref, + effective_flow_control, + state + ) + + {:noreply, state} + end + defp do_handle_info(Message.new(:terminate), state) do state = LifecycleController.handle_terminate_request(state) {:noreply, state} diff --git a/lib/membrane/core/element/action_handler.ex b/lib/membrane/core/element/action_handler.ex index 8b532de34..318418196 100644 --- a/lib/membrane/core/element/action_handler.ex +++ b/lib/membrane/core/element/action_handler.ex @@ -20,7 +20,15 @@ defmodule Membrane.Core.Element.ActionHandler do } alias Membrane.Core.Child.PadModel - alias Membrane.Core.Element.{DemandHandler, PadController, State, StreamFormatController} + + alias Membrane.Core.Element.{ + DemandController, + DemandHandler, + PadController, + State, + StreamFormatController + } + alias Membrane.Core.{Events, Message, Telemetry, TimerController} alias Membrane.Element.Action @@ -307,11 +315,11 @@ defmodule Membrane.Core.Element.ActionHandler do } when stream_format != nil <- pad_data do state = - DemandHandler.handle_outgoing_buffers(pad_ref, pad_data, buffers, state) + DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) |> PadModel.set_data!(pad_ref, :start_of_stream?, true) Message.send(pid, :buffer, buffers, for_pad: other_ref) - state + DemandController.snapshot_atomic_demand(pad_ref, state) else %{direction: :input} -> raise PadDirectionError, action: :buffer, direction: :input, pad: pad_ref @@ -354,7 +362,8 @@ defmodule Membrane.Core.Element.ActionHandler do StreamFormatController.validate_stream_format!( :output, stream_format_validation_params, - stream_format + stream_format, + state ) state = PadModel.set_data!(state, pad_ref, :stream_format, stream_format) diff --git a/lib/membrane/core/element/atomic_demand.ex b/lib/membrane/core/element/atomic_demand.ex new file mode 100644 index 000000000..1b387254e --- /dev/null +++ b/lib/membrane/core/element/atomic_demand.ex @@ -0,0 +1,225 @@ +defmodule Membrane.Core.Element.AtomicDemand do + @moduledoc false + + alias Membrane.Core.Element.EffectiveFlowController + + alias __MODULE__.{ + DistributedAtomic, + AtomicFlowStatus + } + + require Membrane.Core.Message, as: Message + require Membrane.Logger + require Membrane.Pad, as: Pad + + @default_toilet_capacity_factor 200 + @default_throttling_factor 1 + @distributed_default_throttling_factor 150 + + @opaque t :: %__MODULE__{ + counter: DistributedAtomic.t(), + receiver_status: AtomicFlowStatus.t(), + receiver_process: Process.dest(), + sender_status: AtomicFlowStatus.t(), + sender_process: Process.dest(), + sender_pad_ref: Pad.ref(), + toilet_capacity: neg_integer(), + buffered_decrementation: non_neg_integer(), + throttling_factor: pos_integer(), + toilet_overflowed?: boolean(), + receiver_demand_unit: Membrane.Buffer.Metric.unit() + } + + @type flow_mode :: AtomicFlowStatus.value() + + @enforce_keys [ + :counter, + :receiver_status, + :receiver_process, + :sender_status, + :sender_process, + :sender_pad_ref, + :throttling_factor, + :toilet_capacity, + :receiver_demand_unit + ] + + defstruct @enforce_keys ++ [buffered_decrementation: 0, toilet_overflowed?: false] + + @spec new(%{ + :receiver_effective_flow_control => EffectiveFlowController.effective_flow_control(), + :receiver_process => Process.dest(), + :receiver_demand_unit => Membrane.Buffer.Metric.unit(), + :sender_process => Process.dest(), + :sender_pad_ref => Pad.ref(), + :supervisor => pid(), + optional(:toilet_capacity) => non_neg_integer() | nil, + optional(:throttling_factor) => pos_integer() | nil + }) :: t + def new( + %{ + receiver_effective_flow_control: receiver_effective_flow_control, + receiver_process: receiver_process, + receiver_demand_unit: receiver_demand_unit, + sender_process: sender_process, + sender_pad_ref: sender_pad_ref, + supervisor: supervisor + } = options + ) do + toilet_capacity = options[:toilet_capacity] + throttling_factor = options[:throttling_factor] + + counter = DistributedAtomic.new(supervisor: supervisor) + + throttling_factor = + cond do + throttling_factor != nil -> throttling_factor + node(sender_process) == node(counter.worker) -> @default_throttling_factor + true -> @distributed_default_throttling_factor + end + + receiver_status = + AtomicFlowStatus.new( + {:resolved, receiver_effective_flow_control}, + supervisor: supervisor + ) + + %__MODULE__{ + counter: counter, + receiver_status: receiver_status, + receiver_process: receiver_process, + sender_status: AtomicFlowStatus.new(:to_be_resolved, supervisor: supervisor), + sender_process: sender_process, + sender_pad_ref: sender_pad_ref, + toilet_capacity: toilet_capacity || default_toilet_capacity(receiver_demand_unit), + throttling_factor: throttling_factor, + receiver_demand_unit: receiver_demand_unit + } + end + + @spec set_sender_status(t, AtomicFlowStatus.value()) :: :ok + def set_sender_status(%__MODULE__{} = atomic_demand, mode) do + AtomicFlowStatus.set( + atomic_demand.sender_status, + mode + ) + end + + @spec get_sender_status(t) :: AtomicFlowStatus.value() + def get_sender_status(%__MODULE__{} = atomic_demand) do + AtomicFlowStatus.get(atomic_demand.sender_status) + end + + @spec set_receiver_status(t, AtomicFlowStatus.value()) :: :ok + def set_receiver_status(%__MODULE__{} = atomic_demand, mode) do + AtomicFlowStatus.set( + atomic_demand.receiver_status, + mode + ) + end + + @spec get_receiver_status(t) :: AtomicFlowStatus.value() + def get_receiver_status(%__MODULE__{} = atomic_demand) do + AtomicFlowStatus.get(atomic_demand.receiver_status) + end + + @spec increase(t, non_neg_integer()) :: :ok + def increase(%__MODULE__{} = atomic_demand, value) do + new_atomic_demand_value = DistributedAtomic.add_get(atomic_demand.counter, value) + old_atomic_demand_value = new_atomic_demand_value - value + + if old_atomic_demand_value <= 0 do + Message.send( + atomic_demand.sender_process, + :atomic_demand_increased, + atomic_demand.sender_pad_ref + ) + end + + :ok + end + + @spec decrease(t, non_neg_integer()) :: t + def decrease(%__MODULE__{} = atomic_demand, value) do + atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value)) + + if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do + flush_buffered_decrementation(atomic_demand) + else + atomic_demand + end + end + + @spec get(t) :: integer() + def get(%__MODULE__{} = atomic_demand) do + DistributedAtomic.get(atomic_demand.counter) + end + + defp flush_buffered_decrementation(atomic_demand) do + atomic_demand_value = + DistributedAtomic.sub_get( + atomic_demand.counter, + atomic_demand.buffered_decrementation + ) + + atomic_demand = %{atomic_demand | buffered_decrementation: 0} + + if not atomic_demand.toilet_overflowed? and + get_receiver_status(atomic_demand) == {:resolved, :pull} and + get_sender_status(atomic_demand) == {:resolved, :push} and + -1 * atomic_demand_value > atomic_demand.toilet_capacity do + overflow(atomic_demand, atomic_demand_value) + else + atomic_demand + end + end + + defp overflow(atomic_demand, atomic_demand_value) do + Membrane.Logger.debug_verbose(~S""" + Toilet overflow + + ` ' ` + .'''. ' .'''. + .. ' ' .. + ' '.'.' ' + .'''.'.'''. + ' .''.'.''. ' + ;------ ' ------; + | ~~ .--'--// | + | / ' \ | + | / ' \ | + | | ' | | ,----. + | \ , ' , / | =|____|= + '---,###'###,---' (---( + /## ' ##\ )---) + |##, ' ,##| (---( + \'#####'/ `---` + \`"#"`/ + |`"`| + .-| |-. + / ' ' \ + '---------' + """) + + Membrane.Logger.error(""" + Toilet overflow. + + Atomic demand reached the size of #{inspect(atomic_demand_value)}, which means that there are #{inspect(-1 * atomic_demand_value)} + #{atomic_demand.receiver_demand_unit} sent without demanding it, which is above toilet capacity (#{inspect(atomic_demand.toilet_capacity)}) + when storing data from output working in push mode. It means that some element in the pipeline + processes the stream too slow or doesn't process it at all. + To have control over amount of buffers being produced, consider using output in :auto or :manual + flow control mode. (see `Membrane.Pad.flow_control`). + You can also try changing the `toilet_capacity` in `Membrane.ChildrenSpec.via_in/3`. + """) + + Process.exit(atomic_demand.receiver_process, :kill) + + %{atomic_demand | toilet_overflowed?: true} + end + + defp default_toilet_capacity(demand_unit) do + Membrane.Buffer.Metric.from_unit(demand_unit).buffer_size_approximation() * + @default_toilet_capacity_factor + end +end diff --git a/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex b/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex new file mode 100644 index 000000000..e4605bec2 --- /dev/null +++ b/lib/membrane/core/element/atomic_demand/atomic_flow_status.ex @@ -0,0 +1,37 @@ +defmodule Membrane.Core.Element.AtomicDemand.AtomicFlowStatus do + @moduledoc false + + alias Membrane.Core.Element.AtomicDemand.DistributedAtomic + alias Membrane.Core.Element.EffectiveFlowController + + @type t :: DistributedAtomic.t() + @type value :: {:resolved, EffectiveFlowController.effective_flow_control()} | :to_be_resolved + + @spec new(value, supervisor: pid()) :: t + def new(initial_value, supervisor: supervisor) do + initial_value + |> flow_status_to_int() + |> DistributedAtomic.new(supervisor: supervisor) + end + + @spec get(t) :: value() + def get(distributed_atomic) do + distributed_atomic + |> DistributedAtomic.get() + |> int_to_flow_status() + end + + @spec set(t, value()) :: :ok + def set(distributed_atomic, value) do + value = flow_status_to_int(value) + DistributedAtomic.set(distributed_atomic, value) + end + + defp int_to_flow_status(0), do: :to_be_resolved + defp int_to_flow_status(1), do: {:resolved, :push} + defp int_to_flow_status(2), do: {:resolved, :pull} + + defp flow_status_to_int(:to_be_resolved), do: 0 + defp flow_status_to_int({:resolved, :push}), do: 1 + defp flow_status_to_int({:resolved, :pull}), do: 2 +end diff --git a/lib/membrane/core/element/atomic_demand/distributed_atomic.ex b/lib/membrane/core/element/atomic_demand/distributed_atomic.ex new file mode 100644 index 000000000..e18f7cfbb --- /dev/null +++ b/lib/membrane/core/element/atomic_demand/distributed_atomic.ex @@ -0,0 +1,76 @@ +defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic do + @moduledoc false + + # A module providing a common interface to access and modify a counter used in the AtomicDemand implementation. + # The counter uses :atomics module under the hood. + # The module allows to create and modify the value of a counter in the same manner both when the counter is about to be accessed + # from the same node, and from different nodes. + + alias __MODULE__.Worker + alias Membrane.Core.SubprocessSupervisor + + @enforce_keys [:worker, :atomic_ref] + defstruct @enforce_keys + + @type t :: %__MODULE__{worker: Worker.t(), atomic_ref: :atomics.atomics_ref()} + + defguardp on_the_same_node_as_self(distributed_atomic) + when distributed_atomic.worker |> node() == self() |> node() + + @spec new(integer() | nil, supervisor: pid()) :: t + def new(initial_value \\ nil, supervisor: supervisor) do + atomic_ref = :atomics.new(1, []) + {:ok, worker} = SubprocessSupervisor.start_utility(supervisor, Worker) + + distributed_atomic = %__MODULE__{ + atomic_ref: atomic_ref, + worker: worker + } + + if initial_value != nil do + :ok = set(distributed_atomic, initial_value) + end + + distributed_atomic + end + + @spec add_get(t, integer()) :: integer() + def add_get(%__MODULE__{} = distributed_atomic, value) + when on_the_same_node_as_self(distributed_atomic) do + :atomics.add_get(distributed_atomic.atomic_ref, 1, value) + end + + def add_get(%__MODULE__{} = distributed_atomic, value) do + GenServer.call(distributed_atomic.worker, {:add_get, distributed_atomic.atomic_ref, value}) + end + + @spec sub_get(t, integer()) :: integer() + def sub_get(%__MODULE__{} = distributed_atomic, value) + when on_the_same_node_as_self(distributed_atomic) do + :atomics.sub_get(distributed_atomic.atomic_ref, 1, value) + end + + def sub_get(%__MODULE__{} = distributed_atomic, value) do + GenServer.cast(distributed_atomic.worker, {:sub_get, distributed_atomic.atomic_ref, value}) + end + + @spec set(t, integer()) :: :ok + def set(%__MODULE__{} = distributed_atomic, value) + when on_the_same_node_as_self(distributed_atomic) do + :atomics.put(distributed_atomic.atomic_ref, 1, value) + end + + def set(%__MODULE__{} = distributed_atomic, value) do + GenServer.cast(distributed_atomic.worker, {:put, distributed_atomic.atomic_ref, value}) + end + + @spec get(t) :: integer() + def get(%__MODULE__{} = distributed_atomic) + when on_the_same_node_as_self(distributed_atomic) do + :atomics.get(distributed_atomic.atomic_ref, 1) + end + + def get(%__MODULE__{} = distributed_atomic) do + GenServer.call(distributed_atomic.worker, {:get, distributed_atomic.atomic_ref}) + end +end diff --git a/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex new file mode 100644 index 000000000..5f60450d7 --- /dev/null +++ b/lib/membrane/core/element/atomic_demand/distributed_atomic/worker.ex @@ -0,0 +1,42 @@ +defmodule Membrane.Core.Element.AtomicDemand.DistributedAtomic.Worker do + @moduledoc false + + # This is a GenServer created when the counter is about to be accessed from different nodes - it's running on the same node, + # where the :atomics variable is put, and processes from different nodes can ask it to modify the counter on their behalf. + + use GenServer + + @type t :: pid() + + @spec start_link(any()) :: {:ok, t} + def start_link(opts), do: GenServer.start_link(__MODULE__, opts) + + @impl true + def init(_opts) do + {:ok, nil, :hibernate} + end + + @impl true + def handle_call({:add_get, atomic_ref, value}, _from, _state) do + result = :atomics.add_get(atomic_ref, 1, value) + {:reply, result, nil} + end + + @impl true + def handle_call({:sub_get, atomic_ref, value}, _from, _state) do + result = :atomics.sub_get(atomic_ref, 1, value) + {:reply, result, nil} + end + + @impl true + def handle_call({:get, atomic_ref}, _from, _state) do + result = :atomics.get(atomic_ref, 1) + {:reply, result, nil} + end + + @impl true + def handle_cast({:put, atomic_ref, value}, _state) do + :atomics.put(atomic_ref, 1, value) + {:noreply, nil} + end +end diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index eb9785c8e..ef3ec8410 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -12,7 +12,6 @@ defmodule Membrane.Core.Element.BufferController do alias Membrane.Core.Element.{ ActionHandler, CallbackContext, - DemandController, DemandHandler, EventController, InputQueue, @@ -20,6 +19,7 @@ defmodule Membrane.Core.Element.BufferController do State } + alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Telemetry require Membrane.Core.Child.PadModel @@ -58,15 +58,17 @@ defmodule Membrane.Core.Element.BufferController do State.t() defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do %{demand: demand, demand_unit: demand_unit} = data - buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) + state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) - state = DemandController.send_auto_demand_if_needed(pad_ref, state) + + state = AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) exec_buffer_callback(pad_ref, buffers, state) end defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do %{input_queue: old_input_queue} = data + input_queue = InputQueue.store(old_input_queue, buffers) state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue) @@ -77,7 +79,7 @@ defmodule Membrane.Core.Element.BufferController do end end - defp do_handle_buffer(pad_ref, _data, buffers, state) do + defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do exec_buffer_callback(pad_ref, buffers, state) end @@ -92,13 +94,7 @@ defmodule Membrane.Core.Element.BufferController do def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do Telemetry.report_metric("buffer", 1, inspect(pad_ref)) - CallbackHandler.exec_and_handle_callback( - :handle_buffers_batch, - ActionHandler, - %{context: &CallbackContext.from_state/1}, - [pad_ref, buffers], - state - ) + do_exec_buffer_callback(pad_ref, buffers, state) end def exec_buffer_callback(pad_ref, buffers, %State{type: type} = state) @@ -106,6 +102,10 @@ defmodule Membrane.Core.Element.BufferController do Telemetry.report_metric(:buffer, length(List.wrap(buffers))) Telemetry.report_bitrate(buffers) + do_exec_buffer_callback(pad_ref, buffers, state) + end + + defp do_exec_buffer_callback(pad_ref, buffers, state) do CallbackHandler.exec_and_handle_callback( :handle_buffers_batch, ActionHandler, diff --git a/lib/membrane/core/element/demand_controller.ex b/lib/membrane/core/element/demand_controller.ex index 7adb36c04..cb5d99618 100644 --- a/lib/membrane/core/element/demand_controller.ex +++ b/lib/membrane/core/element/demand_controller.ex @@ -1,146 +1,103 @@ defmodule Membrane.Core.Element.DemandController do @moduledoc false - # Module handling demands incoming through output pads. + # Module handling changes in values of output pads atomic demand use Bunch - alias Membrane.Core.{CallbackHandler, Message} + alias __MODULE__.AutoFlowUtils + + alias Membrane.Buffer alias Membrane.Core.Child.PadModel - alias Membrane.Core.Element.{ActionHandler, CallbackContext, PlaybackQueue, State, Toilet} + + alias Membrane.Core.Element.{ + AtomicDemand, + DemandHandler, + PlaybackQueue, + State + } + alias Membrane.Pad require Membrane.Core.Child.PadModel require Membrane.Logger - @doc """ - Handles demand coming on an output pad. Updates demand value and executes `handle_demand` callback. - """ - @spec handle_demand(Pad.ref(), non_neg_integer, State.t()) :: State.t() - def handle_demand(pad_ref, size, state) do - withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref), - playback: %State{playback: :playing} <- state do - %{direction: :output, flow_control: flow_control} = data - - if flow_control == :push, - do: raise("Pad with :push control mode cannot handle demand.") + @spec snapshot_atomic_demand(Pad.ref(), State.t()) :: State.t() + def snapshot_atomic_demand(pad_ref, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + %State{playback: :playing} <- state do + if pad_data.direction == :input, + do: raise("cannot snapshot atomic counter in input pad") - do_handle_demand(pad_ref, size, data, state) + do_snapshot_atomic_demand(pad_data, state) else - pad: {:error, :unknown_pad} -> - # We've got a demand from already unlinked pad + {:error, :unknown_pad} -> + # We've got a :atomic_demand_increased message on already unlinked pad state - playback: _playback -> - PlaybackQueue.store(&handle_demand(pad_ref, size, &1), state) + %State{playback: :stopped} -> + PlaybackQueue.store(&snapshot_atomic_demand(pad_ref, &1), state) end end - defp do_handle_demand(pad_ref, size, %{flow_control: :auto} = data, state) do - %{demand: old_demand, associated_pads: associated_pads} = data - - state = PadModel.set_data!(state, pad_ref, :demand, old_demand + size) - - if old_demand <= 0 do - Enum.reduce(associated_pads, state, &send_auto_demand_if_needed/2) - else - state - end - end + defp do_snapshot_atomic_demand( + %{flow_control: :auto} = pad_data, + %{effective_flow_control: :pull} = state + ) do + %{ + atomic_demand: atomic_demand, + associated_pads: associated_pads + } = pad_data - defp do_handle_demand(pad_ref, size, %{flow_control: :manual} = data, state) do - demand = data.demand + size - data = %{data | demand: demand} - state = PadModel.set_data!(state, pad_ref, data) - - if exec_handle_demand?(data) do - context = &CallbackContext.from_state(&1, incoming_demand: size) - - CallbackHandler.exec_and_handle_callback( - :handle_demand, - ActionHandler, - %{ - split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_ref)), - context: context - }, - [pad_ref, demand, data[:demand_unit]], - state - ) + if AtomicDemand.get(atomic_demand) > 0 do + AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state) else state end end - @doc """ - Sends auto demand to an input pad if it should be sent. - - The demand should be sent when the current demand on the input pad is at most - half of the demand request size and if there's positive demand on each of - associated output pads. - - Also, the `demand_decrease` argument can be passed, decreasing the size of the - demand on the input pad before proceeding to the rest of the function logic. - """ - @spec send_auto_demand_if_needed(Pad.ref(), integer, State.t()) :: State.t() - def send_auto_demand_if_needed(pad_ref, demand_decrease \\ 0, state) do - data = PadModel.get_data!(state, pad_ref) - - %{ - demand: demand, - toilet: toilet, - associated_pads: associated_pads, - auto_demand_size: demand_request_size - } = data - - demand = demand - demand_decrease - - demand = - if demand <= div(demand_request_size, 2) and auto_demands_positive?(associated_pads, state) do - if toilet do - Toilet.drain(toilet, demand_request_size - demand) - else - Membrane.Logger.debug_verbose( - "Sending auto demand of size #{demand_request_size - demand} on pad #{inspect(pad_ref)}" - ) - - %{pid: pid, other_ref: other_ref} = data - Message.send(pid, :demand, demand_request_size - demand, for_pad: other_ref) - end - - demand_request_size - else - Membrane.Logger.debug_verbose( - "Not sending auto demand on pad #{inspect(pad_ref)}, pads data: #{inspect(state.pads_data)}" + defp do_snapshot_atomic_demand(%{flow_control: :manual} = pad_data, state) do + with %{demand_snapshot: demand_snapshot, atomic_demand: atomic_demand} + when demand_snapshot <= 0 <- pad_data, + atomic_demand_value + when atomic_demand_value > 0 and atomic_demand_value > demand_snapshot <- + AtomicDemand.get(atomic_demand) do + state = + PadModel.update_data!( + state, + pad_data.ref, + &%{ + &1 + | demand_snapshot: atomic_demand_value, + incoming_demand: atomic_demand_value - &1.demand_snapshot + } ) - demand - end - - PadModel.set_data!(state, pad_ref, :demand, demand) - end - - defp auto_demands_positive?(associated_pads, state) do - Enum.all?(associated_pads, &(PadModel.get_data!(state, &1, :demand) > 0)) - end - - defp exec_handle_demand?(%{end_of_stream?: true}) do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as :end_of_stream action has already been returned - """) - - false + DemandHandler.handle_redemand(pad_data.ref, state) + else + _other -> state + end end - defp exec_handle_demand?(%{demand: demand}) when demand <= 0 do - Membrane.Logger.debug_verbose(""" - Demand controller: not executing handle_demand as demand is not greater than 0, - demand: #{inspect(demand)} - """) - - false + defp do_snapshot_atomic_demand(_pad_data, state) do + state end - defp exec_handle_demand?(_pad_data) do - true + @doc """ + Decreases demand snapshot and atomic demand on the output by the size of outgoing buffers. + """ + @spec decrease_demand_by_outgoing_buffers(Pad.ref(), [Buffer.t()], State.t()) :: State.t() + def decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) do + pad_data = PadModel.get_data!(state, pad_ref) + buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers) + + demand_snapshot = pad_data.demand_snapshot - buffers_size + atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size) + + PadModel.set_data!(state, pad_ref, %{ + pad_data + | demand_snapshot: demand_snapshot, + atomic_demand: atomic_demand + }) end end diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex new file mode 100644 index 000000000..52e89aebd --- /dev/null +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -0,0 +1,60 @@ +defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do + @moduledoc false + + alias Membrane.Core.Element.{ + AtomicDemand, + State + } + + require Membrane.Core.Child.PadModel, as: PadModel + require Membrane.Pad, as: Pad + + defguardp is_input_auto_pad_data(pad_data) + when is_map(pad_data) and is_map_key(pad_data, :flow_control) and + pad_data.flow_control == :auto and is_map_key(pad_data, :direction) and + pad_data.direction == :input + + @spec auto_adjust_atomic_demand(Pad.ref() | [Pad.ref()], State.t()) :: State.t() + def auto_adjust_atomic_demand(pad_ref_list, state) when is_list(pad_ref_list) do + Enum.reduce(pad_ref_list, state, &auto_adjust_atomic_demand/2) + end + + def auto_adjust_atomic_demand(pad_ref, state) when Pad.is_pad_ref(pad_ref) do + PadModel.get_data!(state, pad_ref) + |> do_auto_adjust_atomic_demand(state) + end + + defp do_auto_adjust_atomic_demand(pad_data, state) when is_input_auto_pad_data(pad_data) do + if increase_atomic_demand?(pad_data, state) do + diff = pad_data.auto_demand_size - pad_data.demand + :ok = AtomicDemand.increase(pad_data.atomic_demand, diff) + + PadModel.set_data!( + state, + pad_data.ref, + :demand, + pad_data.auto_demand_size + ) + else + state + end + end + + defp do_auto_adjust_atomic_demand(%{ref: ref}, _state) do + raise "#{__MODULE__}.auto_adjust_atomic_demand/2 can be called only for auto input pads, while #{inspect(ref)} is not such a pad." + end + + defp increase_atomic_demand?(pad_data, state) do + state.effective_flow_control == :pull and + pad_data.demand < pad_data.auto_demand_size / 2 and + Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state)) + end + + defp atomic_demand_positive?(pad_ref, state) do + atomic_demand_value = + PadModel.get_data!(state, pad_ref, :atomic_demand) + |> AtomicDemand.get() + + atomic_demand_value > 0 + end +end diff --git a/lib/membrane/core/element/demand_handler.ex b/lib/membrane/core/element/demand_handler.ex index a0600f71e..6f8e85b48 100644 --- a/lib/membrane/core/element/demand_handler.ex +++ b/lib/membrane/core/element/demand_handler.ex @@ -3,25 +3,27 @@ defmodule Membrane.Core.Element.DemandHandler do # Module handling demands requested on output pads. - alias Membrane.Buffer - alias Membrane.Core.Child.PadModel + alias Membrane.Core.CallbackHandler alias Membrane.Core.Element.{ + ActionHandler, BufferController, - DemandController, + CallbackContext, EventController, InputQueue, State, - StreamFormatController, - Toilet + StreamFormatController } + alias Membrane.Element.PadData alias Membrane.Pad - require Membrane.Core.Child.PadModel - require Membrane.Core.Message + require Membrane.Core.Child.PadModel, as: PadModel + require Membrane.Core.Message, as: Message require Membrane.Logger + @handle_demand_loop_limit 20 + @doc """ Called when redemand action was returned. * If element is currently supplying demand, it means that after finishing `supply_demand` it will call @@ -34,8 +36,15 @@ defmodule Membrane.Core.Element.DemandHandler do Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand})) end - def handle_redemand(pad_ref, state) do - DemandController.handle_demand(pad_ref, 0, state) + def handle_redemand(pad_ref, %State{} = state) do + do_handle_redemand(pad_ref, state) + |> handle_delayed_demands() + end + + defp do_handle_redemand(pad_ref, state) do + state = %{state | supplying_demand?: true} + state = exec_handle_demand(pad_ref, state) + %{state | supplying_demand?: false} end @doc """ @@ -70,8 +79,8 @@ defmodule Membrane.Core.Element.DemandHandler do end def supply_demand(pad_ref, state) do - state = do_supply_demand(pad_ref, state) - handle_delayed_demands(state) + do_supply_demand(pad_ref, state) + |> handle_delayed_demands() end defp do_supply_demand(pad_ref, state) do @@ -80,100 +89,60 @@ defmodule Membrane.Core.Element.DemandHandler do pad_data = state |> PadModel.get_data!(pad_ref) - {{_queue_status, data}, new_input_queue} = - InputQueue.take_and_demand( - pad_data.input_queue, - pad_data.demand, - pad_data.pid, - pad_data.other_ref - ) + {{_queue_status, popped_data}, new_input_queue} = + InputQueue.take(pad_data.input_queue, pad_data.manual_demand_size) state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue) - state = handle_input_queue_output(pad_ref, data, state) + state = handle_input_queue_output(pad_ref, popped_data, state) %State{state | supplying_demand?: false} end - @doc """ - Decreases the demand on the output by the size of outgoing buffers. Checks for the toilet - overflow if the toilet is enabled. - """ - @spec handle_outgoing_buffers( - Pad.ref(), - PadModel.pad_data(), - [Buffer.t()], - State.t() - ) :: State.t() - def handle_outgoing_buffers(pad_ref, %{flow_control: flow_control} = data, buffers, state) - when flow_control in [:auto, :manual] do - %{other_demand_unit: other_demand_unit, demand: demand} = data - buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers) - PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) - end - - def handle_outgoing_buffers( - pad_ref, - %{ - flow_control: :push, - toilet: toilet - } = data, - buffers, - state - ) - when toilet != nil do - %{other_demand_unit: other_demand_unit} = data - buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers) - - case Toilet.fill(toilet, buf_size) do - {:ok, toilet} -> - PadModel.set_data!(state, pad_ref, :toilet, toilet) - - {:overflow, _toilet} -> - # if the toilet has overflowed, we remove it so it didn't overflow again - # and let the parent handle that situation by unlinking this output pad or crashing - PadModel.set_data!(state, pad_ref, :toilet, nil) - end - end - - def handle_outgoing_buffers(_pad_ref, _pad_data, _buffers, state) do - state - end - defp update_demand(pad_ref, size, state) when is_integer(size) do - PadModel.set_data!(state, pad_ref, :demand, size) + PadModel.set_data!(state, pad_ref, :manual_demand_size, size) end defp update_demand(pad_ref, size_fun, state) when is_function(size_fun) do - demand = PadModel.get_data!(state, pad_ref, :demand) - new_demand = size_fun.(demand) + manual_demand_size = PadModel.get_data!(state, pad_ref, :manual_demand_size) + new_manual_demand_size = size_fun.(manual_demand_size) - if new_demand < 0 do + if new_manual_demand_size < 0 do raise Membrane.ElementError, - "Demand altering function requested negative demand on pad #{inspect(pad_ref)} in #{state.module}" + "Demand altering function requested negative demand on pad #{inspect(pad_ref)} in #{inspect(state.module)}" end - PadModel.set_data!(state, pad_ref, :demand, new_demand) + PadModel.set_data!(state, pad_ref, :manual_demand_size, new_manual_demand_size) end @spec handle_delayed_demands(State.t()) :: State.t() - defp handle_delayed_demands(%State{delayed_demands: delayed_demands} = state) do + def handle_delayed_demands(%State{} = state) do # Taking random element of `:delayed_demands` is done to keep data flow # balanced among pads, i.e. to prevent situation where demands requested by # one pad are supplied right away while another one is waiting for buffers # potentially for a long time. - case Enum.take_random(state.delayed_demands, 1) do - [] -> - state - [{pad_ref, action} = entry] -> - state = %State{state | delayed_demands: MapSet.delete(delayed_demands, entry)} + cond do + state.supplying_demand? -> + raise "Cannot handle delayed demands while already supplying demand" - state = - case action do - :supply -> do_supply_demand(pad_ref, state) - :redemand -> handle_redemand(pad_ref, state) - end + state.handle_demand_loop_counter >= @handle_demand_loop_limit -> + Message.self(:resume_handle_demand_loop) + %{state | handle_demand_loop_counter: 0} + + Enum.empty?(state.delayed_demands) -> + %{state | handle_demand_loop_counter: 0} - handle_delayed_demands(state) + true -> + [{pad_ref, action} = entry] = Enum.take_random(state.delayed_demands, 1) + + state = + state + |> Map.update!(:delayed_demands, &MapSet.delete(&1, entry)) + |> Map.update!(:handle_demand_loop_counter, &(&1 + 1)) + + case action do + :supply -> supply_demand(pad_ref, state) + :redemand -> handle_redemand(pad_ref, state) + end end end @@ -182,9 +151,9 @@ defmodule Membrane.Core.Element.DemandHandler do [InputQueue.output_value()], State.t() ) :: State.t() - defp handle_input_queue_output(pad_ref, data, state) do - Enum.reduce(data, state, fn v, state -> - do_handle_input_queue_output(pad_ref, v, state) + defp handle_input_queue_output(pad_ref, queue_output, state) do + Enum.reduce(queue_output, state, fn item, state -> + do_handle_input_queue_output(pad_ref, item, state) end) end @@ -204,12 +173,56 @@ defmodule Membrane.Core.Element.DemandHandler do {:buffers, buffers, _inbound_metric_buf_size, outbound_metric_buf_size}, state ) do - state = PadModel.update_data!(state, pad_ref, :demand, &(&1 - outbound_metric_buf_size)) + state = + PadModel.update_data!(state, pad_ref, :manual_demand_size, &(&1 - outbound_metric_buf_size)) + + BufferController.exec_buffer_callback(pad_ref, buffers, state) + end - if toilet = PadModel.get_data!(state, pad_ref, :toilet) do - Toilet.drain(toilet, outbound_metric_buf_size) + @spec exec_handle_demand(Pad.ref(), State.t()) :: State.t() + defp exec_handle_demand(pad_ref, state) do + with {:ok, pad_data} <- PadModel.get_data(state, pad_ref), + true <- exec_handle_demand?(pad_data) do + do_exec_handle_demand(pad_data, state) + else + _other -> state end + end - BufferController.exec_buffer_callback(pad_ref, buffers, state) + @spec do_exec_handle_demand(PadData.t(), State.t()) :: State.t() + defp do_exec_handle_demand(pad_data, state) do + context = &CallbackContext.from_state(&1, incoming_demand: pad_data.incoming_demand) + + CallbackHandler.exec_and_handle_callback( + :handle_demand, + ActionHandler, + %{ + split_continuation_arbiter: &exec_handle_demand?(PadModel.get_data!(&1, pad_data.ref)), + context: context + }, + [pad_data.ref, pad_data.demand_snapshot, pad_data.demand_unit], + state + ) + end + + defp exec_handle_demand?(%{end_of_stream?: true}) do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as :end_of_stream action has already been returned + """) + + false + end + + defp exec_handle_demand?(%{demand_snapshot: demand_snapshot}) when demand_snapshot <= 0 do + Membrane.Logger.debug_verbose(""" + Demand controller: not executing handle_demand as demand_snapshot is not greater than 0, + demand_snapshot: #{inspect(demand_snapshot)} + """) + + false + end + + defp exec_handle_demand?(_pad_data) do + true end end diff --git a/lib/membrane/core/element/effective_flow_controller.ex b/lib/membrane/core/element/effective_flow_controller.ex new file mode 100644 index 000000000..8db26fb79 --- /dev/null +++ b/lib/membrane/core/element/effective_flow_controller.ex @@ -0,0 +1,138 @@ +defmodule Membrane.Core.Element.EffectiveFlowController do + @moduledoc false + + # Module responsible for the mechanism of resolving effective flow control in elements with pads with auto flow control. + # Effective flow control of the element determines if the element's pads with auto flow control work in :push or in + # :pull mode. If the element's effective flow control is set to :push, then all of its auto pads work in :push. Analogically, + # if the element effective flow control is set to :pull, auto pads also work in :pull. + + # If element A is linked via its input auto pads only to the :push output pads of other elements, then effective flow + # control of element A will be set to :push. Otherwise, if element A is linked via its input auto pads to at least one + # :pull output pad, element A will set its effective flow control to :pull and will forward this information + # via its output auto pads. + + # Resolving effective flow control is performed on + # - entering playing playback + # - adding and removing pad + # - receiving information, that neighbour element effective flow control has changed + + # Effective flow control of a single element can switch between :push and :pull many times during the element's lifetime. + + alias Membrane.Core.Element.DemandController.AutoFlowUtils + alias Membrane.Core.Element.{AtomicDemand, State} + + require Membrane.Core.Child.PadModel, as: PadModel + require Membrane.Core.Message, as: Message + require Membrane.Logger + require Membrane.Pad, as: Pad + + @type effective_flow_control :: :push | :pull + + @spec get_pad_effective_flow_control(Pad.ref(), State.t()) :: effective_flow_control() + def get_pad_effective_flow_control(pad_ref, state) do + pad_name = Pad.name_by_ref(pad_ref) + + state.pads_info + |> get_in([pad_name, :flow_control]) + |> case do + :manual -> :pull + :push -> :push + :auto -> state.effective_flow_control + end + end + + @spec handle_sender_effective_flow_control( + Pad.ref(), + effective_flow_control(), + State.t() + ) :: + State.t() + def handle_sender_effective_flow_control(input_pad_ref, other_effective_flow_control, state) do + pad_data = PadModel.get_data!(state, input_pad_ref) + pad_data = %{pad_data | other_effective_flow_control: other_effective_flow_control} + state = PadModel.set_data!(state, input_pad_ref, pad_data) + + cond do + state.playback != :playing or pad_data.direction != :input or pad_data.flow_control != :auto -> + state + + other_effective_flow_control == state.effective_flow_control -> + :ok = + PadModel.get_data!(state, input_pad_ref, :atomic_demand) + |> AtomicDemand.set_receiver_status({:resolved, state.effective_flow_control}) + + state + + other_effective_flow_control == :pull -> + set_effective_flow_control(:pull, input_pad_ref, state) + + other_effective_flow_control == :push -> + resolve_effective_flow_control(input_pad_ref, state) + end + end + + @spec resolve_effective_flow_control(Pad.ref(), State.t()) :: State.t() + def resolve_effective_flow_control(triggering_pad \\ nil, state) do + senders_flow_modes = + Map.values(state.pads_data) + |> Enum.filter(&(&1.direction == :input && &1.flow_control == :auto)) + |> Enum.map(& &1.other_effective_flow_control) + + new_effective_flow_control = + cond do + Enum.member?(senders_flow_modes, :pull) -> :pull + Enum.member?(senders_flow_modes, :push) -> :push + true -> state.effective_flow_control + end + + set_effective_flow_control(new_effective_flow_control, triggering_pad, state) + end + + defp set_effective_flow_control( + effective_flow_control, + _triggering_pad, + %{effective_flow_control: effective_flow_control} = state + ), + do: state + + defp set_effective_flow_control(new_effective_flow_control, triggering_pad, state) do + Membrane.Logger.debug( + "Transiting `flow_control: :auto` pads to #{inspect(new_effective_flow_control)} effective flow control" + ) + + state = %{state | effective_flow_control: new_effective_flow_control} + + state.pads_data + |> Enum.filter(fn {_ref, %{flow_control: flow_control}} -> flow_control == :auto end) + |> Enum.reduce(state, fn + {_ref, %{direction: :output} = pad_data}, state -> + :ok = + AtomicDemand.set_sender_status( + pad_data.atomic_demand, + {:resolved, new_effective_flow_control} + ) + + :ok = AtomicDemand.set_receiver_status(pad_data.atomic_demand, :to_be_resolved) + + Message.send( + pad_data.pid, + :sender_effective_flow_control_resolved, + [pad_data.other_ref, new_effective_flow_control] + ) + + state + + {pad_ref, %{direction: :input} = pad_data}, state -> + if triggering_pad in [pad_ref, nil] or + AtomicDemand.get_receiver_status(pad_data.atomic_demand) != :to_be_resolved do + :ok = + AtomicDemand.set_receiver_status( + pad_data.atomic_demand, + {:resolved, new_effective_flow_control} + ) + end + + AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state) + end) + end +end diff --git a/lib/membrane/core/element/input_queue.ex b/lib/membrane/core/element/input_queue.ex index fe95e5e57..41e5926fc 100644 --- a/lib/membrane/core/element/input_queue.ex +++ b/lib/membrane/core/element/input_queue.ex @@ -9,11 +9,12 @@ defmodule Membrane.Core.Element.InputQueue do use Bunch alias Membrane.Buffer - alias Membrane.Core.{Message, Telemetry} + alias Membrane.Core.Element.AtomicDemand + alias Membrane.Event alias Membrane.Pad + alias Membrane.StreamFormat - require Membrane.Core.Message - require Membrane.Core.Telemetry + require Membrane.Core.Telemetry, as: Telemetry require Membrane.Logger @qe Qex @@ -24,31 +25,31 @@ defmodule Membrane.Core.Element.InputQueue do {:event | :stream_format, any} | {:buffers, list, pos_integer, pos_integer} @type output :: {:empty | :value, [output_value]} + @type queue_item() :: Buffer.t() | Event.t() | StreamFormat.t() | atom() + @type t :: %__MODULE__{ q: @qe.t(), log_tag: String.t(), target_size: pos_integer(), size: non_neg_integer(), - demand: integer(), - min_demand: pos_integer(), + demand: non_neg_integer(), inbound_metric: module(), outbound_metric: module(), - toilet?: boolean() + linked_output_ref: Pad.ref(), + atomic_demand: AtomicDemand.t() } @enforce_keys [ :q, :log_tag, :target_size, - :size, - :demand, - :min_demand, + :atomic_demand, :inbound_metric, :outbound_metric, - :toilet? + :linked_output_ref ] - defstruct @enforce_keys + defstruct @enforce_keys ++ [size: 0, demand: 0] @default_target_size_factor 40 @@ -58,23 +59,19 @@ defmodule Membrane.Core.Element.InputQueue do @spec init(%{ inbound_demand_unit: Buffer.Metric.unit(), outbound_demand_unit: Buffer.Metric.unit(), - demand_pid: pid(), - demand_pad: Pad.ref(), + atomic_demand: AtomicDemand.t(), + linked_output_ref: Pad.ref(), log_tag: String.t(), - toilet?: boolean(), - target_size: pos_integer() | nil, - min_demand_factor: pos_integer() | nil + target_size: pos_integer() | nil }) :: t() def init(config) do %{ inbound_demand_unit: inbound_demand_unit, outbound_demand_unit: outbound_demand_unit, - demand_pid: demand_pid, - demand_pad: demand_pad, + atomic_demand: atomic_demand, + linked_output_ref: linked_output_ref, log_tag: log_tag, - toilet?: toilet?, - target_size: target_size, - min_demand_factor: min_demand_factor + target_size: target_size } = config inbound_metric = Buffer.Metric.from_unit(inbound_demand_unit) @@ -84,24 +81,19 @@ defmodule Membrane.Core.Element.InputQueue do target_size = target_size || default_target_size - min_demand = - (target_size * (min_demand_factor || default_min_demand_factor())) |> ceil() |> max(1) - %__MODULE__{ q: @qe.new(), log_tag: log_tag, target_size: target_size, - size: 0, - demand: target_size, - min_demand: min_demand, inbound_metric: inbound_metric, outbound_metric: outbound_metric, - toilet?: toilet? + atomic_demand: atomic_demand, + linked_output_ref: linked_output_ref } - |> send_demands(demand_pid, demand_pad) + |> maybe_increase_atomic_demand() end - @spec store(t(), atom(), any()) :: t() + @spec store(t(), atom(), queue_item() | [queue_item()]) :: t() def store(input_queue, type \\ :buffers, v) def store(input_queue, :buffers, v) when is_list(v) do @@ -138,6 +130,7 @@ defmodule Membrane.Core.Element.InputQueue do %__MODULE__{ q: q, size: size, + demand: demand, inbound_metric: inbound_metric, outbound_metric: outbound_metric } = input_queue, @@ -153,25 +146,22 @@ defmodule Membrane.Core.Element.InputQueue do %__MODULE__{ input_queue | q: q |> @qe.push({:buffers, v, inbound_metric_buffer_size, outbound_metric_buffer_size}), - size: size + inbound_metric_buffer_size + size: size + inbound_metric_buffer_size, + demand: demand - inbound_metric_buffer_size } end - @spec take_and_demand(t(), non_neg_integer(), pid(), Pad.ref()) :: {output(), t()} - def take_and_demand( - %__MODULE__{} = input_queue, - count, - demand_pid, - demand_pad - ) - when count >= 0 do + @spec take(t, non_neg_integer()) :: {output(), t} + def take(%__MODULE__{} = input_queue, count) when count >= 0 do "Taking #{inspect(count)} #{inspect(input_queue.outbound_metric)}" |> mk_log(input_queue) |> Membrane.Logger.debug_verbose() - {out, %__MODULE__{size: new_size} = input_queue} = do_take(input_queue, count) - input_queue = send_demands(input_queue, demand_pid, demand_pad) - Telemetry.report_metric(:take_and_demand, new_size, input_queue.log_tag) + {out, input_queue} = do_take(input_queue, count) + input_queue = maybe_increase_atomic_demand(input_queue) + + Telemetry.report_metric(:take, input_queue.size, input_queue.log_tag) + {out, input_queue} end @@ -180,21 +170,13 @@ defmodule Membrane.Core.Element.InputQueue do q: q, size: size, inbound_metric: inbound_metric, - outbound_metric: outbound_metric, - demand: demand + outbound_metric: outbound_metric } = input_queue, count ) do - {out, nq, new_queue_size} = q |> q_pop(count, inbound_metric, outbound_metric, size) - new_demand_size = demand + (size - new_queue_size) - - {out, - %__MODULE__{ - input_queue - | q: nq, - size: new_queue_size, - demand: new_demand_size - }} + {out, nq, new_size} = q |> q_pop(count, inbound_metric, outbound_metric, size) + input_queue = %{input_queue | q: nq, size: new_size} + {out, input_queue} end defp q_pop( @@ -291,34 +273,29 @@ defmodule Membrane.Core.Element.InputQueue do end end - @spec send_demands(t(), pid(), Pad.ref()) :: t() - defp send_demands( + @spec maybe_increase_atomic_demand(t()) :: t() + defp maybe_increase_atomic_demand( %__MODULE__{ - toilet?: false, size: size, target_size: target_size, - demand: demand, - min_demand: min_demand - } = input_queue, - demand_pid, - linked_output_ref + atomic_demand: atomic_demand, + demand: demand + } = input_queue ) - when size < target_size and demand > 0 do - to_demand = max(demand, min_demand) + when target_size > size + demand do + diff = max(target_size - size - demand, div(target_size, 2)) """ - Sending demand of size #{inspect(to_demand)} to output #{inspect(linked_output_ref)} + Increasing AtomicDemand linked to #{inspect(input_queue.linked_output_ref)} by #{inspect(diff)} """ |> mk_log(input_queue) |> Membrane.Logger.debug_verbose() - Message.send(demand_pid, :demand, to_demand, for_pad: linked_output_ref) - %__MODULE__{input_queue | demand: demand - to_demand} + :ok = AtomicDemand.increase(atomic_demand, diff) + %{input_queue | demand: demand + diff} end - defp send_demands(input_queue, _demand_pid, _linked_output_ref) do - input_queue - end + defp maybe_increase_atomic_demand(%__MODULE__{} = input_queue), do: input_queue # This function may be unused if particular logs are pruned @dialyzer {:no_unused, mk_log: 2} @@ -326,12 +303,11 @@ defmodule Membrane.Core.Element.InputQueue do %__MODULE__{ log_tag: log_tag, size: size, - target_size: target_size, - toilet?: toilet + target_size: target_size } = input_queue [ - "InputQueue #{log_tag}#{if toilet, do: " (toilet)", else: ""}: ", + "InputQueue #{log_tag}: ", message, "\n", "InputQueue size: #{inspect(size)}, target size: #{inspect(target_size)}" diff --git a/lib/membrane/core/element/lifecycle_controller.ex b/lib/membrane/core/element/lifecycle_controller.ex index 4f7c66ba9..c54fe6a1b 100644 --- a/lib/membrane/core/element/lifecycle_controller.ex +++ b/lib/membrane/core/element/lifecycle_controller.ex @@ -8,9 +8,15 @@ defmodule Membrane.Core.Element.LifecycleController do alias Membrane.{Clock, Element, Sync} alias Membrane.Core.{CallbackHandler, Element, Message} - alias Membrane.Core.Element.{ActionHandler, CallbackContext, PlaybackQueue, State} - require Membrane.Core.Child.PadModel + alias Membrane.Core.Element.{ + ActionHandler, + CallbackContext, + EffectiveFlowController, + PlaybackQueue, + State + } + require Membrane.Core.Message require Membrane.Logger @@ -67,7 +73,10 @@ defmodule Membrane.Core.Element.LifecycleController do @spec handle_playing(State.t()) :: State.t() def handle_playing(state) do Membrane.Logger.debug("Got play request") - state = %State{state | playback: :playing} + + state = + %State{state | playback: :playing} + |> EffectiveFlowController.resolve_effective_flow_control() state = CallbackHandler.exec_and_handle_callback( diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 27a5729f7..173c33c38 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -10,15 +10,16 @@ defmodule Membrane.Core.Element.PadController do alias Membrane.Core.Element.{ ActionHandler, + AtomicDemand, CallbackContext, - DemandController, + EffectiveFlowController, EventController, InputQueue, State, - StreamFormatController, - Toilet + StreamFormatController } + alias Membrane.Core.Element.DemandController.AutoFlowUtils alias Membrane.Core.Parent.Link.Endpoint require Membrane.Core.Child.PadModel @@ -28,20 +29,19 @@ defmodule Membrane.Core.Element.PadController do @type link_call_props :: %{ - initiator: :parent, stream_format_validation_params: StreamFormatController.stream_format_validation_params() } | %{ - initiator: :sibling, other_info: PadModel.pad_info() | nil, - link_metadata: %{toilet: Toilet.t() | nil}, + link_metadata: %{}, stream_format_validation_params: - StreamFormatController.stream_format_validation_params() + StreamFormatController.stream_format_validation_params(), + other_effective_flow_control: EffectiveFlowController.effective_flow_control() } @type link_call_reply_props :: - {Endpoint.t(), PadModel.pad_info(), %{toilet: Toilet.t() | nil}} + {Endpoint.t(), PadModel.pad_info(), %{atomic_demand: AtomicDemand.t()}} @type link_call_reply :: :ok @@ -50,7 +50,7 @@ defmodule Membrane.Core.Element.PadController do | {:error, {:neighbor_child_dead, reason :: any()}} | {:error, {:unknown_pad, name :: Membrane.Child.name(), pad_ref :: Pad.ref()}} - @default_auto_demand_size_factor 4000 + @default_auto_demand_size_factor 400 @doc """ Verifies linked pad, initializes it's data. @@ -76,28 +76,25 @@ defmodule Membrane.Core.Element.PadController do :ok = Child.PadController.validate_pad_being_linked!(direction, info) - do_handle_link(endpoint, other_endpoint, info, link_props, state) + do_handle_link(direction, endpoint, other_endpoint, info, link_props, state) end - defp do_handle_link( - endpoint, - other_endpoint, - info, - %{initiator: :parent} = props, - state - ) do + defp do_handle_link(:output, endpoint, other_endpoint, info, props, state) do + effective_flow_control = + EffectiveFlowController.get_pad_effective_flow_control(endpoint.pad_ref, state) + handle_link_response = Message.call(other_endpoint.pid, :handle_link, [ Pad.opposite_direction(info.direction), other_endpoint, endpoint, %{ - initiator: :sibling, other_info: info, link_metadata: %{ observability_metadata: Observability.setup_link(endpoint.pad_ref) }, - stream_format_validation_params: [] + stream_format_validation_params: [], + other_effective_flow_control: effective_flow_control } ]) @@ -115,6 +112,7 @@ defmodule Membrane.Core.Element.PadController do other_endpoint, info, props.stream_format_validation_params, + :push, other_info, link_metadata, state @@ -139,44 +137,43 @@ defmodule Membrane.Core.Element.PadController do end end - defp do_handle_link( - endpoint, - other_endpoint, - info, - %{initiator: :sibling} = link_props, - state - ) do + defp do_handle_link(:input, endpoint, other_endpoint, info, link_props, state) do %{ other_info: other_info, link_metadata: link_metadata, - stream_format_validation_params: stream_format_validation_params + stream_format_validation_params: stream_format_validation_params, + other_effective_flow_control: other_effective_flow_control } = link_props - {output_info, input_info, input_endpoint} = - if info.direction == :output, - do: {info, other_info, other_endpoint}, - else: {other_info, info, endpoint} + if info.direction != :input, do: raise("pad direction #{inspect(info.direction)} is wrong") - {output_demand_unit, input_demand_unit} = resolve_demand_units(output_info, input_info) + {output_demand_unit, input_demand_unit} = resolve_demand_units(other_info, info) link_metadata = - Map.put(link_metadata, :input_demand_unit, input_demand_unit) - |> Map.put(:output_demand_unit, output_demand_unit) - - toilet = - if input_demand_unit != nil, - do: - Toilet.new( - input_endpoint.pad_props.toilet_capacity, - input_demand_unit, - self(), - input_endpoint.pad_props.throttling_factor - ) + Map.merge(link_metadata, %{ + input_demand_unit: input_demand_unit, + output_demand_unit: output_demand_unit + }) + + pad_effective_flow_control = + EffectiveFlowController.get_pad_effective_flow_control(endpoint.pad_ref, state) + + atomic_demand = + AtomicDemand.new(%{ + receiver_effective_flow_control: pad_effective_flow_control, + receiver_process: self(), + receiver_demand_unit: input_demand_unit || :buffers, + sender_process: other_endpoint.pid, + sender_pad_ref: other_endpoint.pad_ref, + supervisor: state.subprocess_supervisor, + toilet_capacity: endpoint.pad_props[:toilet_capacity], + throttling_factor: endpoint.pad_props[:throttling_factor] + }) # The sibiling was an initiator, we don't need to use the pid of a task spawned for observability _metadata = Observability.setup_link(endpoint.pad_ref, link_metadata.observability_metadata) - link_metadata = Map.put(link_metadata, :toilet, toilet) + link_metadata = Map.put(link_metadata, :atomic_demand, atomic_demand) :ok = Child.PadController.validate_pad_mode!( @@ -190,11 +187,25 @@ defmodule Membrane.Core.Element.PadController do other_endpoint, info, stream_format_validation_params, + other_effective_flow_control, other_info, link_metadata, state ) + state = + case PadModel.get_data!(state, endpoint.pad_ref) do + %{flow_control: :auto, direction: :input} = pad_data -> + EffectiveFlowController.handle_sender_effective_flow_control( + pad_data.ref, + pad_data.other_effective_flow_control, + state + ) + + _pad_data -> + state + end + state = maybe_handle_pad_added(endpoint.pad_ref, state) {{:ok, {endpoint, info, link_metadata}}, state} end @@ -213,7 +224,14 @@ defmodule Membrane.Core.Element.PadController do state = generate_eos_if_needed(pad_ref, state) state = maybe_handle_pad_removed(pad_ref, state) state = remove_pad_associations(pad_ref, state) - PadModel.delete_data!(state, pad_ref) + {pad_data, state} = PadModel.pop_data!(state, pad_ref) + + with %{direction: :input, flow_control: :auto, other_effective_flow_control: :pull} <- + pad_data do + EffectiveFlowController.resolve_effective_flow_control(state) + else + _pad_data -> state + end else {:ok, %{availability: :always}} when state.terminating? -> state @@ -238,15 +256,8 @@ defmodule Membrane.Core.Element.PadController do end defp resolve_demand_units(output_info, input_info) do - output_demand_unit = - if output_info[:flow_control] == :push, - do: nil, - else: output_info[:demand_unit] || input_info[:demand_unit] || :buffers - - input_demand_unit = - if input_info[:flow_control] == :push, - do: nil, - else: input_info[:demand_unit] || output_info[:demand_unit] || :buffers + output_demand_unit = output_info[:demand_unit] || input_info[:demand_unit] || :buffers + input_demand_unit = input_info[:demand_unit] || output_info[:demand_unit] || :buffers {output_demand_unit, input_demand_unit} end @@ -256,6 +267,7 @@ defmodule Membrane.Core.Element.PadController do other_endpoint, info, stream_format_validation_params, + other_effective_flow_control, other_info, metadata, state @@ -270,12 +282,20 @@ defmodule Membrane.Core.Element.PadController do Child.PadController.parse_pad_options!(info.name, endpoint.pad_props.options, state), ref: endpoint.pad_ref, stream_format_validation_params: stream_format_validation_params, + other_effective_flow_control: other_effective_flow_control, stream_format: nil, start_of_stream?: false, end_of_stream?: false, - associated_pads: [] + associated_pads: [], + atomic_demand: metadata.atomic_demand }) + :ok = + AtomicDemand.set_sender_status( + data.atomic_demand, + {:resolved, EffectiveFlowController.get_pad_effective_flow_control(data.ref, state)} + ) + data = data |> Map.merge(init_pad_direction_data(data, endpoint.pad_props, metadata, state)) data = @@ -293,10 +313,9 @@ defmodule Membrane.Core.Element.PadController do PadModel.update_data!(state, other_data.ref, :associated_pads, &[data.ref | &1]) end) - case data.direction do - :input -> DemandController.send_auto_demand_if_needed(endpoint.pad_ref, state) - :output -> state - end + if data.direction == :input, + do: AutoFlowUtils.auto_adjust_atomic_demand(endpoint.pad_ref, state), + else: state else state end @@ -316,26 +335,27 @@ defmodule Membrane.Core.Element.PadController do %{direction: :input, flow_control: :manual} = data, props, other_info, - metadata, + _metadata, %State{} ) do - %{ref: ref, pid: pid, other_ref: other_ref, demand_unit: this_demand_unit} = data - - enable_toilet? = other_info.flow_control == :push + %{ + ref: ref, + other_ref: other_ref, + demand_unit: this_demand_unit, + atomic_demand: atomic_demand + } = data input_queue = InputQueue.init(%{ inbound_demand_unit: other_info[:demand_unit] || this_demand_unit, outbound_demand_unit: this_demand_unit, - demand_pid: pid, - demand_pad: other_ref, + atomic_demand: atomic_demand, + linked_output_ref: other_ref, log_tag: inspect(ref), - toilet?: enable_toilet?, - target_size: props.target_queue_size, - min_demand_factor: props.min_demand_factor + target_size: props.target_queue_size }) - %{input_queue: input_queue, demand: 0, toilet: if(enable_toilet?, do: metadata.toilet)} + %{input_queue: input_queue, demand_snapshot: 0} end defp init_pad_mode_data( @@ -345,14 +365,14 @@ defmodule Membrane.Core.Element.PadController do _metadata, _state ) do - %{demand: 0} + %{demand_snapshot: 0} end defp init_pad_mode_data( - %{flow_control: :auto, direction: direction}, + %{flow_control: :auto, direction: direction} = data, props, - other_info, - metadata, + _other_info, + _metadata, %State{} = state ) do associated_pads = @@ -361,41 +381,27 @@ defmodule Membrane.Core.Element.PadController do |> Enum.filter(&(&1.direction != direction and &1.flow_control == :auto)) |> Enum.map(& &1.ref) - toilet = - if direction == :input and other_info.flow_control == :push do - metadata.toilet - else - nil - end - auto_demand_size = - if direction == :input do - props.auto_demand_size || - Membrane.Buffer.Metric.Count.buffer_size_approximation() * - @default_auto_demand_size_factor - else - nil + cond do + direction == :output -> + nil + + props.auto_demand_size != nil -> + props.auto_demand_size + + true -> + demand_unit = data.other_demand_unit || data.demand_unit || :buffers + metric = Membrane.Buffer.Metric.from_unit(demand_unit) + metric.buffer_size_approximation() * @default_auto_demand_size_factor end %{ - demand: 0, + demand_snapshot: 0, associated_pads: associated_pads, - auto_demand_size: auto_demand_size, - toilet: toilet + auto_demand_size: auto_demand_size } end - defp init_pad_mode_data( - %{flow_control: :push, direction: :output}, - _props, - %{flow_control: other_flow_control}, - metadata, - _state - ) - when other_flow_control in [:auto, :manual] do - %{toilet: metadata.toilet} - end - defp init_pad_mode_data(_data, _props, _other_info, _metadata, _state), do: %{} @doc """ @@ -426,15 +432,9 @@ defmodule Membrane.Core.Element.PadController do end) |> PadModel.set_data!(pad_ref, :associated_pads, []) - if pad_data.direction == :output do - Enum.reduce( - pad_data.associated_pads, - state, - &DemandController.send_auto_demand_if_needed/2 - ) - else - state - end + if pad_data.direction == :output, + do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state), + else: state _pad_data -> state diff --git a/lib/membrane/core/element/state.ex b/lib/membrane/core/element/state.ex index 351efc193..f004c2833 100644 --- a/lib/membrane/core/element/state.ex +++ b/lib/membrane/core/element/state.ex @@ -10,6 +10,7 @@ defmodule Membrane.Core.Element.State do alias Membrane.{Clock, Element, Pad, Sync} alias Membrane.Core.Timer alias Membrane.Core.Child.{PadModel, PadSpecHandler} + alias Membrane.Core.Element.EffectiveFlowController require Membrane.Pad @@ -23,6 +24,7 @@ defmodule Membrane.Core.Element.State do parent_pid: pid, supplying_demand?: boolean(), delayed_demands: MapSet.t({Pad.ref(), :supply | :redemand}), + handle_demand_loop_counter: non_neg_integer(), synchronization: %{ timers: %{Timer.id() => Timer.t()}, parent_clock: Clock.t(), @@ -36,7 +38,8 @@ defmodule Membrane.Core.Element.State do resource_guard: Membrane.ResourceGuard.t(), subprocess_supervisor: pid, terminating?: boolean(), - setup_incomplete?: boolean() + setup_incomplete?: boolean(), + effective_flow_control: EffectiveFlowController.effective_flow_control() } defstruct [ @@ -49,6 +52,7 @@ defmodule Membrane.Core.Element.State do :parent_pid, :supplying_demand?, :delayed_demands, + :handle_demand_loop_counter, :synchronization, :demand_size, :initialized?, @@ -57,7 +61,8 @@ defmodule Membrane.Core.Element.State do :resource_guard, :subprocess_supervisor, :terminating?, - :setup_incomplete? + :setup_incomplete?, + :effective_flow_control ] @doc """ @@ -81,6 +86,7 @@ defmodule Membrane.Core.Element.State do parent_pid: options.parent, supplying_demand?: false, delayed_demands: MapSet.new(), + handle_demand_loop_counter: 0, synchronization: %{ parent_clock: options.parent_clock, timers: %{}, @@ -94,7 +100,8 @@ defmodule Membrane.Core.Element.State do resource_guard: options.resource_guard, subprocess_supervisor: options.subprocess_supervisor, terminating?: false, - setup_incomplete?: false + setup_incomplete?: false, + effective_flow_control: :push } |> PadSpecHandler.init_pads() end diff --git a/lib/membrane/core/element/stream_format_controller.ex b/lib/membrane/core/element/stream_format_controller.ex index cebcfd946..e96b2f1a5 100644 --- a/lib/membrane/core/element/stream_format_controller.ex +++ b/lib/membrane/core/element/stream_format_controller.ex @@ -12,7 +12,6 @@ defmodule Membrane.Core.Element.StreamFormatController do require Membrane.Core.Child.PadModel require Membrane.Core.Telemetry - require Membrane.Logger @type stream_format_validation_param() :: {module(), Pad.name()} @type stream_format_validation_params() :: [stream_format_validation_param()] @@ -64,7 +63,8 @@ defmodule Membrane.Core.Element.StreamFormatController do validate_stream_format!( :input, [{state.module, pad_name} | stream_format_validation_params], - stream_format + stream_format, + state ) state = @@ -82,9 +82,10 @@ defmodule Membrane.Core.Element.StreamFormatController do @spec validate_stream_format!( Pad.direction(), stream_format_validation_params(), - StreamFormat.t() + StreamFormat.t(), + State.t() ) :: :ok - def validate_stream_format!(direction, params, stream_format) do + def validate_stream_format!(direction, params, stream_format, state) do unless is_struct(stream_format) do raise Membrane.StreamFormatError, """ Stream format must be defined as a struct, therefore it cannot be: #{inspect(stream_format)} @@ -93,8 +94,10 @@ defmodule Membrane.Core.Element.StreamFormatController do for {module, pad_name} <- params do unless module.membrane_stream_format_match?(pad_name, stream_format) do + pattern_string = get_in(state, [:pads_info, pad_name, :accepted_formats_str]) + raise Membrane.StreamFormatError, """ - Stream format: #{inspect(stream_format)} is not matching accepted format pattern in def_#{direction}_pad + Stream format: #{inspect(stream_format)} is not matching accepted format pattern "#{pattern_string}" in def_#{direction}_pad for pad #{inspect(pad_name)} in #{inspect(module)} """ end diff --git a/lib/membrane/core/element/toilet.ex b/lib/membrane/core/element/toilet.ex deleted file mode 100644 index 29e1863bd..000000000 --- a/lib/membrane/core/element/toilet.ex +++ /dev/null @@ -1,171 +0,0 @@ -defmodule Membrane.Core.Element.Toilet do - @moduledoc false - - # Toilet is an entity that can be filled and drained. If it's not drained on - # time and exceeds its capacity, it overflows by logging an error and killing - # the responsible process (passed on the toilet creation). - - require Membrane.Logger - - defmodule DistributedCounter do - @moduledoc false - - # A module providing a common interface to access and modify a counter used in the toilet implementation. - # The counter uses :atomics module under the hood. - # The module allows to create and modify the value of a counter in the same manner both when the counter is about to be accessed - # from the same node, and from different nodes. - - defmodule Worker do - @moduledoc false - - # This is a GenServer created when the counter is about to be accessed from different nodes - it's running on the same node, - # where the :atomics variable is put, and processes from different nodes can ask it to modify the counter on their behalf. - - use GenServer - - @impl true - def init(parent_pid) do - Process.monitor(parent_pid) - {:ok, nil, :hibernate} - end - - @impl true - def handle_call({:add_get, atomic_ref, value}, _from, _state) do - result = :atomics.add_get(atomic_ref, 1, value) - {:reply, result, nil} - end - - @impl true - def handle_cast({:sub, atomic_ref, value}, _state) do - :atomics.sub(atomic_ref, 1, value) - {:noreply, nil} - end - - @impl true - def handle_info({:DOWN, _ref, :process, _object, _reason}, state) do - {:stop, :normal, state} - end - end - - @type t :: {pid(), :atomics.atomics_ref()} - - @spec new() :: t - def new() do - atomic_ref = :atomics.new(1, []) - {:ok, pid} = GenServer.start(Worker, self()) - {pid, atomic_ref} - end - - @spec add_get(t, integer()) :: integer() - def add_get({pid, atomic_ref}, value) when node(pid) == node(self()) do - :atomics.add_get(atomic_ref, 1, value) - end - - def add_get({pid, atomic_ref}, value) do - GenServer.call(pid, {:add_get, atomic_ref, value}) - end - - @spec sub(t, integer()) :: :ok - def sub({pid, atomic_ref}, value) when node(pid) == node(self()) do - :atomics.sub(atomic_ref, 1, value) - end - - def sub({pid, atomic_ref}, value) do - GenServer.cast(pid, {:sub, atomic_ref, value}) - end - end - - @opaque t :: - {__MODULE__, DistributedCounter.t(), pos_integer, Process.dest(), pos_integer(), - non_neg_integer()} - - @default_capacity_factor 200 - - @spec new( - pos_integer() | nil, - Membrane.Buffer.Metric.unit(), - Process.dest(), - pos_integer() - ) :: t - def new(capacity, demand_unit, responsible_process, throttling_factor) do - default_capacity = - Membrane.Buffer.Metric.from_unit(demand_unit).buffer_size_approximation() * - @default_capacity_factor - - toilet_ref = DistributedCounter.new() - capacity = capacity || default_capacity - {__MODULE__, toilet_ref, capacity, responsible_process, throttling_factor, 0} - end - - @spec fill(t, non_neg_integer) :: {:ok | :overflow, t} - def fill( - {__MODULE__, counter, capacity, responsible_process, throttling_factor, - unrinsed_buffers_size}, - amount - ) do - if unrinsed_buffers_size + amount < throttling_factor do - {:ok, - {__MODULE__, counter, capacity, responsible_process, throttling_factor, - amount + unrinsed_buffers_size}} - else - size = DistributedCounter.add_get(counter, amount + unrinsed_buffers_size) - - if size > capacity do - overflow(size, capacity, responsible_process) - {:overflow, {__MODULE__, counter, capacity, responsible_process, throttling_factor, 0}} - else - {:ok, {__MODULE__, counter, capacity, responsible_process, throttling_factor, 0}} - end - end - end - - @spec drain(t, non_neg_integer) :: :ok - def drain( - {__MODULE__, counter, _capacity, _responsible_process, _throttling_factor, - _unrinsed_buff_size}, - amount - ) do - DistributedCounter.sub(counter, amount) - end - - defp overflow(size, capacity, responsible_process) do - Membrane.Logger.debug_verbose(~S""" - Toilet overflow - - ` ' ` - .'''. ' .'''. - .. ' ' .. - ' '.'.' ' - .'''.'.'''. - ' .''.'.''. ' - ;------ ' ------; - | ~~ .--'--// | - | / ' \ | - | / ' \ | - | | ' | | ,----. - | \ , ' , / | =|____|= - '---,###'###,---' (---( - /## ' ##\ )---) - |##, ' ,##| (---( - \'#####'/ `---` - \`"#"`/ - |`"`| - .-| |-. - jgs / ' ' \ - '---------' - """) - - Membrane.Logger.error(""" - Toilet overflow. - - Reached the size of #{inspect(size)}, which is above toilet capacity (#{inspect(capacity)}) - when storing data from output working in push mode. It means that some element in the pipeline - processes the stream too slow or doesn't process it at all. - To have control over amount of buffers being produced, consider using output in :auto or :manual - flow control mode. (see `Membrane.Pad.flow_control`). - You can also try changing the `toilet_capacity` in `Membrane.ChildrenSpec.via_in/3`. - """) - - Process.exit(responsible_process, :kill) - end -end diff --git a/lib/membrane/core/filter_aggregator/context.ex b/lib/membrane/core/filter_aggregator/context.ex index e1e42463f..21ce6be3c 100644 --- a/lib/membrane/core/filter_aggregator/context.ex +++ b/lib/membrane/core/filter_aggregator/context.ex @@ -64,7 +64,7 @@ defmodule Membrane.Core.FilterAggregator.Context do |> Map.delete(:accepted_formats_str) |> Map.merge(%{ stream_format: nil, - demand: nil, + demand_snapshot: nil, start_of_stream?: false, end_of_stream?: false, ref: pad_description.name, diff --git a/lib/membrane/core/parent/child_life_controller/link_utils.ex b/lib/membrane/core/parent/child_life_controller/link_utils.ex index ebd0b90a5..95d76dbb7 100644 --- a/lib/membrane/core/parent/child_life_controller/link_utils.ex +++ b/lib/membrane/core/parent/child_life_controller/link_utils.ex @@ -277,7 +277,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do if {Membrane.Bin, :itself} in [from.child, to.child] do state else - params = %{initiator: :parent, stream_format_validation_params: []} + params = %{stream_format_validation_params: []} case Message.call(from.pid, :handle_link, [:output, from, to, params]) do :ok -> diff --git a/lib/membrane/core/pipeline.ex b/lib/membrane/core/pipeline.ex index 094f36932..a472cc2ce 100644 --- a/lib/membrane/core/pipeline.ex +++ b/lib/membrane/core/pipeline.ex @@ -11,7 +11,6 @@ defmodule Membrane.Core.Pipeline do require Membrane.Core.Message, as: Message require Membrane.Core.Telemetry, as: Telemetry - require Membrane.Logger require Membrane.Core.Component @impl GenServer diff --git a/lib/membrane/core/pipeline/action_handler.ex b/lib/membrane/core/pipeline/action_handler.ex index b078dc021..ea1196486 100644 --- a/lib/membrane/core/pipeline/action_handler.ex +++ b/lib/membrane/core/pipeline/action_handler.ex @@ -8,8 +8,6 @@ defmodule Membrane.Core.Pipeline.ActionHandler do alias Membrane.Core.Parent.LifecycleController alias Membrane.Core.Pipeline.State - require Membrane.Logger - @impl CallbackHandler def handle_action({action, _args}, :handle_init, _params, _state) when action != :spec do raise ActionError, action: action, reason: {:invalid_callback, :handle_init} diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index 2186313d3..e8c02c6c1 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -6,7 +6,6 @@ defmodule Membrane.Element.PadData do - `:availability` - see `t:Membrane.Pad.availability/0` - `:stream_format` - the most recent `t:Membrane.StreamFormat.t/0` that have been sent (output) or received (input) on the pad. May be `nil` if not yet set. - - `:demand` - current demand requested on the pad working in `:auto` or `:manual` flow control mode. - `:direction` - see `t:Membrane.Pad.direction/0` - `:end_of_stream?` - flag determining whether the stream processing via the pad has been finished - `:flow_control` - see `t:Membrane.Pad.flow_control/0`. @@ -39,14 +38,29 @@ defmodule Membrane.Element.PadData do pid: private_field, other_ref: private_field, input_queue: private_field, - demand: integer() | nil, + incoming_demand: integer() | nil, demand_unit: private_field, other_demand_unit: private_field, auto_demand_size: private_field, sticky_messages: private_field, - toilet: private_field, + + # Used only for output pads with :pull or :auto flow control. Holds the last captured value of AtomicDemand, + # decreased by the size of buffers sent via specific pad since the last capture, expressed in the appropriate metric. + # Moment, when demand_snapshot value drops to 0 or less, triggers another capture of AtomicDemand value. + demand_snapshot: integer() | nil, + + # Instance of AtomicDemand shared by both sides of link. Holds amount of data, that has been demanded by the element + # with input pad, but hasn't been sent yet by the element with output pad. Detects toilet overflow as well. + atomic_demand: private_field, + + # Field used in DemandController.AutoFlowUtils and InputQueue, to caluclate, how much AtomicDemand should be increased. + # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but + # hasn't arrived yet. Unused for output pads. + demand: private_field, + manual_demand_size: private_field, associated_pads: private_field, - sticky_events: private_field + sticky_events: private_field, + other_effective_flow_control: private_field } @enforce_keys [ @@ -64,16 +78,20 @@ defmodule Membrane.Element.PadData do defstruct @enforce_keys ++ [ input_queue: nil, - demand: nil, + demand_snapshot: 0, + incoming_demand: nil, demand_unit: nil, start_of_stream?: false, end_of_stream?: false, auto_demand_size: nil, sticky_messages: [], - toilet: nil, + atomic_demand: nil, + demand: 0, + manual_demand_size: 0, associated_pads: [], sticky_events: [], stream_format_validation_params: [], - other_demand_unit: nil + other_demand_unit: nil, + other_effective_flow_control: :push ] end diff --git a/lib/membrane/testing/pipeline.ex b/lib/membrane/testing/pipeline.ex index 7d79114e3..3589d98ad 100644 --- a/lib/membrane/testing/pipeline.ex +++ b/lib/membrane/testing/pipeline.ex @@ -85,7 +85,6 @@ defmodule Membrane.Testing.Pipeline do alias Membrane.{Element, Pipeline} alias Membrane.Testing.Notification - require Membrane.Logger require Membrane.Core.Message defmodule State do diff --git a/lib/membrane/testing/sink.ex b/lib/membrane/testing/sink.ex index e9bf55d82..9b5923b24 100644 --- a/lib/membrane/testing/sink.ex +++ b/lib/membrane/testing/sink.ex @@ -71,7 +71,7 @@ defmodule Membrane.Testing.Sink do do: {notify({:end_of_stream, pad}), state} @impl true - def handle_stream_format(pad, stream_format, _context, state), + def handle_stream_format(pad, stream_format, _ctx, state), do: {notify({:stream_format, pad, stream_format}), state} @impl true diff --git a/lib/membrane/testing/source.ex b/lib/membrane/testing/source.ex index 7a6e180f7..448ccda6d 100644 --- a/lib/membrane/testing/source.ex +++ b/lib/membrane/testing/source.ex @@ -95,9 +95,7 @@ defmodule Membrane.Testing.Source do def default_buf_gen(generator_state, size) do buffers = generator_state..(size + generator_state - 1) - |> Enum.map(fn generator_state -> - %Buffer{payload: <>} - end) + |> Enum.map(&%Buffer{payload: <<&1::16>>}) action = [buffer: {:output, buffers}] {action, generator_state + size} diff --git a/test/membrane/core/element/action_handler_test.exs b/test/membrane/core/element/action_handler_test.exs index 89ba9af2b..584bda1b3 100644 --- a/test/membrane/core/element/action_handler_test.exs +++ b/test/membrane/core/element/action_handler_test.exs @@ -2,7 +2,8 @@ defmodule Membrane.Core.Element.ActionHandlerTest do use ExUnit.Case, async: true alias Membrane.{ActionError, Buffer, ElementError, PadDirectionError} - alias Membrane.Core.Element.State + alias Membrane.Core.Element.{AtomicDemand, State} + alias Membrane.Core.SubprocessSupervisor alias Membrane.Support.DemandsTest.Filter alias Membrane.Support.Element.{TrivialFilter, TrivialSource} @@ -30,7 +31,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do direction: :input, pid: self(), flow_control: :manual, - demand: 0 + demand_snapshot: 0 ), input_push: struct(Membrane.Element.PadData, @@ -38,6 +39,10 @@ defmodule Membrane.Core.Element.ActionHandlerTest do pid: self(), flow_control: :push ) + }, + pads_info: %{ + input: %{flow_control: :manual}, + input_push: %{flow_control: :push} } ) @@ -50,7 +55,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do test "delaying demand", %{state: state} do state = %{state | playback: :playing, supplying_demand?: true} state = @module.handle_action({:demand, {:input, 10}}, :handle_info, %{}, state) - assert state.pads_data.input.demand == 10 + assert state.pads_data.input.manual_demand_size == 10 assert MapSet.new([{:input, :supply}]) == state.delayed_demands end @@ -68,6 +73,28 @@ defmodule Membrane.Core.Element.ActionHandlerTest do end defp trivial_filter_state(_context) do + supervisor = SubprocessSupervisor.start_link!() + + input_atomic_demand = + AtomicDemand.new(%{ + receiver_effective_flow_control: :push, + receiver_process: self(), + receiver_demand_unit: :buffers, + sender_process: spawn(fn -> :ok end), + sender_pad_ref: :output, + supervisor: supervisor + }) + + output_atomic_demand = + AtomicDemand.new(%{ + receiver_effective_flow_control: :push, + receiver_process: spawn(fn -> :ok end), + receiver_demand_unit: :bytes, + sender_process: self(), + sender_pad_ref: :output, + supervisor: supervisor + }) + state = struct(State, module: TrivialFilter, @@ -82,10 +109,13 @@ defmodule Membrane.Core.Element.ActionHandlerTest do pid: self(), other_ref: :other_ref, stream_format: nil, + demand_unit: :bytes, other_demand_unit: :bytes, start_of_stream?: true, end_of_stream?: false, - flow_control: :push + flow_control: :push, + atomic_demand: output_atomic_demand, + demand_snapshot: 0 }, input: %{ direction: :input, @@ -94,8 +124,14 @@ defmodule Membrane.Core.Element.ActionHandlerTest do stream_format: nil, start_of_stream?: true, end_of_stream?: false, - flow_control: :push + flow_control: :push, + atomic_demand: input_atomic_demand, + demand_snapshot: 0 } + }, + pads_info: %{ + output: %{flow_control: :push}, + input: %{flow_control: :push} } ) @@ -130,23 +166,6 @@ defmodule Membrane.Core.Element.ActionHandlerTest do end end - test "when element is moving to playing", %{state: state} do - state = - %{state | playback: :playing} - |> PadModel.set_data!(:output, :stream_format, @mock_stream_format) - - result = - @module.handle_action( - buffer_action(:output), - :handle_playing, - %{}, - state - ) - - assert result == state - assert_received Message.new(:buffer, [@mock_buffer], for_pad: :other_ref) - end - test "when element is playing", %{state: state} do state = %{state | playback: :playing} @@ -160,7 +179,9 @@ defmodule Membrane.Core.Element.ActionHandlerTest do state ) - assert result == state + assert result.pads_data.output.demand_snapshot < 0 + assert AtomicDemand.get(result.pads_data.output.atomic_demand) < 0 + assert put_in(result, [:pads_data, :output, :demand_snapshot], 0) == state assert_received Message.new(:buffer, [@mock_buffer], for_pad: :other_ref) end @@ -478,9 +499,12 @@ defmodule Membrane.Core.Element.ActionHandlerTest do direction: :output, pid: self(), flow_control: :manual, - demand: 0 + demand_snapshot: 0 } }, + pads_info: %{ + output: %{flow_control: :manual} + }, playback: :playing ) diff --git a/test/membrane/core/element/atomic_demand_test.exs b/test/membrane/core/element/atomic_demand_test.exs new file mode 100644 index 000000000..b6490b219 --- /dev/null +++ b/test/membrane/core/element/atomic_demand_test.exs @@ -0,0 +1,154 @@ +defmodule Membrane.Core.Element.AtomicDemandTest do + use ExUnit.Case + + alias Membrane.Core.Element.AtomicDemand + alias Membrane.Core.SubprocessSupervisor + + test "if AtomicDemand is implemented as :atomics for elements put on the same node" do + atomic_demand = new_atomic_demand(:pull, self(), self()) + :ok = AtomicDemand.increase(atomic_demand, 10) + + assert get_atomic_value(atomic_demand) == 10 + + atomic_demand = AtomicDemand.decrease(atomic_demand, 15) + + assert atomic_demand.buffered_decrementation == 0 + assert get_atomic_value(atomic_demand) == -5 + assert AtomicDemand.get(atomic_demand) == -5 + end + + test "if AtomicDemand.DistributedAtomic.Worker works properly " do + atomic_demand = new_atomic_demand(:pull, self(), self()) + :ok = AtomicDemand.increase(atomic_demand, 10) + + assert GenServer.call( + atomic_demand.counter.worker, + {:get, atomic_demand.counter.atomic_ref} + ) == 10 + + assert GenServer.call( + atomic_demand.counter.worker, + {:sub_get, atomic_demand.counter.atomic_ref, 15} + ) == -5 + + assert get_atomic_value(atomic_demand) == -5 + + assert GenServer.call( + atomic_demand.counter.worker, + {:add_get, atomic_demand.counter.atomic_ref, 55} + ) == 50 + + assert get_atomic_value(atomic_demand) == 50 + assert AtomicDemand.get(atomic_demand) == 50 + end + + test "if setting receiver and sender modes works properly" do + atomic_demand = new_atomic_demand(:pull, self(), self()) + + :ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :push}) + + assert AtomicDemand.AtomicFlowStatus.get(atomic_demand.receiver_status) == + {:resolved, :push} + + :ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :pull}) + + assert AtomicDemand.AtomicFlowStatus.get(atomic_demand.receiver_status) == + {:resolved, :pull} + + :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) + + assert AtomicDemand.AtomicFlowStatus.get(atomic_demand.sender_status) == + {:resolved, :push} + + :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :pull}) + + assert AtomicDemand.AtomicFlowStatus.get(atomic_demand.sender_status) == + {:resolved, :pull} + end + + test "if toilet overflows, only and only when it should" do + hour_in_millis = 60 * 60 * 1000 + sleeping_process = spawn(fn -> Process.sleep(hour_in_millis) end) + monitor_ref = Process.monitor(sleeping_process) + + atomic_demand = new_atomic_demand(:pull, sleeping_process, self()) + + :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) + atomic_demand = AtomicDemand.decrease(atomic_demand, 100) + + refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} + + possible_statuses = [{:resolved, :push}, {:resolved, :pull}, :to_be_resolved] + + atomic_demand = + for status_1 <- possible_statuses, status_2 <- possible_statuses do + {status_1, status_2} + end + |> List.delete({{:resolved, :push}, {:resolved, :pull}}) + |> Enum.reduce(atomic_demand, fn {sender_status, receiver_status}, atomic_demand -> + :ok = AtomicDemand.set_sender_status(atomic_demand, sender_status) + :ok = AtomicDemand.set_receiver_status(atomic_demand, receiver_status) + atomic_demand = AtomicDemand.decrease(atomic_demand, 1000) + + refute_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} + + atomic_demand + end) + + :ok = AtomicDemand.set_sender_status(atomic_demand, {:resolved, :push}) + :ok = AtomicDemand.set_receiver_status(atomic_demand, {:resolved, :pull}) + _atomic_demand = AtomicDemand.decrease(atomic_demand, 1000) + + assert_receive {:DOWN, ^monitor_ref, :process, _pid, _reason} + end + + test "if buffering decrementation works properly with distribution" do + another_node = setup_another_node() + pid_on_another_node = Node.spawn(another_node, fn -> :ok end) + atomic_demand = new_atomic_demand(:push, self(), pid_on_another_node) + + assert %AtomicDemand{throttling_factor: 150} = atomic_demand + + atomic_demand = AtomicDemand.decrease(atomic_demand, 100) + + assert %AtomicDemand{buffered_decrementation: 100} = atomic_demand + assert get_atomic_value(atomic_demand) == 0 + + atomic_demand = AtomicDemand.decrease(atomic_demand, 49) + + assert %AtomicDemand{buffered_decrementation: 149} = atomic_demand + assert get_atomic_value(atomic_demand) == 0 + + atomic_demand = AtomicDemand.decrease(atomic_demand, 51) + + assert %AtomicDemand{buffered_decrementation: 0} = atomic_demand + assert get_atomic_value(atomic_demand) == -200 + end + + defp setup_another_node() do + _cmd_result = System.cmd("epmd", ["-daemon"]) + _start_result = Node.start(:"my_node@127.0.0.1", :longnames) + {:ok, _pid, another_node} = :peer.start(%{host: ~c"127.0.0.1", name: :another_node}) + :rpc.block_call(another_node, :code, :add_paths, [:code.get_path()]) + + on_exit(fn -> :rpc.call(another_node, :init, :stop, []) end) + + another_node + end + + defp get_atomic_value(atomic_demand) do + atomic_demand.counter.atomic_ref + |> :atomics.get(1) + end + + defp new_atomic_demand(receiver_effective_flow_control, receiver_pid, sender_pid) do + AtomicDemand.new(%{ + receiver_effective_flow_control: receiver_effective_flow_control, + receiver_process: receiver_pid, + receiver_demand_unit: :buffers, + sender_process: sender_pid, + sender_pad_ref: :output, + supervisor: SubprocessSupervisor.start_link!() + }) + end +end diff --git a/test/membrane/core/element/event_controller_test.exs b/test/membrane/core/element/event_controller_test.exs index d88396352..55f6f777b 100644 --- a/test/membrane/core/element/event_controller_test.exs +++ b/test/membrane/core/element/event_controller_test.exs @@ -1,8 +1,9 @@ defmodule Membrane.Core.Element.EventControllerTest do use ExUnit.Case - alias Membrane.Core.Element.{EventController, InputQueue, State} - alias Membrane.Core.{Events, Message} + alias Membrane.Core.Element.{AtomicDemand, EventController, InputQueue, State} + alias Membrane.Core.Events + alias Membrane.Core.SubprocessSupervisor alias Membrane.Event require Membrane.Core.Message @@ -19,14 +20,24 @@ defmodule Membrane.Core.Element.EventControllerTest do end setup do + atomic_demand = + AtomicDemand.new(%{ + receiver_effective_flow_control: :pull, + receiver_process: spawn(fn -> :ok end), + receiver_demand_unit: :buffers, + sender_process: spawn(fn -> :ok end), + sender_pad_ref: :output, + supervisor: SubprocessSupervisor.start_link!(), + toilet_capacity: 300 + }) + input_queue = InputQueue.init(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, - demand_pid: self(), - demand_pad: :some_pad, + linked_output_ref: :some_pad, log_tag: "test", - toilet?: false, + atomic_demand: atomic_demand, target_size: nil, min_demand_factor: nil }) @@ -54,7 +65,8 @@ defmodule Membrane.Core.Element.EventControllerTest do } ) - assert_received Message.new(:demand, _size, for_pad: :some_pad) + assert AtomicDemand.get(atomic_demand) > 0 + [state: state] end diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index e4c2f7b14..269917fab 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -2,8 +2,9 @@ defmodule Membrane.Core.Element.InputQueueTest do use ExUnit.Case, async: true alias Membrane.Buffer - alias Membrane.Core.Element.InputQueue + alias Membrane.Core.Element.{AtomicDemand, InputQueue} alias Membrane.Core.Message + alias Membrane.Core.SubprocessSupervisor alias Membrane.Testing.Event require Message @@ -14,13 +15,11 @@ defmodule Membrane.Core.Element.InputQueueTest do %{ log_tag: "test", target_queue_size: 100, - min_demand_factor: 0.1, inbound_demand_unit: :bytes, outbound_demand_unit: :bytes, - demand_pid: self(), linked_output_ref: :output_pad_ref, - expected_metric: Buffer.Metric.from_unit(:bytes), - expected_min_demand: 10 + atomic_demand: new_atomic_demand(), + expected_metric: Buffer.Metric.from_unit(:bytes) }} end @@ -28,53 +27,26 @@ defmodule Membrane.Core.Element.InputQueueTest do assert InputQueue.init(%{ inbound_demand_unit: context.inbound_demand_unit, outbound_demand_unit: context.outbound_demand_unit, - demand_pid: context.demand_pid, - demand_pad: context.linked_output_ref, + linked_output_ref: context.linked_output_ref, log_tag: context.log_tag, - toilet?: false, - target_size: context.target_queue_size, - min_demand_factor: context.min_demand_factor + atomic_demand: context.atomic_demand, + target_size: context.target_queue_size }) == %InputQueue{ q: Qex.new(), log_tag: context.log_tag, target_size: context.target_queue_size, - size: 0, - demand: 0, - min_demand: context.expected_min_demand, + atomic_demand: context.atomic_demand, inbound_metric: context.expected_metric, outbound_metric: context.expected_metric, - toilet?: false - } - - message = - Message.new(:demand, context.target_queue_size, for_pad: context.linked_output_ref) - - assert_received ^message - end - - test "not send the demand if toilet is enabled", context do - assert InputQueue.init(%{ - inbound_demand_unit: context.inbound_demand_unit, - outbound_demand_unit: context.outbound_demand_unit, - demand_pid: context.demand_pid, - demand_pad: context.linked_output_ref, - log_tag: context.log_tag, - toilet?: true, - target_size: context.target_queue_size, - min_demand_factor: context.min_demand_factor - }) == %InputQueue{ - q: Qex.new(), - log_tag: context.log_tag, - target_size: context.target_queue_size, + linked_output_ref: context.linked_output_ref, size: 0, - demand: context.target_queue_size, - min_demand: context.expected_min_demand, - inbound_metric: context.expected_metric, - outbound_metric: context.expected_metric, - toilet?: true + demand: context.target_queue_size } - refute_received Message.new(:demand, _) + assert context.target_queue_size == AtomicDemand.get(context.atomic_demand) + + expected_message = Message.new(:atomic_demand_increased, context.linked_output_ref) + assert_received ^expected_message end end @@ -112,35 +84,51 @@ defmodule Membrane.Core.Element.InputQueueTest do describe ".store/3 should" do setup do - {:ok, %{size: 10, q: Qex.new() |> Qex.push({:buffers, [], 3, 3}), payload: <<1, 2, 3>>}} + {:ok, + %{ + demand: 30, + size: 10, + q: Qex.new() |> Qex.push({:buffers, [], 3, 3}), + payload: <<1, 2, 3>> + }} end - test "increment `size` when `:metric` is `Count`", context do + test "updated properly `size` and `demand` when `:metric` is `Buffer.Metric.Count`", + context do input_queue = struct(InputQueue, size: context.size, inbound_metric: Buffer.Metric.Count, outbound_metric: Buffer.Metric.Count, - q: context.q + q: context.q, + demand: context.demand ) v = [%Buffer{payload: context.payload}] - %{size: new_size} = InputQueue.store(input_queue, :buffers, v) - assert new_size == context.size + 1 + updated_input_queue = InputQueue.store(input_queue, :buffers, v) + + assert updated_input_queue.size == context.size + 1 + assert updated_input_queue.demand == context.demand - 1 end - test "add payload size to `size` when `:metric` is `ByteSize`", context do + test "updated properly `size` and `demand` when `:metric` is `Buffer.Metric.ByteSize`", + context do input_queue = struct(InputQueue, size: context.size, inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, - q: context.q + q: context.q, + demand: context.demand ) v = [%Buffer{payload: context.payload}] - %{size: new_size} = InputQueue.store(input_queue, :buffers, v) - assert new_size == context.size + byte_size(context.payload) + updated_input_queue = InputQueue.store(input_queue, :buffers, v) + + assert updated_input_queue.size == context.size + byte_size(context.payload) + + assert updated_input_queue.demand == + context.demand - byte_size(context.payload) end test "append buffer to the queue", context do @@ -149,7 +137,8 @@ defmodule Membrane.Core.Element.InputQueueTest do size: context.size, inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, - q: context.q + q: context.q, + demand: context.demand ) v = [%Buffer{payload: context.payload}] @@ -165,7 +154,8 @@ defmodule Membrane.Core.Element.InputQueueTest do size: context.size, inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, - q: context.q + q: context.q, + demand: context.demand ) v = %Event{} @@ -181,7 +171,8 @@ defmodule Membrane.Core.Element.InputQueueTest do size: context.size, inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, - q: context.q + q: context.q, + demand: context.demand ) v = %Event{} @@ -190,62 +181,66 @@ defmodule Membrane.Core.Element.InputQueueTest do end end - describe ".take_and_demand/4 should" do + describe ".take/2 should" do setup do input_queue = InputQueue.init(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, - demand_pid: self(), - demand_pad: :pad, + linked_output_ref: :output_pad_ref, log_tag: "test", - toilet?: false, - target_size: nil, - min_demand_factor: nil + atomic_demand: new_atomic_demand(), + target_size: 40 }) - assert_receive {Membrane.Core.Message, :demand, 40, [for_pad: :pad]} + assert_received Message.new(:atomic_demand_increased, :output_pad_ref) [input_queue: input_queue] end test "return {:empty, []} when the queue is empty", %{input_queue: input_queue} do - assert {{:empty, []}, %InputQueue{size: 0, demand: 0}} = - InputQueue.take_and_demand(input_queue, 1, self(), :input) + old_atomic_demand_value = AtomicDemand.get(input_queue.atomic_demand) + old_demand = input_queue.demand + + assert {{:empty, []}, %InputQueue{size: 0, demand: ^old_demand}} = + InputQueue.take(input_queue, 1) - refute_receive {Membrane.Core.Message, :demand, 10, [for_pad: :pad]} + assert old_atomic_demand_value == AtomicDemand.get(input_queue.atomic_demand) end test "send demands to the pid and updates demand", %{input_queue: input_queue} do assert {{:value, [{:buffers, [1], 1, 1}]}, new_input_queue} = input_queue |> InputQueue.store(bufs(10)) - |> InputQueue.take_and_demand(1, self(), :pad) - - assert_receive {Membrane.Core.Message, :demand, 10, [for_pad: :pad]} + |> InputQueue.take(1) assert new_input_queue.size == 9 - assert new_input_queue.demand == -9 + assert new_input_queue.demand >= 31 + assert AtomicDemand.get(new_input_queue.atomic_demand) >= 41 end end - describe ".take_and_demand/4 should also" do + describe ".take/2 should also" do setup do size = 6 buffers1 = {:buffers, [:b1, :b2, :b3], 3, 3} buffers2 = {:buffers, [:b4, :b5, :b6], 3, 3} q = Qex.new() |> Qex.push(buffers1) |> Qex.push(buffers2) + atomic_demand = new_atomic_demand() + + :ok = AtomicDemand.increase(atomic_demand, 94) + assert_received Message.new(:atomic_demand_increased, :output_pad_ref) input_queue = struct(InputQueue, size: size, - demand: 0, - min_demand: 0, - target_queue_size: 100, - toilet?: false, + demand: 94, + target_size: 100, inbound_metric: Buffer.Metric.Count, outbound_metric: Buffer.Metric.Count, - q: q + q: q, + linked_output_ref: :output_pad_ref, + atomic_demand: atomic_demand ) {:ok, %{input_queue: input_queue, q: q, size: size, buffers1: buffers1, buffers2: buffers2}} @@ -253,71 +248,41 @@ defmodule Membrane.Core.Element.InputQueueTest do test "return tuple {:ok, {:empty, buffers}} when there are not enough buffers", context do - {result, _new_input_queue} = - InputQueue.take_and_demand( - context.input_queue, - 10, - self(), - :linked_output_ref - ) - + {result, _new_input_queue} = InputQueue.take(context.input_queue, 10) assert result == {:empty, [context.buffers1, context.buffers2]} end test "set `size` to 0 when there are not enough buffers", context do - {_, %{size: new_size}} = - InputQueue.take_and_demand( - context.input_queue, - 10, - self(), - :linked_output_ref - ) + {_, %{size: new_size}} = InputQueue.take(context.input_queue, 10) assert new_size == 0 end - test "generate demand hen there are not enough buffers", context do - InputQueue.take_and_demand( - context.input_queue, - 10, - self(), - :linked_output_ref - ) - - expected_size = context.size - pad_ref = :linked_output_ref - message = Message.new(:demand, expected_size, for_pad: pad_ref) - assert_received ^message + test "increase AtomicDemand hen there are not enough buffers", context do + old_atomic_demand_value = AtomicDemand.get(context.input_queue.atomic_demand) + old_demand = context.input_queue.demand + + {_output, input_queue} = InputQueue.take(context.input_queue, 10) + + assert old_atomic_demand_value < AtomicDemand.get(input_queue.atomic_demand) + assert old_demand < input_queue.demand end - test "return `to_take` buffers from the queue when there are enough buffers and buffers dont have to be split", + test "return `to_take` buffers from the queue when there are enough buffers and buffers don't have to be split", context do - {result, %{q: new_q}} = - InputQueue.take_and_demand( - context.input_queue(), - 3, - self(), - :linked_output_ref - ) + {result, %{q: new_q}} = InputQueue.take(context.input_queue, 3) - assert result == {:value, [context.buffers1()]} + assert result == {:value, [context.buffers1]} list = new_q |> Enum.into([]) - exp_list = Qex.new() |> Qex.push(context.buffers2()) |> Enum.into([]) + exp_list = Qex.new() |> Qex.push(context.buffers2) |> Enum.into([]) assert list == exp_list - assert_received Message.new(:demand, _, _) end test "return `to_take` buffers from the queue when there are enough buffers and buffers have to be split", context do - {result, %{q: new_q}} = - InputQueue.take_and_demand( - context.input_queue, - 4, - self(), - :linked_output_ref - ) + {result, %{q: new_q}} = InputQueue.take(context.input_queue, 4) exp_buf2 = {:buffers, [:b4], 1, 1} exp_rest = {:buffers, [:b5, :b6], 2, 2} @@ -327,79 +292,92 @@ defmodule Membrane.Core.Element.InputQueueTest do exp_list = Qex.new() |> Qex.push(exp_rest) |> Enum.into([]) assert list == exp_list - assert_received Message.new(:demand, _, _) end end test "if the queue works properly for :bytes input metric and :buffers output metric" do + atomic_demand = new_atomic_demand() + queue = InputQueue.init(%{ inbound_demand_unit: :bytes, outbound_demand_unit: :buffers, - demand_pid: self(), - demand_pad: :input, + atomic_demand: atomic_demand, + linked_output_ref: :output_pad_ref, log_tag: nil, - toilet?: false, - target_size: 10, - min_demand_factor: 1 + target_size: 10 }) - assert_receive {Membrane.Core.Message, :demand, 10, [for_pad: :input]} - assert queue.demand == 0 + assert_receive Message.new(:atomic_demand_increased, :output_pad_ref) + assert queue.demand == 10 queue = InputQueue.store(queue, [%Buffer{payload: "1234"}]) assert queue.size == 4 queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) + queue = Map.update!(queue, :atomic_demand, &AtomicDemand.decrease(&1, 16)) assert queue.size == 16 - assert queue.demand == 0 - {out, queue} = InputQueue.take_and_demand(queue, 2, self(), :input) + assert queue.demand == -6 + {out, queue} = InputQueue.take(queue, 2) assert bufs_size(out, :buffers) == 2 assert queue.size == 4 - assert queue.demand == 0 - assert_receive {Membrane.Core.Message, :demand, 12, [for_pad: :input]} + assert queue.demand >= 6 + assert_receive Message.new(:atomic_demand_increased, :output_pad_ref) + queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "1234"}]) - {out, queue} = InputQueue.take_and_demand(queue, 1, self(), :input) + {out, queue} = InputQueue.take(queue, 1) assert bufs_size(out, :buffers) == 1 assert queue.size == 8 - assert queue.demand == -8 + assert queue.demand >= 2 end test "if the queue works properly for :buffers input metric and :bytes output metric" do + atomic_demand = new_atomic_demand() + queue = InputQueue.init(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :bytes, - demand_pid: self(), - demand_pad: :input, + atomic_demand: atomic_demand, + linked_output_ref: :output_pad_ref, log_tag: nil, - toilet?: false, - target_size: 3, - min_demand_factor: 1 + target_size: 3 }) - assert_receive {Membrane.Core.Message, :demand, 3, [for_pad: :input]} - assert queue.demand == 0 + assert_receive Message.new(:atomic_demand_increased, :output_pad_ref) + assert queue.demand == 3 queue = InputQueue.store(queue, [%Buffer{payload: "1234"}]) assert queue.size == 1 queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) + queue = Map.update!(queue, :atomic_demand, &AtomicDemand.decrease(&1, 4)) assert queue.size == 4 - assert queue.demand == 0 - {out, queue} = InputQueue.take_and_demand(queue, 2, self(), :input) + assert queue.demand == -1 + {out, queue} = InputQueue.take(queue, 2) assert bufs_size(out, :bytes) == 2 assert queue.size == 4 - assert queue.demand == 0 - refute_receive {Membrane.Core.Message, :demand, _size, [for_pad: :input]} - {out, queue} = InputQueue.take_and_demand(queue, 11, self(), :input) + assert queue.demand == -1 + refute_receive Message.new(:atomic_demand_increased, :output_pad_ref) + {out, queue} = InputQueue.take(queue, 11) assert bufs_size(out, :bytes) == 11 assert queue.size == 2 - assert queue.demand == -1 - assert_receive {Membrane.Core.Message, :demand, 3, [for_pad: :input]} + assert queue.demand == 1 + assert_receive Message.new(:atomic_demand_increased, :output_pad_ref) end + defp new_atomic_demand(), + do: + AtomicDemand.new(%{ + receiver_effective_flow_control: :pull, + receiver_process: self(), + receiver_demand_unit: :bytes, + sender_process: self(), + sender_pad_ref: :output_pad_ref, + supervisor: SubprocessSupervisor.start_link!() + }) + defp bufs_size(output, unit) do {_state, bufs} = output diff --git a/test/membrane/core/element/lifecycle_controller_test.exs b/test/membrane/core/element/lifecycle_controller_test.exs index 3a92d29c0..6b5688d65 100644 --- a/test/membrane/core/element/lifecycle_controller_test.exs +++ b/test/membrane/core/element/lifecycle_controller_test.exs @@ -1,8 +1,12 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do use ExUnit.Case - alias Membrane.Core.Element.{InputQueue, LifecycleController, State} - alias Membrane.Core.Message + alias Membrane.Core.Element.{AtomicDemand, InputQueue, LifecycleController, State} + + alias Membrane.Core.{ + Message, + SubprocessSupervisor + } require Membrane.Core.Message @@ -17,16 +21,24 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do end setup do + atomic_demand = + AtomicDemand.new(%{ + receiver_effective_flow_control: :pull, + receiver_process: self(), + receiver_demand_unit: :buffers, + sender_process: self(), + sender_pad_ref: :some_pad, + supervisor: SubprocessSupervisor.start_link!() + }) + input_queue = InputQueue.init(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, - demand_pid: self(), - demand_pad: :some_pad, + atomic_demand: atomic_demand, + linked_output_ref: :some_pad, log_tag: "test", - toilet?: false, - target_size: nil, - min_demand_factor: nil + target_size: nil }) state = @@ -52,7 +64,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do } ) - assert_received Message.new(:demand, _size, for_pad: :some_pad) + assert_received Message.new(:atomic_demand_increased, :some_pad) [state: state] end diff --git a/test/membrane/core/element/pad_controller_test.exs b/test/membrane/core/element/pad_controller_test.exs index 4df843206..4a5b32c2a 100644 --- a/test/membrane/core/element/pad_controller_test.exs +++ b/test/membrane/core/element/pad_controller_test.exs @@ -4,6 +4,7 @@ defmodule Membrane.Core.Element.PadControllerTest do alias Membrane.Core.Child.{PadModel, PadSpecHandler} alias Membrane.Core.Element.State alias Membrane.Core.Message + alias Membrane.Core.SubprocessSupervisor alias Membrane.LinkError alias Membrane.Pad alias Membrane.Support.Element.{DynamicFilter, TrivialFilter} @@ -19,7 +20,8 @@ defmodule Membrane.Core.Element.PadControllerTest do module: elem_module, parent_pid: self(), internal_state: %{}, - synchronization: %{clock: nil, parent_clock: nil} + synchronization: %{clock: nil, parent_clock: nil}, + subprocess_supervisor: SubprocessSupervisor.start_link!() ) |> PadSpecHandler.init_pads() end @@ -30,25 +32,30 @@ defmodule Membrane.Core.Element.PadControllerTest do assert {{:ok, _pad_info}, new_state} = @module.handle_link( - :output, - %{pad_ref: :output, pid: self(), pad_props: %{options: []}, child: :a}, + :input, %{ - pad_ref: :other_input, - pid: nil, + pad_ref: :input, + pid: self(), + pad_props: %{min_demand_factor: 0.25, target_queue_size: 40, options: []}, + child: :a + }, + %{ + pad_ref: :other_output, + pid: spawn(fn -> :ok end), child: :b, pad_props: %{options: [], toilet_capacity: nil, throttling_factor: nil} }, %{ - initiator: :sibling, - other_info: %{direction: :input, flow_control: :manual, demand_unit: :buffers}, + other_info: %{direction: :output, flow_control: :manual, demand_unit: :buffers}, link_metadata: %{toilet: make_ref(), observability_metadata: %{}}, - stream_format_validation_params: [] + stream_format_validation_params: [], + other_effective_flow_control: :pull }, state ) assert %{new_state | pads_data: nil} == %{state | pads_data: nil} - assert PadModel.assert_instance(new_state, :output) == :ok + assert PadModel.assert_instance(new_state, :input) == :ok end test "when pad is does not exist in the element" do @@ -59,7 +66,7 @@ defmodule Membrane.Core.Element.PadControllerTest do :output, %{pad_ref: :invalid_pad_ref, child: :a}, %{pad_ref: :x, child: :b}, - %{link_initiator: :parent, stream_format_validation_params: []}, + %{stream_format_validation_params: []}, state ) end diff --git a/test/membrane/core/element/pad_model_test.exs b/test/membrane/core/element/pad_model_test.exs index 6c63df615..cf747500f 100644 --- a/test/membrane/core/element/pad_model_test.exs +++ b/test/membrane/core/element/pad_model_test.exs @@ -9,7 +9,7 @@ defmodule Membrane.Core.Child.PadModelTest do defp setup_element_state(_ctx) do state = %Membrane.Core.Element.State{ - pads_data: %{:input => struct(Membrane.Element.PadData, demand: 1)}, + pads_data: %{:input => struct(Membrane.Element.PadData, demand_snapshot: 1)}, pads_info: %{} } @@ -32,11 +32,11 @@ defmodule Membrane.Core.Child.PadModelTest do setup :setup_element_state test "is {:ok, value} when the pad is present", ctx do - assert {:ok, 1} = PadModel.get_data(ctx.state, :input, :demand) + assert {:ok, 1} = PadModel.get_data(ctx.state, :input, :demand_snapshot) end test "is :unknown_pad when the pad is not present", ctx do - assert PadModel.get_data(ctx.state, :output, :demand) == + assert PadModel.get_data(ctx.state, :output, :demand_snapshot) == {:error, :unknown_pad} end end @@ -45,12 +45,12 @@ defmodule Membrane.Core.Child.PadModelTest do setup :setup_element_state test "is value when the pad is present", ctx do - assert 1 = PadModel.get_data!(ctx.state, :input, :demand) + assert 1 = PadModel.get_data!(ctx.state, :input, :demand_snapshot) end test "is :unknown_pad when the pad is not present", ctx do assert_raise @unknown_pad_error_module, fn -> - PadModel.get_data!(ctx.state, :output, :demand) + PadModel.get_data!(ctx.state, :output, :demand_snapshot) end end end @@ -90,12 +90,12 @@ defmodule Membrane.Core.Child.PadModelTest do setup :setup_element_state test "updates the pad data with the given function when present", ctx do - assert PadModel.update_data(ctx.state, :input, :demand, &{:ok, &1 + 5}) == - {:ok, put_in(ctx.state, [:pads_data, :input, :demand], 6)} + assert PadModel.update_data(ctx.state, :input, :demand_snapshot, &{:ok, &1 + 5}) == + {:ok, put_in(ctx.state, [:pads_data, :input, :demand_snapshot], 6)} end test "is :unknown_pad and original state when the pad is not present", ctx do - assert PadModel.update_data(ctx.state, :output, :demand, &{:ok, &1 + 1}) == + assert PadModel.update_data(ctx.state, :output, :demand_snapshot, &{:ok, &1 + 1}) == {{:error, :unknown_pad}, ctx.state} end end @@ -104,13 +104,13 @@ defmodule Membrane.Core.Child.PadModelTest do setup :setup_element_state test "updates the pad data with the given function when present", ctx do - assert PadModel.update_data!(ctx.state, :input, :demand, &(&1 + 5)) == - put_in(ctx.state, [:pads_data, :input, :demand], 6) + assert PadModel.update_data!(ctx.state, :input, :demand_snapshot, &(&1 + 5)) == + put_in(ctx.state, [:pads_data, :input, :demand_snapshot], 6) end test "raises when the pad is not present", ctx do assert_raise @unknown_pad_error_module, fn -> - PadModel.update_data!(ctx.state, :other_input, :demand, &(&1 + 5)) + PadModel.update_data!(ctx.state, :other_input, :demand_snapshot, &(&1 + 5)) end end end diff --git a/test/membrane/core/element/stream_format_controller_test.exs b/test/membrane/core/element/stream_format_controller_test.exs index 45e605e64..3b159e4ae 100644 --- a/test/membrane/core/element/stream_format_controller_test.exs +++ b/test/membrane/core/element/stream_format_controller_test.exs @@ -3,7 +3,8 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do alias Membrane.Buffer alias Membrane.Core.Message - alias Membrane.Core.Element.{InputQueue, State} + alias Membrane.Core.Element.{AtomicDemand, InputQueue, State} + alias Membrane.Core.SubprocessSupervisor alias Membrane.StreamFormat.Mock, as: MockStreamFormat alias Membrane.Support.DemandsTest.Filter @@ -13,16 +14,24 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do @module Membrane.Core.Element.StreamFormatController setup do + atomic_demand = + AtomicDemand.new(%{ + receiver_effective_flow_control: :pull, + receiver_process: self(), + receiver_demand_unit: :buffers, + sender_process: self(), + sender_pad_ref: :some_pad, + supervisor: SubprocessSupervisor.start_link!() + }) + input_queue = InputQueue.init(%{ inbound_demand_unit: :buffers, outbound_demand_unit: :buffers, - demand_pid: self(), - demand_pad: :some_pad, + atomic_demand: atomic_demand, + linked_output_ref: :some_pad, log_tag: "test", - toilet?: false, - target_size: nil, - min_demand_factor: nil + target_size: nil }) state = @@ -46,7 +55,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do } ) - assert_received Message.new(:demand, _size, for_pad: :some_pad) + assert_received Message.new(:atomic_demand_increased, :some_pad) [state: state] end diff --git a/test/membrane/core/element/toilet_test.exs b/test/membrane/core/element/toilet_test.exs deleted file mode 100644 index 6aa557525..000000000 --- a/test/membrane/core/element/toilet_test.exs +++ /dev/null @@ -1,49 +0,0 @@ -defmodule Membrane.Core.Element.ToiletTest do - use ExUnit.Case - alias Membrane.Core.Element.Toilet - - setup do - [responsible_process: spawn(fn -> nil end)] - end - - test "if toilet is implemented as :atomics for elements put on the same node", context do - toilet = Toilet.new(100, :buffers, context.responsible_process, 1) - - {_module, {_pid, atomic_ref}, _capacity, _responsible_process_pid, _throttling_factor, - _unrinsed_buffers} = toilet - - Toilet.fill(toilet, 10) - assert :atomics.get(atomic_ref, 1) == 10 - Toilet.drain(toilet, 10) - assert :atomics.get(atomic_ref, 1) == 0 - end - - test "if the receiving element uses toilet with :atomics and the sending element with a interprocess message, when the toilet is distributed", - context do - toilet = Toilet.new(100, :buffers, context.responsible_process, 1) - - {_module, {counter_pid, atomic_ref}, _capacity, _responsible_process_pid, _throttling_factor, - _unrinsed_buffers} = toilet - - Toilet.fill(toilet, 10) - assert GenServer.call(counter_pid, {:add_get, atomic_ref, 0}) == 10 - assert :atomics.get(atomic_ref, 1) == 10 - Toilet.drain(toilet, 10) - assert GenServer.call(counter_pid, {:add_get, atomic_ref, 0}) == 0 - assert :atomics.get(atomic_ref, 1) == 0 - end - - test "if throttling mechanism works properly", context do - toilet = Toilet.new(100, :buffers, context.responsible_process, 10) - {:ok, toilet} = Toilet.fill(toilet, 10) - assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet - {:ok, toilet} = Toilet.fill(toilet, 5) - assert {_module, _counter, _capacity, _pid, _throttling_factor, 5} = toilet - {:ok, toilet} = Toilet.fill(toilet, 80) - assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet - {:ok, toilet} = Toilet.fill(toilet, 9) - assert {_module, _counter, _capacity, _pid, _throttling_factor, 9} = toilet - {:overflow, toilet} = Toilet.fill(toilet, 11) - assert {_module, _counter, _capacity, _pid, _throttling_factor, 0} = toilet - end -end diff --git a/test/membrane/core/element_test.exs b/test/membrane/core/element_test.exs index 2296afba8..bc9acd169 100644 --- a/test/membrane/core/element_test.exs +++ b/test/membrane/core/element_test.exs @@ -2,8 +2,13 @@ defmodule Membrane.Core.ElementTest do use ExUnit.Case, async: true alias __MODULE__.SomeElement - alias Membrane.Core.Element - alias Membrane.Core.Message + + alias Membrane.Core.{ + Element, + Message, + SubprocessSupervisor + } + alias Membrane.Core.Parent.Link.Endpoint require Membrane.Core.Message @@ -65,24 +70,84 @@ defmodule Membrane.Core.ElementTest do state end + defmodule HelperServer do + use GenServer + + @spec start!() :: pid() + def start!() do + {:ok, pid} = GenServer.start(__MODULE__, self()) + pid + end + + @impl true + def init(owner) do + ref = Process.monitor(owner) + {:ok, %{monitor_ref: ref, owner: owner}} + end + + @impl true + def handle_cast({:set_reply, reply}, state), do: {:noreply, Map.put(state, :reply, reply)} + + @impl true + def handle_call(_msg, _from, state), do: {:reply, state.reply, state} + + @impl true + def handle_info({:DOWN, ref, _process, _pid, _reason}, %{monitor_ref: ref} = state) do + {:stop, :normal, state} + end + + @impl true + def handle_info(msg, %{owner: owner} = state) do + send(owner, msg) + {:noreply, state} + end + end + defp linked_state do - {:reply, {:ok, _reply}, state} = + helper_server = HelperServer.start!() + + output_other_endpoint = %Endpoint{ + pad_spec: :dynamic_input, + pad_ref: :dynamic_input, + pid: helper_server, + child: :other, + pad_props: %{options: [], toilet_capacity: nil, throttling_factor: nil} + } + + other_info = %{direction: :input, flow_control: :manual, demand_unit: :buffers} + + output_atomic_demand = + Element.AtomicDemand.new(%{ + receiver_effective_flow_control: :pull, + receiver_process: helper_server, + receiver_demand_unit: :buffers, + sender_process: self(), + sender_pad_ref: :output, + supervisor: SubprocessSupervisor.start_link!() + }) + + reply_link_metadata = %{ + atomic_demand: output_atomic_demand, + observability_metadata: %{}, + input_demand_unit: :buffers, + output_demand_unit: :buffers + } + + handle_link_reply = {:ok, {output_other_endpoint, other_info, reply_link_metadata}} + + GenServer.cast(helper_server, {:set_reply, handle_link_reply}) + + {:reply, :ok, state} = Element.handle_call( Message.new(:handle_link, [ :output, %Endpoint{pad_spec: :output, pad_ref: :output, pad_props: %{options: []}, child: :this}, - %Endpoint{ - pad_spec: :dynamic_input, - pad_ref: :dynamic_input, - pid: self(), - child: :other, - pad_props: %{options: [], toilet_capacity: nil, throttling_factor: nil} - }, + output_other_endpoint, %{ - initiator: :sibling, - other_info: %{direction: :input, flow_control: :manual, demand_unit: :buffers}, - link_metadata: %{toilet: nil, observability_metadata: %{}}, - stream_format_validation_params: [] + other_info: other_info, + link_metadata: %{atomic_demand: output_atomic_demand, observability_metadata: %{}}, + stream_format_validation_params: [], + other_effective_flow_control: :pull } ]), nil, @@ -109,10 +174,10 @@ defmodule Membrane.Core.ElementTest do }, %Endpoint{pad_spec: :output, pad_ref: :output, pid: self(), child: :other}, %{ - initiator: :sibling, other_info: %{direction: :output, flow_control: :manual}, link_metadata: %{toilet: nil, observability_metadata: %{}}, - stream_format_validation_params: [] + stream_format_validation_params: [], + other_effective_flow_control: :pull } ]), nil, @@ -143,9 +208,10 @@ defmodule Membrane.Core.ElementTest do test "should store demand/buffer/event/stream format when not playing" do initial_state = linked_state() + :ok = increase_output_atomic_demand(initial_state, 10) [ - Message.new(:demand, 10, for_pad: :output), + Message.new(:atomic_demand_increased, :output), Message.new(:buffer, %Membrane.Buffer{payload: <<>>}, for_pad: :dynamic_input), Message.new(:stream_format, %StreamFormat{}, for_pad: :dynamic_input), Message.new(:event, %Membrane.Testing.Event{}, for_pad: :dynamic_input), @@ -160,9 +226,13 @@ defmodule Membrane.Core.ElementTest do end test "should update demand" do - msg = Message.new(:demand, 10, for_pad: :output) - assert {:noreply, state} = Element.handle_info(msg, playing_state()) - assert state.pads_data.output.demand == 10 + state = playing_state() + :ok = increase_output_atomic_demand(state, 10) + + msg = Message.new(:atomic_demand_increased, :output) + assert {:noreply, state} = Element.handle_info(msg, state) + + assert state.pads_data.output.demand_snapshot == 10 end test "should store incoming buffers in dynamic_input buffer" do @@ -201,50 +271,52 @@ defmodule Membrane.Core.ElementTest do assert {:reply, {:ok, reply}, state} = Element.handle_call( Message.new(:handle_link, [ - :output, + :input, %{ - pad_ref: :output, - pad_props: %{options: [], toilet_capacity: nil}, + pad_ref: :dynamic_input, + pad_props: %{options: [], toilet_capacity: nil, target_queue_size: 40}, child: :this }, %{ - pad_ref: :dynamic_input, + pad_ref: :output, pid: pid, child: :other, pad_props: %{options: [], toilet_capacity: nil, throttling_factor: nil} }, %{ - initiator: :sibling, other_info: %{ direction: :input, demand_unit: :buffers, flow_control: :manual }, link_metadata: %{observability_metadata: %{}}, - stream_format_validation_params: [] + stream_format_validation_params: [], + other_effective_flow_control: :pull } ]), nil, get_state() ) - assert {%{child: :this, pad_props: %{options: []}, pad_ref: :output}, + assert {%{child: :this, pad_props: %{options: []}, pad_ref: :dynamic_input}, %{ - availability: :always, + availability: :on_request, flow_control: :manual, - direction: :output, - name: :output, + direction: :input, + name: :dynamic_input, options: nil }, - %{toilet: toilet, output_demand_unit: :buffers, input_demand_unit: :buffers}} = reply - - assert toilet != nil + %{ + atomic_demand: %Element.AtomicDemand{}, + output_demand_unit: :buffers, + input_demand_unit: :buffers + }} = reply assert %Membrane.Element.PadData{ pid: ^pid, - other_ref: :dynamic_input, + other_ref: :output, other_demand_unit: :buffers - } = state.pads_data.output + } = state.pads_data.dynamic_input end test "should handle unlinking pads" do @@ -347,4 +419,10 @@ defmodule Membrane.Core.ElementTest do group: nil } end + + defp increase_output_atomic_demand(state, value) do + :ok = + state.pads_data.output.atomic_demand + |> Element.AtomicDemand.increase(value) + end end diff --git a/test/membrane/element_test.exs b/test/membrane/element_test.exs index c2e5a40c0..abd755816 100644 --- a/test/membrane/element_test.exs +++ b/test/membrane/element_test.exs @@ -75,7 +75,7 @@ defmodule Membrane.ElementTest do [pipeline: pipeline] end - test "play", %{} do + test "play" do TestFilter.assert_callback_called(:handle_playing) end @@ -84,7 +84,7 @@ defmodule Membrane.ElementTest do TestFilter.assert_callback_called(:handle_start_of_stream) end - test "does not trigger calling callback handle_event/3", %{} do + test "does not trigger calling callback handle_event/3" do TestFilter.refute_callback_called(:handle_event) end @@ -94,11 +94,11 @@ defmodule Membrane.ElementTest do end describe "End of stream" do - test "causes handle_end_of_stream/3 to be called", %{} do + test "causes handle_end_of_stream/3 to be called" do TestFilter.assert_callback_called(:handle_end_of_stream) end - test "does not trigger calling callback handle_event/3", %{} do + test "does not trigger calling callback handle_event/3" do TestFilter.refute_callback_called(:handle_event) end diff --git a/test/membrane/filter_aggregator/unit_test.exs b/test/membrane/filter_aggregator/unit_test.exs index 32a39488f..e5b42d48b 100644 --- a/test/membrane/filter_aggregator/unit_test.exs +++ b/test/membrane/filter_aggregator/unit_test.exs @@ -45,7 +45,7 @@ defmodule Membrane.FilterAggregator.UnitTest do pad_description_template |> Map.merge(%{ stream_format: nil, - demand: 0, + demand_snapshot: 0, ref: nil, other_ref: nil, other_demand_unit: :buffers, @@ -160,7 +160,7 @@ defmodule Membrane.FilterAggregator.UnitTest do |> Enum.flat_map(& &1.pads) |> Enum.each(fn {pad, pad_data} -> assert pad_data.availability == :always - assert pad_data.demand == nil + assert pad_data.demand_snapshot == nil assert pad_data.direction == pad assert pad_data.start_of_stream? == false assert pad_data.end_of_stream? == false diff --git a/test/membrane/integration/auto_demands_test.exs b/test/membrane/integration/auto_demands_test.exs index fd6914239..a46bbf6f9 100644 --- a/test/membrane/integration/auto_demands_test.exs +++ b/test/membrane/integration/auto_demands_test.exs @@ -1,6 +1,7 @@ defmodule Membrane.Integration.AutoDemandsTest do use ExUnit.Case, async: true + import Membrane.ChildrenSpec import Membrane.Testing.Assertions alias Membrane.Testing.{Pipeline, Sink, Source} @@ -51,8 +52,6 @@ defmodule Membrane.Integration.AutoDemandsTest do ] |> Enum.map(fn opts -> test "buffers pass through auto-demand filters; setup: #{inspect(opts)}" do - import Membrane.ChildrenSpec - %{payloads: payloads, factor: factor, direction: direction, filters: filters} = unquote(Macro.escape(opts)) @@ -110,8 +109,6 @@ defmodule Membrane.Integration.AutoDemandsTest do end test "handle removed branch" do - import Membrane.ChildrenSpec - pipeline = Pipeline.start_link_supervised!( spec: [ @@ -158,8 +155,6 @@ defmodule Membrane.Integration.AutoDemandsTest do ] |> Enum.map(fn opts -> test "buffers pass to auto-demand #{opts.name}" do - import Membrane.ChildrenSpec - %{name: name, module: module} = unquote(Macro.escape(opts)) payloads = Enum.map(1..1000, &inspect/1) @@ -201,8 +196,6 @@ defmodule Membrane.Integration.AutoDemandsTest do end test "toilet" do - import Membrane.ChildrenSpec - pipeline = Pipeline.start_link_supervised!( spec: @@ -229,8 +222,6 @@ defmodule Membrane.Integration.AutoDemandsTest do end test "toilet overflow" do - import Membrane.ChildrenSpec - pipeline = Pipeline.start_supervised!( spec: diff --git a/test/membrane/integration/child_pad_removed_test.exs b/test/membrane/integration/child_pad_removed_test.exs index 5eb2b629f..5b29d833b 100644 --- a/test/membrane/integration/child_pad_removed_test.exs +++ b/test/membrane/integration/child_pad_removed_test.exs @@ -181,7 +181,6 @@ defmodule Membrane.Integration.ChildPadRemovedTest do end end - @tag :target test "and sibling linked via static pad is removed, pipeline is not raising" do for bin_actions <- [ [remove_children: :source], diff --git a/test/membrane/integration/demands_test.exs b/test/membrane/integration/demands_test.exs index 43fb0eca4..d2c3a0a27 100644 --- a/test/membrane/integration/demands_test.exs +++ b/test/membrane/integration/demands_test.exs @@ -17,15 +17,13 @@ defmodule Membrane.Integration.DemandsTest do end defp test_pipeline(pid) do - pattern_gen = fn i -> %Buffer{payload: <> <> <<255>>} end - demand = 500 Pipeline.message_child(pid, :sink, {:make_demand, demand}) 0..(demand - 1) |> assert_buffers_received(pid) - pattern = pattern_gen.(demand) + pattern = %Buffer{payload: <> <> <<255>>} refute_sink_buffer(pid, :sink, ^pattern, 0) Pipeline.message_child(pid, :sink, {:make_demand, demand}) diff --git a/test/membrane/integration/effective_flow_control_resolution_test.exs b/test/membrane/integration/effective_flow_control_resolution_test.exs new file mode 100644 index 000000000..6052515f5 --- /dev/null +++ b/test/membrane/integration/effective_flow_control_resolution_test.exs @@ -0,0 +1,232 @@ +defmodule Membrane.Integration.EffectiveFlowControlResolutionTest do + use ExUnit.Case + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Testing + + require Membrane.Child, as: Child + + defmodule AutoFilter do + use Membrane.Filter + + def_input_pad :input, availability: :on_request, accepted_format: _any + def_output_pad :output, availability: :on_request, accepted_format: _any + + def_options lazy?: [spec: boolean(), default: false] + + @impl true + def handle_playing(_ctx, state) do + {[notify_parent: :playing], state} + end + + @impl true + def handle_buffer(_pad, buffer, _ctx, state) do + if state.lazy?, do: Process.sleep(100) + {[forward: buffer], state} + end + end + + defmodule DoubleFlowControlSource do + use Membrane.Source + + def_output_pad :push_output, accepted_format: _any, flow_control: :push + def_output_pad :pull_output, accepted_format: _any, flow_control: :manual + + @impl true + def handle_demand(_pad, _size, _unit, _ctx, state), do: {[], state} + end + + defmodule PushSource do + use Membrane.Source + + def_output_pad :output, accepted_format: _any, flow_control: :push + end + + defmodule PullSource do + use Membrane.Source + + def_output_pad :output, accepted_format: _any, flow_control: :manual + + @impl true + def handle_demand(_pad, _size, _unit, _ctx, state), do: {[], state} + end + + defmodule BatchingSource do + use Membrane.Source + + def_output_pad :output, accepted_format: _any, flow_control: :push + + @impl true + def handle_playing(_ctx, state) do + # buffers_barch is bigger than sum of default toilet capacity and default initial auto demand + buffers_batch = Enum.map(1..5_000, &%Membrane.Buffer{payload: inspect(&1)}) + + actions = [ + stream_format: {:output, %Membrane.StreamFormat.Mock{}}, + buffer: {:output, buffers_batch} + ] + + {actions, state} + end + end + + test "effective_flow_control is properly resolved in simple scenario" do + spec_beggining = [ + child({:filter_a, 0}, AutoFilter), + child({:filter_b, 0}, AutoFilter) + ] + + spec = + Enum.reduce(1..10, spec_beggining, fn idx, [predecessor_a, predecessor_b] -> + [ + predecessor_a + |> child({:filter_a, idx}, AutoFilter), + predecessor_b + |> child({:filter_b, idx}, AutoFilter) + ] + end) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + children_names = + for filter_type <- [:filter_a, :filter_b], idx <- 0..10 do + {filter_type, idx} + end + + children_names + |> Enum.map(fn child -> + assert_pipeline_notified(pipeline, child, :playing) + child + end) + |> Enum.each(fn child -> + assert_child_effective_flow_control(pipeline, child, :push) + end) + + Testing.Pipeline.execute_actions(pipeline, + spec: [ + child(:source, DoubleFlowControlSource) + |> via_out(:push_output) + |> get_child({:filter_a, 0}), + get_child(:source) |> via_out(:pull_output) |> get_child({:filter_b, 0}) + ] + ) + + Process.sleep(1000) + + for child <- children_names do + expected = + case child do + {:filter_a, _idx} -> :push + {:filter_b, _idx} -> :pull + end + + assert_child_effective_flow_control(pipeline, child, expected) + end + end + + test "effective_flow_control switches between :push and :pull" do + spec = + Enum.reduce( + 1..10, + child(:push_source, PushSource), + fn idx, last_child -> last_child |> child({:filter, idx}, AutoFilter) end + ) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(500) + + for _i <- 1..5 do + Testing.Pipeline.execute_actions(pipeline, + spec: child(:pull_source, PullSource) |> get_child({:filter, 1}) + ) + + Process.sleep(500) + + for idx <- 1..10 do + assert_child_effective_flow_control(pipeline, {:filter, idx}, :pull) + end + + Testing.Pipeline.execute_actions(pipeline, remove_children: :pull_source) + Process.sleep(500) + + for idx <- 1..10 do + assert_child_effective_flow_control(pipeline, {:filter, idx}, :push) + end + end + + Testing.Pipeline.terminate(pipeline) + end + + test "effective_flow_control is :pull, only when it should be" do + spec = [ + child(:push_source, PushSource) + |> child(:filter_1, AutoFilter) + |> child(:filter_2, AutoFilter), + child(:pull_source, PullSource) + |> get_child(:filter_2) + ] + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(500) + + assert_child_effective_flow_control(pipeline, :filter_1, :push) + assert_child_effective_flow_control(pipeline, :filter_2, :pull) + end + + test "Toilet does not overflow, when input pad effective flow control is :push" do + spec = + child(:source, BatchingSource) + |> child(:filter, AutoFilter) + + pipeline = Testing.Pipeline.start_supervised!(spec: spec) + Process.sleep(500) + + child_pid = Testing.Pipeline.get_child_pid!(pipeline, :filter) + assert Process.alive?(child_pid) + + Testing.Pipeline.terminate(pipeline) + end + + test "Toilet overflows, when it should" do + spec = { + child(:pull_source, PullSource) + |> child(:filter, %AutoFilter{lazy?: true}), + group: :group, crash_group_mode: :temporary + } + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + Process.sleep(500) + + filter_ref = Child.ref(:filter, group: :group) + assert_child_effective_flow_control(pipeline, filter_ref, :pull) + + monitor_ref = + Testing.Pipeline.get_child_pid!(pipeline, filter_ref) + |> Process.monitor() + + Testing.Pipeline.execute_actions(pipeline, + spec: { + child(:batching_source, BatchingSource) + |> get_child(filter_ref), + group: :another_group, crash_group_mode: :temporary + } + ) + + # batch of buffers sent by BatchingSource should cause toilet overflow + assert_receive {:DOWN, ^monitor_ref, _process, _pid, :killed} + assert_pipeline_crash_group_down(pipeline, :group) + Testing.Pipeline.terminate(pipeline) + end + + defp assert_child_effective_flow_control(pipeline, child_name, expected) do + child_state = + Testing.Pipeline.get_child_pid!(pipeline, child_name) + |> :sys.get_state() + + assert child_state.effective_flow_control == expected + end +end diff --git a/test/membrane/integration/toilet_forwarding_test.exs b/test/membrane/integration/toilet_forwarding_test.exs new file mode 100644 index 000000000..302b197e8 --- /dev/null +++ b/test/membrane/integration/toilet_forwarding_test.exs @@ -0,0 +1,245 @@ +defmodule Membrane.Integration.ToiletForwardingTest do + use ExUnit.Case, async: true + + import Membrane.ChildrenSpec + import Membrane.Testing.Assertions + + alias Membrane.Testing + + require Membrane.Child, as: Child + require Membrane.Pad, as: Pad + + defmodule StreamFormat do + defstruct [] + end + + defmodule AutoFilter do + use Membrane.Filter + + def_input_pad :input, availability: :on_request, accepted_format: _any, flow_control: :auto + def_output_pad :output, availability: :on_request, accepted_format: _any, flow_control: :auto + + @impl true + def handle_buffer(_pad, buffer, _ctx, state) do + {[forward: buffer], state} + end + + @impl true + def handle_parent_notification({:execute_actions, actions}, _ctx, state) do + {actions, state} + end + end + + defmodule PushSink do + use Membrane.Sink + def_input_pad :input, accepted_format: _any, flow_control: :push + end + + defmodule AutoSink do + use Membrane.Sink + def_input_pad :input, accepted_format: _any, flow_control: :auto + + @impl true + def handle_parent_notification({:sleep, timeout}, _cts, state) do + Process.sleep(timeout) + {[], state} + end + + @impl true + def handle_buffer(_pad, buffer, _ctx, state) do + {[notify_parent: {:buffer, buffer}], state} + end + end + + defmodule PushSource do + use Membrane.Source + + def_output_pad :output, accepted_format: _any, flow_control: :push + + @impl true + def handle_playing(_ctx, state) do + {[stream_format: {:output, %StreamFormat{}}], state} + end + + @impl true + def handle_parent_notification({:forward_buffers, buffers}, _ctx, state) do + {[buffer: {:output, buffers}], state} + end + end + + defmodule PullSource do + use Membrane.Source + + defmodule StreamFormat do + defstruct [] + end + + def_output_pad :output, accepted_format: _any, flow_control: :manual + + @impl true + def handle_playing(_ctx, state) do + {[stream_format: {:output, %StreamFormat{}}], state} + end + + @impl true + def handle_parent_notification({:forward_buffers, buffers}, _ctx, state) do + {[buffer: {:output, buffers}], state} + end + + @impl true + def handle_demand(_pad, _demand, _unit, _ctx, state), do: {[], state} + end + + defmodule RedemandingFilter do + use Membrane.Filter + + def_input_pad :input, accepted_format: _any, flow_control: :manual, demand_unit: :buffers + def_output_pad :output, accepted_format: _any, flow_control: :manual, demand_unit: :buffers + + @impl true + def handle_demand(:output, _size, :buffers, _ctx, state) do + {[demand: :input], state} + end + + @impl true + def handle_buffer(:input, buffer, _ctx, state) do + {[buffer: {:output, buffer}, redemand: :output], state} + end + end + + test "toilet overflows only where it should" do + # because Membrane.Testing.Source output pad has :manual flow control, :filter will work in auto pull + + filter_ref = Child.ref(:filter, group: :filter_group) + sink_ref = Child.ref(:sink, group: :sink_group) + + spec = [ + {child(:filter, AutoFilter), group: :filter_group, crash_group_mode: :temporary}, + {child(:sink, %Testing.Sink{autodemand: false}), + group: :sink_group, crash_group_mode: :temporary}, + child(Testing.Source) + |> child(AutoFilter) + |> get_child(filter_ref) + |> get_child(sink_ref), + child(:push_source, PushSource) + |> child(AutoFilter) + |> get_child(filter_ref) + ] + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Process.sleep(500) + + buffers = + 1..10_000 + |> Enum.map(fn i -> %Membrane.Buffer{payload: <>} end) + + Testing.Pipeline.execute_actions(pipeline, + notify_child: {:push_source, {:forward_buffers, buffers}} + ) + + assert_pipeline_crash_group_down(pipeline, :filter_group) + + Testing.Pipeline.terminate(pipeline) + end + + test "toilet does not overflow on link between 2 pads in auto push" do + spec = + child(:source, PushSource) + |> child(AutoFilter) + |> child(:sink, AutoSink) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + Testing.Pipeline.execute_actions(pipeline, notify_child: {:sink, {:sleep, 1000}}) + Process.sleep(100) + + buffers = + 1..10_000 + |> Enum.map(fn i -> %Membrane.Buffer{payload: <>} end) + + Testing.Pipeline.execute_actions(pipeline, + notify_child: {:source, {:forward_buffers, buffers}} + ) + + refute_pipeline_notified(pipeline, :sink, {:buffer, _buffer}, 500) + + for i <- 1..10_000 do + assert_pipeline_notified( + pipeline, + :sink, + {:buffer, %Membrane.Buffer{payload: <>}} + ) + + assert buff_idx == i + end + + Testing.Pipeline.terminate(pipeline) + end + + test "toilet does not overflow on link between 2 pads in auto pull" do + spec = + child(PullSource) + |> child(:filter, AutoFilter) + |> via_out(Pad.ref(:output, 1)) + |> child(:sink, AutoSink) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + # time to propagate :pull effective flow control + Process.sleep(100) + + buffers = + 1..10_000 + |> Enum.map(fn i -> %Membrane.Buffer{payload: <>} end) + + filter_actions = [ + stream_format: {Pad.ref(:output, 1), %StreamFormat{}}, + buffer: {Pad.ref(:output, 1), buffers} + ] + + Testing.Pipeline.execute_actions(pipeline, + notify_child: {:filter, {:execute_actions, filter_actions}} + ) + + for i <- 1..10_000 do + assert_pipeline_notified( + pipeline, + :sink, + {:buffer, %Membrane.Buffer{payload: <>}} + ) + + assert buff_idx == i + end + + Testing.Pipeline.terminate(pipeline) + end + + test "elements in auto pull work properly with elements in manual pull" do + spec = + Enum.reduce(1..20, child(:source, PullSource), fn _i, spec -> + spec + |> child(AutoFilter) + |> child(RedemandingFilter) + end) + |> child(:sink, %Testing.Sink{autodemand: false}) + + pipeline = Testing.Pipeline.start_link_supervised!(spec: spec) + + buffers = + 1..4000 + |> Enum.map(fn i -> %Membrane.Buffer{payload: <>} end) + + Testing.Pipeline.execute_actions(pipeline, + notify_child: {:source, {:forward_buffers, buffers}}, + notify_child: {:sink, {:make_demand, 3000}} + ) + + for i <- 1..3000 do + assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: <>}) + assert buff_idx == i + end + + Testing.Pipeline.terminate(pipeline) + end +end diff --git a/test/membrane/testing/dynamic_source_test.exs b/test/membrane/testing/dynamic_source_test.exs index a95a87eab..1abdc69c0 100644 --- a/test/membrane/testing/dynamic_source_test.exs +++ b/test/membrane/testing/dynamic_source_test.exs @@ -50,16 +50,12 @@ defmodule Membrane.Testing.DynamicSourceTest do test "Source works properly when using generator function" do pipeline = Testing.Pipeline.start_link_supervised!( - spec: - [ - child(:source, Testing.DynamicSource), - child(:sink_1, Testing.Sink), - child(:sink_2, Testing.Sink) - ] ++ - [ - get_child(:source) |> get_child(:sink_1), - get_child(:source) |> get_child(:sink_2) - ] + spec: [ + child(:source, Testing.DynamicSource) + |> child(:sink_1, Testing.Sink), + get_child(:source) + |> child(:sink_2, Testing.Sink) + ] ) assert_sink_buffer(pipeline, :sink_1, %Buffer{payload: <<0::16>>}) diff --git a/test/support/bin/test_bins.ex b/test/support/bin/test_bins.ex index aa2dbd9f4..0603ff173 100644 --- a/test/support/bin/test_bins.ex +++ b/test/support/bin/test_bins.ex @@ -46,7 +46,7 @@ defmodule Membrane.Support.Bin.TestBins do ctx.pads |> Map.values() |> Enum.filter(&(&1.direction == :output)) - |> Enum.map(& &1.demand) + |> Enum.map(& &1.demand_snapshot) |> Enum.min() demands = diff --git a/test/support/log_metadata_test/pipeline.ex b/test/support/log_metadata_test/pipeline.ex index fb7c0a581..0a2e3fa06 100644 --- a/test/support/log_metadata_test/pipeline.ex +++ b/test/support/log_metadata_test/pipeline.ex @@ -14,8 +14,6 @@ defmodule Membrane.Support.LogMetadataTest.Pipeline do import Membrane.ChildrenSpec - require Membrane.Logger - def_output_pad :output, flow_control: :manual, accepted_format: _any,