Skip to content

Commit

Permalink
Delete unncessary flags, rename some modules
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 26, 2024
1 parent 27a3b01 commit 93bdfb4
Show file tree
Hide file tree
Showing 13 changed files with 564 additions and 310 deletions.
23 changes: 5 additions & 18 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.Core.Component

require Membrane.Logger

Expand Down Expand Up @@ -189,15 +188,8 @@ defmodule Membrane.Core.CallbackHandler do
reraise e, __STACKTRACE__
end

# was_handling_action? = state.handling_action?
# state = %{state | handling_action?: true}

# Updating :delay_demands? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_delay_demands? = Map.get(state, :delay_demands?, false)
state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state
# was_delay_demands? = Map.get(state, :delay_demands?, false)
# state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
Expand All @@ -214,14 +206,9 @@ defmodule Membrane.Core.CallbackHandler do
end)

# state =
# if was_handling_action?,
# do: state,
# else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_delay_demands?,
do: %{state | delay_demands?: false},
else: state
# if Component.is_element?(state) and not was_delay_demands?,
# do: %{state | delay_demands?: false},
# else: state

handler_module.handle_end_of_actions(callback, state)
end
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ defmodule Membrane.Core.Element do
alias Membrane.Core.Element.{
BufferController,
DemandController,
DemandHandler,
EffectiveFlowController,
EventController,
LifecycleController,
Expand Down Expand Up @@ -211,7 +210,7 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:resume_delayed_demands_loop), state) do
state = DemandHandler.resume_delayed_demands_loop(state)
state = DemandController.Manual.resume_delayed_demands_loop(state)
{:noreply, state}
end

Expand Down
40 changes: 23 additions & 17 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ defmodule Membrane.Core.Element.ActionHandler do

alias Membrane.Core.Element.{
DemandController,
DemandHandler,
State,
StreamFormatController
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.{Events, TimerController}
alias Membrane.Element.Action

Expand All @@ -51,26 +49,31 @@ defmodule Membrane.Core.Element.ActionHandler do
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.

if Enum.random([1, 2]) == 1 do
snapshot(callback, state)
|> hdd()
# Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might
# be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.

if callback != :handle_spec_started do
if Enum.random([1, 2]) == 1 do
snapshot(callback, state)
|> hdd()
else
state
|> hdd()
|> then(&snapshot(callback, &1))
end
else
state
|> hdd()
|> then(&snapshot(callback, &1))
end
end

defp hdd(state) do
with %{delay_demands?: false} <- state do
DemandHandler.handle_delayed_demands(state)
DemandController.Manual.handle_delayed_demands(state)
end
end

defp snapshot(callback, state) do
# Condition in if below is caused by a fact, that handle_spec_started is the only callback, that might
# be executed in between handling actions returned from other callbacks.
# This callback has been deprecated and should be removed in v2.0.0, along with the if statement below.
if callback != :handle_spec_started do
state.pads_to_snapshot
|> Enum.shuffle()
Expand Down Expand Up @@ -179,13 +182,13 @@ defmodule Membrane.Core.Element.ActionHandler do
@impl CallbackHandler
def handle_action({:pause_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.pause_demands(in_ref, state)
DemandController.Auto.pause_demands(in_ref, state)
end

@impl CallbackHandler
def handle_action({:resume_auto_demand, in_ref}, _cb, _params, %State{type: type} = state)
when type in [:sink, :filter, :endpoint] do
DemandController.AutoFlowUtils.resume_demands(in_ref, state)
DemandController.Auto.resume_demands(in_ref, state)
end

@impl CallbackHandler
Expand Down Expand Up @@ -417,7 +420,9 @@ defmodule Membrane.Core.Element.ActionHandler do
defp supply_demand(pad_ref, size, state) do
with %{direction: :input, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.supply_demand(pad_ref, size, state)
# todo: get_data! above could be eradicated
state = DemandController.Manual.update_demand(pad_ref, size, state)
DemandController.Manual.delay_demand_supply(pad_ref, state)
else
%{direction: :output} ->
raise PadDirectionError, action: :demand, direction: :output, pad: pad_ref
Expand All @@ -437,7 +442,8 @@ defmodule Membrane.Core.Element.ActionHandler do
when type in [:source, :filter, :endpoint] do
with %{direction: :output, flow_control: :manual} <-
PadModel.get_data!(state, pad_ref) do
DemandHandler.handle_redemand(pad_ref, state)
# todo: get_data! above could be eradicated
DemandController.Manual.delay_redemand(pad_ref, state)
else
%{direction: :input} ->
raise ElementError, "Tried to make a redemand on input pad #{inspect(pad_ref)}"
Expand Down Expand Up @@ -473,10 +479,10 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
DemandController.Manual.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> DemandController.Auto.pop_queues_and_bump_demand()
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
11 changes: 5 additions & 6 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,21 @@ defmodule Membrane.Core.Element.BufferController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandHandler,
DemandController,
EventController,
InputQueue,
PlaybackQueue,
State
}

alias Membrane.Core.Element.DemandController.AutoFlowUtils
alias Membrane.Core.Telemetry

require Membrane.Core.Child.PadModel
require Membrane.Core.Telemetry

@doc """
Handles incoming buffer: either stores it in InputQueue, or executes element's
callback. Also calls `Membrane.Core.Element.DemandHandler.supply_demand/2`
callback. Also calls `Membrane.Core.Element.DemandController.Manual.supply_demand/2`
to check if there are any unsupplied demands.
"""
@spec handle_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
Expand Down Expand Up @@ -70,10 +69,10 @@ defmodule Membrane.Core.Element.BufferController do
:atomics.put(stalker_metrics.demand, 1, demand - buf_size)

if state.effective_flow_control == :pull and MapSet.size(state.satisfied_auto_output_pads) > 0 do
AutoFlowUtils.store_buffers_in_queue(pad_ref, buffers, state)
DemandController.Auto.store_buffers_in_queue(pad_ref, buffers, state)
else
state = exec_buffer_callback(pad_ref, buffers, state)
AutoFlowUtils.auto_adjust_atomic_demand(pad_ref, state)
DemandController.Auto.auto_adjust_atomic_demand(pad_ref, state)
end
end

Expand All @@ -84,7 +83,7 @@ defmodule Membrane.Core.Element.BufferController do
state = PadModel.set_data!(state, pad_ref, :input_queue, input_queue)

if old_input_queue |> InputQueue.empty?() do
DemandHandler.supply_demand(pad_ref, state)
DemandController.Manual.supply_demand(pad_ref, state)
else
state
end
Expand Down
7 changes: 3 additions & 4 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ defmodule Membrane.Core.Element.DemandController do

use Bunch

alias __MODULE__.AutoFlowUtils
alias __MODULE__.{Auto, Manual}

alias Membrane.Buffer

alias Membrane.Core.Element.{
AtomicDemand,
DemandHandler,
PlaybackQueue,
State
}
Expand Down Expand Up @@ -56,7 +55,7 @@ defmodule Membrane.Core.Element.DemandController do
if atomic_value > 0 do
state
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
|> AutoFlowUtils.pop_queues_and_bump_demand()
|> Auto.pop_queues_and_bump_demand()
else
state
end
Expand All @@ -79,7 +78,7 @@ defmodule Membrane.Core.Element.DemandController do
}
)

DemandHandler.handle_redemand(pad_data.ref, state)
Manual.handle_redemand(pad_data.ref, state)
else
_other -> state
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
defmodule Membrane.Core.Element.DemandController.Auto do
@moduledoc false

alias Membrane.Buffer
Expand Down
Loading

0 comments on commit 93bdfb4

Please sign in to comment.