Skip to content

Commit

Permalink
Implement full version of auto-push
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 15, 2023
1 parent e5c8305 commit 4af3609
Show file tree
Hide file tree
Showing 17 changed files with 418 additions and 241 deletions.
15 changes: 2 additions & 13 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ defmodule Membrane.Core.Element.BufferController do
CallbackContext,
DemandController,
DemandHandler,
EffectiveFlowController,
EventController,
InputQueue,
PlaybackQueue,
Expand Down Expand Up @@ -62,18 +61,8 @@ defmodule Membrane.Core.Element.BufferController do
%{demand: demand, demand_unit: demand_unit} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

state =
EffectiveFlowController.pad_effective_flow_control(pad_ref, state)
|> case do
:push ->
if data.toilet, do: Toilet.drain(data.toilet, buf_size)
state

:pull ->
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
DemandController.send_auto_demand_if_needed(pad_ref, state)
end

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
state = DemandController.send_auto_demand_if_needed(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
end

Expand Down
37 changes: 17 additions & 20 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ defmodule Membrane.Core.Element.DemandController do
def handle_demand(pad_ref, size, state) do
withl pad: {:ok, data} <- PadModel.get_data(state, pad_ref),
playback: %State{playback: :playing} <- state do
%{direction: :output, flow_control: flow_control} = data

if flow_control == :push,
do: raise("Pad with :push control mode cannot handle demand.")
if data.direction == :input,
do: raise("Input pad cannot handle demand.")

do_handle_demand(pad_ref, size, data, state)
else
Expand Down Expand Up @@ -71,6 +69,10 @@ defmodule Membrane.Core.Element.DemandController do
end
end

defp do_handle_demand(_pad_ref, _size, %{flow_control: :push} = _data, state) do
state
end

@doc """
Sends auto demand to an input pad if it should be sent.
Expand All @@ -79,30 +81,29 @@ defmodule Membrane.Core.Element.DemandController do
associated output pads.
"""
@spec send_auto_demand_if_needed(Pad.ref(), State.t()) :: State.t()
def send_auto_demand_if_needed(pad_ref, %{effective_flow_control: :pull} = state) do
def send_auto_demand_if_needed(pad_ref, state) do
data = PadModel.get_data!(state, pad_ref)

%{
flow_control: :auto,
demand: demand,
toilet: toilet,
associated_pads: associated_pads,
auto_demand_size: demand_request_size
} = data

demand =
if demand <= div(demand_request_size, 2) and auto_demands_positive?(associated_pads, state) do
if toilet != nil and data.other_effective_flow_control in [:push, :not_resolved] do
Toilet.drain(toilet, demand_request_size - demand)
end
if demand <= div(demand_request_size, 2) and
(state.effective_flow_control == :push or
auto_demands_positive?(associated_pads, state)) do
Membrane.Logger.debug_verbose(
"Sending auto demand of size #{demand_request_size - demand} on pad #{inspect(pad_ref)}"
)

if data.other_effective_flow_control in [:pull, :not_resolved] do
Membrane.Logger.debug_verbose(
"Sending auto demand of size #{demand_request_size - demand} on pad #{inspect(pad_ref)}"
)
%{pid: pid, other_ref: other_ref} = data
Message.send(pid, :demand, demand_request_size - demand, for_pad: other_ref)

%{pid: pid, other_ref: other_ref} = data
Message.send(pid, :demand, demand_request_size - demand, for_pad: other_ref)
end
if toilet, do: Toilet.drain(toilet, demand_request_size - demand)

demand_request_size
else
Expand All @@ -116,10 +117,6 @@ defmodule Membrane.Core.Element.DemandController do
PadModel.set_data!(state, pad_ref, :demand, demand)
end

def send_auto_demand_if_needed(_pad_ref, state) do
state
end

defp auto_demands_positive?(associated_pads, state) do
Enum.all?(associated_pads, &(PadModel.get_data!(state, &1, :demand) > 0))
end
Expand Down
27 changes: 10 additions & 17 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule Membrane.Core.Element.DemandHandler do
alias Membrane.Core.Element.{
BufferController,
DemandController,
EffectiveFlowController,
EventController,
InputQueue,
State,
Expand Down Expand Up @@ -104,22 +103,22 @@ defmodule Membrane.Core.Element.DemandHandler do
[Buffer.t()],
State.t()
) :: State.t()
def handle_outgoing_buffers(pad_ref, data, buffers, state) do
EffectiveFlowController.pad_effective_flow_control(pad_ref, state)
|> do_handle_outgoing_buffers(pad_ref, data, buffers, state)
end

defp do_handle_outgoing_buffers(:pull, pad_ref, data, buffers, state) do
def handle_outgoing_buffers(pad_ref, %{flow_control: flow_control} = data, buffers, state)
when flow_control in [:auto, :manual] do
%{other_demand_unit: other_demand_unit, demand: demand} = data
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers)
PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
fill_toilet_if_exists(pad_ref, data.toilet, buf_size, state)
end

defp do_handle_outgoing_buffers(:push, pad_ref, data, buffers, state) when data.toilet != nil do
def handle_outgoing_buffers(pad_ref, %{flow_control: :push} = data, buffers, state) do
%{other_demand_unit: other_demand_unit} = data
buf_size = Buffer.Metric.from_unit(other_demand_unit).buffers_size(buffers)
fill_toilet_if_exists(pad_ref, data.toilet, buf_size, state)
end

case Toilet.fill(data.toilet, buf_size) do
defp fill_toilet_if_exists(pad_ref, toilet, buf_size, state) when toilet != nil do
case Toilet.fill(toilet, buf_size) do
{:ok, toilet} ->
PadModel.set_data!(state, pad_ref, :toilet, toilet)

