Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto push full impl #537

Merged
merged 71 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
821f8c9
Implement resolving auto pads mode
FelonEkonom Feb 27, 2023
328837b
Refactor effective flow control related code
FelonEkonom Feb 27, 2023
1d5f38b
Rename file auto_demand_controller to effective_flow_control_controller
FelonEkonom Feb 27, 2023
490d651
Add simple test for effective flow control resoultion
FelonEkonom Feb 28, 2023
c291687
Refactor & upgrade toilet
FelonEkonom Feb 28, 2023
ba8bb81
Fix credo issues
FelonEkonom Mar 6, 2023
62315ba
Fill toilet always on sending buffers via push output pad
FelonEkonom Mar 7, 2023
7116b94
Drain toilet only if it exists
FelonEkonom Mar 8, 2023
a156819
Fix credo issue
FelonEkonom Mar 8, 2023
eaab0bc
Rename :undefined to :not_resolved
FelonEkonom Mar 9, 2023
a1dcfef
Implement comments from PR
FelonEkonom Mar 9, 2023
89e85bc
Merge remote-tracking branch 'origin/master' into auto-push
FelonEkonom Mar 9, 2023
012574b
Fix dialyzer issues
FelonEkonom Mar 9, 2023
e5c8305
Refactor Element test
FelonEkonom Mar 9, 2023
70f1842
Implement full version of auto-push
FelonEkonom Mar 15, 2023
e819388
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom Mar 15, 2023
38ff2a7
Implement DemandCounter
FelonEkonom Mar 17, 2023
a95ee9a
wip
FelonEkonom Mar 17, 2023
73820a5
wip
FelonEkonom Mar 17, 2023
6801f3c
wip
FelonEkonom Mar 20, 2023
3fe1aa0
wip
FelonEkonom Mar 23, 2023
bb520e9
wip
FelonEkonom Mar 28, 2023
bdc2c69
wip
FelonEkonom Mar 30, 2023
0b41d56
Fix bugs in integration tests
FelonEkonom Apr 3, 2023
e559e89
Fix all failing tests
FelonEkonom Apr 3, 2023
dc26072
Write unit tests for DemandCounter
FelonEkonom Apr 4, 2023
a1f7380
Write integration tests for toilet forwarding
FelonEkonom Apr 5, 2023
3f1fefc
Fix StreamFormatController and LifecycleController unit tests
FelonEkonom Apr 5, 2023
dd47e4f
Fix DistributedPipelineTest
FelonEkonom Apr 5, 2023
3be90a3
Fix input queue unit tests
FelonEkonom Apr 6, 2023
3e7deef
Remove unnecessary warns and comments
FelonEkonom Apr 6, 2023
aa201a3
Fix credo issues
FelonEkonom Apr 6, 2023
fc3b7d3
Fix dialyzer issues
FelonEkonom Apr 6, 2023
37ca9e4
Refactor DemandController and DemandHander
FelonEkonom Apr 6, 2023
b60a4d0
Remove unnecesary require Membrane.Logger
FelonEkonom Apr 6, 2023
7936d53
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom Apr 6, 2023
2a5bb10
Reduce autodemands size
FelonEkonom Apr 6, 2023
dd90dc8
Implement comments from PR - wip
FelonEkonom Apr 20, 2023
40b1d08
Improve log on stream format patter error
FelonEkonom Apr 20, 2023
2f6d685
Merge branch 'master' into auto-push-full-impl
FelonEkonom Apr 20, 2023
7d5620f
Remove info about initiator from handle_link
FelonEkonom Apr 21, 2023
3476724
Rename PadData.demand to PadData.demand_snapshot
FelonEkonom Apr 21, 2023
5a09bfe
Rename check_demand_counter to snpashot_demand_counter
FelonEkonom Apr 21, 2023
3970934
Refactor DemandController and DemandHandler
FelonEkonom Apr 21, 2023
f18195d
Rename overflow_limit in DemandCounter to toilet_capacity
FelonEkonom Apr 24, 2023
1eef455
Sort aliases
FelonEkonom Apr 25, 2023
ca7fef7
Add PadData fields description
FelonEkonom Apr 25, 2023
9f3e046
Add comments about effective flow control resolution
FelonEkonom Apr 25, 2023
456aafa
Add lacking alias
FelonEkonom Apr 25, 2023
241d807
Implement suggestions from CR wip
FelonEkonom May 12, 2023
dd4be64
Implement suggestions from CR wip
FelonEkonom May 15, 2023
91375b2
Implement suggestions from CR
FelonEkonom May 15, 2023
bdbc08e
Implement suggestion from CR wip
FelonEkonom May 15, 2023
72e8f47
Implement suggestion from CR
FelonEkonom May 16, 2023
0a88e25
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom May 16, 2023
97d0b22
Terminate distributed atomic worker after it's creator death
FelonEkonom May 16, 2023
47964a5
Update algorithm of changing flags in DemandCounter
FelonEkonom May 17, 2023
27bc14c
Implement suggestion from CR wip
FelonEkonom May 17, 2023
e713e52
Refactor getting demand unit in demand controller
FelonEkonom May 18, 2023
1a1957a
Rename lacking_buffers_size to demand
FelonEkonom May 18, 2023
71ca3e7
Implement suggestion from CR
FelonEkonom May 19, 2023
ae869b2
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom May 19, 2023
7959a6a
Fix Error log
FelonEkonom May 19, 2023
812cf9c
Rename DemandCounter to AtomicDemand
FelonEkonom May 19, 2023
0821fb5
Start AtomicDemand Worker under supervisor
FelonEkonom May 19, 2023
765adad
Fix AtomicDemand tests wip
FelonEkonom May 19, 2023
70840cb
Sort aliases
FelonEkonom May 19, 2023
efc9edb
Fix tests failing because of changes in AtomicDemand
FelonEkonom May 22, 2023
f3ef798
Implememnt suggestions from CR
FelonEkonom May 22, 2023
74a94e6
Merge remote-tracking branch 'origin/master' into auto-push-full-impl
FelonEkonom May 22, 2023
3d8f81c
Fix typo in comment
FelonEkonom May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Membrane.Bin do
alias Membrane.Core.OptionsSpecs

