Skip to content

Commit

Permalink
Implement :remove_link action (#487)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom authored Dec 15, 2022
1 parent 3216972 commit 7aebd1d
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# Changelog

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
* Add children groups - a mechanism that allows refering to multiple children with a single identifier.
* Rename `remove_child_t` action into `remove_children_t` and allow for removing a children group with a single action.
* Add an ability to spawn anonymous children.

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
* Refine communication between parent and its children [#270](https://github.com/membraneframework/membrane_core/issues/270)
Expand Down
10 changes: 9 additions & 1 deletion lib/membrane/bin/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Bin.Action do
callback unless explicitly stated otherwise.
"""

alias Membrane.{Child, ChildrenSpec}
alias Membrane.{Child, ChildrenSpec, Pad}

@typedoc """
Action that sends a message to a child identified by name.
Expand Down Expand Up @@ -42,6 +42,13 @@ defmodule Membrane.Bin.Action do
| Membrane.Child.group_t()
| [Membrane.Child.group_t()]}

@typedoc """
Action that removes link, which relates to specified child and pad.
Removed link has to have dynamic pads on both ends.
"""
@type remove_link_t :: {:remove_link, {Child.name_t(), Pad.ref_t()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Bin.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -115,6 +122,7 @@ defmodule Membrane.Bin.Action do
| notify_parent_t
| spec_t
| remove_children_t
| remove_link_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/bin/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ defmodule Membrane.Core.Bin.ActionHandler do
Parent.ChildLifeController.handle_remove_children(children, state)
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
end

@impl CallbackHandler
def handle_action(
{:notify_parent, notification},
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Membrane.Core.Child.PadModel do
ref: Membrane.Pad.ref_t(),
options: Membrane.ChildrenSpec.pad_options_t(),
link_id: Membrane.Core.Parent.Link.id(),
endpoint: Membrane.Core.Parent.Link.Endpoint.t(),
endpoint: Membrane.Core.Parent.Link.Endpoint.t() | nil,
linked?: boolean(),
response_received?: boolean(),
spec_ref: Membrane.Core.Parent.ChildLifeController.spec_ref_t(),
Expand Down
8 changes: 8 additions & 0 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do
SpecificationParser
}

alias Membrane.Pad

require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -490,6 +492,12 @@ defmodule Membrane.Core.Parent.ChildLifeController do
Parent.ChildrenModel.update_children!(state, refs, &%{&1 | terminating?: true})
end

@spec handle_remove_link(Membrane.Child.name_t(), Pad.ref_t(), Parent.state_t()) ::
Parent.state_t()
def handle_remove_link(child_name, pad_ref, state) do
LinkUtils.remove_link(child_name, pad_ref, state)
end

@doc """
Handles death of a child:
- removes it from state
Expand Down
29 changes: 29 additions & 0 deletions lib/membrane/core/parent/child_life_controller/link_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do

use Bunch

alias Membrane.ParentError
alias Membrane.Core.{Bin, Message, Parent, Telemetry}
alias Membrane.Core.Bin.PadController

Expand Down Expand Up @@ -32,6 +33,34 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
end)
end

@spec remove_link(Membrane.Child.name_t(), Pad.ref_t(), Parent.state_t()) :: Parent.state_t()
def remove_link(child_name, pad_ref, state) do
Enum.find(state.links, fn {_id, link} ->
[link.from, link.to]
|> Enum.any?(&(&1.child == child_name and &1.pad_ref == pad_ref))
end)
|> case do
{_id, %Link{} = link} ->
for endpoint <- [link.from, link.to] do
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
end

links = Map.delete(state.links, link.id)
Map.put(state, :links, links)

nil ->
with %{^child_name => _child_entry} <- state.children do
raise ParentError, """
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but this child does not have this pad linked
"""
end

raise ParentError, """
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but such a child does not exist
"""
end
end

@spec unlink_element(Membrane.Child.name_t(), Parent.state_t()) :: Parent.state_t()
def unlink_element(child_name, state) do
Map.update!(
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
Parent.ChildLifeController.handle_remove_children(children, state)
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
end

@impl CallbackHandler
def handle_action({:start_timer, {id, interval, clock}}, _cb, _params, state) do
TimerController.start_timer(id, interval, clock, state)
Expand Down
10 changes: 9 additions & 1 deletion lib/membrane/pipeline/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Pipeline.Action do
callback unless explicitly stated otherwise.
"""

alias Membrane.{Child, ChildrenSpec}
alias Membrane.{Child, ChildrenSpec, Pad}

@typedoc """
Action that sends a message to a child identified by name.
Expand All @@ -32,6 +32,13 @@ defmodule Membrane.Pipeline.Action do
@type remove_children_t ::
{:remove_children, Child.name_t() | [Child.name_t()]}

@typedoc """
Action that removes link, which relates to specified child and pad.
Removed link has to have dynamic pads on both ends.
"""
@type remove_link_t :: {:remove_link, {Child.name_t(), Pad.ref_t()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Pipeline.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -124,6 +131,7 @@ defmodule Membrane.Pipeline.Action do
notify_child_t
| spec_t
| remove_children_t
| remove_link_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
1 change: 0 additions & 1 deletion test/membrane/core/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Membrane.Core.PipelineTest do
import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.ChildrenSpec
alias Membrane.Core.Message
alias Membrane.Core.Pipeline.{ActionHandler, State}
alias Membrane.Testing
Expand Down
1 change: 0 additions & 1 deletion test/membrane/integration/child_spawn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule Membrane.Integration.ChildSpawnTest do
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.ChildrenSpec
alias Membrane.Core.Message
alias Membrane.Testing

Expand Down
65 changes: 64 additions & 1 deletion test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,33 @@ defmodule Membrane.Integration.LinkingTest do
import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.{Buffer, Child, ChildrenSpec, Testing}
alias Membrane.{Buffer, Child, Testing}
alias Membrane.Support.Element.DynamicFilter

require Membrane.Pad, as: Pad

defmodule Element do
use Membrane.Endpoint

def_input_pad :input,
availability: :on_request,
accepted_format: _any

def_output_pad :output,
availability: :on_request,
accepted_format: _any

@impl true
def handle_demand(_pad, _size, _unit, _ctx, state) do
{[], state}
end

@impl true
def handle_pad_removed(pad_ref, _ctx, state) do
{[notify_parent: {:handle_pad_removed, pad_ref}], state}
end
end

defmodule Bin do
use Membrane.Bin

Expand Down Expand Up @@ -449,6 +471,47 @@ defmodule Membrane.Integration.LinkingTest do
assert_start_of_stream(pipeline, :sink)
end

test "Parent successfully unlinks children with dynamic pads using :remove_link action" do
spec =
[
child(:source, __MODULE__.Element),
child(:sink, __MODULE__.Element)
] ++
Enum.map(1..10, fn i ->
get_child(:source)
|> via_out(Pad.ref(:output, i))
|> via_in(Pad.ref(:input, i))
|> get_child(:sink)
end)

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

for pad_id <- 1..10 do
actions =
if rem(pad_id, 2) == 0,
do: [remove_link: {:source, Pad.ref(:output, pad_id)}],
else: [remove_link: {:sink, Pad.ref(:input, pad_id)}]

Testing.Pipeline.execute_actions(pipeline, actions)

assert_link_removed(pipeline, pad_id)

for i <- (pad_id + 1)..10//1 do
refute_link_removed(pipeline, i)
end
end
end

defp assert_link_removed(pipeline, id) do
assert_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)})
assert_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)})
end

defp refute_link_removed(pipeline, id) do
refute_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)}, 10)
refute_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)}, 10)
end

defp get_child_pid(ref, parent_pid) do
state = :sys.get_state(parent_pid)
state.children[ref].pid
Expand Down

0 comments on commit 7aebd1d

Please sign in to comment.