Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn committed Aug 22, 2024
1 parent e71b3e5 commit 8173dc1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 72 deletions.
37 changes: 25 additions & 12 deletions test/ex_webrtc/rtp/jitter_buffer/realtime_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ defmodule ExWebRTC.RTP.JitterBuffer.RealtimeTest do

@impl true
def init(state) do
:ok = JitterBuffer.start_timer(state.buffer)
{:ok, state, {:continue, :after_init}}
end

Expand Down Expand Up @@ -51,8 +50,26 @@ defmodule ExWebRTC.RTP.JitterBuffer.RealtimeTest do

@impl true
def handle_info({:push_packet, n}, %{buffer: buffer} = state) do
:ok = JitterBuffer.place_packet(buffer, PacketFactory.sample_packet(n))
{:noreply, state}
buffer
|> JitterBuffer.place_packet(PacketFactory.sample_packet(n))
|> handle_jitter_buffer_result(state)
end

@impl true
def handle_info(:jitter_buffer_timer, %{buffer: buffer} = state) do
buffer
|> JitterBuffer.handle_timer()
|> handle_jitter_buffer_result(state)
end

defp handle_jitter_buffer_result({buffer, packets, timer}, state) do
for packet <- packets do
send(state.owner, packet)
end

unless is_nil(timer), do: Process.send_after(self(), :jitter_buffer_timer, timer)

{:noreply, %{state | buffer: buffer}}
end
end

Expand All @@ -71,11 +88,10 @@ defmodule ExWebRTC.RTP.JitterBuffer.RealtimeTest do
end

defp test_pipeline(packets, packet_delay_ms, latency_ms) do
{:ok, buffer} = JitterBuffer.start_link(latency: latency_ms)

