Skip to content

Commit

Permalink
Rename lacking_buffers_size to demand
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed May 18, 2023
1 parent e713e52 commit 1a1957a
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 43 deletions.
5 changes: 2 additions & 3 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ defmodule Membrane.Core.Element.BufferController do
@spec do_handle_buffer(Pad.ref(), PadModel.pad_data(), [Buffer.t()] | Buffer.t(), State.t()) ::
State.t()
defp do_handle_buffer(pad_ref, %{flow_control: :auto} = data, buffers, state) do
%{lacking_buffer_size: lacking_buffer_size, demand_unit: demand_unit} = data
%{demand: demand, demand_unit: demand_unit} = data
buf_size = Buffer.Metric.from_unit(demand_unit).buffers_size(buffers)

state =
PadModel.set_data!(state, pad_ref, :lacking_buffer_size, lacking_buffer_size - buf_size)
state = PadModel.set_data!(state, pad_ref, :demand, demand - buf_size)

state = AutoFlowUtils.auto_adjust_demand_counter(pad_ref, state)
exec_buffer_callback(pad_ref, buffers, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp do_auto_adjust_demand_counter(pad_data, state) when is_input_auto_pad_data(pad_data) do
if increase_demand_counter?(pad_data, state) do
diff = pad_data.auto_demand_size - pad_data.lacking_buffer_size
diff = pad_data.auto_demand_size - pad_data.demand
:ok = DemandCounter.increase(pad_data.demand_counter, diff)

PadModel.set_data!(
state,
pad_data.ref,
:lacking_buffer_size,
:demand,
pad_data.auto_demand_size
)
else
Expand All @@ -46,7 +46,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

defp increase_demand_counter?(pad_data, state) do
state.effective_flow_control == :pull and
pad_data.lacking_buffer_size < pad_data.auto_demand_size / 2 and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &demand_counter_positive?(&1, state))
end

Expand Down
16 changes: 8 additions & 8 deletions lib/membrane/core/element/input_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Membrane.Core.Element.InputQueue do
log_tag: String.t(),
target_size: pos_integer(),
size: non_neg_integer(),
lacking_buffer_size: non_neg_integer(),
demand: non_neg_integer(),
inbound_metric: module(),
outbound_metric: module(),
linked_output_ref: Pad.ref(),
Expand All @@ -49,7 +49,7 @@ defmodule Membrane.Core.Element.InputQueue do
:linked_output_ref
]

defstruct @enforce_keys ++ [size: 0, lacking_buffer_size: 0]
defstruct @enforce_keys ++ [size: 0, demand: 0]

@default_target_size_factor 40

Expand Down Expand Up @@ -130,7 +130,7 @@ defmodule Membrane.Core.Element.InputQueue do
%__MODULE__{
q: q,
size: size,
lacking_buffer_size: lacking_buffer_size,
demand: demand,
inbound_metric: inbound_metric,
outbound_metric: outbound_metric
} = input_queue,
Expand All @@ -147,7 +147,7 @@ defmodule Membrane.Core.Element.InputQueue do
input_queue
| q: q |> @qe.push({:buffers, v, inbound_metric_buffer_size, outbound_metric_buffer_size}),
size: size + inbound_metric_buffer_size,
lacking_buffer_size: lacking_buffer_size - inbound_metric_buffer_size
demand: demand - inbound_metric_buffer_size
}
end

Expand Down Expand Up @@ -279,11 +279,11 @@ defmodule Membrane.Core.Element.InputQueue do
size: size,
target_size: target_size,
demand_counter: demand_counter,
lacking_buffer_size: lacking_buffer_size
demand: demand
} = input_queue
)
when target_size > size + lacking_buffer_size do
diff = max(target_size - size - lacking_buffer_size, div(target_size, 2))
when target_size > size + demand do
diff = max(target_size - size - demand, div(target_size, 2))

