Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_instances option for dynamic pads #876

Merged
merged 7 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Changelog

## 1.2.0
* Add `:max_cardinality` option for dynamic pads. [#876](https://github.com/membraneframework/membrane_core/pull/876)

## 1.1.2
* Remove 'failed to insert a metric' stalker warning [#849](https://github.com/membraneframework/membrane_core/pull/849)
* Remove 'failed to insert a metric' stalker warning. [#849](https://github.com/membraneframework/membrane_core/pull/849)

## 1.1.1
* Fix 'table identifier does not refer to an existing ETS table' error when inserting metrics into the observability ETS. [#835](https://github.com/membraneframework/membrane_core/pull/835)
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/bin/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.Bin.PadData do
- `:name` - see `t:Membrane.Pad.name/0`. Do not mistake with `:ref`
- `:options` - options passed in `Membrane.ChildrenSpec` when linking pad
- `:ref` - see `t:Membrane.Pad.ref/0`
- `max_cardinality` - specyfies maximal possible number of instances of a dynamic pads that can occur within single element. `nil` for pads with `availability: :always`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `max_cardinality` - specyfies maximal possible number of instances of a dynamic pads that can occur within single element. `nil` for pads with `availability: :always`.
- `max_cardinality` - specifies maximal possible number of instances of a dynamic pad that can exist within single element. Equals `nil` for pads with `availability: :always`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would apply similar changes for the option in bin and in the typedoc in Pad module


Other fields in the struct ARE NOT PART OF THE PUBLIC API and should not be
accessed or relied on.
Expand All @@ -23,6 +24,7 @@ defmodule Membrane.Bin.PadData do
availability: Membrane.Pad.availability(),
direction: Membrane.Pad.direction(),
name: Membrane.Pad.name(),
max_cardinality: Membrane.Pad.max_cardinality() | nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about max_instances? :P

spec_ref: private_field,
link_id: private_field,
endpoint: private_field,
Expand All @@ -47,5 +49,5 @@ defmodule Membrane.Bin.PadData do
:linked_in_spec?
]

defstruct @enforce_keys
defstruct @enforce_keys ++ [:max_cardinality]
end
2 changes: 2 additions & 0 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ defmodule Membrane.Core.Bin.PadController do
end

:ok = Child.PadController.validate_pad_direction!(direction, pad_info)
:ok = Child.PadController.validate_pad_cardinality!(pad_name, state)

pad_options = Child.PadController.parse_pad_options!(pad_name, pad_options, state)

state =
Expand Down
22 changes: 21 additions & 1 deletion lib/membrane/core/child/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Membrane.Core.Child.PadController do
@moduledoc false

alias Membrane.Core.Child.{PadModel, PadSpecHandler}
alias Membrane.{LinkError, Pad}
alias Membrane.{LinkError, Pad, PadError}

require Membrane.Core.Child.PadModel

Expand Down Expand Up @@ -37,6 +37,26 @@ defmodule Membrane.Core.Child.PadController do
:ok
end

@spec validate_pad_cardinality!(Pad.name(), state()) :: :ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@spec validate_pad_cardinality!(Pad.name(), state()) :: :ok
@spec validate_pad_cardinality!(Pad.name(), state()) :: :ok | no_return

def validate_pad_cardinality!(pad_name, state) do
with %{max_cardinality: max_cardinality} when is_integer(max_cardinality) <-
state.pads_info[pad_name] do
current_number =
state.pads_data
|> Enum.count(fn {_ref, data} -> data.name == pad_name end)

if max_cardinality <= current_number do
raise PadError, """
Pad #{inspect(pad_name)} can occur only #{max_cardinality} times within a single component, while it \
attempted to occur #{current_number + 1} times. Set `:max_cardinality` option to a different value,
to change this boundary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like this:

Suggested change
Pad #{inspect(pad_name)} can occur only #{max_cardinality} times within a single component, while it \
attempted to occur #{current_number + 1} times. Set `:max_cardinality` option to a different value,
to change this boundary.
Only #{max_cardinality} instances of pad #{inspect(pad_name)} can exist within this component, while an attempt to create #{current_number + 1} instances was made. Set `:max_cardinality` option to a different value,
to change this boundary.

?

"""
end
end

:ok
end

@spec parse_pad_options!(Pad.name(), Membrane.ChildrenSpec.pad_options(), state()) ::
map | no_return
def parse_pad_options!(pad_name, options, state) do
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/child/pad_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ defmodule Membrane.Core.Child.PadModel do
required(:accepted_formats_str) => String.t(),
optional(:flow_control) => Pad.flow_control(),
optional(:demand_unit) => Membrane.Buffer.Metric.unit(),
optional(:other_demand_unit) => Membrane.Buffer.Metric.unit()
optional(:other_demand_unit) => Membrane.Buffer.Metric.unit(),
optional(:max_cardinality) => Pad.max_cardinality()
}

@type pads_info :: %{Pad.name() => pad_info}
Expand Down
10 changes: 10 additions & 0 deletions lib/membrane/core/child/pads_specs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ defmodule Membrane.Core.Child.PadsSpecs do
true ->
nil
end,
max_cardinality: fn config ->
if config[:availability] == :on_request do
[
default: :infinity,
validate: &(&1 == :infinity or (is_integer(&1) and &1 >= 0))
]
else
nil
end
end,
options: [default: nil]
) do
config
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ defmodule Membrane.Core.Element.PadController do
end

:ok = Child.PadController.validate_pad_direction!(direction, pad_info)
:ok = Child.PadController.validate_pad_cardinality!(pad_name, state)

do_handle_link(direction, endpoint, other_endpoint, pad_info, link_props, state)
end

Expand Down
3 changes: 3 additions & 0 deletions lib/membrane/element/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Membrane.Element.PadData do
- `:ref` - see `t:Membrane.Pad.ref/0`
- `:start_of_stream?` - flag determining whether the stream processing via the pad has been started
- `auto_demand_paused?` - flag determining if auto-demanding on the pad is paused or no
- `max_cardinality` - specyfies maximal possible number of instances of a dynamic pads that can occur within single element. `nil` for pads with `availability: :always`.

Other fields in the struct ARE NOT PART OF THE PUBLIC API and should not be
accessed or relied on.
Expand All @@ -33,6 +34,7 @@ defmodule Membrane.Element.PadData do
direction: Pad.direction(),
flow_control: Pad.flow_control(),
name: Pad.name(),
max_cardinality: Pad.max_cardinality() | nil,
ref: Pad.ref(),
options: %{optional(atom) => any},
auto_demand_paused?: boolean(),
Expand Down Expand Up @@ -79,6 +81,7 @@ defmodule Membrane.Element.PadData do

defstruct @enforce_keys ++
[
max_cardinality: nil,
input_queue: nil,
auto_flow_queue: Qex.new(),
demand: 0,
Expand Down
17 changes: 14 additions & 3 deletions lib/membrane/pad.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ defmodule Membrane.Pad do
"""
@type accepted_format :: module() | (pattern :: term())

@typedoc """
Describes maximal number of instances of dynamic pad (`availability: :on_request`) that
can occur within single component.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
can occur within single component.
can occur simultaneously within a single component.

"""
@type max_cardinality :: non_neg_integer() | :infinity

@typedoc """
Describes how a pad should be declared in element or bin.
"""
Expand All @@ -115,7 +121,10 @@ defmodule Membrane.Pad do
"""
@type bin_spec ::
{name(),
availability: availability(), accepted_format: accepted_format(), options: Keyword.t()}
availability: availability(),
accepted_format: accepted_format(),
options: Keyword.t(),
max_cardinality: max_cardinality()}

@typedoc """
Describes how a pad should be declared inside an element.
Expand All @@ -126,7 +135,8 @@ defmodule Membrane.Pad do
accepted_format: accepted_format(),
flow_control: flow_control(),
options: Keyword.t(),
demand_unit: Buffer.Metric.unit()}
demand_unit: Buffer.Metric.unit(),
max_cardinality: max_cardinality()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably not the scope of this PR, but is this element_spec a proper typespec? I think that it doesn't highlight the fact that some of these fields are mandatory, while the others are optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW. - Perhaps it would be good to write somewhere that max_cardinality defaults to :infinity for dynamic pads? I don't think there is a way to tell that without checking the code.


@typedoc """
Type describing a pad. Contains data parsed from `t:spec/0`
Expand All @@ -138,7 +148,8 @@ defmodule Membrane.Pad do
:accepted_formats_str => [String.t()],
optional(:demand_unit) => Buffer.Metric.unit() | nil,
:direction => direction(),
:options => nil | Keyword.t()
:options => nil | Keyword.t(),
optional(:max_cardinality) => max_cardinality()
}

