-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto push full impl #537
Auto push full impl #537
Conversation
17ac82e
to
4af3609
Compare
4af3609
to
70f1842
Compare
8d7e3d9
to
e819388
Compare
436a9be
to
38ff2a7
Compare
defmodule Membrane.Core.Element.DemandCounter.DistributedFlowMode do | ||
@moduledoc false | ||
|
||
alias Membrane.Core.Element.DemandCounter.DistributedAtomic | ||
alias Membrane.Core.Element.EffectiveFlowController | ||
|
||
@type t :: DistributedAtomic.t() | ||
@type flow_mode_value :: | ||
EffectiveFlowController.effective_flow_control() | :to_be_resolved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defmodule Membrane.Core.Element.DemandCounter.DistributedFlowMode do | |
@moduledoc false | |
alias Membrane.Core.Element.DemandCounter.DistributedAtomic | |
alias Membrane.Core.Element.EffectiveFlowController | |
@type t :: DistributedAtomic.t() | |
@type flow_mode_value :: | |
EffectiveFlowController.effective_flow_control() | :to_be_resolved | |
defmodule Membrane.Core.Element.DemandCounter.DistributedFlowStatus do | |
@moduledoc false | |
alias Membrane.Core.Element.DemandCounter.DistributedAtomic | |
alias Membrane.Core.Element.EffectiveFlowController | |
@type t :: DistributedAtomic.t() | |
@type flow_control_status :: | |
{:resolved, EffectiveFlowController.effective_flow_control()} | :to_be_resolved |
receiver_demand_unit: Membrane.Buffer.Metric.unit() | ||
} | ||
|
||
@type flow_mode :: DistributedFlowMode.flow_mode_value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@type flow_mode :: DistributedFlowMode.flow_mode_value() |
@@ -91,7 +89,9 @@ defmodule Membrane.Core.Element.DemandController do | |||
@spec decrease_demand_by_outgoing_buffers(Pad.ref(), [Buffer.t()], State.t()) :: State.t() | |||
def decrease_demand_by_outgoing_buffers(pad_ref, buffers, state) do | |||
pad_data = PadModel.get_data!(state, pad_ref) | |||
buffers_size = Buffer.Metric.from_unit(pad_data.other_demand_unit).buffers_size(buffers) | |||
|
|||
demand_unit = pad_data.demand_unit || pad_data.other_demand_unit || :buffers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
demand unit should be resolved when the pad is linked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this alternative matters when output pad has :push
flow control
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe we should calculate the demand unit no matter the flow control (since the demand counter is always there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it does not look good to have non-nil
demand_unit
when flow_control
is :push
. I have changed this line to make it explicit, that we take other_demand_unit
into account only when flow_control
is :push
and pad_data.demand_unit
when flow_control
is :pull
or :auto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it does not look good to have non-nil demand_unit when flow_control is
:push
.
IMO it looks even worse to have demand_counter and not have demand_unit there 🤔 And I see no point in resolving it each time when we can do it once.
Maybe the field should be buffer_size_unit
, not demand_unit
:D
@@ -241,6 +244,18 @@ defmodule Membrane.Core.Element.PadController do | |||
end | |||
end | |||
|
|||
@spec handle_input_pad_added(Pad.ref(), State.t()) :: State.t() | |||
def handle_input_pad_added(pad_ref, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only called once, in the same module. Remove it and put the contents where it's called
27c0b66
to
1a1957a
Compare
@@ -19,7 +19,7 @@ defmodule Membrane.Core.Element.DemandCounter.DistributedAtomic do | |||
@spec new(integer() | nil) :: t | |||
def new(initial_value \\ nil) do | |||
atomic_ref = :atomics.new(1, []) | |||
{:ok, worker} = Worker.start_link() | |||
{:ok, worker} = Worker.start_link(self()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's start it under the supervisor
%{effective_flow_control: effective_flow_control} = state | ||
), | ||
do: state | ||
|
||
defp set_effective_flow_control(new_effective_flow_control, state) do | ||
defp set_effective_flow_control(new_effective_flow_control, last_changed_pad, state) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defp set_effective_flow_control(new_effective_flow_control, last_changed_pad, state) do | |
defp set_effective_flow_control(new_effective_flow_control, triggerer_pad, state) do |
[pad_data.other_ref, new_effective_flow_control] | ||
) | ||
|
||
state | ||
|
||
{pad_ref, %{direction: :input} = pad_data}, state when last_changed_pad != nil -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{pad_ref, %{direction: :input} = pad_data}, state when last_changed_pad != nil -> | |
{pad_ref, %{direction: :input} = pad_data}, state -> |
lib/membrane/core/element/demand_counter/distributed_flow_status.ex
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
awesome |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one small comment concerning the docs, but generally speaking:
LGTM 🥇
@@ -1,7 +1,7 @@ | |||
defmodule Membrane.Core.Element.DemandController do | |||
@moduledoc false | |||
|
|||
# Module handling changes in values of output pads demand counters | |||
# Module handling changes in values of output pads demand atomics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Module handling changes in values of output pads demand atomics | |
# Module handling changes in values of output pads atomic demand |
To be fair, I believe it should be written that way, but I don't find it anyhow crucial :D
Related Jira ticket: https://membraneframework.atlassian.net/browse/MC-163