Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement :remove_link action #487

Merged
merged 14 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# Changelog

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
* Add children groups - a mechanism that allows refering to multiple children with a single identifier.
* Rename `remove_child_t` action into `remove_children_t` and allow for removing a children group with a single action.
* Add an ability to spawn anonymous children.

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
* Refine communication between parent and its children [#270](https://github.com/membraneframework/membrane_core/issues/270)
Expand Down
10 changes: 9 additions & 1 deletion lib/membrane/bin/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Bin.Action do
callback unless explicitly stated otherwise.
"""

alias Membrane.{Child, ChildrenSpec}
alias Membrane.{Child, ChildrenSpec, Pad}

@typedoc """
Action that sends a message to a child identified by name.
Expand Down Expand Up @@ -42,6 +42,13 @@ defmodule Membrane.Bin.Action do
| Membrane.Child.group_t()
| [Membrane.Child.group_t()]}

@typedoc """
Action that removes link, which relates to specified child and pad.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's note that only dynamic pads can be unlinked


Removed link has to have dynamic pads on both ends.
"""
@type remove_link_t :: {:remove_link, {Child.name_t(), Pad.ref_t()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Bin.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -115,6 +122,7 @@ defmodule Membrane.Bin.Action do
| notify_parent_t
| spec_t
| remove_children_t
| remove_link_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/bin/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ defmodule Membrane.Core.Bin.ActionHandler do
Parent.ChildLifeController.handle_remove_children(children, state)
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
end

@impl CallbackHandler
def handle_action(
{:notify_parent, notification},
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Membrane.Core.Child.PadModel do
ref: Membrane.Pad.ref_t(),
options: Membrane.ChildrenSpec.pad_options_t(),
link_id: Membrane.Core.Parent.Link.id(),
endpoint: Membrane.Core.Parent.Link.Endpoint.t(),
endpoint: Membrane.Core.Parent.Link.Endpoint.t() | nil,
linked?: boolean(),
response_received?: boolean(),
spec_ref: Membrane.Core.Parent.ChildLifeController.spec_ref_t(),
Expand Down
8 changes: 8 additions & 0 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do
SpecificationParser
}

alias Membrane.Pad

require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -490,6 +492,12 @@ defmodule Membrane.Core.Parent.ChildLifeController do
Parent.ChildrenModel.update_children!(state, refs, &%{&1 | terminating?: true})
end

@spec handle_remove_link(Membrane.Child.name_t(), Pad.ref_t(), Parent.state_t()) ::
Parent.state_t()
def handle_remove_link(child_name, pad_ref, state) do
LinkUtils.remove_link(child_name, pad_ref, state)
end

@doc """
Handles death of a child:
- removes it from state
Expand Down
29 changes: 29 additions & 0 deletions lib/membrane/core/parent/child_life_controller/link_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do

use Bunch

alias Membrane.ParentError
alias Membrane.Core.{Bin, Message, Parent, Telemetry}
alias Membrane.Core.Bin.PadController

Expand Down Expand Up @@ -32,6 +33,34 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
end)
end

@spec remove_link(Membrane.Child.name_t(), Pad.ref_t(), Parent.state_t()) :: Parent.state_t()
def remove_link(child_name, pad_ref, state) do
Enum.find(state.links, fn {_id, link} ->
[link.from, link.to]
|> Enum.any?(&(&1.child == child_name and &1.pad_ref == pad_ref))
end)
|> case do
{_id, %Link{} = link} ->
for endpoint <- [link.from, link.to] do
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
end

links = Map.delete(state.links, link.id)
Map.put(state, :links, links)

nil ->
with %{^child_name => _child_entry} <- state.children do
raise ParentError, """
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but this child does not have this pad linked
"""
end

raise ParentError, """
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but such a child does not exist
"""
end
end

@spec unlink_element(Membrane.Child.name_t(), Parent.state_t()) :: Parent.state_t()
def unlink_element(child_name, state) do
Map.update!(
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
Parent.ChildLifeController.handle_remove_children(children, state)
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
end

@impl CallbackHandler
def handle_action({:start_timer, {id, interval, clock}}, _cb, _params, state) do
TimerController.start_timer(id, interval, clock, state)
Expand Down
10 changes: 9 additions & 1 deletion lib/membrane/pipeline/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Pipeline.Action do
callback unless explicitly stated otherwise.
"""

alias Membrane.{Child, ChildrenSpec}
alias Membrane.{Child, ChildrenSpec, Pad}

@typedoc """
Action that sends a message to a child identified by name.
Expand All @@ -32,6 +32,13 @@ defmodule Membrane.Pipeline.Action do
@type remove_children_t ::
{:remove_children, Child.name_t() | [Child.name_t()]}

@typedoc """
Action that removes link, which relates to specified child and pad.

Removed link has to have dynamic pads on both ends.
"""
@type remove_link_t :: {:remove_link, {Child.name_t(), Pad.ref_t()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Pipeline.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -124,6 +131,7 @@ defmodule Membrane.Pipeline.Action do
notify_child_t
| spec_t
| remove_children_t
| remove_link_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
1 change: 0 additions & 1 deletion test/membrane/core/pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Membrane.Core.PipelineTest do
import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.ChildrenSpec
alias Membrane.Core.Message
alias Membrane.Core.Pipeline.{ActionHandler, State}
alias Membrane.Testing
Expand Down
1 change: 0 additions & 1 deletion test/membrane/integration/child_spawn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule Membrane.Integration.ChildSpawnTest do
import Membrane.Testing.Assertions

alias Membrane.Buffer
alias Membrane.ChildrenSpec
alias Membrane.Core.Message
alias Membrane.Testing

Expand Down
65 changes: 64 additions & 1 deletion test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,33 @@ defmodule Membrane.Integration.LinkingTest do
import Membrane.Testing.Assertions
import Membrane.ChildrenSpec

alias Membrane.{Buffer, Child, ChildrenSpec, Testing}
alias Membrane.{Buffer, Child, Testing}
alias Membrane.Support.Element.DynamicFilter

require Membrane.Pad, as: Pad

defmodule Element do
use Membrane.Endpoint

def_input_pad :input,
availability: :on_request,
accepted_format: _any

def_output_pad :output,
availability: :on_request,
accepted_format: _any

@impl true
def handle_demand(_pad, _size, _unit, _ctx, state) do
{[], state}
end

@impl true
def handle_pad_removed(pad_ref, _ctx, state) do
{[notify_parent: {:handle_pad_removed, pad_ref}], state}
end
end

defmodule Bin do
use Membrane.Bin

Expand Down Expand Up @@ -449,6 +471,47 @@ defmodule Membrane.Integration.LinkingTest do
assert_start_of_stream(pipeline, :sink)
end

test "Parent successfully unlinks children with dynamic pads using :remove_link action" do
spec =
[
child(:source, __MODULE__.Element),
child(:sink, __MODULE__.Element)
] ++
Enum.map(1..10, fn i ->
get_child(:source)
|> via_out(Pad.ref(:output, i))
|> via_in(Pad.ref(:input, i))
|> get_child(:sink)
end)

pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

for pad_id <- 1..10 do
actions =
if rem(pad_id, 2) == 0,
do: [remove_link: {:source, Pad.ref(:output, pad_id)}],
else: [remove_link: {:sink, Pad.ref(:input, pad_id)}]

Testing.Pipeline.execute_actions(pipeline, actions)

assert_link_removed(pipeline, pad_id)

for i <- (pad_id + 1)..10//1 do
refute_link_removed(pipeline, i)
end
end
end

defp assert_link_removed(pipeline, id) do
assert_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)})
assert_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)})
end

defp refute_link_removed(pipeline, id) do
refute_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)}, 10)
varsill marked this conversation as resolved.
Show resolved Hide resolved
refute_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)}, 10)
end

defp get_child_pid(ref, parent_pid) do
state = :sys.get_state(parent_pid)
state.children[ref].pid
Expand Down