@doc """
Expand Down
44 changes: 44 additions & 0 deletions test/membrane/integration/linking_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -608,4 +608,48 @@ defmodule Membrane.Integration.LinkingTest do
state = :sys.get_state(parent_pid)
state.children[ref].pid
end

describe "link fails when max_cardinality is exceeded in" do
defmodule MaxCardinalityElement do
use Membrane.Sink
def_input_pad :input, accepted_format: _any, availability: :on_request, max_cardinality: 1

@impl true
def handle_buffer(_pad, _buffer, _ctx, state), do: {[], state}
end

defmodule MaxCardinalityBin do
use Membrane.Bin
def_input_pad :input, accepted_format: _any, availability: :on_request, max_cardinality: 1

@impl true
def handle_pad_added(pad_ref, _ctx, state) do
spec = bin_input(pad_ref) |> child({:element, pad_ref}, Membrane.Testing.Sink)
{[spec: spec], state}
end
end

[{MaxCardinalityElement, "element"}, {MaxCardinalityBin, "bin"}]
|> Enum.map(fn {module, component_type} ->
test component_type do
pipeline =
Testing.Pipeline.start_supervised!(
spec: child(:source, Testing.Source) |> child(:sink, unquote(module))
)

ref = Process.monitor(pipeline)
refute_receive {:DOWN, ^ref, _process, _pid, _reason}, 500

Testing.Pipeline.execute_actions(pipeline,
spec: child(:second_source, Testing.Source) |> get_child(:sink)
)

assert_receive {:DOWN, ^ref, _process, _pid,
{:membrane_child_crash, :sink,
{%Membrane.PadError{message: message}, _stacktrace}}}

assert message =~ ~r/max_cardinality/
end
end)
end
end
Loading