Expand All @@ -130,13 +129,7 @@ defmodule Membrane.Core.Element.DemandHandler do
end
end

defp do_handle_outgoing_buffers(:push, _ref, _data, _buffers, state) do
state
end

defp do_handle_outgoing_buffers(:not_resolved, _ref, _data, _buffers, _state) do
raise "not implemented yet"
end
defp fill_toilet_if_exists(_pad_ref, nil, _buf_size, state), do: state

defp update_demand(pad_ref, size, state) when is_integer(size) do
PadModel.set_data!(state, pad_ref, :demand, size)
Expand Down
92 changes: 53 additions & 39 deletions lib/membrane/core/element/effective_flow_controller.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
defmodule Membrane.Core.Element.EffectiveFlowController do
@moduledoc false

alias Membrane.Core.Element.Toilet

alias Membrane.Core.Element.{
DemandController,
State
}

require Membrane.Core.Child.PadModel, as: PadModel
require Membrane.Core.Message, as: Message
require Membrane.Logger
require Membrane.Pad, as: Pad

@type effective_flow_control :: :push | :pull | :not_resolved
@type effective_flow_control :: :push | :pull

@spec pad_effective_flow_control(Pad.ref(), State.t()) :: effective_flow_control()
def pad_effective_flow_control(pad_ref, state) do
Expand Down Expand Up @@ -44,65 +47,76 @@ defmodule Membrane.Core.Element.EffectiveFlowController do
State.t()
def handle_other_effective_flow_control(my_pad_ref, other_effective_flow_control, state) do
pad_data = PadModel.get_data!(state, my_pad_ref)

pad_data = %{pad_data | other_effective_flow_control: other_effective_flow_control}
state = PadModel.set_data!(state, my_pad_ref, pad_data)

if state.effective_flow_control == :push and pad_data.direction == :input and
other_effective_flow_control == :pull do
# TODO: implement this
raise "not implemented yet"
end

with :playing <- state.playback,
%{direction: :input, flow_control: :auto} <- pad_data,
mode when mode in [:push, :pull] <- other_effective_flow_control do
if state.playback == :playing and pad_data.direction == :input and
pad_data.flow_control == :auto and
other_effective_flow_control != state.effective_flow_control do
resolve_effective_flow_control(state)
else
_other -> state
state
end
end

@spec resolve_effective_flow_control(State.t()) :: State.t()
def resolve_effective_flow_control(%State{effective_flow_control: :not_resolved} = state) do
input_auto_pads =
def resolve_effective_flow_control(state) do
senders_flow_modes =
Map.values(state.pads_data)
|> Enum.filter(&(&1.direction == :input && &1.flow_control == :auto))
|> Enum.map(& &1.other_effective_flow_control)

effective_flow_control =
new_effective_flow_control =
cond do
Enum.any?(input_auto_pads, &(&1.other_effective_flow_control == :pull)) ->
:pull
Enum.member?(senders_flow_modes, :pull) -> :pull
Enum.member?(senders_flow_modes, :push) -> :push
true -> state.effective_flow_control
end

Enum.any?(input_auto_pads, &(&1.other_effective_flow_control == :push)) ->
:push
set_effective_flow_control(new_effective_flow_control, state)
end

true ->
:not_resolved
end
defp set_effective_flow_control(
effective_flow_control,
%{effective_flow_control: effective_flow_control} = state
),
do: state

defp set_effective_flow_control(new_effective_flow_control, state) do
Membrane.Logger.debug(
"Transiting `flow_control: :auto` pads to #{inspect(new_effective_flow_control)} effective flow control"
)

state = %{state | effective_flow_control: effective_flow_control}
state = %{state | effective_flow_control: new_effective_flow_control}

_ignored =
if effective_flow_control != :not_resolved do
for {_ref, %{flow_control: :auto} = pad_data} <- state.pads_data do
Message.send(pad_data.pid, :other_effective_flow_control_resolved, [
pad_data.other_ref,
effective_flow_control
])
for {_ref, %{flow_control: :auto} = pad_data} <- state.pads_data do
Message.send(pad_data.pid, :other_effective_flow_control_resolved, [
pad_data.other_ref,
new_effective_flow_control
])

case pad_data.direction do
:input ->
Toilet.set_receiver_effective_flow_control(
pad_data.toilet,
new_effective_flow_control
)

:output ->
Toilet.set_sender_effective_flow_control(
pad_data.toilet,
new_effective_flow_control
)
end
end

with %{effective_flow_control: :pull} <- state do
Enum.reduce(state.pads_data, state, fn
{pad_ref, %{flow_control: :auto, direction: :input}}, state ->
DemandController.send_auto_demand_if_needed(pad_ref, state)
Enum.reduce(state.pads_data, state, fn
{pad_ref, %{flow_control: :auto, direction: :input}}, state ->
DemandController.send_auto_demand_if_needed(pad_ref, state)

_pad_entry, state ->
state
end)
end
_pad_entry, state ->
state
end)
end

def resolve_effective_flow_control(state), do: state
end
2 changes: 1 addition & 1 deletion lib/membrane/core/element/input_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ defmodule Membrane.Core.Element.InputQueue do
Sending demand of size #{inspect(to_demand)} to output #{inspect(linked_output_ref)}
"""
|> mk_log(input_queue)
|> Membrane.Logger.debug_verbose()
|> Membrane.Logger.debug()

Message.send(demand_pid, :demand, to_demand, for_pad: linked_output_ref)
%__MODULE__{input_queue | demand: demand - to_demand}
Expand Down
Loading

0 comments on commit 4af3609

Please sign in to comment.