Skip to content

Commit

Permalink
Fix bug in handling actions returned from handle_tick. Deprecate hand…
Browse files Browse the repository at this point in the history
…le_spec_started. (#708)

* wip

* Write tests wip

* Write tests wip

* Revert "Fix timer running late (#685)"

This reverts commit 168f57e.

* Fix actions handling order bug related to Pipeline.handle_playing

* Add assertion on value passed with :setup action

* WIP Fix bug in executing handle_buffer while handling actions from previous callback

* Fix tests wip

* Fix CI

* Update changelog

* Stopt  calling handle_spec_started in between handling actions

* Make demands test more strict

* Add dots to changelog

* Fix double tick bug

* wip

* Deprecate handle_spec_started/3

* Remove unused aliases

* Remove unnecessary warning

* Bump version to 1.0.1

* Remove leftovers

* Fix docs

* Implement suggestions from CR, bump version to 1.1.0-rc
  • Loading branch information
FelonEkonom authored Feb 26, 2024
1 parent dc5f653 commit e4305da
Show file tree
Hide file tree
Showing 27 changed files with 382 additions and 124 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# Changelog

## 1.1.0-rc0
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Handle buffers from input pads having `flow_control: :auto` only if demand on all output pads having `flow_control: :auto` is positive. [#693](https://github.com/membraneframework/membrane_core/pull/693)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
Expand Down
9 changes: 4 additions & 5 deletions lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ defmodule Membrane.Bin do
) :: callback_return

@doc """
This callback is deprecated since v1.1.0-rc0
Callback invoked when children of `Membrane.ChildrenSpec` are started.
By default, it does nothing.
It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens.
"""
@callback handle_spec_started(
children :: [Child.name()],
Expand Down Expand Up @@ -309,6 +311,7 @@ defmodule Membrane.Bin do
alias unquote(__MODULE__)
@behaviour unquote(__MODULE__)
@before_compile unquote(__MODULE__)
@after_compile {Membrane.Core.Parent, :check_deprecated_callbacks}

unquote(bring_spec)
unquote(bring_pad)
Expand Down Expand Up @@ -354,9 +357,6 @@ defmodule Membrane.Bin do
{[], state}
end

@impl true
def handle_spec_started(new_children, _ctx, state), do: {[], state}

@impl true
def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state}

Expand All @@ -381,7 +381,6 @@ defmodule Membrane.Bin do
handle_setup: 2,
handle_playing: 2,
handle_info: 3,
handle_spec_started: 3,
handle_element_start_of_stream: 4,
handle_element_end_of_stream: 4,
handle_child_notification: 4,
Expand Down
13 changes: 13 additions & 0 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.Core.Component

require Membrane.Logger

Expand Down Expand Up @@ -191,6 +192,13 @@ defmodule Membrane.Core.CallbackHandler do
was_handling_action? = state.handling_action?
state = %{state | handling_action?: true}

# Updating :supplying_demand? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
try do
Expand All @@ -210,6 +218,11 @@ defmodule Membrane.Core.CallbackHandler do
do: state,
else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_supplying_demand?,
do: %{state | supplying_demand?: false},
else: state

handler_module.handle_end_of_actions(state)
end
end
13 changes: 0 additions & 13 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,7 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:timer_tick, timer_id), state) do
# Guarding the `TimerController.handle_tick/2` invocation is
# required since there might be a case in which `handle_tick`
# callback's implementation returns demand action.
# In this scenario, without this guard, there would a possibility that
# the `handle_buffer` would be called immediately, returning
# some action that would affect the timer and the original state
# of the timer, set with actions returned from `handle_tick`,
# would be overwritten with that action.
#
# For more information see: https://github.com/membraneframework/membrane_core/issues/670
state = %{state | supplying_demand?: true}
state = TimerController.handle_tick(timer_id, state)
state = %{state | supplying_demand?: false}
state = Membrane.Core.Element.DemandHandler.handle_delayed_demands(state)
{:noreply, state}
end

Expand Down
66 changes: 34 additions & 32 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,47 @@ defmodule Membrane.Core.Element.ActionHandler do
require Membrane.Logger

@impl CallbackHandler
def transform_actions(actions, callback, _handler_params, state) do
def transform_actions(actions, _callback, _handler_params, state) do
actions = join_buffers(actions)
ensure_nothing_after_redemand(actions, callback, state)
{actions, state}
end

defguardp is_demand_size(size) when is_integer(size) or is_function(size)

@impl CallbackHandler
def handle_end_of_actions(state) when not state.handling_action? do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
def handle_end_of_actions(state) do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state

state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)

state
end

