Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add behaviour and dynamic dispatch for (de)payloaders #147

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions examples/save_to_file/lib/save_to_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ defmodule SaveToFile.PeerHandler do
video_writer: nil,
video_depayloader: nil,
audio_writer: nil,
audio_depayloader: nil,
frames_cnt: 0
}

Expand Down Expand Up @@ -154,7 +155,13 @@ defmodule SaveToFile.PeerHandler do
# by default uses 1 mono channel and 48k clock rate
{:ok, audio_writer} = Ogg.Writer.open(@audio_file)

state = %{state | audio_writer: audio_writer, audio_track_id: id}
state = %{
state
| audio_depayloader: Opus.Depayloader.new(),
audio_writer: audio_writer,
audio_track_id: id
}

{:ok, state}
end

Expand All @@ -166,11 +173,11 @@ defmodule SaveToFile.PeerHandler do

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do
state =
case VP8.Depayloader.write(state.video_depayloader, packet) do
{:ok, video_depayloader} ->
case VP8.Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{:ok, vp8_frame, video_depayloader} ->
{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

Expand All @@ -186,10 +193,10 @@ defmodule SaveToFile.PeerHandler do
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
opus_packet = Opus.Depayloader.depayload(packet)
{opus_packet, depayloader} = Opus.Depayloader.depayload(state.audio_depayloader, packet)
{:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet)

{:ok, %{state | audio_writer: audio_writer}}
{:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
Expand Down
5 changes: 4 additions & 1 deletion examples/send_from_file/lib/send_from_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule SendFromFile.PeerHandler do
video_payloader = VP8.Payloader.new(800)

{:ok, audio_reader} = Ogg.Reader.open(@audio_file)
audio_payloader = Opus.Payloader.new()

state = %{
peer_connection: pc,
Expand All @@ -71,6 +72,7 @@ defmodule SendFromFile.PeerHandler do
video_reader: video_reader,
video_payloader: video_payloader,
audio_reader: audio_reader,
audio_payloader: audio_payloader,
next_video_timestamp: Enum.random(0..@max_rtp_timestamp),
next_audio_timestamp: Enum.random(0..@max_rtp_timestamp),
next_video_sequence_number: Enum.random(0..@max_rtp_seq_no),
Expand Down Expand Up @@ -158,7 +160,7 @@ defmodule SendFromFile.PeerHandler do
# and time spent on reading and parsing the file
Process.send_after(self(), :send_audio, duration)

rtp_packet = Opus.Payloader.payload(packet)
{[rtp_packet], payloader} = Opus.Payloader.payload(state.audio_payloader, packet)

rtp_packet = %{
rtp_packet
Expand All @@ -177,6 +179,7 @@ defmodule SendFromFile.PeerHandler do
state = %{
state
| audio_reader: reader,
audio_payloader: payloader,
next_audio_timestamp: next_timestamp,
next_audio_sequence_number: next_sequence_number
}
Expand Down
23 changes: 23 additions & 0 deletions lib/ex_webrtc/rtp/depayloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule ExWebRTC.RTP.Depayloader do
@moduledoc """
Behaviour for ExWebRTC Depayloaders.
"""

@type depayloader :: struct()

@doc """
Creates a new depayloader struct.

Refer to the modules implementing the behaviour for available options.
"""
@callback new(options :: any()) :: depayloader()

@doc """
Processes binary data from a single RTP packet, and outputs a frame if assembled.

Returns the frame (or `nil` if a frame could not be depayloaded yet)
together with the updated depayloader struct.
"""
@callback depayload(depayloader(), packet :: ExRTP.Packet.t()) ::
{binary() | nil, depayloader()}
end
25 changes: 23 additions & 2 deletions lib/ex_webrtc/rtp/opus/depayloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,30 @@ defmodule ExWebRTC.RTP.Opus.Depayloader do

alias ExRTP.Packet

@behaviour ExWebRTC.RTP.Depayloader

@opaque t :: %__MODULE__{}

defstruct []

@doc """
Creates a new Opus depayloader struct.

Does not take any options/parameters.
"""
@impl true
@spec new(any()) :: t()
def new(_unused \\ nil) do
%__MODULE__{}
end

@doc """
Takes Opus packet out of an RTP packet.

Always returns a binary as the first element.
"""
@spec depayload(Packet.t()) :: binary()
def depayload(%Packet{payload: payload}), do: payload
@impl true
@spec depayload(t(), Packet.t()) :: {binary(), t()}
def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload}),
do: {payload, depayloader}
end
25 changes: 22 additions & 3 deletions lib/ex_webrtc/rtp/opus/payloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,32 @@ defmodule ExWebRTC.RTP.Opus.Payloader do
Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587).
"""

@behaviour ExWebRTC.RTP.Payloader

@opaque t :: %__MODULE__{}

defstruct []

@doc """
Creates a new Opus payloader struct.

Does not take any options/parameters.
"""
@impl true
@spec new(any()) :: t()
def new(_unused \\ nil) do
%__MODULE__{}
end

@doc """
Packs Opus packet into an RTP packet.

Fields from RTP header like ssrc, timestamp etc. are set to 0.
Always returns a single RTP packet.
"""
@spec payload(binary()) :: ExRTP.Packet.t()
def payload(packet) when packet != <<>> do
ExRTP.Packet.new(packet)
@impl true
@spec payload(t(), binary()) :: {[ExRTP.Packet.t()], t()}
def payload(%__MODULE__{} = payloader, packet) when packet != <<>> do
{[ExRTP.Packet.new(packet)], payloader}
end
end
21 changes: 21 additions & 0 deletions lib/ex_webrtc/rtp/payloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule ExWebRTC.RTP.Payloader do
@moduledoc """
Behaviour for ExWebRTC Payloaders.
"""

@type payloader :: struct()

@doc """
Creates a new payloader struct.

Refer to the modules implementing the behaviour for available options.
"""
@callback new(options :: any()) :: payloader()

@doc """
Packs a frame into one or more RTP packets.

Returns the packets together with the updated payloader struct.
"""
@callback payload(payloader(), frame :: binary()) :: {[ExRTP.Packet.t()], payloader()}
end
32 changes: 24 additions & 8 deletions lib/ex_webrtc/rtp/vp8/depayloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do

Based on [RFC 7741: RTP Payload Format for VP8 Video](https://datatracker.ietf.org/doc/html/rfc7741).
"""

@behaviour ExWebRTC.RTP.Depayloader

require Logger

alias ExWebRTC.RTP.VP8.Payload
Expand All @@ -15,17 +18,30 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do

defstruct [:current_frame, :current_timestamp]

@spec new() :: t()
def new() do
@doc """
Creates a new VP8 depayloader struct.

Does not take any options/parameters.
"""
@impl true
@spec new(any()) :: t()
def new(_unused \\ nil) do
%__MODULE__{}
end

@spec write(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:ok, binary(), t()}
def write(depayloader, packet)
@doc """
Reassembles VP8 frames from subsequent RTP packets.

Returns the frame (or `nil` if a frame could not be depayloaded yet)
together with the updated depayloader struct.
"""
@impl true
@spec depayload(t(), ExRTP.Packet.t()) :: {binary() | nil, t()}
def depayload(depayloader, packet)

def write(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {:ok, depayloader}
def depayload(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {nil, depayloader}

def write(depayloader, packet) do
def depayload(depayloader, packet) do
case Payload.parse(packet.payload) do
{:ok, vp8_payload} ->
do_write(depayloader, packet, vp8_payload)
Expand Down Expand Up @@ -80,10 +96,10 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do

case {depayloader.current_frame, packet.marker} do
{current_frame, true} when current_frame != nil ->
{:ok, current_frame, %{depayloader | current_frame: nil, current_timestamp: nil}}
{current_frame, %{depayloader | current_frame: nil, current_timestamp: nil}}

_ ->
{:ok, depayloader}
{nil, depayloader}
end
end
end
15 changes: 13 additions & 2 deletions lib/ex_webrtc/rtp/vp8/payloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule ExWebRTC.RTP.VP8.Payloader do
does not pay attention to VP8 partition boundaries (see RFC 7741 sec. 4.4).
"""

@behaviour ExWebRTC.RTP.Payloader

@first_chunk_descriptor <<0::1, 0::1, 0::1, 1::1, 0::1, 0::3>>

@next_chunk_descriptor <<0::1, 0::1, 0::1, 0::1, 0::1, 0::3>>
Expand All @@ -18,8 +20,16 @@ defmodule ExWebRTC.RTP.VP8.Payloader do
max_payload_size: non_neg_integer()
}

defstruct [:max_payload_size]
@enforce_keys [:max_payload_size]
defstruct @enforce_keys

@doc """
Creates a new VP8 payloader struct.

The parameter `max_payload_size` determines the maximum size of a single RTP packet
outputted by the payloader. It must be greater than `100`, and is set to `1000` by default.
"""
@impl true
@spec new(non_neg_integer()) :: t()
def new(max_payload_size \\ 1000) when max_payload_size > 100 do
%__MODULE__{max_payload_size: max_payload_size}
Expand All @@ -30,8 +40,9 @@ defmodule ExWebRTC.RTP.VP8.Payloader do

Fields from RTP header like ssrc, timestamp etc. are set to 0.
"""
@impl true
@spec payload(t(), frame :: binary()) :: {[ExRTP.Packet.t()], t()}
def payload(payloader, frame) when frame != <<>> do
def payload(%__MODULE__{} = payloader, frame) when frame != <<>> do
rtp_payloads = chunk(frame, payloader.max_payload_size - @desc_size_bytes)

[first_rtp_payload | next_rtp_payloads] = rtp_payloads
Expand Down
20 changes: 10 additions & 10 deletions test/ex_webrtc/rtp/vp8/depayloader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,35 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do

packet = ExRTP.Packet.new(vp8_payload, marker: true)

assert {:ok, ^data, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.write(depayloader, packet)
assert {^data, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.depayload(depayloader, packet)

# packet that doesn't start a new frame
vp8_payload = %Payload{n: 0, s: 0, pid: 0, payload: data}
vp8_payload = Payload.serialize(vp8_payload)

packet = ExRTP.Packet.new(vp8_payload)

assert {:ok, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.depayload(depayloader, packet)

# packet that starts a new frame without finishing the previous one
vp8_payload = %Payload{n: 0, s: 1, pid: 0, payload: data}
vp8_payload = Payload.serialize(vp8_payload)

packet = ExRTP.Packet.new(vp8_payload)

assert {:ok, %{current_frame: ^data, current_timestamp: 0} = depayloader} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: ^data, current_timestamp: 0} = depayloader} =
Depayloader.depayload(depayloader, packet)

data2 = data <> <<0>>
vp8_payload = %Payload{n: 0, s: 1, pid: 0, payload: data2}
vp8_payload = Payload.serialize(vp8_payload)

packet = ExRTP.Packet.new(vp8_payload, timestamp: 3000)

assert {:ok, %{current_frame: ^data2, current_timestamp: 3000} = depayloader} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: ^data2, current_timestamp: 3000} = depayloader} =
Depayloader.depayload(depayloader, packet)

# packet with timestamp from a new frame that is not a beginning of this frame
data2 = data
Expand All @@ -51,7 +51,7 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do

packet = ExRTP.Packet.new(vp8_payload, timestamp: 6000)

assert {:ok, %{current_frame: nil, current_timestamp: nil}} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: nil, current_timestamp: nil}} =
Depayloader.depayload(depayloader, packet)
end
end