require Membrane.Core.Message
require Membrane.Logger

@type state :: any()

Expand Down
12 changes: 6 additions & 6 deletions lib/membrane/children_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -538,11 +538,11 @@ defmodule Membrane.ChildrenSpec do
"""
@spec via_in(builder(), Pad.name() | Pad.ref(),
options: pad_options(),
toilet_capacity: number | nil,
target_queue_size: number | nil,
min_demand_factor: number | nil,
auto_demand_size: number | nil,
throttling_factor: number | nil
toilet_capacity: non_neg_integer() | nil,
target_queue_size: non_neg_integer() | nil,
min_demand_factor: non_neg_integer() | nil,
auto_demand_size: non_neg_integer() | nil,
throttling_factor: non_neg_integer() | nil
) :: builder() | no_return
def via_in(builder, pad, props \\ [])

Expand All @@ -567,7 +567,7 @@ defmodule Membrane.ChildrenSpec do
min_demand_factor: [default: nil],
auto_demand_size: [default: nil],
toilet_capacity: [default: nil],
throttling_factor: [default: 1]
throttling_factor: [default: nil]
)
|> case do
{:ok, props} ->
Expand Down
9 changes: 3 additions & 6 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,10 @@ defmodule Membrane.Core.Bin.PadController do
SpecificationParser.raw_endpoint(),
SpecificationParser.raw_endpoint(),
%{
initiator: :parent,
stream_format_validation_params:
StreamFormatController.stream_format_validation_params()
}
| %{
initiator: :sibling,
other_info: PadModel.pad_info() | nil,
link_metadata: map,
stream_format_validation_params:
Expand Down Expand Up @@ -238,7 +236,7 @@ defmodule Membrane.Core.Bin.PadController do

child_endpoint = %{child_endpoint | pad_props: pad_props}

if params.initiator == :sibling do
if direction == :input do
mat-hek marked this conversation as resolved.
Show resolved Hide resolved
:ok =
Child.PadController.validate_pad_mode!(
{endpoint.pad_ref, pad_data},
Expand Down Expand Up @@ -297,11 +295,10 @@ defmodule Membrane.Core.Bin.PadController do
def handle_unlink(pad_ref, state) do
with {:ok, %{availability: :on_request}} <- PadModel.get_data(state, pad_ref) do
state = maybe_handle_pad_removed(pad_ref, state)
endpoint = PadModel.get_data!(state, pad_ref, :endpoint)
{pad_data, state} = PadModel.pop_data!(state, pad_ref)

if endpoint do
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
if pad_data.endpoint do
Message.send(pad_data.endpoint.pid, :handle_unlink, pad_data.endpoint.pad_ref)
ChildLifeController.proceed_spec_startup(pad_data.spec_ref, state)
else
Membrane.Logger.debug("""
Expand Down
1 change: 0 additions & 1 deletion lib/membrane/core/child/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Membrane.Core.Child.PadController do
alias Membrane.{LinkError, Pad}

