Skip to content

Commit

Permalink
Use atomic refcounter
Browse files Browse the repository at this point in the history
  • Loading branch information
disa6302 committed Sep 26, 2023
1 parent bf2dd07 commit 099e50b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 51 deletions.
67 changes: 21 additions & 46 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,14 @@ static volatile ATOMIC_BOOL gKvsWebRtcInitialized = (SIZE_T) FALSE;
// Function to get access to the Singleton instance
PWebRtcClientContext getWebRtcClientInstance()
{
static WebRtcClientContext w = {.pStunIpAddrCtx = NULL,
.stunCtxlock = INVALID_MUTEX_VALUE,
.usageSemaphore = INVALID_SEMAPHORE_HANDLE_VALUE,
.isSemAccessInitialized = FALSE,
.isContextInitialized = FALSE};
if (w.isSemAccessInitialized) {
if (STATUS_FAILED(semaphoreAcquire(w.usageSemaphore, INFINITE_TIME_VALUE))) {
DLOGW("Failed to create the semaphore to control access to the client context instance");
} else {
ATOMIC_STORE_BOOL(&w.isSemAccessInitialized, TRUE);
}
} else {
DLOGI("WebRTC Client instance hasnt been initialized for the semaphore");
}
static WebRtcClientContext w = {.pStunIpAddrCtx = NULL, .stunCtxlock = INVALID_MUTEX_VALUE, .contextRefCnt = 0, .isContextInitialized = FALSE};
ATOMIC_INCREMENT(&w.contextRefCnt);
return &w;
}

STATUS releaseHoldOnWebRtcClientInstance(PWebRtcClientContext pWebRtcClientContext)
VOID releaseHoldOnInstance(PWebRtcClientContext pWebRtcClientContext)
{
STATUS retStatus = STATUS_SUCCESS;
if (ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isSemAccessInitialized)) {
CHK_STATUS(semaphoreRelease(pWebRtcClientContext->usageSemaphore));
}

CleanUp:
return retStatus;
ATOMIC_DECREMENT(&pWebRtcClientContext->contextRefCnt);
}

STATUS createWebRtcClientInstance()
Expand All @@ -43,11 +25,9 @@ STATUS createWebRtcClientInstance()

CHK_WARN(!ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), retStatus, "WebRtc client context already initialized, nothing to do");
CHK_ERR(!IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), retStatus, "Mutex seems to have been created already");
CHK_ERR(!IS_VALID_SEMAPHORE_HANDLE(pWebRtcClientContext->usageSemaphore), retStatus, "Semaphore seems to have been created already");

pWebRtcClientContext->stunCtxlock = MUTEX_CREATE(FALSE);
CHK_STATUS(semaphoreCreate(MAX_ACCESS_THREADS_WEBRTC_CLIENT_CONTEXT, &pWebRtcClientContext->usageSemaphore));
ATOMIC_STORE_BOOL(&pWebRtcClientContext->isSemAccessInitialized, TRUE);
CHK_ERR(IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), STATUS_NULL_ARG, "Mutex creation failed");
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;
CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx == NULL, STATUS_INVALID_OPERATION, "STUN object already allocated");
Expand All @@ -60,6 +40,7 @@ STATUS createWebRtcClientInstance()
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
releaseHoldOnInstance(pWebRtcClientContext);
return retStatus;
}

Expand Down Expand Up @@ -783,13 +764,13 @@ STATUS onSetStunServerIp(UINT64 customData, PCHAR url, PKvsIpAddress pIpAddr)
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();

CHK_WARN(ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), STATUS_NULL_ARG, "WebRTC Client object Object not initialized");

UINT64 currentTime = GETTIME();

MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;

CHK(STRCMP(url, pWebRtcClientContext->pStunIpAddrCtx->hostname) == 0, STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME);

