Skip to content

Commit

Permalink
feat: Move some processing to the sender pipeline (#2172)
Browse files Browse the repository at this point in the history
Moves the bitrate controller transform (applying source projection
logic) and SSRC rewriting to the sender pipeline. Previously an
RtpReceiver's pipeline was running this code for every target endpoint.

This reduces the packet processing time for large conferences since the
work is split to multiple threads.
  • Loading branch information
bgrozev authored Jun 12, 2024
1 parent 059cd3e commit 93c41cc
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ import java.util.concurrent.ScheduledExecutorService
class RtpReceiverImpl @JvmOverloads constructor(
val id: String,
/**
* A function to be used when these receiver wants to send RTCP packets to the
* A function to be used when the receiver wants to send RTCP packets to the
* participant it's receiving data from (NACK packets, for example)
*/
private val rtcpSender: (RtcpPacket) -> Unit = {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@ abstract class RtpSender :
abstract fun isFeatureEnabled(feature: Features): Boolean
abstract fun tearDown()

/**
* An optional function to be executed for each RTP packet, as the first step of the send pipeline.
*/
var preProcesor: ((PacketInfo) -> PacketInfo?)? = null

abstract val bandwidthEstimator: BandwidthEstimator
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.jitsi.nlj.transform.node.PacketStreamStatsNode
import org.jitsi.nlj.transform.node.SrtcpEncryptNode
import org.jitsi.nlj.transform.node.SrtpEncryptNode
import org.jitsi.nlj.transform.node.ToggleablePcapWriter
import org.jitsi.nlj.transform.node.TransformerNode
import org.jitsi.nlj.transform.node.outgoing.AbsSendTime
import org.jitsi.nlj.transform.node.outgoing.HeaderExtEncoder
import org.jitsi.nlj.transform.node.outgoing.HeaderExtStripper
Expand Down Expand Up @@ -140,6 +141,12 @@ class RtpSenderImpl(
incomingPacketQueue.setErrorHandler(queueErrorCounter)

outgoingRtpRoot = pipeline {
node(object : TransformerNode("Pre-processor") {
override fun transform(packetInfo: PacketInfo): PacketInfo? {
return preProcesor?.invoke(packetInfo) ?: packetInfo
}
override fun trace(f: () -> Unit) {}
})
node(AudioRedHandler(streamInformationStore, logger))
node(HeaderExtStripper(streamInformationStore))
node(outgoingPacketCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Transceiver(
*/
fun isReceivingVideo(): Boolean = rtpReceiver.isReceivingVideo()

private val rtpSender: RtpSender = RtpSenderImpl(
val rtpSender: RtpSender = RtpSenderImpl(
id,
rtcpEventNotifier,
senderExecutor,
Expand Down
71 changes: 37 additions & 34 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,42 @@ class Endpoint @JvmOverloads constructor(
addEndpointConnectionStatsListener(rttListener)
setLocalSsrc(MediaType.AUDIO, conference.localAudioSsrc)
setLocalSsrc(MediaType.VIDEO, conference.localVideoSsrc)
rtpSender.preProcesor = { packetInfo -> preProcess(packetInfo) }
}

/**
* Perform processing of the packet before it goes through the rest of the [transceiver] send pipeline:
* 1. Update the bitrate controller state and apply the source projection logic
* 2. Perform SSRC re-writing if [doSsrcRewriting] is set.
*/
private fun preProcess(packetInfo: PacketInfo): PacketInfo? {
when (val packet = packetInfo.packet) {
is VideoRtpPacket -> {
if (!bitrateController.transformRtp(packetInfo)) {
logger.warn("Dropping a packet which was supposed to be accepted:$packet")
return null
}
// The original packet was transformed in place.
if (doSsrcRewriting) {
val start = packet !is ParsedVideoPacket || (packet.isKeyframe && packet.isStartOfFrame)
if (!videoSsrcs.rewriteRtp(packet, start)) {
return null
}
}
}
is AudioRtpPacket -> if (doSsrcRewriting) audioSsrcs.rewriteRtp(packet)
is RtcpSrPacket -> {
// Allow the BC to update the timestamp (in place).
bitrateController.transformRtcp(packet)
if (doSsrcRewriting) {
// Just check both tables instead of looking up the type first.
if (!videoSsrcs.rewriteRtcp(packet) && !audioSsrcs.rewriteRtcp(packet)) {
return null
}
}
}
}
return packetInfo
}

private val bandwidthProbing = BandwidthProbing(
Expand Down Expand Up @@ -890,40 +926,7 @@ class Endpoint @JvmOverloads constructor(
}
}

override fun send(packetInfo: PacketInfo) {
when (val packet = packetInfo.packet) {
is VideoRtpPacket -> {
if (bitrateController.transformRtp(packetInfo)) {
// The original packet was transformed in place.
if (doSsrcRewriting) {
val start = packet !is ParsedVideoPacket || (packet.isKeyframe && packet.isStartOfFrame)
if (!videoSsrcs.rewriteRtp(packet, start)) {
return
}
}
transceiver.sendPacket(packetInfo)
} else {
logger.warn("Dropping a packet which was supposed to be accepted:$packet")
}
return
}
is AudioRtpPacket -> if (doSsrcRewriting) audioSsrcs.rewriteRtp(packet)
is RtcpSrPacket -> {
// Allow the BC to update the timestamp (in place).
bitrateController.transformRtcp(packet)
if (doSsrcRewriting) {
// Just check both tables instead of looking up the type first.
if (!videoSsrcs.rewriteRtcp(packet) && !audioSsrcs.rewriteRtcp(packet)) {
return
}
}
logger.trace {
"relaying an sr from ssrc=${packet.senderSsrc}, timestamp=${packet.senderInfo.rtpTimestamp}"
}
}
}
transceiver.sendPacket(packetInfo)
}
override fun send(packetInfo: PacketInfo) = transceiver.sendPacket(packetInfo)

/**
* To find out whether the endpoint should be expired, we check the activity timestamps from the transceiver.
Expand Down
3 changes: 0 additions & 3 deletions resources/analyze-timeline2.pl
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
the clone on the sender queue (see Conference#sendOut). This workload scales with the number of local endpoits and relays,
and in a large conference is the most computationally expensive.
Note that currently Endpoint#send(PacketInfo) contains code for SSRC rewriting and a transformation from the BitrateController.
This can be offloaded to the sender queue and we plan to do so soon.
4.Sender Queue: From the time the Receiver Pipeline thread places the packet on the Sender Queue, to the time another CPU thread
removes it from the queue and starts executing the Sender Pipeline.
Expand Down

0 comments on commit 93c41cc

Please sign in to comment.