Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor demand mechanism #783

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,14 @@ jobs:
steps:
- attach_workspace:
at: .
- run: cd app; git add -A; git reset --hard; cd ..
- run: cp -r benchmark/ ~/benchmark_backup/
- run: cp mix.exs ~/benchmark_backup/
- run: docker pull membraneframeworklabs/docker_membrane
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/feature_branch_results
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/feature_branch_results
- run: git checkout -f master
- run: cp ~/benchmark_backup/mix.exs ~/app
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, run benchmark/run.exs /root/results/master_results
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix do deps.get, deps.compile --force --all, compile --force, run benchmark/run.exs /root/results/master_results
- run: docker run -e MIX_ENV=benchmark -v ./:/root/app -v ~/results:/root/results -v ~/benchmark_backup/benchmark:/root/app/benchmark -w /root/app membraneframeworklabs/docker_membrane mix run benchmark/compare.exs /root/results/feature_branch_results /root/results/master_results
- run:
command: rm ~/results/*
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/children_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ defmodule Membrane.ChildrenSpec do
Membrane won't send smaller demand than `minimal demand`, to reduce demands' overhead. However, the user will always receive
as many buffers, as demanded, all excess buffers will be queued internally.
Used only for pads working in `:manual` flow control mode. See `t:Membrane.Pad.flow_control/0`
for more info. Defaults to `#{Membrane.Core.Element.InputQueue.default_min_demand_factor()}` (the default may change in the future).
for more info. Defaults to `#{Membrane.Core.Element.ManualFlowController.InputQueue.default_min_demand_factor()}` (the default may change in the future).
- `auto_demand_size` - Size of automatically generated demands. Used only for pads working in `:auto` flow control mode.
See `t:Membrane.Pad.flow_control/0` for more info.
- `throttling_factor` - an integer specifying how frequently should a sender update the number of buffers in the `Toilet`. Defaults to 1,
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ defmodule Membrane.Core.Bin do

if node do
result = :rpc.call(node, GenServer, :start, [__MODULE__, options])

# TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved
with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid)
result
else
Expand Down
2 changes: 0 additions & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ defmodule Membrane.Core.Bin.State do
terminating?: boolean(),
resource_guard: Membrane.ResourceGuard.t(),
setup_incomplete?: boolean(),
handling_action?: boolean(),
stalker: Membrane.Core.Stalker.t()
}

Expand Down Expand Up @@ -73,7 +72,6 @@ defmodule Membrane.Core.Bin.State do
initialized?: false,
terminating?: false,
setup_incomplete?: false,
handling_action?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil,
Expand Down
21 changes: 0 additions & 21 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.Core.Component

require Membrane.Logger

Expand Down Expand Up @@ -189,16 +188,6 @@ defmodule Membrane.Core.CallbackHandler do
reraise e, __STACKTRACE__
end

was_handling_action? = state.handling_action?
state = %{state | handling_action?: true}

# Updating :supplying_demand? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
try do
Expand All @@ -213,16 +202,6 @@ defmodule Membrane.Core.CallbackHandler do
end
end)

state =
if was_handling_action?,
do: state,
else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_supplying_demand?,
do: %{state | supplying_demand?: false},
else: state

handler_module.handle_end_of_actions(state)
end
end
3 changes: 2 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Membrane.Core.Child.PadModel do

alias Membrane.Core.Child
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.Core.Element.ManualFlowController.InputQueue
alias Membrane.{Pad, UnknownPadError}

@type bin_pad_data :: %Membrane.Bin.PadData{
Expand Down Expand Up @@ -39,7 +40,7 @@ defmodule Membrane.Core.Child.PadModel do
pid: pid,
other_ref: Pad.ref(),
sticky_messages: [Membrane.Event.t()],
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
input_queue: InputQueue.t() | nil,
options: %{optional(atom) => any},
auto_demand_size: pos_integer() | nil,
sticky_events: [Membrane.Event.t()],
Expand Down
8 changes: 3 additions & 5 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ defmodule Membrane.Core.Element do
alias Membrane.Core.Element.{
BufferController,
DemandController,
DemandHandler,
EffectiveFlowController,
EventController,
LifecycleController,
ManualFlowController,
PadController,
State,
StreamFormatController
Expand Down Expand Up @@ -85,8 +85,6 @@ defmodule Membrane.Core.Element do
# rpc if necessary
if node do
result = :rpc.call(node, GenServer, :start, [__MODULE__, options])

# TODO: use an atomic way of linking once https://github.com/erlang/otp/issues/6375 is solved
with {:start_link, {:ok, pid}} <- {method, result}, do: Process.link(pid)
result
else
Expand Down Expand Up @@ -211,13 +209,13 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
state = ManualFlowController.resume_delayed_demands_loop(state)
{:noreply, state}
end

defp do_handle_info(Message.new(:buffer, buffers, _opts) = msg, state) do
pad_ref = Message.for_pad(msg)
state = BufferController.handle_buffer(pad_ref, buffers, state)
state = BufferController.handle_incoming_buffers(pad_ref, buffers, state)
{:noreply, state}
end

Expand Down
64 changes: 30 additions & 34 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ defmodule Membrane.Core.Element.ActionHandler do
}

alias Membrane.Core.Element.{
AutoFlowController,
DemandController,
DemandHandler,
ManualFlowController,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand All @@ -50,34 +50,29 @@ defmodule Membrane.Core.Element.ActionHandler do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state

state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)

state
if Enum.random([true, false]) do
state
|> handle_pads_to_snapshot()
|> maybe_handle_delayed_demands()
else
state
|> maybe_handle_delayed_demands()
|> handle_pads_to_snapshot()
end
end

defp maybe_handle_delayed_demands(state) do
with %{supplying_demand?: false} <- state do
DemandHandler.handle_delayed_demands(state)
with %{delay_demands?: false} <- state do
ManualFlowController.handle_delayed_demands(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
with %{handling_action?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end
defp handle_pads_to_snapshot(state) do
state.pads_to_snapshot
|> Enum.shuffle()
|> Enum.reduce(state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end

@impl CallbackHandler
Expand Down Expand Up @@ -178,13 +173,13 @@ defmodule Membrane.Core.Element.ActionHandler do
@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
AutoFlowController.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
AutoFlowController.resume_demands(in_ref, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -239,7 +234,7 @@ defmodule Membrane.Core.Element.ActionHandler do
%State{type: type} = state
)
when is_pad_ref(pad_ref) and is_demand_size(size) and type in [:sink, :filter, :endpoint] do
supply_demand(pad_ref, size, state)
delay_supplying_demand(pad_ref, size, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -396,26 +391,27 @@ defmodule Membrane.Core.Element.ActionHandler do
end
end

@spec supply_demand(
@spec delay_supplying_demand(
Pad.ref(),
Action.demand_size(),
State.t()
) :: State.t()
defp supply_demand(pad_ref, 0, state) do
defp delay_supplying_demand(pad_ref, 0, state) do
Membrane.Logger.debug_verbose("Ignoring demand of size of 0 on pad #{inspect(pad_ref)}")
state
end

defp supply_demand(pad_ref, size, _state)
defp delay_supplying_demand(pad_ref, size, _state)
when is_integer(size) and size < 0 do
raise ElementError,
"Tried to request a negative demand of size #{inspect(size)} on pad #{inspect(pad_ref)}"
end

defp supply_demand(pad_ref, size, state) do
defp delay_supplying_demand(pad_ref, size, state) do
with %{direction: :input, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.supply_demand(pad_ref, size, state)
state = ManualFlowController.update_demand(pad_ref, size, state)
ManualFlowController.delay_supplying_demand(pad_ref, state)
else
%{direction: :output} ->
raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref
Expand All @@ -435,7 +431,7 @@ defmodule Membrane.Core.Element.ActionHandler do
when type in [:source, :filter, :endpoint] do
with %{direction: :output, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.handle_redemand(pad_ref, state)
ManualFlowController.delay_redemand(pad_ref, state)
else
%{direction: :input} ->
raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}"
Expand Down Expand Up @@ -471,10 +467,10 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
ManualFlowController.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> AutoFlowController.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
defmodule Membrane.Core.Element.AutoFlowController do
@moduledoc false

alias Membrane.Buffer
Expand Down
38 changes: 22 additions & 16 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,28 @@ defmodule Membrane.Core.Element.BufferController do

alias Membrane.Core.Element.{
ActionHandler,
AutoFlowController,
CallbackContext,
DemandHandler,
EventController,
InputQueue,
ManualFlowController,
PlaybackQueue,
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.Element.ManualFlowController.InputQueue

alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry

@doc """
Handles incoming buffer: either stores it in InputQueue, or executes element's
callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2`
callback. Also calls `Membrane.Core.Element.ManualFlowController.supply_demand/2`
to check if there are any unsupplied demands.
"""
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_buffer(pad_ref, buffers, state) do
@spec handle_incoming_buffers(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
def handle_incoming_buffers(pad_ref, buffers, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
%{
Expand All @@ -49,48 +50,53 @@ defmodule Membrane.Core.Element.BufferController do
EventController.handle_start_of_stream(pad_ref, state)
end

do_handle_buffer(pad_ref, data, buffers, state)
do_handle_incoming_buffers(pad_ref, data, buffers, state)
else
pad: {:error, :unknown_pad} ->
# We've got a buffer from already unlinked pad
state

playback: _playback ->
PlaybackQueue.store(&handle_buffer(pad_ref, buffers, &1), state)
PlaybackQueue.store(&handle_incoming_buffers(pad_ref, buffers, &1), state)
end
end

@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
@spec do_handle_incoming_buffers(
Pad.ref(),
PadModel.pad_data(),
[Buffer.t()] | Buffer.t(),
State.t()
) ::
State.t()
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit, stalker_metrics: stalker_metrics} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
AutoFlowController.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
AutoFlowController.auto_adjust_atomic_demand(pad_ref, state)
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :manual} = data, buffers, state) do
%{input_queue: old_input_queue} = data

input_queue = InputQueue.store(old_input_queue, buffers)
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)

if old_input_queue |> InputQueue.empty?() do
DemandHandler.supply_demand(pad_ref, state)
if InputQueue.empty?(old_input_queue) do
ManualFlowController.supply_demand(pad_ref, state)
else
state
end
end

defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
defp do_handle_incoming_buffers(pad_ref, %{flow_control: :push}, buffers, state) do
exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
Loading