if (pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized) {
Expand All @@ -808,7 +789,8 @@ STATUS onSetStunServerIp(UINT64 customData, PCHAR url, PKvsIpAddress pIpAddr)
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
retStatus = releaseHoldOnWebRtcClientInstance(pWebRtcClientContext);
DLOGD("Exiting from stun server IP callback");
releaseHoldOnInstance(pWebRtcClientContext);
return retStatus;
}

Expand Down Expand Up @@ -851,7 +833,8 @@ PVOID resolveStunIceServerIp(PVOID args)
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
releaseHoldOnWebRtcClientInstance(pWebRtcClientContext);
DLOGD("Exiting from stun server IP resolution thread");
releaseHoldOnInstance(pWebRtcClientContext);
return NULL;
}

Expand Down Expand Up @@ -1594,6 +1577,7 @@ STATUS initKvsWebRtc(VOID)
KVS_CRYPTO_INIT();
LOG_GIT_HASH();

SET_INSTRUMENTED_ALLOCATORS();
#ifdef ENABLE_DATA_CHANNEL
CHK_STATUS(initSctpSession());
#endif
Expand All @@ -1614,28 +1598,21 @@ STATUS initKvsWebRtc(VOID)
STATUS cleanupWebRtcClientInstance()
{
STATUS retStatus = STATUS_SUCCESS;
INT32 count = 0;
// Stun object cleanup
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();

CHK_WARN(ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), STATUS_INVALID_OPERATION,
"WebRtc context not initialized, nothing to clean up");
DLOGD("Releasing webrtc client context instance from cleanupWebRtcClientInstance");
releaseHoldOnInstance(pWebRtcClientContext);

while (ATOMIC_LOAD(&pWebRtcClientContext->contextRefCnt) > 0) {
DLOGV("Waiting on all references to be returned...%d", pWebRtcClientContext->contextRefCnt);
THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}

/* Start of handling STUN object */
// Need this check to ensure we do not clean up the object in the next
// step while the resolve thread is ongoing

if (IS_VALID_SEMAPHORE_HANDLE(pWebRtcClientContext->usageSemaphore)) {
releaseHoldOnWebRtcClientInstance(pWebRtcClientContext);
ATOMIC_STORE_BOOL(&pWebRtcClientContext->isSemAccessInitialized, FALSE);
semaphoreLock(pWebRtcClientContext->usageSemaphore);
semaphoreWaitUntilClear(pWebRtcClientContext->usageSemaphore, CLIENT_SHUTDOWN_SEMAPHORE_TIMEOUT);
semaphoreGetCount(pWebRtcClientContext->usageSemaphore, &count);
DLOGI("Semaphore pending count: %d", count);
semaphoreFree(&pWebRtcClientContext->usageSemaphore);
pWebRtcClientContext->usageSemaphore = INVALID_SEMAPHORE_HANDLE_VALUE;
}

CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "Destroying STUN object without setting up");
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
SAFE_MEMFREE(pWebRtcClientContext->pStunIpAddrCtx);
Expand All @@ -1649,13 +1626,10 @@ STATUS cleanupWebRtcClientInstance()
pWebRtcClientContext->stunCtxlock = INVALID_MUTEX_VALUE;
}

// Ensure this happens after release because we check for init before releasing.
// It is still ok to reset these to post releasing the semaphore because there is no
// resource destruction for these bools and when we hit this path, we have already finished
// using the relevant objects and it is safe to clean
ATOMIC_STORE_BOOL(&pWebRtcClientContext->isContextInitialized, FALSE);

DLOGI("Destroyed WebRtc client context");