@impl CallbackHandler
def handle_end_of_actions(state), do: state
defp maybe_handle_delayed_demands(state) do
with %{supplying_demand?: false} <- state do
DemandHandler.handle_delayed_demands(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
with %{handling_action?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end
end

@impl CallbackHandler
def handle_action({action, _}, :handle_init, _params, _state)
Expand Down Expand Up @@ -284,30 +309,6 @@ defmodule Membrane.Core.Element.ActionHandler do
)
end

defp ensure_nothing_after_redemand(actions, callback, state) do
{redemands, actions_after_redemands} =
actions
|> Enum.drop_while(fn
{:redemand, _args} -> false
_other_action -> true
end)
|> Enum.split_while(fn
{:redemand, _args} -> true
_other_action -> false
end)

case {redemands, actions_after_redemands} do
{_redemands, []} ->
:ok

{[redemand | _redemands], _actions_after_redemands} ->
raise ActionError,
reason: :actions_after_redemand,
action: redemand,
callback: {state.module, callback}
end
end

@spec send_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
defp send_buffer(_pad_ref, [], state) do
state
Expand Down Expand Up @@ -470,7 +471,8 @@ defmodule Membrane.Core.Element.ActionHandler do
@spec handle_outgoing_event(Pad.ref(), Event.t(), State.t()) :: State.t()
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> AutoFlowUtils.pop_queues_and_bump_demand()
else
Expand Down
9 changes: 9 additions & 0 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do
end
end

@spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t()
def remove_pad_from_delayed_demands(pad_ref, state) do
Map.update!(state, :delayed_demands, fn delayed_demands_set ->
delayed_demands_set
|> MapSet.delete({pad_ref, :supply})
|> MapSet.delete({pad_ref, :redemand})
end)
end

@spec handle_input_queue_output(
Pad.ref(),
[InputQueue.output_value()],
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandHandler,
InputQueue,
PlaybackQueue,
State
Expand Down Expand Up @@ -108,7 +109,8 @@ defmodule Membrane.Core.Element.EventController do
Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}")

state =
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
|> Map.update!(:awaiting_auto_input_pads, &MapSet.delete(&1, pad_ref))
|> Map.update!(:auto_input_pads, &List.delete(&1, pad_ref))

Expand Down
18 changes: 15 additions & 3 deletions lib/membrane/core/lifecycle_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ defmodule Membrane.Core.LifecycleController do
def handle_setup_operation(operation, state) do
:ok = assert_operation_allowed!(operation, state.setup_incomplete?)

case operation do
:incomplete ->
cond do
operation == :incomplete ->
Membrane.Logger.debug("Component deferred initialization")
%{state | setup_incomplete?: true}

:complete ->
Component.is_pipeline?(state) ->
# complete_setup/1 will be called in Membrane.Core.Pipeline.ActionHandler.handle_end_of_actions/1
%{state | awaiting_setup_completition?: true}

Component.is_child?(state) ->
complete_setup(state)
end
end
Expand Down Expand Up @@ -52,5 +56,13 @@ defmodule Membrane.Core.LifecycleController do
"""
end

defp assert_operation_allowed!(operation, _status)
when operation not in [:incomplete, :complete] do
raise SetupError, """
Action {:setup, #{inspect(operation)}} was returned, but second element in the tuple must
be :complete or :incomplete
"""
end

defp assert_operation_allowed!(_operation, _status), do: :ok
end
17 changes: 17 additions & 0 deletions lib/membrane/core/parent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,21 @@ defmodule Membrane.Core.Parent do
@moduledoc false

@type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t()

@spec check_deprecated_callbacks(Macro.Env.t(), binary) :: :ok
def check_deprecated_callbacks(env, _bytecode) do
modules_whitelist = [Membrane.Testing.Pipeline]

if env.module not in modules_whitelist and
Module.defines?(env.module, {:handle_spec_started, 3}, :def) do
warn_message = """
Callback handle_spec_started/3 has been deprecated since \
:membrane_core v1.1.0-rc0, but it is implemented in #{inspect(env.module)}
"""

IO.warn(warn_message, [])
end

:ok
end
end
21 changes: 13 additions & 8 deletions lib/membrane/core/parent/child_life_controller/startup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,20 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do

@spec exec_handle_spec_started([Membrane.Child.name()], Parent.state()) :: Parent.state()
def exec_handle_spec_started(children_names, state) do
action_handler = Component.action_handler(state)

CallbackHandler.exec_and_handle_callback(
:handle_spec_started,
action_handler,
%{context: &Component.context_from_state/1},
[children_names],
# handle_spec_started/3 callback is deprecated, so we don't require its implementation
if function_exported?(state.module, :handle_spec_started, 3) do
action_handler = Component.action_handler(state)

CallbackHandler.exec_and_handle_callback(
:handle_spec_started,
action_handler,
%{context: &Component.context_from_state/1},
[children_names],
state
)
else
state
)
end
end

@spec check_if_children_names_and_children_groups_ids_are_unique(
Expand Down
10 changes: 10 additions & 0 deletions lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
alias Membrane.Core.Parent.LifecycleController
alias Membrane.Core.Pipeline.State

require Membrane.Logger

@impl CallbackHandler
def handle_action({:spec, args}, _cb, _params, %State{terminating?: true}) do
raise Membrane.ParentError,
Expand Down Expand Up @@ -103,4 +105,12 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
def handle_action(action, _callback, _params, _state) do
raise ActionError, action: action, reason: {:unknown_action, Membrane.Pipeline.Action}
end

@impl CallbackHandler
def handle_end_of_actions(state) do
with %{awaiting_setup_completition?: true} <- state do
%{state | awaiting_setup_completition?: false}
|> Membrane.Core.LifecycleController.complete_setup()
end
end
end
6 changes: 4 additions & 2 deletions lib/membrane/core/pipeline/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule Membrane.Core.Pipeline.State do
setup_incomplete?: boolean(),
handling_action?: boolean(),
stalker: Membrane.Core.Stalker.t(),
subprocess_supervisor: pid()
subprocess_supervisor: pid(),
awaiting_setup_completition?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand All @@ -58,5 +59,6 @@ defmodule Membrane.Core.Pipeline.State do
handling_action?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil
subprocess_supervisor: nil,
awaiting_setup_completition?: false
end
Loading

0 comments on commit e4305da

Please sign in to comment.