require Membrane.Core.Child.PadModel
require Membrane.Logger

@type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Element.State.t()

Expand Down
7 changes: 5 additions & 2 deletions lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Membrane.Core.Child.PadModel do
use Bunch

alias Membrane.Core.Child
alias Membrane.Core.Element.EffectiveFlowController
alias Membrane.{Pad, UnknownPadError}

@type bin_pad_data :: %Membrane.Bin.PadData{
Expand All @@ -24,11 +25,13 @@ defmodule Membrane.Core.Child.PadModel do
@type element_pad_data :: %Membrane.Element.PadData{
availability: Pad.availability(),
stream_format: Membrane.StreamFormat.t() | nil,
demand: integer() | nil,
demand_snapshot: integer() | nil,
manual_demand_size: integer(),
start_of_stream?: boolean(),
end_of_stream?: boolean(),
direction: Pad.direction(),
flow_control: Pad.flow_control(),
other_effective_flow_control: EffectiveFlowController.effective_flow_control() | nil,
name: Pad.name(),
ref: Pad.ref(),
demand_unit: Membrane.Buffer.Metric.unit() | nil,
Expand All @@ -38,7 +41,6 @@ defmodule Membrane.Core.Child.PadModel do
sticky_messages: [Membrane.Event.t()],
input_queue: Membrane.Core.Element.InputQueue.t() | nil,
options: %{optional(atom) => any},
toilet: Membrane.Core.Element.Toilet.t() | nil,
auto_demand_size: pos_integer() | nil,
associated_pads: [Pad.ref()] | nil,
sticky_events: [Membrane.Event.t()]
Expand All @@ -52,6 +54,7 @@ defmodule Membrane.Core.Child.PadModel do
required(:availability) => Pad.availability(),
required(:direction) => Pad.direction(),
required(:name) => Pad.name(),
required(:accepted_formats_str) => String.t(),
optional(:flow_control) => Pad.flow_control(),
optional(:demand_unit) => Membrane.Buffer.Metric.unit(),
optional(:other_demand_unit) => Membrane.Buffer.Metric.unit()
Expand Down
29 changes: 26 additions & 3 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ defmodule Membrane.Core.Element do
use Bunch
use GenServer

alias Membrane.Core.Element.DemandHandler
alias Membrane.{Clock, Core, ResourceGuard, Sync}

alias Membrane.Core.{SubprocessSupervisor, TimerController}

alias Membrane.Core.Element.{
BufferController,
DemandController,
EffectiveFlowController,
EventController,
LifecycleController,
PadController,
Expand Down Expand Up @@ -171,9 +173,13 @@ defmodule Membrane.Core.Element do

@compile {:inline, do_handle_info: 2}

defp do_handle_info(Message.new(:demand, size, _opts) = msg, state) do
pad_ref = Message.for_pad(msg)
state = DemandController.handle_demand(pad_ref, size, state)
defp do_handle_info(Message.new(:demand_counter_increased, pad_ref), state) do
state = DemandController.snapshot_demand_counter(pad_ref, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:resume_handle_demand_loop), state) do
state = DemandHandler.handle_delayed_demands(state)
{:noreply, state}
end

Expand Down Expand Up @@ -215,6 +221,23 @@ defmodule Membrane.Core.Element do
{:noreply, state}
end

defp do_handle_info(
Message.new(:sender_effective_flow_control_resolved, [
input_pad_ref,
effective_flow_control
]),
state
) do
state =
EffectiveFlowController.handle_sender_effective_flow_control(
input_pad_ref,
effective_flow_control,
state
)

{:noreply, state}
end

defp do_handle_info(Message.new(:terminate), state) do
state = LifecycleController.handle_terminate_request(state)
{:noreply, state}
Expand Down
17 changes: 13 additions & 4 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@ defmodule Membrane.Core.Element.ActionHandler do
}

alias Membrane.Core.Child.PadModel
alias Membrane.Core.Element.{DemandHandler, PadController, State, StreamFormatController}

alias Membrane.Core.Element.{
DemandController,
DemandHandler,
PadController,
State,
StreamFormatController
}

alias Membrane.Core.{Events, Message, Telemetry, TimerController}
alias Membrane.Element.Action

Expand Down Expand Up @@ -307,11 +315,11 @@ defmodule Membrane.Core.Element.ActionHandler do
}
when stream_format != nil <- pad_data do
state =
DemandHandler.handle_outgoing_buffers(pad_ref, pad_data, buffers, state)
DemandController.decrease_demand_by_outgoing_buffers(pad_ref, buffers, state)
|> PadModel.set_data!(pad_ref, :start_of_stream?, true)

