Skip to content

Commit

Permalink
supplying_demand? -> delay_demands?
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Mar 26, 2024
1 parent 0f7b0ca commit 27a3b01
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 22 deletions.
10 changes: 5 additions & 5 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ defmodule Membrane.Core.CallbackHandler do
# was_handling_action? = state.handling_action?
# state = %{state | handling_action?: true}

# Updating :supplying_demand? flag value here is a temporal fix.
# Updating :delay_demands? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state
was_delay_demands? = Map.get(state, :delay_demands?, false)
state = if Component.is_element?(state), do: %{state | delay_demands?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
Expand All @@ -219,8 +219,8 @@ defmodule Membrane.Core.CallbackHandler do
# else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_supplying_demand?,
do: %{state | supplying_demand?: false},
if Component.is_element?(state) and not was_delay_demands?,
do: %{state | delay_demands?: false},
else: state

handler_module.handle_end_of_actions(callback, state)
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ defmodule Membrane.Core.Element.ActionHandler do
end

defp hdd(state) do
with %{supplying_demand?: false} <- state do
with %{delay_demands?: false} <- state do
DemandHandler.handle_delayed_demands(state)
end
end
Expand Down
16 changes: 8 additions & 8 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Membrane.Core.Element.DemandHandler do
output, `handle_demand` is invoked right away, so that the demand can be synchronously supplied.
"""
@spec handle_redemand(Pad.ref(), State.t()) :: State.t()
def handle_redemand(pad_ref, %State{supplying_demand?: true} = state) do
def handle_redemand(pad_ref, %State{delay_demands?: true} = state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :redemand}))
end

Expand All @@ -42,9 +42,9 @@ defmodule Membrane.Core.Element.DemandHandler do
end

defp do_handle_redemand(pad_ref, state) do
state = %{state | supplying_demand?: true}
state = %{state | delay_demands?: true}
state = exec_handle_demand(pad_ref, state)
%{state | supplying_demand?: false}
%{state | delay_demands?: false}
end

@doc """
Expand Down Expand Up @@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.DemandHandler do
end

@spec supply_demand(Pad.ref(), State.t()) :: State.t()
def supply_demand(pad_ref, %State{supplying_demand?: true} = state) do
def supply_demand(pad_ref, %State{delay_demands?: true} = state) do
Map.update!(state, :delayed_demands, &MapSet.put(&1, {pad_ref, :supply}))
end

Expand All @@ -85,7 +85,7 @@ defmodule Membrane.Core.Element.DemandHandler do

defp do_supply_demand(pad_ref, state) do
# marking is state that actual demand supply has been started (note changing back to false when finished)
state = %State{state | supplying_demand?: true}
state = %State{state | delay_demands?: true}

pad_data = state |> PadModel.get_data!(pad_ref)

Expand All @@ -94,7 +94,7 @@ defmodule Membrane.Core.Element.DemandHandler do

state = PadModel.set_data!(state, pad_ref, :input_queue, new_input_queue)
state = handle_input_queue_output(pad_ref, popped_data, state)
%State{state | supplying_demand?: false}
%State{state | delay_demands?: false}
end

defp update_demand(pad_ref, size, state) when is_integer(size) do
Expand Down Expand Up @@ -127,8 +127,8 @@ defmodule Membrane.Core.Element.DemandHandler do
# potentially for a long time.

cond do
state.supplying_demand? ->
raise "Cannot handle delayed demands while already supplying demand"
state.delay_demands? ->
raise "Cannot handle delayed demands when delay_demands? flag is set to true"

state.handle_demand_loop_counter >= @handle_demand_loop_limit ->
state =
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Membrane.Core.Element.State do
pads_info: PadModel.pads_info() | nil,
pads_data: PadModel.pads_data() | nil,
parent_pid: pid,
supplying_demand?: boolean(),
delay_demands?: boolean(),
delayed_demands: MapSet.t({Pad.ref(), :supply | :redemand}),
handle_demand_loop_counter: non_neg_integer(),
synchronization: %{
Expand Down Expand Up @@ -74,7 +74,7 @@ defmodule Membrane.Core.Element.State do
initialized?: false,
terminating?: false,
setup_incomplete?: false,
supplying_demand?: false,
delay_demands?: false,
# handling_action?: false,
popping_auto_flow_queue?: false,
stalker: nil,
Expand Down
4 changes: 2 additions & 2 deletions test/membrane/core/element/action_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do
setup :demand_test_filter

test "delaying demand", %{state: state} do
state = %{state | playback: :playing, supplying_demand?: true}
state = %{state | playback: :playing, delay_demands?: true}
state = @module.handle_action({:demand, {:input, 10}}, :handle_info, %{}, state)
assert state.pads_data.input.manual_demand_size == 10
assert MapSet.new([{:input, :supply}]) == state.delayed_demands
Expand Down Expand Up @@ -489,7 +489,7 @@ defmodule Membrane.Core.Element.ActionHandlerTest do

test "when pad works in auto or manual flow control mode", %{state: state} do
state =
%{state | supplying_demand?: true, playback: :playing}
%{state | delay_demands?: true, playback: :playing}
|> PadModel.set_data!(:output, :flow_control, :manual)

new_state =
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/core/element/event_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ defmodule Membrane.Core.Element.EventControllerTest do
parent_pid: self(),
synchronization: %{clock: nil, parent_clock: nil, stream_sync: nil},
# handling_action?: false,
supplying_demand?: false,
delay_demands?: false,
pads_to_snapshot: MapSet.new(),
delayed_demands: MapSet.new(),
handle_demand_loop_counter: 0,
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/core/element/lifecycle_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Membrane.Core.Element.LifecycleControllerTest do
parent_pid: self(),
synchronization: %{clock: nil, parent_clock: nil},
# handling_action?: false,
supplying_demand?: false,
delay_demands?: false,
pads_to_snapshot: MapSet.new(),
delayed_demands: MapSet.new(),
pads_data: %{
Expand Down
2 changes: 1 addition & 1 deletion test/membrane/core/element/pad_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Membrane.Core.Element.PadControllerTest do
name: name,
module: elem_module,
# handling_action?: false,
supplying_demand?: false,
delay_demands?: false,
pads_to_snapshot: MapSet.new(),
delayed_demands: MapSet.new(),
parent_pid: self(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule Membrane.Core.Element.StreamFormatControllerTest do
playback: :playing,
synchronization: %{clock: nil, parent_clock: nil},
# handling_action?: false,
supplying_demand?: false,
delay_demands?: false,
pads_to_snapshot: MapSet.new(),
delayed_demands: MapSet.new(),
handle_demand_loop_counter: 0,
Expand Down

0 comments on commit 27a3b01

Please sign in to comment.