Skip to content

Commit

Permalink
Fix bug in Supervisors when pipeline is distributed (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom authored May 18, 2023
1 parent 2456cd1 commit ec655c0
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 46 deletions.
13 changes: 2 additions & 11 deletions lib/membrane/core/parent/child_life_controller/startup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do
log_metadata: log_metadata
}

server_module =
component_module =
case child.component_type do
:element ->
Core.Element
Expand All @@ -220,17 +220,8 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do
Core.Bin
end

start_fun = fn supervisor, parent_supervisor ->
server_module.start(
Map.merge(params, %{
subprocess_supervisor: supervisor,
parent_supervisor: parent_supervisor
})
)
end

with {:ok, child_pid} <-
SubprocessSupervisor.start_component(supervisor, name, start_fun),
SubprocessSupervisor.start_component(supervisor, name, component_module, params),
{:ok, clock} <- receive_clock(name) do
%ChildEntry{
child
Expand Down
37 changes: 28 additions & 9 deletions lib/membrane/core/subprocess_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.Core.SubprocessSupervisor do
use Bunch
use GenServer

alias Membrane.Core
alias Membrane.Core.Observability

require Membrane.Core.Message, as: Message
Expand All @@ -28,14 +29,14 @@ defmodule Membrane.Core.SubprocessSupervisor do
Starts a Membrane component under the supervisor
"""
@spec start_component(
supervisor_pid,
supervisor_pid :: pid(),
name :: Membrane.Child.name(),
(supervisor_pid, parent_supervisor_pid -> {:ok, child_pid} | {:error, reason :: any()})
component_module :: Core.Bin | Core.Element,
options :: map()
) ::
{:ok, child_pid} | {:error, reason :: any()}
when child_pid: pid(), supervisor_pid: pid(), parent_supervisor_pid: pid
def start_component(supervisor, name, start_fun) do
Message.call!(supervisor, :start_component, [name, start_fun])
{:ok, child_pid :: pid()} | {:error, reason :: any()}
def start_component(supervisor, name, component_module, options) do
Message.call!(supervisor, :start_component, [name, component_module, options])
end

@doc """
Expand Down Expand Up @@ -103,10 +104,16 @@ defmodule Membrane.Core.SubprocessSupervisor do
end

@impl true
def handle_call(Message.new(:start_component, [name, start_fun]), _from, state) do
subprocess_supervisor = start_link!()
def handle_call(Message.new(:start_component, [name, component_module, options]), _from, state) do
subprocess_supervisor = start_link_subprocess_supervisor!(options)

with {:ok, child_pid} <- start_fun.(subprocess_supervisor, self()) do
options =
Map.merge(options, %{
subprocess_supervisor: subprocess_supervisor,
parent_supervisor: self()
})

with {:ok, child_pid} <- component_module.start(options) do
state =
state
|> put_in([:children, child_pid], %{
Expand Down Expand Up @@ -211,6 +218,18 @@ defmodule Membrane.Core.SubprocessSupervisor do
end
end

defp start_link_subprocess_supervisor!(component_options) do
case component_options[:node] do
nil ->
start_link!()

node ->
{:ok, pid} = :rpc.call(node, GenServer, :start, [__MODULE__, self()])
Process.link(pid)
pid
end
end

defp handle_exit(%{role: :subprocess_supervisor} = data, reason, state) do
case Map.fetch(state.children, data.child_pid) do
{:ok, child_data} ->
Expand Down
57 changes: 32 additions & 25 deletions test/membrane/integration/distributed_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,44 @@ defmodule Membrane.Integration.DistributedPipelineTest do

import Membrane.Testing.Assertions

alias Membrane.Support.Distributed
alias Membrane.Testing

setup do
hostname = start_nodes()
on_exit(fn -> kill_node(hostname) end)
another_node = start_another_node()
on_exit(fn -> kill_node(another_node) end)
[first_node: node(self()), second_node: another_node]
end

test "if distributed pipeline works properly" do
defmodule Pipeline do
use Membrane.Pipeline
alias Membrane.Support.Distributed.{Sink, Source}

@impl true
def handle_init(_ctx, _opts) do
{[
spec: [
{child(:source, %Source{output: [1, 2, 3, 4, 5]}), node: :"first@127.0.0.1"},
{get_child(:source)
|> via_in(:input, toilet_capacity: 100, throttling_factor: 50)
|> child(:sink, Sink), node: :"second@127.0.0.1"}
]
], %{}}
end
end

pipeline = Membrane.Testing.Pipeline.start_link_supervised!(module: Pipeline)

assert_end_of_stream(pipeline, :sink)
test "if distributed pipeline works properly", context do
pipeline =
Testing.Pipeline.start_link_supervised!(
module: Distributed.Pipeline,
custom_args: context
)

assert_pipeline_notified(pipeline, :sink_bin, :end_of_stream)

assert context.first_node == node(pipeline)

assert context.first_node ==
Testing.Pipeline.get_child_pid!(pipeline, :source)
|> node()

assert context.second_node ==
Testing.Pipeline.get_child_pid!(pipeline, :sink_bin)
|> node()

assert context.second_node ==
Testing.Pipeline.get_child_pid!(pipeline, [:sink_bin, :sink])
|> node()

Testing.Pipeline.terminate(pipeline)
end

defp start_nodes() do
defp start_another_node() do
System.cmd("epmd", ["-daemon"])
{:ok, _pid} = Node.start(:"first@127.0.0.1", :longnames)
_start_result = Node.start(:"first@127.0.0.1", :longnames)
{:ok, _pid, hostname} = :peer.start(%{host: ~c"127.0.0.1", name: :second})
:rpc.block_call(hostname, :code, :add_paths, [:code.get_path()])
hostname
Expand Down
40 changes: 39 additions & 1 deletion test/support/distributed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ defmodule Membrane.Support.Distributed do

defmodule Sink do
@moduledoc false

use Membrane.Sink

def_input_pad :input, flow_control: :manual, accepted_format: _any, demand_unit: :buffers
Expand All @@ -56,4 +55,43 @@ defmodule Membrane.Support.Distributed do
{[demand: {:input, 1}], state}
end
end

defmodule SinkBin do
@moduledoc false
use Membrane.Bin

def_input_pad :input, accepted_format: _any

@impl true
def handle_init(_ctx, _opts) do
spec = bin_input() |> child(:sink, Sink)
{[spec: spec], %{}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
{[notify_parent: :end_of_stream], state}
end
end

defmodule Pipeline do
@moduledoc false
use Membrane.Pipeline

@impl true
def handle_init(_ctx, opts) do
first_node = opts.first_node
second_node = opts.second_node

{[
spec: [
{child(:source, %Source{output: [1, 2, 3, 4, 5]}), node: first_node},
{
get_child(:source) |> child(:sink_bin, SinkBin),
node: second_node
}
]
], %{}}
end
end
end

0 comments on commit ec655c0

Please sign in to comment.