diff --git a/lib/membrane/core/element/buffer_controller.ex b/lib/membrane/core/element/buffer_controller.ex index 15f32561e..6945c2d16 100644 --- a/lib/membrane/core/element/buffer_controller.ex +++ b/lib/membrane/core/element/buffer_controller.ex @@ -57,11 +57,11 @@ defmodule Membrane.Core.Element.BufferController do @spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t() defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do - %{lacking_buffer_size: lacking_buffer_size, demand_unit: demand_unit} = data + %{demand: demand, demand_unit: demand_unit} = data buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers) state = - PadModel.set_data!(state, pad_ref, :lacking_buffer_size, lacking_buffer_size - buf_size) + PadModel.set_data!(state, pad_ref, :demand, demand - buf_size) state = AutoFlowUtils.auto_adjust_demand_counter(pad_ref, state) exec_buffer_callback(pad_ref, buffers, state) diff --git a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex index c41183b7c..21d4ce90b 100644 --- a/lib/membrane/core/element/demand_controller/auto_flow_utils.ex +++ b/lib/membrane/core/element/demand_controller/auto_flow_utils.ex @@ -26,13 +26,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do defp do_auto_adjust_demand_counter(pad_data, state) when is_input_auto_pad_data(pad_data) do if increase_demand_counter?(pad_data, state) do - diff = pad_data.auto_demand_size - pad_data.lacking_buffer_size + diff = pad_data.auto_demand_size - pad_data.demand :ok = DemandCounter.increase(pad_data.demand_counter, diff) PadModel.set_data!( state, pad_data.ref, - :lacking_buffer_size, + :demand, pad_data.auto_demand_size ) else @@ -46,7 +46,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do defp increase_demand_counter?(pad_data, state) do state.effective_flow_control == :pull and - pad_data.lacking_buffer_size < pad_data.auto_demand_size / 2 and + pad_data.demand < pad_data.auto_demand_size / 2 and Enum.all?(pad_data.associated_pads, &demand_counter_positive?(&1, state)) end diff --git a/lib/membrane/core/element/input_queue.ex b/lib/membrane/core/element/input_queue.ex index 334bbcae1..5d3e8e37e 100644 --- a/lib/membrane/core/element/input_queue.ex +++ b/lib/membrane/core/element/input_queue.ex @@ -32,7 +32,7 @@ defmodule Membrane.Core.Element.InputQueue do log_tag: String.t(), target_size: pos_integer(), size: non_neg_integer(), - lacking_buffer_size: non_neg_integer(), + demand: non_neg_integer(), inbound_metric: module(), outbound_metric: module(), linked_output_ref: Pad.ref(), @@ -49,7 +49,7 @@ defmodule Membrane.Core.Element.InputQueue do :linked_output_ref ] - defstruct @enforce_keys ++ [size: 0, lacking_buffer_size: 0] + defstruct @enforce_keys ++ [size: 0, demand: 0] @default_target_size_factor 40 @@ -130,7 +130,7 @@ defmodule Membrane.Core.Element.InputQueue do %__MODULE__{ q: q, size: size, - lacking_buffer_size: lacking_buffer_size, + demand: demand, inbound_metric: inbound_metric, outbound_metric: outbound_metric } = input_queue, @@ -147,7 +147,7 @@ defmodule Membrane.Core.Element.InputQueue do input_queue | q: q |> @qe.push({:buffers, v, inbound_metric_buffer_size, outbound_metric_buffer_size}), size: size + inbound_metric_buffer_size, - lacking_buffer_size: lacking_buffer_size - inbound_metric_buffer_size + demand: demand - inbound_metric_buffer_size } end @@ -279,11 +279,11 @@ defmodule Membrane.Core.Element.InputQueue do size: size, target_size: target_size, demand_counter: demand_counter, - lacking_buffer_size: lacking_buffer_size + demand: demand } = input_queue ) - when target_size > size + lacking_buffer_size do - diff = max(target_size - size - lacking_buffer_size, div(target_size, 2)) + when target_size > size + demand do + diff = max(target_size - size - demand, div(target_size, 2)) """ Increasing DemandCounter linked to #{inspect(input_queue.linked_output_ref)} by #{inspect(diff)} @@ -292,7 +292,7 @@ defmodule Membrane.Core.Element.InputQueue do |> Membrane.Logger.debug_verbose() :ok = DemandCounter.increase(demand_counter, diff) - %{input_queue | lacking_buffer_size: lacking_buffer_size + diff} + %{input_queue | demand: demand + diff} end defp maybe_increase_demand_counter(%__MODULE__{} = input_queue), do: input_queue diff --git a/lib/membrane/core/element/pad_controller.ex b/lib/membrane/core/element/pad_controller.ex index 3ec94a7b2..f9c702b66 100644 --- a/lib/membrane/core/element/pad_controller.ex +++ b/lib/membrane/core/element/pad_controller.ex @@ -4,7 +4,6 @@ defmodule Membrane.Core.Element.PadController do # Module handling linking and unlinking pads. use Bunch - alias Membrane.Core.Element.PadController alias Membrane.{LinkError, Pad} alias Membrane.Core.{CallbackHandler, Child, Events, Message, Observability} alias Membrane.Core.Child.PadModel diff --git a/lib/membrane/element/pad_data.ex b/lib/membrane/element/pad_data.ex index 4f82e80c3..19452e449 100644 --- a/lib/membrane/element/pad_data.ex +++ b/lib/membrane/element/pad_data.ex @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do # Field used in DemandController.AutoFlowUtils and InputQueue, to caluclate, how much DemandCounter should be increased. # Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but # hasn't arrived yet. Unused for output pads. - lacking_buffer_size: private_field, + demand: private_field, manual_demand_size: private_field, associated_pads: private_field, sticky_events: private_field, @@ -86,7 +86,7 @@ defmodule Membrane.Element.PadData do auto_demand_size: nil, sticky_messages: [], demand_counter: nil, - lacking_buffer_size: 0, + demand: 0, manual_demand_size: 0, associated_pads: [], sticky_events: [], diff --git a/test/membrane/core/element/input_queue_test.exs b/test/membrane/core/element/input_queue_test.exs index e250775da..e0480860d 100644 --- a/test/membrane/core/element/input_queue_test.exs +++ b/test/membrane/core/element/input_queue_test.exs @@ -41,7 +41,7 @@ defmodule Membrane.Core.Element.InputQueueTest do outbound_metric: context.expected_metric, linked_output_ref: context.linked_output_ref, size: 0, - lacking_buffer_size: context.target_queue_size + demand: context.target_queue_size } assert context.target_queue_size == DemandCounter.get(context.demand_counter) @@ -87,14 +87,14 @@ defmodule Membrane.Core.Element.InputQueueTest do setup do {:ok, %{ - lacking_buffer_size: 30, + demand: 30, size: 10, q: Qex.new() |> Qex.push({:buffers, [], 3, 3}), payload: <<1, 2, 3>> }} end - test "updated properly `size` and `lacking_buffer_size` when `:metric` is `Buffer.Metric.Count`", + test "updated properly `size` and `demand` when `:metric` is `Buffer.Metric.Count`", context do input_queue = struct(InputQueue, @@ -102,17 +102,17 @@ defmodule Membrane.Core.Element.InputQueueTest do inbound_metric: Buffer.Metric.Count, outbound_metric: Buffer.Metric.Count, q: context.q, - lacking_buffer_size: context.lacking_buffer_size + demand: context.demand ) v = [%Buffer{payload: context.payload}] updated_input_queue = InputQueue.store(input_queue, :buffers, v) assert updated_input_queue.size == context.size + 1 - assert updated_input_queue.lacking_buffer_size == context.lacking_buffer_size - 1 + assert updated_input_queue.demand == context.demand - 1 end - test "updated properly `size` and `lacking_buffer_size` when `:metric` is `Buffer.Metric.ByteSize`", + test "updated properly `size` and `demand` when `:metric` is `Buffer.Metric.ByteSize`", context do input_queue = struct(InputQueue, @@ -120,7 +120,7 @@ defmodule Membrane.Core.Element.InputQueueTest do inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, q: context.q, - lacking_buffer_size: context.lacking_buffer_size + demand: context.demand ) v = [%Buffer{payload: context.payload}] @@ -128,8 +128,8 @@ defmodule Membrane.Core.Element.InputQueueTest do assert updated_input_queue.size == context.size + byte_size(context.payload) - assert updated_input_queue.lacking_buffer_size == - context.lacking_buffer_size - byte_size(context.payload) + assert updated_input_queue.demand == + context.demand - byte_size(context.payload) end test "append buffer to the queue", context do @@ -139,7 +139,7 @@ defmodule Membrane.Core.Element.InputQueueTest do inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, q: context.q, - lacking_buffer_size: context.lacking_buffer_size + demand: context.demand ) v = [%Buffer{payload: context.payload}] @@ -156,7 +156,7 @@ defmodule Membrane.Core.Element.InputQueueTest do inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, q: context.q, - lacking_buffer_size: context.lacking_buffer_size + demand: context.demand ) v = %Event{} @@ -173,7 +173,7 @@ defmodule Membrane.Core.Element.InputQueueTest do inbound_metric: Buffer.Metric.ByteSize, outbound_metric: Buffer.Metric.ByteSize, q: context.q, - lacking_buffer_size: context.lacking_buffer_size + demand: context.demand ) v = %Event{} @@ -204,9 +204,9 @@ defmodule Membrane.Core.Element.InputQueueTest do test "return {:empty, []} when the queue is empty", %{input_queue: input_queue} do old_counter_value = DemandCounter.get(input_queue.demand_counter) - old_lacking_buffer_size = input_queue.lacking_buffer_size + old_demand = input_queue.demand - assert {{:empty, []}, %InputQueue{size: 0, lacking_buffer_size: ^old_lacking_buffer_size}} = + assert {{:empty, []}, %InputQueue{size: 0, demand: ^old_demand}} = InputQueue.take(input_queue, 1) assert old_counter_value == DemandCounter.get(input_queue.demand_counter) @@ -219,7 +219,7 @@ defmodule Membrane.Core.Element.InputQueueTest do |> InputQueue.take(1) assert new_input_queue.size == 9 - assert new_input_queue.lacking_buffer_size >= 31 + assert new_input_queue.demand >= 31 assert DemandCounter.get(new_input_queue.demand_counter) >= 41 end end @@ -239,7 +239,7 @@ defmodule Membrane.Core.Element.InputQueueTest do input_queue = struct(InputQueue, size: size, - lacking_buffer_size: 94, + demand: 94, target_size: 100, inbound_metric: Buffer.Metric.Count, outbound_metric: Buffer.Metric.Count, @@ -265,12 +265,12 @@ defmodule Membrane.Core.Element.InputQueueTest do test "increase DemandCounter hen there are not enough buffers", context do old_counter_value = DemandCounter.get(context.input_queue.demand_counter) - old_lacking_buffer_size = context.input_queue.lacking_buffer_size + old_demand = context.input_queue.demand {_output, input_queue} = InputQueue.take(context.input_queue, 10) assert old_counter_value < DemandCounter.get(input_queue.demand_counter) - assert old_lacking_buffer_size < input_queue.lacking_buffer_size + assert old_demand < input_queue.demand end test "return `to_take` buffers from the queue when there are enough buffers and buffers don't have to be split", @@ -314,7 +314,7 @@ defmodule Membrane.Core.Element.InputQueueTest do }) assert_receive Message.new(:demand_counter_increased, :output_pad_ref) - assert queue.lacking_buffer_size == 10 + assert queue.demand == 10 queue = InputQueue.store(queue, [%Buffer{payload: "1234"}]) assert queue.size == 4 queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) @@ -322,11 +322,11 @@ defmodule Membrane.Core.Element.InputQueueTest do queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = Map.update!(queue, :demand_counter, &DemandCounter.decrease(&1, 16)) assert queue.size == 16 - assert queue.lacking_buffer_size == -6 + assert queue.demand == -6 {out, queue} = InputQueue.take(queue, 2) assert bufs_size(out, :buffers) == 2 assert queue.size == 4 - assert queue.lacking_buffer_size >= 6 + assert queue.demand >= 6 assert_receive Message.new(:demand_counter_increased, :output_pad_ref) queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) @@ -334,7 +334,7 @@ defmodule Membrane.Core.Element.InputQueueTest do {out, queue} = InputQueue.take(queue, 1) assert bufs_size(out, :buffers) == 1 assert queue.size == 8 - assert queue.lacking_buffer_size >= 2 + assert queue.demand >= 2 end test "if the queue works properly for :buffers input metric and :bytes output metric" do @@ -351,7 +351,7 @@ defmodule Membrane.Core.Element.InputQueueTest do }) assert_receive Message.new(:demand_counter_increased, :output_pad_ref) - assert queue.lacking_buffer_size == 3 + assert queue.demand == 3 queue = InputQueue.store(queue, [%Buffer{payload: "1234"}]) assert queue.size == 1 queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}]) @@ -359,16 +359,16 @@ defmodule Membrane.Core.Element.InputQueueTest do queue = InputQueue.store(queue, [%Buffer{payload: "12"}]) queue = Map.update!(queue, :demand_counter, &DemandCounter.decrease(&1, 4)) assert queue.size == 4 - assert queue.lacking_buffer_size == -1 + assert queue.demand == -1 {out, queue} = InputQueue.take(queue, 2) assert bufs_size(out, :bytes) == 2 assert queue.size == 4 - assert queue.lacking_buffer_size == -1 + assert queue.demand == -1 refute_receive Message.new(:demand_counter_increased, :output_pad_ref) {out, queue} = InputQueue.take(queue, 11) assert bufs_size(out, :bytes) == 11 assert queue.size == 2 - assert queue.lacking_buffer_size == 1 + assert queue.demand == 1 assert_receive Message.new(:demand_counter_increased, :output_pad_ref) end