CleanUp:
return retStatus;
}
Expand All @@ -1676,6 +1650,7 @@ STATUS deinitKvsWebRtc(VOID)
cleanupWebRtcClientInstance();
destroyThreadPoolContext();
DLOGI("Destroyed threadpool");
RESET_INSTRUMENTED_ALLOCATORS();
#endif
ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
CleanUp:
Expand Down
3 changes: 1 addition & 2 deletions src/source/PeerConnection/PeerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ typedef struct {
typedef struct {
PStunIpAddrContext pStunIpAddrCtx;
volatile ATOMIC_BOOL isContextInitialized;
volatile SIZE_T contextRefCnt;
MUTEX stunCtxlock;
SEMAPHORE_HANDLE usageSemaphore;
volatile ATOMIC_BOOL isSemAccessInitialized;
} WebRtcClientContext, *PWebRtcClientContext;

STATUS onFrameReadyFunc(UINT64, UINT16, UINT16, UINT32);
Expand Down
4 changes: 2 additions & 2 deletions src/source/PeerConnection/Rtcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ typedef enum {
#define TWCC_FB_PACKETCHUNK_SIZE 2
#define IS_TWCC_RUNLEN(packetChunk) ((((packetChunk) >> 15u) & 1u) == 0)
#define TWCC_RUNLEN_STATUS_SYMBOL(packetChunk) (((packetChunk) >> 13u) & 3u)
#define TWCC_RUNLEN_GET(packetChunk) ((packetChunk) &0x1fffu)
#define TWCC_RUNLEN_GET(packetChunk) ((packetChunk) & 0x1fffu)
#define TWCC_IS_NOTRECEIVED(statusSymbol) ((statusSymbol) == TWCC_STATUS_SYMBOL_NOTRECEIVED)
#define TWCC_ISRECEIVED(statusSymbol) ((statusSymbol) == TWCC_STATUS_SYMBOL_SMALLDELTA || (statusSymbol) == TWCC_STATUS_SYMBOL_LARGEDELTA)
#define TWCC_RUNLEN_ISRECEIVED(packetChunk) TWCC_ISRECEIVED(TWCC_RUNLEN_STATUS_SYMBOL(packetChunk))
#define TWCC_STATUSVECTOR_IS_2BIT(packetChunk) (((packetChunk) >> 14u) & 1u)
#define TWCC_STATUSVECTOR_SSIZE(packetChunk) (TWCC_STATUSVECTOR_IS_2BIT(packetChunk) ? 2u : 1u)
#define TWCC_STATUSVECTOR_SMASK(packetChunk) (TWCC_STATUSVECTOR_IS_2BIT(packetChunk) ? 2u : 1u)
#define TWCC_STATUSVECTOR_STATUS(packetChunk, i) \
(((packetChunk) >> (14u - (i) *TWCC_STATUSVECTOR_SSIZE(packetChunk))) & TWCC_STATUSVECTOR_SMASK(packetChunk))
(((packetChunk) >> (14u - (i) * TWCC_STATUSVECTOR_SSIZE(packetChunk))) & TWCC_STATUSVECTOR_SMASK(packetChunk))
#define TWCC_STATUSVECTOR_COUNT(packetChunk) (TWCC_STATUSVECTOR_IS_2BIT(packetChunk) ? 7 : 14)
#define TWCC_PACKET_STATUS_COUNT(payload) (getUnalignedInt16BigEndian((payload) + 10))

Expand Down
2 changes: 1 addition & 1 deletion src/source/Rtp/RtpPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ extern "C" {
*/
// https://tools.ietf.org/html/draft-holmer-rmcat-transport-wide-cc-extensions-01
#define TWCC_EXT_PROFILE 0xBEDE
#define TWCC_PAYLOAD(extId, sequenceNum) htonl((((extId) &0xfu) << 28u) | (1u << 24u) | ((UINT32) (sequenceNum) << 8u))
#define TWCC_PAYLOAD(extId, sequenceNum) htonl((((extId) & 0xfu) << 28u) | (1u << 24u) | ((UINT32) (sequenceNum) << 8u))
#define TWCC_SEQNUM(extPayload) ((UINT16) getUnalignedInt16BigEndian(extPayload + 1))

typedef STATUS (*DepayRtpPayloadFunc)(PBYTE, UINT32, PBYTE, PUINT32, PBOOL);
Expand Down

0 comments on commit 099e50b

Please sign in to comment.