"""
Increasing DemandCounter linked to #{inspect(input_queue.linked_output_ref)} by #{inspect(diff)}
Expand All @@ -292,7 +292,7 @@ defmodule Membrane.Core.Element.InputQueue do
|> Membrane.Logger.debug_verbose()

:ok = DemandCounter.increase(demand_counter, diff)
%{input_queue | lacking_buffer_size: lacking_buffer_size + diff}
%{input_queue | demand: demand + diff}
end

defp maybe_increase_demand_counter(%__MODULE__{} = input_queue), do: input_queue
Expand Down
1 change: 0 additions & 1 deletion lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Membrane.Core.Element.PadController do
# Module handling linking and unlinking pads.

use Bunch
alias Membrane.Core.Element.PadController
alias Membrane.{LinkError, Pad}
alias Membrane.Core.{CallbackHandler, Child, Events, Message, Observability}
alias Membrane.Core.Child.PadModel
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/element/pad_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule Membrane.Element.PadData do
# Field used in DemandController.AutoFlowUtils and InputQueue, to caluclate, how much DemandCounter should be increased.
# Contains amount of data (:buffers/:bytes), that has been demanded from the element on the other side of link, but
# hasn't arrived yet. Unused for output pads.
lacking_buffer_size: private_field,
demand: private_field,
manual_demand_size: private_field,
associated_pads: private_field,
sticky_events: private_field,
Expand Down Expand Up @@ -86,7 +86,7 @@ defmodule Membrane.Element.PadData do
auto_demand_size: nil,
sticky_messages: [],
demand_counter: nil,
lacking_buffer_size: 0,
demand: 0,
manual_demand_size: 0,
associated_pads: [],
sticky_events: [],
Expand Down
52 changes: 26 additions & 26 deletions test/membrane/core/element/input_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
outbound_metric: context.expected_metric,
linked_output_ref: context.linked_output_ref,
size: 0,
lacking_buffer_size: context.target_queue_size
demand: context.target_queue_size
}

assert context.target_queue_size == DemandCounter.get(context.demand_counter)
Expand Down Expand Up @@ -87,49 +87,49 @@ defmodule Membrane.Core.Element.InputQueueTest do
setup do
{:ok,
%{
lacking_buffer_size: 30,
demand: 30,
size: 10,
q: Qex.new() |> Qex.push({:buffers, [], 3, 3}),
payload: <<1, 2, 3>>
}}
end

test "updated properly `size` and `lacking_buffer_size` when `:metric` is `Buffer.Metric.Count`",
test "updated properly `size` and `demand` when `:metric` is `Buffer.Metric.Count`",
context do
input_queue =
struct(InputQueue,
size: context.size,
inbound_metric: Buffer.Metric.Count,
outbound_metric: Buffer.Metric.Count,
q: context.q,
lacking_buffer_size: context.lacking_buffer_size
demand: context.demand
)

v = [%Buffer{payload: context.payload}]
updated_input_queue = InputQueue.store(input_queue, :buffers, v)

assert updated_input_queue.size == context.size + 1
assert updated_input_queue.lacking_buffer_size == context.lacking_buffer_size - 1
assert updated_input_queue.demand == context.demand - 1
end

test "updated properly `size` and `lacking_buffer_size` when `:metric` is `Buffer.Metric.ByteSize`",
test "updated properly `size` and `demand` when `:metric` is `Buffer.Metric.ByteSize`",
context do
input_queue =
struct(InputQueue,
size: context.size,
inbound_metric: Buffer.Metric.ByteSize,
outbound_metric: Buffer.Metric.ByteSize,
q: context.q,
lacking_buffer_size: context.lacking_buffer_size
demand: context.demand
)

v = [%Buffer{payload: context.payload}]
updated_input_queue = InputQueue.store(input_queue, :buffers, v)

assert updated_input_queue.size == context.size + byte_size(context.payload)

assert updated_input_queue.lacking_buffer_size ==
context.lacking_buffer_size - byte_size(context.payload)
assert updated_input_queue.demand ==
context.demand - byte_size(context.payload)
end

test "append buffer to the queue", context do
Expand All @@ -139,7 +139,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
inbound_metric: Buffer.Metric.ByteSize,
outbound_metric: Buffer.Metric.ByteSize,
q: context.q,
lacking_buffer_size: context.lacking_buffer_size
demand: context.demand
)

v = [%Buffer{payload: context.payload}]
Expand All @@ -156,7 +156,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
inbound_metric: Buffer.Metric.ByteSize,
outbound_metric: Buffer.Metric.ByteSize,
q: context.q,
lacking_buffer_size: context.lacking_buffer_size
demand: context.demand
)

v = %Event{}
Expand All @@ -173,7 +173,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
inbound_metric: Buffer.Metric.ByteSize,
outbound_metric: Buffer.Metric.ByteSize,
q: context.q,
lacking_buffer_size: context.lacking_buffer_size
demand: context.demand
)

v = %Event{}
Expand Down Expand Up @@ -204,9 +204,9 @@ defmodule Membrane.Core.Element.InputQueueTest do

test "return {:empty, []} when the queue is empty", %{input_queue: input_queue} do
old_counter_value = DemandCounter.get(input_queue.demand_counter)
old_lacking_buffer_size = input_queue.lacking_buffer_size
old_demand = input_queue.demand

assert {{:empty, []}, %InputQueue{size: 0, lacking_buffer_size: ^old_lacking_buffer_size}} =
assert {{:empty, []}, %InputQueue{size: 0, demand: ^old_demand}} =
InputQueue.take(input_queue, 1)

assert old_counter_value == DemandCounter.get(input_queue.demand_counter)
Expand All @@ -219,7 +219,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
|> InputQueue.take(1)

assert new_input_queue.size == 9
assert new_input_queue.lacking_buffer_size >= 31
assert new_input_queue.demand >= 31
assert DemandCounter.get(new_input_queue.demand_counter) >= 41
end
end
Expand All @@ -239,7 +239,7 @@ defmodule Membrane.Core.Element.InputQueueTest do
input_queue =
struct(InputQueue,
size: size,
lacking_buffer_size: 94,
demand: 94,
target_size: 100,
inbound_metric: Buffer.Metric.Count,
outbound_metric: Buffer.Metric.Count,
Expand All @@ -265,12 +265,12 @@ defmodule Membrane.Core.Element.InputQueueTest do

test "increase DemandCounter hen there are not enough buffers", context do
old_counter_value = DemandCounter.get(context.input_queue.demand_counter)
old_lacking_buffer_size = context.input_queue.lacking_buffer_size
old_demand = context.input_queue.demand

{_output, input_queue} = InputQueue.take(context.input_queue, 10)

assert old_counter_value < DemandCounter.get(input_queue.demand_counter)
assert old_lacking_buffer_size < input_queue.lacking_buffer_size
assert old_demand < input_queue.demand
end

test "return `to_take` buffers from the queue when there are enough buffers and buffers don't have to be split",
Expand Down Expand Up @@ -314,27 +314,27 @@ defmodule Membrane.Core.Element.InputQueueTest do
})

assert_receive Message.new(:demand_counter_increased, :output_pad_ref)
assert queue.lacking_buffer_size == 10
assert queue.demand == 10
queue = InputQueue.store(queue, [%Buffer{payload: "1234"}])
assert queue.size == 4
queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}])
queue = InputQueue.store(queue, [%Buffer{payload: "12"}])
queue = InputQueue.store(queue, [%Buffer{payload: "12"}])
queue = Map.update!(queue, :demand_counter, &DemandCounter.decrease(&1, 16))
assert queue.size == 16
assert queue.lacking_buffer_size == -6
assert queue.demand == -6
{out, queue} = InputQueue.take(queue, 2)
assert bufs_size(out, :buffers) == 2
assert queue.size == 4
assert queue.lacking_buffer_size >= 6
assert queue.demand >= 6
assert_receive Message.new(:demand_counter_increased, :output_pad_ref)

queue = InputQueue.store(queue, [%Buffer{payload: "12"}])
queue = InputQueue.store(queue, [%Buffer{payload: "1234"}])
{out, queue} = InputQueue.take(queue, 1)
assert bufs_size(out, :buffers) == 1
assert queue.size == 8
assert queue.lacking_buffer_size >= 2
assert queue.demand >= 2
end

test "if the queue works properly for :buffers input metric and :bytes output metric" do
Expand All @@ -351,24 +351,24 @@ defmodule Membrane.Core.Element.InputQueueTest do
})

assert_receive Message.new(:demand_counter_increased, :output_pad_ref)
assert queue.lacking_buffer_size == 3
assert queue.demand == 3
queue = InputQueue.store(queue, [%Buffer{payload: "1234"}])
assert queue.size == 1
queue = InputQueue.store(queue, [%Buffer{payload: "12345678"}])
queue = InputQueue.store(queue, [%Buffer{payload: "12"}])
queue = InputQueue.store(queue, [%Buffer{payload: "12"}])
queue = Map.update!(queue, :demand_counter, &DemandCounter.decrease(&1, 4))
assert queue.size == 4
assert queue.lacking_buffer_size == -1
assert queue.demand == -1
{out, queue} = InputQueue.take(queue, 2)
assert bufs_size(out, :bytes) == 2
assert queue.size == 4
assert queue.lacking_buffer_size == -1
assert queue.demand == -1
refute_receive Message.new(:demand_counter_increased, :output_pad_ref)
{out, queue} = InputQueue.take(queue, 11)
assert bufs_size(out, :bytes) == 11
assert queue.size == 2
assert queue.lacking_buffer_size == 1
assert queue.demand == 1
assert_receive Message.new(:demand_counter_increased, :output_pad_ref)
end

Expand Down

0 comments on commit 1a1957a

Please sign in to comment.