Message.send(pid, :buffer, buffers, for_pad: other_ref)
state
DemandController.snapshot_demand_counter(pad_ref, state)
else
%{direction: :input} ->
raise PadDirectionError, action: :buffer, direction: :input, pad: pad_ref
Expand Down Expand Up @@ -354,7 +362,8 @@ defmodule Membrane.Core.Element.ActionHandler do
StreamFormatController.validate_stream_format!(
:output,
stream_format_validation_params,
stream_format
stream_format,
state
)

state = PadModel.set_data!(state, pad_ref, :stream_format, stream_format)
Expand Down
22 changes: 11 additions & 11 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ defmodule Membrane.Core.Element.BufferController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandController,
DemandHandler,
EventController,
InputQueue,
PlaybackQueue,
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
Expand Down Expand Up @@ -58,15 +58,17 @@ defmodule Membrane.Core.Element.BufferController do
State.t()
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{demand: demand, demand_unit: demand_unit} = data

buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)
state = DemandController.send_auto_demand_if_needed(pad_ref, state)

state = AutoFlowUtils.auto_adjust_demand_counter(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
end

defp do_handle_buffer(pad_ref, %{flow_control: :manual} = data, buffers, state) do
%{input_queue: old_input_queue} = data

input_queue = InputQueue.store(old_input_queue, buffers)
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)

Expand All @@ -77,7 +79,7 @@ defmodule Membrane.Core.Element.BufferController do
end
end

defp do_handle_buffer(pad_ref, _data, buffers, state) do
defp do_handle_buffer(pad_ref, %{flow_control: :push}, buffers, state) do
exec_buffer_callback(pad_ref, buffers, state)
end

Expand All @@ -92,20 +94,18 @@ defmodule Membrane.Core.Element.BufferController do
def exec_buffer_callback(pad_ref, buffers, %State{type: :filter} = state) do
Telemetry.report_metric("buffer", 1, inspect(pad_ref))

CallbackHandler.exec_and_handle_callback(
:handle_buffers_batch,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffers],
state
)
do_exec_buffer_callback(pad_ref, buffers, state)
end

def exec_buffer_callback(pad_ref, buffers, %State{type: type} = state)
when type in [:sink, :endpoint] do
Telemetry.report_metric(:buffer, length(List.wrap(buffers)))
Telemetry.report_bitrate(buffers)

do_exec_buffer_callback(pad_ref, buffers, state)
end

defp do_exec_buffer_callback(pad_ref, buffers, state) do
CallbackHandler.exec_and_handle_callback(
:handle_buffers_batch,
ActionHandler,
Expand Down
Loading