Skip to content

Commit

Permalink
Add new delayed demands loop tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 26, 2024
1 parent ff51e43 commit 0f7b0ca
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 28 deletions.
8 changes: 4 additions & 4 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Membrane.Core.CallbackHandler do
@callback transform_actions(actions :: list, callback :: atom, handler_params, state) ::
{actions :: list, state}

@callback handle_end_of_actions(state) :: state
@callback handle_end_of_actions(callback :: atom, state) :: state

defmacro __using__(_args) do
quote location: :keep do
Expand All @@ -44,7 +44,7 @@ defmodule Membrane.Core.CallbackHandler do
end

@impl unquote(__MODULE__)
def handle_end_of_actions(state) do
def handle_end_of_actions(_callback, state) do
state
end

Expand Down Expand Up @@ -133,7 +133,7 @@ defmodule Membrane.Core.CallbackHandler do
%{context: context_fun},
%{module: module, internal_state: internal_state} = state
) do
args = args ++ [context_fun.(state), internal_state]
args = args ++ [context_fun.(state) |> Map.put(:s, state), internal_state]

callback_result =
try do
Expand Down Expand Up @@ -223,6 +223,6 @@ defmodule Membrane.Core.CallbackHandler do
do: %{state | supplying_demand?: false},
else: state

handler_module.handle_end_of_actions(state)
handler_module.handle_end_of_actions(callback, state)
end
end
42 changes: 22 additions & 20 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,39 @@ defmodule Membrane.Core.Element.ActionHandler do
defguardp is_demand_size(size) when is_integer(size) or is_function(size)

@impl CallbackHandler
def handle_end_of_actions(state) do
def handle_end_of_actions(callback, state) do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state

state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)

state
if Enum.random([1, 2]) == 1 do
snapshot(callback, state)
|> hdd()
else
state
|> hdd()
|> then(&snapshot(callback, &1))
end
end

defp maybe_handle_delayed_demands(state) do
defp hdd(state) do
with %{supplying_demand?: false} <- state do
DemandHandler.handle_delayed_demands(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
# with %{handling_action?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
defp snapshot(callback, state) do
# Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might
# be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.
if callback != :handle_spec_started do
state.pads_to_snapshot
|> Enum.shuffle()
|> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
# end
else
state
end
end

@impl CallbackHandler
Expand Down Expand Up @@ -342,6 +343,7 @@ defmodule Membrane.Core.Element.ActionHandler do
stalker_metrics: stalker_metrics
}
when stream_format != nil <- pad_data do
# todo: move this function to one of the controllers, to avoid redundant PadModet.get_data in the function below
state = DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state)
:atomics.add(stalker_metrics.total_buffers, 1, length(buffers))
Message.send(pid, :buffer, buffers, for_pad: other_ref)
Expand Down
5 changes: 4 additions & 1 deletion lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ defmodule Membrane.Core.Element.DemandController do
%{flow_control: :auto} = pad_data,
%{effective_flow_control: :pull} = state
) do
if AtomicDemand.get(pad_data.atomic_demand) > 0 do
atomic_value = AtomicDemand.get(pad_data.atomic_demand)
state = PadModel.set_data!(state, pad_data.ref, :demand, atomic_value)

if atomic_value > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
end

@impl CallbackHandler
def handle_end_of_actions(state) do
def handle_end_of_actions(_callback, state) do
with %{awaiting_setup_completition?: true} <- state do
%{state | awaiting_setup_completition?: false}
|> Membrane.Core.LifecycleController.complete_setup()
Expand Down
2 changes: 0 additions & 2 deletions test/membrane/core/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ defmodule Membrane.Core.PipelineTest do
[],
state
)
|> ActionHandler.handle_end_of_actions()
end
end

Expand All @@ -93,7 +92,6 @@ defmodule Membrane.Core.PipelineTest do
[],
state
)
|> ActionHandler.handle_end_of_actions()
end
end
end
Expand Down
96 changes: 96 additions & 0 deletions test/membrane/integration/delayed_demands_loop_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,100 @@ defmodule Membrane.Test.DelayedDemandsLoopTest do

Testing.Pipeline.terminate(pipeline)
end

defmodule VariousFlowFilter do
use Membrane.Filter

def_input_pad :manual_input,
accepted_format: _any,
flow_control: :manual,
demand_unit: :buffers

def_input_pad :auto_input, accepted_format: _any, flow_control: :auto

def_output_pad :manual_output, accepted_format: _any, flow_control: :manual
def_output_pad :auto_output, accepted_format: _any, flow_control: :auto

defmodule StreamFormat do
defstruct []
end

@impl true
def handle_playing(_ctx, _state) do
actions =
[:manual_output, :auto_output]
|> Enum.map(&{:stream_format, {&1, %StreamFormat{}}})

{actions, %{}}
end

@impl true
def handle_demand(:manual_output, size, :buffers, _ctx, state) do
{[demand: {:manual_input, size}], state}
end

@impl true
def handle_buffer(_pad, buffer, _ctx, state) do
# Aim of this Process.sleep is to make VariousFlowFilter working slower than Testing.Sinks
Process.sleep(1)

actions =
[:manual_output, :auto_output]
|> Enum.map(&{:buffer, {&1, buffer}})

{actions, state}
end

@impl true
def handle_end_of_stream(_pad, _ctx, state) do
{[], state}
end
end

test "manual pad doesn't starve auto pad" do
buffers_per_source = 10_000
input_demand_size = 100

manual_source_buffers =
Stream.repeatedly(fn -> %Buffer{metadata: :manual, payload: <<>>} end)
|> Stream.take(buffers_per_source)

auto_source_buffers =
Stream.repeatedly(fn -> %Buffer{metadata: :auto, payload: <<>>} end)
|> Stream.take(buffers_per_source)

pipeline =
Testing.Pipeline.start_link_supervised!(
spec: [
child(:manual_source, %Testing.Source{output: manual_source_buffers})
|> via_in(:manual_input, target_queue_size: input_demand_size)
|> child(:filter, VariousFlowFilter)
|> via_out(:manual_output)
|> child(:manual_sink, Testing.Sink),
child(:auto_source, %Testing.Source{output: auto_source_buffers})
|> via_in(:auto_input, auto_demand_size: input_demand_size)
|> get_child(:filter)
|> via_out(:auto_output)
|> child(:auto_sink, Testing.Sink)
]
)

stats = %{manual: 0, auto: 0}

Enum.reduce(1..10_000, stats, fn _i, stats ->
assert_sink_buffer(pipeline, :auto_sink, buffer)
stats = Map.update!(stats, buffer.metadata, &(&1 + 1))

difference_upperbound =
max(stats.auto, stats.manual)
|> div(2)
|> max(5 * input_demand_size)

assert abs(stats.auto - stats.manual) <= difference_upperbound

stats
end)

Testing.Pipeline.terminate(pipeline)
end
end

0 comments on commit 0f7b0ca

Please sign in to comment.