Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TWCC #1934

Merged
merged 19 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ jobs:
- name: Install dependencies
shell: powershell
run: |
choco install gstreamer --version=1.16.2
choco install gstreamer-devel --version=1.16.2
choco install gstreamer --version=1.16.3
choco install gstreamer-devel --version=1.16.3
curl.exe -o C:\tools\pthreads-w32-2-9-1-release.zip ftp://sourceware.org/pub/pthreads-win32/pthreads-w32-2-9-1-release.zip
mkdir C:\tools\pthreads-w32-2-9-1-release\
Expand-Archive -Path C:\tools\pthreads-w32-2-9-1-release.zip -DestinationPath C:\tools\pthreads-w32-2-9-1-release
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,22 @@ createLwsIotCredentialProvider(
freeIotCredentialProvider(&pSampleConfiguration->pCredentialProvider);
```

## Use of TWCC
In order to listen in on TWCC reports, the application must set up a callback using the `peerConnectionOnSenderBandwidthEstimation` API. In our samples, it is set up like this:

```c
CHK_STATUS(peerConnectionOnSenderBandwidthEstimation(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession,
sampleSenderBandwidthEstimationHandler));
```

Note that TWCC is disabled by default in the SDK samples. In order to enable it, set the `disableSenderSideBandwidthEstimation` flag to FALSE. For example,

```c
RtcConfiguration configuration;
configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = FALSE;
```


## Use Pre-generated Certificates
The certificate generating function ([createCertificateAndKey](https://awslabs.github.io/amazon-kinesis-video-streams-webrtc-sdk-c/Dtls__openssl_8c.html#a451c48525b0c0a8919a880d6834c1f7f)) in createDtlsSession() can take between 5 - 15 seconds in low performance embedded devices, it is called for every peer connection creation when KVS WebRTC receives an offer. To avoid this extra start-up latency, certificate can be pre-generated and passed in when offer comes.

Expand Down
4 changes: 2 additions & 2 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP
configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL;

configuration.kvsRtcConfiguration.enableIceStats = pSampleConfiguration->enableIceStats;

configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE;
// Set the STUN server
PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
Expand Down Expand Up @@ -720,7 +720,7 @@ VOID sampleSenderBandwidthEstimationHandler(UINT64 customData, UINT32 txBytes, U
}
// otherwise keep bitrate the same

DLOGS("received sender bitrate estimation: suggested bitrate %u sent: %u bytes %u packets received: %u bytes %u packets in %lu msec, ", bitrate,
DLOGV("received sender bitrate estimation: suggested bitrate %u sent: %u bytes %u packets received: %u bytes %u packets in %lu msec", bitrate,
txBytes, txPacketsCnt, rxBytes, rxPacketsCnt, duration / 10000ULL);
}

Expand Down
117 changes: 85 additions & 32 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
if (!pConfiguration->kvsRtcConfiguration.disableSenderSideBandwidthEstimation) {
pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE);
pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager));
CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH,
&pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
}

*ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection;
Expand Down Expand Up @@ -992,6 +994,8 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
PDoubleListNode pCurNode = NULL;
UINT64 item = 0;
UINT64 startTime;
UINT32 twccHashTableCount = 0;
BOOL twccLocked = FALSE;

CHK(ppPeerConnection != NULL, STATUS_NULL_ARG);

Expand Down Expand Up @@ -1065,12 +1069,20 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
}

if (pKvsPeerConnection->pTwccManager != NULL) {
MUTEX_LOCK(pKvsPeerConnection->twccLock);
disa6302 marked this conversation as resolved.
Show resolved Hide resolved
twccLocked = TRUE;
if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) {
DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount);
}
CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry));
CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
}
// twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets)
// we should not deallocate items but we do need to clear the queue
CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE));
SAFE_MEMFREE(pKvsPeerConnection->pTwccManager);
}

Expand Down Expand Up @@ -1770,47 +1782,88 @@ STATUS deinitKvsWebRtc(VOID)
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket)
// Not thread safe. Ensure this function is invoked in a guarded section
static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 seqNum)
disa6302 marked this conversation as resolved.
Show resolved Hide resolved
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
UINT16 updatedSeqNum = 0;
PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL;
UINT64 ageOfOldest = 0, firstRtpTime = 0;
UINT64 twccPacketValue = 0;
BOOL isCheckComplete = FALSE;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG);

updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow;
do {
// If the seqNum is not present in the hash table, it is ok. We move on to the next
if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) {
tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue;
}
if (tempTwccRtpPktInfo != NULL) {
disa6302 marked this conversation as resolved.
Show resolved Hide resolved
firstRtpTime = tempTwccRtpPktInfo->localTimeKvs;
// Would be the case if the timestamps are not monotonically increasing.
if (pRtpPacket->sentTime >= firstRtpTime) {
ageOfOldest = pRtpPacket->sentTime - firstRtpTime;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
// If the seqNum is not present in the hash table, move on. However, this case should not happen
// given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists
if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) {
SAFE_MEMFREE(tempTwccRtpPktInfo);
}
updatedSeqNum++;
} else {
isCheckComplete = TRUE;
}
} else {
// Move to the next seqNum to check if we can remove the next one atleast
DLOGV("Detected timestamp not increasing monotonically for RTP packet %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64,
updatedSeqNum, firstRtpTime, pRtpPacket->sentTime);
updatedSeqNum++;
}
} else {
updatedSeqNum++;
}
// reset before next iteration
tempTwccRtpPktInfo = NULL;
} while (!isCheckComplete && updatedSeqNum != (seqNum + 1));

// Update regardless. The loop checks until current RTP packets seq number irrespective of the failure
pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum;
CleanUp:
LEAVES();
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
UINT64 sn = 0;
UINT16 seqNum;
BOOL isEmpty = FALSE;
INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest;
CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS);
UINT16 seqNum = 0;
PTwccRtpPacketInfo pTwccRtpPktInfo = NULL;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS);
CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS);

MUTEX_LOCK(pc->twccLock);
MUTEX_LOCK(pKvsPeerConnection->twccLock);
locked = TRUE;

CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY);

pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength;
pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime;
pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME;
seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload);
CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum));
pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum;
pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength;
pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime;
pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME;
pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime;

// cleanup queue until it contains up to 2 seconds of sent packets
do {
CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn));
firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs;
lastLocalTimeKvs = pRtpPacket->sentTime;
ageOfOldest = lastLocalTimeKvs - firstTimeKvs;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn));
CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty));
} else {
break;
}
} while (!isEmpty);
CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo));

// Ensure twccRollingWindowDeletion is run in a guarded section
CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum));
CleanUp:
if (locked) {
MUTEX_UNLOCK(pc->twccLock);
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
}
CHK_LOG_ERR(retStatus);

Expand Down
18 changes: 10 additions & 8 deletions src/source/PeerConnection/PeerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extern "C" {
#define CODEC_HASH_TABLE_BUCKET_LENGTH 2
#define RTX_HASH_TABLE_BUCKET_COUNT 50
#define RTX_HASH_TABLE_BUCKET_LENGTH 2
#define TWCC_HASH_TABLE_BUCKET_COUNT 100
#define TWCC_HASH_TABLE_BUCKET_LENGTH 2

#define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200
#define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2
Expand All @@ -46,17 +48,16 @@ typedef enum {
} RTX_CODEC;

typedef struct {
UINT16 seqNum;
UINT16 packetSize;
UINT64 localTimeKvs;
UINT64 remoteTimeKvs;
} TwccPacket, *PTwccPacket;
UINT32 packetSize;
} TwccRtpPacketInfo, *PTwccRtpPacketInfo;

typedef struct {
StackQueue twccPackets;
TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality
UINT64 lastLocalTimeKvs;
UINT16 lastReportedSeqNum;
PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket]
UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window
UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response
UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response
} TwccManager, *PTwccManager;

typedef struct {
Expand All @@ -69,7 +70,7 @@ typedef struct {

typedef struct {
RtcPeerConnection peerConnection;
// UINT32 padding padding makes transportWideSequenceNumber 64bit aligned
// UINT32 padding makes transportWideSequenceNumber 64bit aligned
// we put atomics at the top of structs because customers application could set the packing to 0
// in which case any atomic operations would result in bus errors if there is a misalignment
// for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907
Expand Down Expand Up @@ -181,6 +182,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32);
STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32);
STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE);
STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket);
UINT32 parseExtId(PCHAR);

// visible for testing only
VOID onIceConnectionStateChange(UINT64, UINT64);
Expand Down
Loading
Loading