Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement :remove_link action #487

Merged
merged 14 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
* Refine communication between parent and its children [#270](https://github.com/membraneframework/membrane_core/issues/270)
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane/bin/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Bin.Action do
callback unless explicitly stated otherwise.
"""

alias Membrane.{Child, ChildrenSpec}
alias Membrane.{Child, ChildrenSpec, Pad}

@typedoc """
Action that sends a message to a child identified by name.
Expand Down Expand Up @@ -37,6 +37,11 @@ defmodule Membrane.Bin.Action do
Child.name_t()
| [Child.name_t()]}

@typedoc """
Action that removes link, which relates to specified child and pad.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's note that only dynamic pads can be unlinked

"""
@type remove_link_t :: {:remove_link, {Child.name_t(), Pad.ref_t()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Bin.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -110,6 +115,7 @@ defmodule Membrane.Bin.Action do
| notify_parent_t
| spec_t
| remove_child_t
| remove_link_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/bin/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ defmodule Membrane.Core.Bin.ActionHandler do
Parent.ChildLifeController.handle_remove_children(children, state)
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
end

@impl CallbackHandler
def handle_action(
{:notify_parent, notification},
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Membrane.Core.Child.PadModel do
ref: Membrane.Pad.ref_t(),
options: Membrane.ChildrenSpec.pad_options_t(),
link_id: Membrane.Core.Parent.Link.id(),
endpoint: Membrane.Core.Parent.Link.Endpoint.t(),
endpoint: Membrane.Core.Parent.Link.Endpoint.t() | nil,
linked?: boolean(),
response_received?: boolean(),
spec_ref: Membrane.Core.Parent.ChildLifeController.spec_ref_t(),
Expand Down
8 changes: 8 additions & 0 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ defmodule Membrane.Core.Parent.ChildLifeController do
StructureParser
}

alias Membrane.Pad

require Membrane.Core.Component
require Membrane.Core.Message, as: Message
require Membrane.Logger
Expand Down Expand Up @@ -427,6 +429,12 @@ defmodule Membrane.Core.Parent.ChildLifeController do
Parent.ChildrenModel.update_children!(state, names, &%{&1 | terminating?: true})
end

@spec handle_remove_link(Membrane.Child.name_t(), Pad.ref_t(), Parent.state_t()) ::
Parent.state_t()
def handle_remove_link(child_name, pad_ref, state) do
LinkUtils.remove_link(child_name, pad_ref, state)
end

@doc """
Handles death of a child:
- removes it from state
Expand Down
22 changes: 22 additions & 0 deletions lib/membrane/core/parent/child_life_controller/link_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ defmodule Membrane.Core.Parent.ChildLifeController.LinkUtils do
end)
end

@spec remove_link(Membrane.Child.name_t(), Pad.ref_t(), Parent.state_t()) :: Parent.state_t()
def remove_link(child_name, pad_ref, state) do
Enum.find(state.links, fn {_id, link} ->
[link.from, link.to]
|> Enum.any?(&(&1.child == child_name and &1.pad_ref == pad_ref))
end)
|> case do
{_id, %Link{} = link} ->
for endpoint <- [link.from, link.to] do
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
end

links = Map.delete(state.links, link.id)
Map.put(state, :links, links)

nil ->
raise LinkError, """
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but this child does not have this pad
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but this child does not have this pad
Attempted to unlink pad #{inspect(pad_ref)} of child #{inspect(child_name)}, but no such link exists

We should probably first check if such a child exists to give a better error when it doesn't

"""
end
end

@spec unlink_element(Membrane.Child.name_t(), Parent.state_t()) :: Parent.state_t()
def unlink_element(child_name, state) do
Map.update!(
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
Parent.ChildLifeController.handle_remove_children(children, state)
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
end

@impl CallbackHandler
def handle_action({:start_timer, {id, interval, clock}}, _cb, _params, state) do
TimerController.start_timer(id, interval, clock, state)
Expand Down
8 changes: 7 additions & 1 deletion lib/membrane/pipeline/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Membrane.Pipeline.Action do
callback unless explicitly stated otherwise.
"""

alias Membrane.{Child, ChildrenSpec}
alias Membrane.{Child, ChildrenSpec, Pad}

@typedoc """
Action that sends a message to a child identified by name.
Expand All @@ -29,6 +29,11 @@ defmodule Membrane.Pipeline.Action do
@type remove_child_t ::
{:remove_child, Child.name_t() | [Child.name_t()]}

@typedoc """
Action that removes link, which relates to specified child and pad.
"""
@type remove_link_t :: {:remove_link, {Child.name_t(), Pad.ref_t()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Pipeline.handle_tick/3` callback
every `interval` according to the given `clock`.
Expand Down Expand Up @@ -121,6 +126,7 @@ defmodule Membrane.Pipeline.Action do
notify_child_t
| spec_t
| remove_child_t
| remove_link_t
| start_timer_t
| timer_interval_t
| stop_timer_t
Expand Down
65 changes: 65 additions & 0 deletions test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,28 @@ defmodule Membrane.Integration.LinkingTest do

require Membrane.Pad, as: Pad

defmodule Element do
use Membrane.Endpoint

def_input_pad :input,
availability: :on_request,
accepted_format: _any

def_output_pad :output,
availability: :on_request,
accepted_format: _any

@impl true
def handle_demand(_pad, _size, _unit, _ctx, state) do
{[], state}
end

@impl true
def handle_pad_removed(pad_ref, _ctx, state) do
{[notify_parent: {:handle_pad_removed, pad_ref}], state}
end
end

defmodule Bin do
use Membrane.Bin

Expand Down Expand Up @@ -413,6 +435,49 @@ defmodule Membrane.Integration.LinkingTest do
assert_start_of_stream(pipeline, :sink)
end

test "Parent successfully unlinks children with dynamic pads using :remove_link action" do
structure =
[
child(:source, __MODULE__.Element),
child(:sink, __MODULE__.Element)
] ++
Enum.map(1..10, fn i ->
get_child(:source)
|> via_out(Pad.ref(:output, i))
|> via_in(Pad.ref(:input, i))
|> get_child(:sink)
end)

pipeline = Testing.Pipeline.start_link_supervised!(structure: structure)

for pad_id <- 1..10 do
actions =
if rem(pad_id, 2) == 0,
do: [remove_link: {:source, Pad.ref(:output, pad_id)}],
else: [remove_link: {:sink, Pad.ref(:input, pad_id)}]

Testing.Pipeline.execute_actions(pipeline, actions)

assert_link_removed(pipeline, pad_id)

if pad_id < 10 do
for i <- (pad_id + 1)..10 do
refute_link_removed(pipeline, i)
end
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if pad_id < 10 do
for i <- (pad_id + 1)..10 do
refute_link_removed(pipeline, i)
end
end
for i <- (pad_id + 1)..10//1 do
refute_link_removed(pipeline, i)
end

end
end

defp assert_link_removed(pipeline, id) do
assert_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)})
assert_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)})
end

defp refute_link_removed(pipeline, id) do
refute_pipeline_notified(pipeline, :source, {:handle_pad_removed, Pad.ref(:output, ^id)}, 10)
varsill marked this conversation as resolved.
Show resolved Hide resolved
refute_pipeline_notified(pipeline, :sink, {:handle_pad_removed, Pad.ref(:input, ^id)}, 10)
end

defp get_child_pid(ref, parent_pid) do
state = :sys.get_state(parent_pid)
state.children[ref].pid
Expand Down