Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto push full impl #537

Merged
merged 71 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
821f8c9
Implement resolving auto pads mode
FelonEkonom Feb 27, 2023
328837b
Refactor effective flow control related code
FelonEkonom Feb 27, 2023
1d5f38b
Rename file auto_demand_controller to effective_flow_control_controller
FelonEkonom Feb 27, 2023
490d651
Add simple test for effective flow control resoultion
FelonEkonom Feb 28, 2023
c291687
Refactor & upgrade toilet
FelonEkonom Feb 28, 2023
ba8bb81
Fix credo issues
FelonEkonom Mar 6, 2023
62315ba
Fill toilet always on sending buffers via push output pad
FelonEkonom Mar 7, 2023
7116b94
Drain toilet only if it exists
FelonEkonom Mar 8, 2023
a156819
Fix credo issue
FelonEkonom Mar 8, 2023
eaab0bc
Rename :undefined to :not_resolved
FelonEkonom Mar 9, 2023
a1dcfef
Implement comments from PR
FelonEkonom Mar 9, 2023
89e85bc
Merge remote-tracking branch 'origin/master' into auto-push
FelonEkonom Mar 9, 2023
012574b
Fix dialyzer issues
FelonEkonom Mar 9, 2023
e5c8305
Refactor Element test
FelonEkonom Mar 9, 2023
70f1842
Implement full version of auto-push
FelonEkonom Mar 15, 2023
e819388
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom Mar 15, 2023
38ff2a7
Implement DemandCounter
FelonEkonom Mar 17, 2023
a95ee9a
wip
FelonEkonom Mar 17, 2023
73820a5
wip
FelonEkonom Mar 17, 2023
6801f3c
wip
FelonEkonom Mar 20, 2023
3fe1aa0
wip
FelonEkonom Mar 23, 2023
bb520e9
wip
FelonEkonom Mar 28, 2023
bdc2c69
wip
FelonEkonom Mar 30, 2023
0b41d56
Fix bugs in integration tests
FelonEkonom Apr 3, 2023
e559e89
Fix all failing tests
FelonEkonom Apr 3, 2023
dc26072
Write unit tests for DemandCounter
FelonEkonom Apr 4, 2023
a1f7380
Write integration tests for toilet forwarding
FelonEkonom Apr 5, 2023
3f1fefc
Fix StreamFormatController and LifecycleController unit tests
FelonEkonom Apr 5, 2023
dd47e4f
Fix DistributedPipelineTest
FelonEkonom Apr 5, 2023
3be90a3
Fix input queue unit tests
FelonEkonom Apr 6, 2023
3e7deef
Remove unnecessary warns and comments
FelonEkonom Apr 6, 2023
aa201a3
Fix credo issues
FelonEkonom Apr 6, 2023
fc3b7d3
Fix dialyzer issues
FelonEkonom Apr 6, 2023
37ca9e4
Refactor DemandController and DemandHander
FelonEkonom Apr 6, 2023
b60a4d0
Remove unnecesary require Membrane.Logger
FelonEkonom Apr 6, 2023
7936d53
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom Apr 6, 2023
2a5bb10
Reduce autodemands size
FelonEkonom Apr 6, 2023
dd90dc8
Implement comments from PR - wip
FelonEkonom Apr 20, 2023
40b1d08
Improve log on stream format patter error
FelonEkonom Apr 20, 2023
2f6d685
Merge branch 'master' into auto-push-full-impl
FelonEkonom Apr 20, 2023
7d5620f
Remove info about initiator from handle_link
FelonEkonom Apr 21, 2023
3476724
Rename PadData.demand to PadData.demand_snapshot
FelonEkonom Apr 21, 2023
5a09bfe
Rename check_demand_counter to snpashot_demand_counter
FelonEkonom Apr 21, 2023
3970934
Refactor DemandController and DemandHandler
FelonEkonom Apr 21, 2023
f18195d
Rename overflow_limit in DemandCounter to toilet_capacity
FelonEkonom Apr 24, 2023
1eef455
Sort aliases
FelonEkonom Apr 25, 2023
ca7fef7
Add PadData fields description
FelonEkonom Apr 25, 2023
9f3e046
Add comments about effective flow control resolution
FelonEkonom Apr 25, 2023
456aafa
Add lacking alias
FelonEkonom Apr 25, 2023
241d807
Implement suggestions from CR wip
FelonEkonom May 12, 2023
dd4be64
Implement suggestions from CR wip
FelonEkonom May 15, 2023
91375b2
Implement suggestions from CR
FelonEkonom May 15, 2023
bdbc08e
Implement suggestion from CR wip
FelonEkonom May 15, 2023
72e8f47
Implement suggestion from CR
FelonEkonom May 16, 2023
0a88e25
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom May 16, 2023
97d0b22
Terminate distributed atomic worker after it's creator death
FelonEkonom May 16, 2023
47964a5
Update algorithm of changing flags in DemandCounter
FelonEkonom May 17, 2023
27bc14c
Implement suggestion from CR wip
FelonEkonom May 17, 2023
e713e52
Refactor getting demand unit in demand controller
FelonEkonom May 18, 2023
1a1957a
Rename lacking_buffers_size to demand
FelonEkonom May 18, 2023
71ca3e7
Implement suggestion from CR
FelonEkonom May 19, 2023
ae869b2
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom May 19, 2023
7959a6a
Fix Error log
FelonEkonom May 19, 2023
812cf9c
Rename DemandCounter to AtomicDemand
FelonEkonom May 19, 2023
0821fb5
Start AtomicDemand Worker under supervisor
FelonEkonom May 19, 2023
765adad
Fix AtomicDemand tests wip
FelonEkonom May 19, 2023
70840cb
Sort aliases
FelonEkonom May 19, 2023
efc9edb
Fix tests failing because of changes in AtomicDemand
FelonEkonom May 22, 2023
f3ef798
Implememnt suggestions from CR
FelonEkonom May 22, 2023
74a94e6
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom May 22, 2023
3d8f81c
Fix typo in comment
FelonEkonom May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(
Message.new(:other_effective_flow_control_resolved, [
Message.new(:sender_effective_flow_control_resolved, [
input_pad_ref,
effective_flow_control
]),
state
) do
state =
EffectiveFlowController.handle_other_effective_flow_control(
EffectiveFlowController.handle_sender_effective_flow_control(
input_pad_ref,
effective_flow_control,
state
Expand Down
5 changes: 2 additions & 3 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ defmodule Membrane.Core.Element.BufferController do
@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
State.t()
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{lacking_buffer_size: lacking_buffer_size, demand_unit: demand_unit} = data
%{demand: demand, demand_unit: demand_unit} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

state =
PadModel.set_data!(state, pad_ref, :lacking_buffer_size, lacking_buffer_size - buf_size)
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)

state = AutoFlowUtils.auto_adjust_demand_counter(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ defmodule Membrane.Core.Element.DemandController do
def decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) do
pad_data = PadModel.get_data!(state, pad_ref)

demand_unit = pad_data.demand_unit || pad_data.other_demand_unit || :buffers
demand_unit =
case pad_data.flow_control do
:push -> pad_data.other_demand_unit || :buffers
_pull_or_auto -> pad_data.demand_unit
end

buffers_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

demand_snapshot = pad_data.demand_snapshot - buffers_size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp do_auto_adjust_demand_counter(pad_data, state) when is_input_auto_pad_data(pad_data) do
if increase_demand_counter?(pad_data, state) do
diff = pad_data.auto_demand_size - pad_data.lacking_buffer_size
diff = pad_data.auto_demand_size - pad_data.demand
:ok = DemandCounter.increase(pad_data.demand_counter, diff)

PadModel.set_data!(
state,
pad_data.ref,
:lacking_buffer_size,
:demand,
pad_data.auto_demand_size
)
else
Expand All @@ -46,7 +46,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp increase_demand_counter?(pad_data, state) do
state.effective_flow_control == :pull and
pad_data.lacking_buffer_size < pad_data.auto_demand_size / 2 and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &demand_counter_positive?(&1, state))
end

Expand Down
55 changes: 29 additions & 26 deletions lib/membrane/core/element/demand_counter.ex
varsill marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Membrane.Core.Element.DemandCounter do
@moduledoc false
alias Membrane.Core.Element.DemandCounter.DistributedFlowStatus
alias Membrane.Core.Element.EffectiveFlowController

alias __MODULE__.{
DistributedAtomic,
DistributedFlowMode
DistributedFlowStatus
varsill marked this conversation as resolved.
Show resolved Hide resolved
}

require Membrane.Core.Message, as: Message
Expand All @@ -17,9 +18,9 @@ defmodule Membrane.Core.Element.DemandCounter do

@opaque t :: %__MODULE__{
counter: DistributedAtomic.t(),
receiver_mode: DistributedFlowMode.t(),
receiver_status: DistributedFlowStatus.t(),
receiver_process: Process.dest(),
sender_mode: DistributedFlowMode.t(),
sender_status: DistributedFlowStatus.t(),
sender_process: Process.dest(),
sender_pad_ref: Pad.ref(),
toilet_capacity: neg_integer(),
Expand All @@ -29,13 +30,13 @@ defmodule Membrane.Core.Element.DemandCounter do
receiver_demand_unit: Membrane.Buffer.Metric.unit()
}

@type flow_mode :: DistributedFlowMode.flow_mode_value()
@type flow_mode :: DistributedFlowStatus.value()

@enforce_keys [
:counter,
:receiver_mode,
:receiver_status,
:receiver_process,
:sender_mode,
:sender_status,
:sender_process,
:sender_pad_ref,
:throttling_factor,
Expand All @@ -46,7 +47,7 @@ defmodule Membrane.Core.Element.DemandCounter do
defstruct @enforce_keys ++ [buffered_decrementation: 0, toilet_overflowed?: false]

@spec new(
receiver_mode :: EffectiveFlowController.effective_flow_control(),
receiver_effective_flow_control :: EffectiveFlowController.effective_flow_control(),
receiver_process :: Process.dest(),
receiver_demand_unit :: Membrane.Buffer.Metric.unit(),
sender_process :: Process.dest(),
Expand All @@ -55,7 +56,7 @@ defmodule Membrane.Core.Element.DemandCounter do
throttling_factor :: pos_integer() | nil
) :: t
def new(
receiver_mode,
receiver_effective_flow_control,
receiver_process,
receiver_demand_unit,
sender_process,
Expand All @@ -72,11 +73,13 @@ defmodule Membrane.Core.Element.DemandCounter do
true -> @distributed_default_throttling_factor
end

receiver_status = DistributedFlowStatus.new({:resolved, receiver_effective_flow_control})

%__MODULE__{
counter: counter,
receiver_mode: DistributedFlowMode.new(receiver_mode),
receiver_status: receiver_status,
receiver_process: receiver_process,
sender_mode: DistributedFlowMode.new(:to_be_resolved),
sender_status: DistributedFlowStatus.new(:to_be_resolved),
sender_process: sender_process,
sender_pad_ref: sender_pad_ref,
toilet_capacity: toilet_capacity || default_toilet_capacity(receiver_demand_unit),
Expand All @@ -85,30 +88,30 @@ defmodule Membrane.Core.Element.DemandCounter do
}
end

@spec set_sender_mode(t, EffectiveFlowController.effective_flow_control()) :: :ok
def set_sender_mode(%__MODULE__{} = demand_counter, mode) do
DistributedFlowMode.put(
demand_counter.sender_mode,
@spec set_sender_status(t, DistributedFlowStatus.value()) :: :ok
def set_sender_status(%__MODULE__{} = demand_counter, mode) do
DistributedFlowStatus.put(
varsill marked this conversation as resolved.
Show resolved Hide resolved
demand_counter.sender_status,
mode
)
end

@spec get_sender_mode(t) :: flow_mode()
def get_sender_mode(%__MODULE__{} = demand_counter) do
DistributedFlowMode.get(demand_counter.sender_mode)
@spec get_sender_status(t) :: DistributedFlowStatus.value()
def get_sender_status(%__MODULE__{} = demand_counter) do
DistributedFlowStatus.get(demand_counter.sender_status)
end

@spec set_receiver_mode(t, flow_mode()) :: :ok
def set_receiver_mode(%__MODULE__{} = demand_counter, mode) do
DistributedFlowMode.put(
demand_counter.receiver_mode,
@spec set_receiver_status(t, DistributedFlowStatus.value()) :: :ok
def set_receiver_status(%__MODULE__{} = demand_counter, mode) do
DistributedFlowStatus.put(
demand_counter.receiver_status,
mode
)
end

@spec get_receiver_mode(t) :: flow_mode()
def get_receiver_mode(%__MODULE__{} = demand_counter) do
DistributedFlowMode.get(demand_counter.receiver_mode)
@spec get_receiver_status(t) :: DistributedFlowStatus.value()
def get_receiver_status(%__MODULE__{} = demand_counter) do
DistributedFlowStatus.get(demand_counter.receiver_status)
end

@spec increase(t, non_neg_integer()) :: :ok
Expand Down Expand Up @@ -153,8 +156,8 @@ defmodule Membrane.Core.Element.DemandCounter do
demand_counter = %{demand_counter | buffered_decrementation: 0}

if not demand_counter.toilet_overflowed? and
get_receiver_mode(demand_counter) == :pull and
get_sender_mode(demand_counter) == :push and
get_receiver_status(demand_counter) == {:resolved, :pull} and
get_sender_status(demand_counter) == {:resolved, :push} and
-1 * counter_value > demand_counter.toilet_capacity do
overflow(demand_counter, counter_value)
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Membrane.Core.Element.DemandCounter.DistributedAtomic do
@spec new(integer() | nil) :: t
def new(initial_value \\ nil) do
atomic_ref = :atomics.new(1, [])
{:ok, worker} = Worker.start_link()
{:ok, worker} = Worker.start_link(self())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's start it under the supervisor


distributed_atomic = %__MODULE__{
atomic_ref: atomic_ref,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ defmodule Membrane.Core.Element.DemandCounter.DistributedAtomic.Worker do

@type t :: pid()

@spec start_link() :: {:ok, t}
def start_link(), do: GenServer.start_link(__MODULE__, nil)
@spec start_link(pid()) :: {:ok, t}
def start_link(owner_pid), do: GenServer.start_link(__MODULE__, owner_pid)

@impl true
def init(_arg) do
{:ok, nil, :hibernate}
def init(owner_pid) do
ref = Process.monitor(owner_pid)
{:ok, %{ref: ref}, :hibernate}
end

@impl true
Expand All @@ -39,4 +40,9 @@ defmodule Membrane.Core.Element.DemandCounter.DistributedAtomic.Worker do
:atomics.put(atomic_ref, 1, value)
{:noreply, nil}
end

@impl true
def handle_info({:DOWN, ref, _process, _pid, _reason}, %{ref: ref} = state) do
{:stop, :normal, state}
end
end
38 changes: 0 additions & 38 deletions lib/membrane/core/element/demand_counter/distributed_flow_mode.ex

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Membrane.Core.Element.DemandCounter.DistributedFlowStatus do
@moduledoc false

alias Membrane.Core.Element.DemandCounter.DistributedAtomic
alias Membrane.Core.Element.EffectiveFlowController

@type t :: DistributedAtomic.t()
@type value :: {:resolved, EffectiveFlowController.effective_flow_control()} | :to_be_resolved

@spec new(value) :: t
def new(initial_value) do
initial_value
|> flow_status_to_int()
|> DistributedAtomic.new()
end

@spec get(t) :: value()
def get(distributed_atomic) do
distributed_atomic
|> DistributedAtomic.get()
|> int_to_flow_status()
end

@spec put(t, value()) :: :ok
def put(distributed_atomic, value) do
varsill marked this conversation as resolved.
Show resolved Hide resolved
value = flow_status_to_int(value)
DistributedAtomic.put(distributed_atomic, value)
end

defp int_to_flow_status(0), do: :to_be_resolved
defp int_to_flow_status(1), do: {:resolved, :push}
defp int_to_flow_status(2), do: {:resolved, :pull}

defp flow_status_to_int(:to_be_resolved), do: 0
defp flow_status_to_int({:resolved, :push}), do: 1
defp flow_status_to_int({:resolved, :pull}), do: 2
end
Loading