{:ok, _pid} =
GenServer.start_link(PacketSource, %{
buffer: buffer,
owner: self(),
buffer: JitterBuffer.new(latency: latency_ms),
packet_num: packets,
packet_delay_ms: packet_delay_ms,
max_latency: latency_ms
Expand All @@ -88,16 +104,13 @@ defmodule ExWebRTC.RTP.JitterBuffer.RealtimeTest do

cond do
rem(n, 50) >= 30 and rem(n, 50) <= 32 ->
refute_receive {:jitter_buffer, _pid, {:packet, %Packet{sequence_number: ^seq_num}}},
timeout
refute_receive %Packet{sequence_number: ^seq_num}, timeout

rem(n, 19) == 0 and rem(n, 15) != 0 ->
refute_receive {:jitter_buffer, _pid, {:packet, %Packet{sequence_number: ^seq_num}}},
timeout
refute_receive %Packet{sequence_number: ^seq_num}, timeout

true ->
assert_receive {:jitter_buffer, _pid, {:packet, %Packet{sequence_number: ^seq_num}}},
timeout
assert_receive %Packet{sequence_number: ^seq_num}, timeout
end
end)
end
Expand Down
104 changes: 44 additions & 60 deletions test/ex_webrtc/rtp/jitter_buffer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,138 +6,122 @@ defmodule ExWebRTC.RTP.JitterBufferTest do
alias ExWebRTC.RTP.{JitterBuffer, PacketFactory}

@base_seq_number PacketFactory.base_seq_number()
@buffer_latency_ms 10

setup do
packet = PacketFactory.sample_packet(@base_seq_number)

{:ok, state} = JitterBuffer.init(controlling_process: self(), latency: 10)
state = %{state | waiting?: false}
buffer = JitterBuffer.new(latency: @buffer_latency_ms)
buffer = %{buffer | state: :timer_not_set}

[state: state, packet: packet]
[buffer: buffer, packet: packet]
end

describe "When JitterBuffer is in waiting state" do
setup %{state: state} do
[state: %{state | waiting?: true}]
describe "When JitterBuffer is in initial_wait state" do
setup do
[buffer: JitterBuffer.new(latency: @buffer_latency_ms)]
end

test "start of stream starts timer that changes state", %{state: state} do
{:reply, :ok, state} = JitterBuffer.handle_call(:start_timer, nil, state)
assert_receive message, state.latency + 5
{:noreply, final_state} = JitterBuffer.handle_info(message, state)
assert final_state.waiting? == false
test "first packet starts timer that changes state", %{buffer: buffer, packet: packet} do
assert buffer.state == :initial_wait
{buffer, [], timer} = JitterBuffer.place_packet(buffer, packet)
assert timer == buffer.latency
{buffer, _packets, _timer} = JitterBuffer.handle_timer(buffer)
assert buffer.state != :initial_wait
end

test "any new packet is kept", %{state: state, packet: packet} do
assert PacketStore.dump(state.store) == []
{:noreply, state} = JitterBuffer.handle_cast({:packet, packet}, state)
test "any new packet is kept", %{buffer: buffer, packet: packet} do
assert PacketStore.dump(buffer.store) == []
{buffer, [], _timer} = JitterBuffer.place_packet(buffer, packet)

%{store: store} = state
%{store: store} = buffer
{%Record{packet: ^packet}, new_store} = PacketStore.flush_one(store)
assert PacketStore.dump(new_store) == []

refute_receive {:jitter_buffer, _pid, {:packet, ^packet}}
end
end

describe "When new packet arrives when not waiting and already pushed some packet" do
setup %{state: state} do
setup %{buffer: buffer} do
flush_index = @base_seq_number - 1
store = %{state.store | flush_index: flush_index, highest_incoming_index: flush_index}
[state: %{state | waiting?: false, store: store}]
store = %{buffer.store | flush_index: flush_index, highest_incoming_index: flush_index}
[buffer: %{buffer | state: :timer_not_set, store: store}]
end

test "outputs it immediately if it is in order", %{state: state, packet: packet} do
{:noreply, state} = JitterBuffer.handle_cast({:packet, packet}, state)

assert_receive {:jitter_buffer, _pid, {:packet, ^packet}}
test "outputs it immediately if it is in order", %{buffer: buffer, packet: packet} do
{buffer, [^packet], _timer} = JitterBuffer.place_packet(buffer, packet)

%{store: store} = state
%{store: store} = buffer
assert PacketStore.dump(store) == []
end

test "refuses to add that packet when it comes too late", %{state: state} do
test "refuses to add that packet when it comes too late", %{buffer: buffer} do
late_packet = PacketFactory.sample_packet(@base_seq_number - 2)
{:noreply, new_state} = JitterBuffer.handle_cast({:packet, late_packet}, state)
assert new_state == state
refute_receive {:jitter_buffer, _pid, {:packet, ^late_packet}}
{new_buffer, [], nil} = JitterBuffer.place_packet(buffer, late_packet)
assert new_buffer == buffer
end

test "adds it and when it fills the gap, returns all packets in order", %{state: state} do
test "adds it and when it fills the gap, returns all packets in order", %{buffer: buffer} do
first_packet = PacketFactory.sample_packet(@base_seq_number)
second_packet = PacketFactory.sample_packet(@base_seq_number + 1)
third_packet = PacketFactory.sample_packet(@base_seq_number + 2)

flush_index = @base_seq_number - 1

store = %PacketStore{
state.store
buffer.store
| flush_index: flush_index,
highest_incoming_index: flush_index
}

{:ok, store} = PacketStore.insert_packet(store, second_packet)
{:ok, store} = PacketStore.insert_packet(store, third_packet)

state = %{state | store: store}
buffer = %{buffer | store: store}

{:noreply, %{store: result_store}} =
JitterBuffer.handle_cast({:packet, first_packet}, state)
{%{store: result_store}, packets, _timer} = JitterBuffer.place_packet(buffer, first_packet)

for packet <- [first_packet, second_packet, third_packet] do
receive do
msg ->
assert {:jitter_buffer, _pid, {:packet, ^packet}} = msg
end
end
assert packets == [first_packet, second_packet, third_packet]

assert PacketStore.dump(result_store) == []
refute_receive {:jitter_buffer, _pid, {:packet, _packet}}
end
end

describe "When latency passes without filling the gap, JitterBuffer" do
test "outputs discontinuity and late packet", %{state: state, packet: packet} do
test "outputs the late packet", %{buffer: buffer, packet: packet} do
flush_index = @base_seq_number - 2

store = %PacketStore{
state.store
buffer.store
| flush_index: flush_index,
highest_incoming_index: flush_index
}

state = %{state | store: store, waiting?: false}

{:noreply, state} = JitterBuffer.handle_cast({:packet, packet}, state)
refute_received {:jitter_buffer, _pid, {:packet, ^packet}}

assert is_reference(state.max_latency_timer)
buffer = %{buffer | store: store, state: :timer_not_set}

receive do
msg ->
{:noreply, _state} = JitterBuffer.handle_info(msg, state)
end
{buffer, [], timer} = JitterBuffer.place_packet(buffer, packet)
assert timer != nil
assert buffer.state == :timer_set

assert_receive {:jitter_buffer, _pid, {:packet, ^packet}}
Process.sleep(buffer.latency + 5)
{_buffer, [^packet], _timer} = JitterBuffer.handle_timer(buffer)
end
end

describe "When asked to flush, JitterBuffer" do
test "dumps store and resets itself", %{state: state, packet: packet} do
test "dumps store and resets itself", %{buffer: buffer, packet: packet} do
flush_index = @base_seq_number - 2

store = %PacketStore{
state.store
buffer.store
| flush_index: flush_index,
highest_incoming_index: flush_index
}

{:ok, store} = PacketStore.insert_packet(store, packet)
state = %{state | store: store}
{:reply, :ok, state} = JitterBuffer.handle_call(:flush, nil, state)
buffer = %{buffer | store: store}
{buffer, [^packet], nil} = JitterBuffer.flush(buffer)

assert_receive {:jitter_buffer, _pid, {:packet, ^packet}}
assert state.store == %PacketStore{}
assert buffer.store == %PacketStore{}
end
end
end

0 comments on commit 8173dc1

Please sign in to comment.