Skip to content

Commit

Permalink
Merge e0e5170 into 7be31c1
Browse files Browse the repository at this point in the history
  • Loading branch information
disa6302 authored Mar 21, 2024
2 parents 7be31c1 + e0e5170 commit 8a5f4c1
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 94 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ jobs:
- name: Install dependencies
shell: powershell
run: |
choco install gstreamer --version=1.16.2
choco install gstreamer-devel --version=1.16.2
choco install gstreamer --version=1.16.3
choco install gstreamer-devel --version=1.16.3
curl.exe -o C:\tools\pthreads-w32-2-9-1-release.zip ftp://sourceware.org/pub/pthreads-win32/pthreads-w32-2-9-1-release.zip
mkdir C:\tools\pthreads-w32-2-9-1-release\
Expand-Archive -Path C:\tools\pthreads-w32-2-9-1-release.zip -DestinationPath C:\tools\pthreads-w32-2-9-1-release
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,22 @@ createLwsIotCredentialProvider(
freeIotCredentialProvider(&pSampleConfiguration->pCredentialProvider);
```
## Use of TWCC
In order to listen in on TWCC reports, the application must set up a callback using the `peerConnectionOnSenderBandwidthEstimation` API. In our samples, it is set up like this:
```c
CHK_STATUS(peerConnectionOnSenderBandwidthEstimation(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession,
sampleSenderBandwidthEstimationHandler));
```

Note that TWCC is disabled by default in the SDK samples. In order to enable it, set the `disableSenderSideBandwidthEstimation` flag to FALSE. For example,

```c
RtcConfiguration configuration;
configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = FALSE;
```


## Use Pre-generated Certificates
The certificate generating function ([createCertificateAndKey](https://awslabs.github.io/amazon-kinesis-video-streams-webrtc-sdk-c/Dtls__openssl_8c.html#a451c48525b0c0a8919a880d6834c1f7f)) in createDtlsSession() can take between 5 - 15 seconds in low performance embedded devices, it is called for every peer connection creation when KVS WebRTC receives an offer. To avoid this extra start-up latency, certificate can be pre-generated and passed in when offer comes.

Expand Down
4 changes: 2 additions & 2 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP
configuration.iceTransportPolicy = ICE_TRANSPORT_POLICY_ALL;

configuration.kvsRtcConfiguration.enableIceStats = pSampleConfiguration->enableIceStats;

configuration.kvsRtcConfiguration.disableSenderSideBandwidthEstimation = TRUE;
// Set the STUN server
PCHAR pKinesisVideoStunUrlPostFix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
Expand Down Expand Up @@ -720,7 +720,7 @@ VOID sampleSenderBandwidthEstimationHandler(UINT64 customData, UINT32 txBytes, U
}
// otherwise keep bitrate the same

DLOGS("received sender bitrate estimation: suggested bitrate %u sent: %u bytes %u packets received: %u bytes %u packets in %lu msec, ", bitrate,
DLOGV("received sender bitrate estimation: suggested bitrate %u sent: %u bytes %u packets received: %u bytes %u packets in %lu msec", bitrate,
txBytes, txPacketsCnt, rxBytes, rxPacketsCnt, duration / 10000ULL);
}

Expand Down
131 changes: 97 additions & 34 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
if (!pConfiguration->kvsRtcConfiguration.disableSenderSideBandwidthEstimation) {
pKvsPeerConnection->twccLock = MUTEX_CREATE(TRUE);
pKvsPeerConnection->pTwccManager = (PTwccManager) MEMCALLOC(1, SIZEOF(TwccManager));
CHK_STATUS(hashTableCreateWithParams(TWCC_HASH_TABLE_BUCKET_COUNT, TWCC_HASH_TABLE_BUCKET_LENGTH,
&pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
}

*ppPeerConnection = (PRtcPeerConnection) pKvsPeerConnection;
Expand Down Expand Up @@ -988,10 +990,12 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKvsPeerConnection pKvsPeerConnection;
PKvsPeerConnection pKvsPeerConnection = NULL;
PDoubleListNode pCurNode = NULL;
UINT64 item = 0;
UINT64 startTime;
UINT32 twccHashTableCount = 0;
BOOL twccLocked = FALSE;

CHK(ppPeerConnection != NULL, STATUS_NULL_ARG);

Expand Down Expand Up @@ -1065,19 +1069,37 @@ STATUS freePeerConnection(PRtcPeerConnection* ppPeerConnection)
}

if (pKvsPeerConnection->pTwccManager != NULL) {
MUTEX_LOCK(pKvsPeerConnection->twccLock);
twccLocked = TRUE;
if (STATUS_SUCCEEDED(hashTableGetCount(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, &twccHashTableCount))) {
DLOGI("Number of TWCC info packets in memory: %d", twccHashTableCount);
}
CHK_LOG_ERR(hashTableIterateEntries(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, 0, freeHashEntry));
CHK_LOG_ERR(hashTableFree(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable));
if (IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
pKvsPeerConnection->twccLock = INVALID_MUTEX_VALUE;
}
// twccManager.twccPackets contains sequence numbers of packets (as opposed to pointers to actual packets)
// we should not deallocate items but we do need to clear the queue
CHK_LOG_ERR(stackQueueClear(&pKvsPeerConnection->pTwccManager->twccPackets, FALSE));
SAFE_MEMFREE(pKvsPeerConnection->pTwccManager);
}

PROFILE_WITH_START_TIME_OBJ(startTime, pKvsPeerConnection->peerConnectionDiagnostics.freePeerConnectionTime, "Free peer connection");
SAFE_MEMFREE(*ppPeerConnection);
ppPeerConnection = NULL;
CleanUp:

if (ppPeerConnection != NULL) {
if(IS_VALID_MUTEX_VALUE(pKvsPeerConnection->twccLock)) {
if (twccLocked) {
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
twccLocked = FALSE;
}
MUTEX_FREE(pKvsPeerConnection->twccLock);
}
}
LEAVES();
return retStatus;
}
Expand Down Expand Up @@ -1770,47 +1792,88 @@ STATUS deinitKvsWebRtc(VOID)
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pc, PRtpPacket pRtpPacket)
// Not thread safe. Ensure this function is invoked in a guarded section
static STATUS twccRollingWindowDeletion(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket, UINT16 seqNum)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
UINT16 updatedSeqNum = 0;
PTwccRtpPacketInfo tempTwccRtpPktInfo = NULL;
UINT64 ageOfOldest = 0, firstRtpTime = 0;
UINT64 twccPacketValue = 0;
BOOL isCheckComplete = FALSE;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_NULL_ARG);

updatedSeqNum = pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow;
do {
// If the seqNum is not present in the hash table, it is ok. We move on to the next
if (STATUS_SUCCEEDED(hashTableGet(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum, &twccPacketValue))) {
tempTwccRtpPktInfo = (PTwccRtpPacketInfo) twccPacketValue;
}
if (tempTwccRtpPktInfo != NULL) {
firstRtpTime = tempTwccRtpPktInfo->localTimeKvs;
// Would be the case if the timestamps are not monotonically increasing.
if (pRtpPacket->sentTime >= firstRtpTime) {
ageOfOldest = pRtpPacket->sentTime - firstRtpTime;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
// If the seqNum is not present in the hash table, move on. However, this case should not happen
// given this function is holding the lock and tempTwccRtpPktInfo is populated because it exists
if (STATUS_SUCCEEDED(hashTableRemove(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, updatedSeqNum))) {
SAFE_MEMFREE(tempTwccRtpPktInfo);
}
updatedSeqNum++;
} else {
isCheckComplete = TRUE;
}
} else {
// Move to the next seqNum to check if we can remove the next one atleast
DLOGV("Detected timestamp not increasing monotonically for RTP packet %d [ts: %" PRIu64 ". Current RTP packets' ts: %" PRIu64,
updatedSeqNum, firstRtpTime, pRtpPacket->sentTime);
updatedSeqNum++;
}
} else {
updatedSeqNum++;
}
// reset before next iteration
tempTwccRtpPktInfo = NULL;
} while (!isCheckComplete && updatedSeqNum != (seqNum + 1));

// Update regardless. The loop checks until current RTP packets seq number irrespective of the failure
pKvsPeerConnection->pTwccManager->firstSeqNumInRollingWindow = updatedSeqNum;
CleanUp:
LEAVES();
return retStatus;
}

STATUS twccManagerOnPacketSent(PKvsPeerConnection pKvsPeerConnection, PRtpPacket pRtpPacket)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
UINT64 sn = 0;
UINT16 seqNum;
BOOL isEmpty = FALSE;
INT64 firstTimeKvs, lastLocalTimeKvs, ageOfOldest;
CHK(pc != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pc->onSenderBandwidthEstimation != NULL && pc->pTwccManager != NULL, STATUS_SUCCESS);
UINT16 seqNum = 0;
PTwccRtpPacketInfo pTwccRtpPktInfo = NULL;

CHK(pKvsPeerConnection != NULL && pRtpPacket != NULL, STATUS_NULL_ARG);
CHK(pKvsPeerConnection->onSenderBandwidthEstimation != NULL && pKvsPeerConnection->pTwccManager != NULL, STATUS_SUCCESS);
CHK(TWCC_EXT_PROFILE == pRtpPacket->header.extensionProfile, STATUS_SUCCESS);

MUTEX_LOCK(pc->twccLock);
MUTEX_LOCK(pKvsPeerConnection->twccLock);
locked = TRUE;

CHK((pTwccRtpPktInfo = MEMCALLOC(1, SIZEOF(TwccRtpPacketInfo))) != NULL, STATUS_NOT_ENOUGH_MEMORY);

pTwccRtpPktInfo->packetSize = pRtpPacket->payloadLength;
pTwccRtpPktInfo->localTimeKvs = pRtpPacket->sentTime;
pTwccRtpPktInfo->remoteTimeKvs = TWCC_PACKET_LOST_TIME;
seqNum = TWCC_SEQNUM(pRtpPacket->header.extensionPayload);
CHK_STATUS(stackQueueEnqueue(&pc->pTwccManager->twccPackets, seqNum));
pc->pTwccManager->twccPacketBySeqNum[seqNum].seqNum = seqNum;
pc->pTwccManager->twccPacketBySeqNum[seqNum].packetSize = pRtpPacket->payloadLength;
pc->pTwccManager->twccPacketBySeqNum[seqNum].localTimeKvs = pRtpPacket->sentTime;
pc->pTwccManager->twccPacketBySeqNum[seqNum].remoteTimeKvs = TWCC_PACKET_LOST_TIME;
pc->pTwccManager->lastLocalTimeKvs = pRtpPacket->sentTime;

// cleanup queue until it contains up to 2 seconds of sent packets
do {
CHK_STATUS(stackQueuePeek(&pc->pTwccManager->twccPackets, &sn));
firstTimeKvs = pc->pTwccManager->twccPacketBySeqNum[(UINT16) sn].localTimeKvs;
lastLocalTimeKvs = pRtpPacket->sentTime;
ageOfOldest = lastLocalTimeKvs - firstTimeKvs;
if (ageOfOldest > TWCC_ESTIMATOR_TIME_WINDOW) {
CHK_STATUS(stackQueueDequeue(&pc->pTwccManager->twccPackets, &sn));
CHK_STATUS(stackQueueIsEmpty(&pc->pTwccManager->twccPackets, &isEmpty));
} else {
break;
}
} while (!isEmpty);
CHK_STATUS(hashTableUpsert(pKvsPeerConnection->pTwccManager->pTwccRtpPktInfosHashTable, seqNum, (UINT64) pTwccRtpPktInfo));

// Ensure twccRollingWindowDeletion is run in a guarded section
CHK_STATUS(twccRollingWindowDeletion(pKvsPeerConnection, pRtpPacket, seqNum));
CleanUp:
if (locked) {
MUTEX_UNLOCK(pc->twccLock);
MUTEX_UNLOCK(pKvsPeerConnection->twccLock);
}
CHK_LOG_ERR(retStatus);

Expand Down
18 changes: 10 additions & 8 deletions src/source/PeerConnection/PeerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ extern "C" {
#define CODEC_HASH_TABLE_BUCKET_LENGTH 2
#define RTX_HASH_TABLE_BUCKET_COUNT 50
#define RTX_HASH_TABLE_BUCKET_LENGTH 2
#define TWCC_HASH_TABLE_BUCKET_COUNT 100
#define TWCC_HASH_TABLE_BUCKET_LENGTH 2

#define DATA_CHANNEL_HASH_TABLE_BUCKET_COUNT 200
#define DATA_CHANNEL_HASH_TABLE_BUCKET_LENGTH 2
Expand All @@ -46,17 +48,16 @@ typedef enum {
} RTX_CODEC;

typedef struct {
UINT16 seqNum;
UINT16 packetSize;
UINT64 localTimeKvs;
UINT64 remoteTimeKvs;
} TwccPacket, *PTwccPacket;
UINT32 packetSize;
} TwccRtpPacketInfo, *PTwccRtpPacketInfo;

typedef struct {
StackQueue twccPackets;
TwccPacket twccPacketBySeqNum[65536]; // twccPacketBySeqNum takes about 1.2MB of RAM but provides great cache locality
UINT64 lastLocalTimeKvs;
UINT16 lastReportedSeqNum;
PHashTable pTwccRtpPktInfosHashTable; // Hash table of [seqNum, PTwccPacket]
UINT16 firstSeqNumInRollingWindow; // To monitor the last deleted packet in the rolling window
UINT16 lastReportedSeqNum; // To monitor the last packet's seqNum in the TWCC response
UINT16 prevReportedBaseSeqNum; // To monitor the base seqNum in the TWCC response
} TwccManager, *PTwccManager;

typedef struct {
Expand All @@ -69,7 +70,7 @@ typedef struct {

typedef struct {
RtcPeerConnection peerConnection;
// UINT32 padding padding makes transportWideSequenceNumber 64bit aligned
// UINT32 padding makes transportWideSequenceNumber 64bit aligned
// we put atomics at the top of structs because customers application could set the packing to 0
// in which case any atomic operations would result in bus errors if there is a misalignment
// for more see https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/pull/987#discussion_r534432907
Expand Down Expand Up @@ -181,6 +182,7 @@ VOID onSctpSessionDataChannelOpen(UINT64, UINT32, PBYTE, UINT32);
STATUS sendPacketToRtpReceiver(PKvsPeerConnection, PBYTE, UINT32);
STATUS changePeerConnectionState(PKvsPeerConnection, RTC_PEER_CONNECTION_STATE);
STATUS twccManagerOnPacketSent(PKvsPeerConnection, PRtpPacket);
UINT32 parseExtId(PCHAR);

// visible for testing only
VOID onIceConnectionStateChange(UINT64, UINT64);
Expand Down
Loading

0 comments on commit 8a5f4c1

Please sign in to comment.