Skip to content

Commit

Permalink
Move lock out of Stun context
Browse files Browse the repository at this point in the history
  • Loading branch information
disa6302 committed Sep 21, 2023
1 parent a6e0a12 commit 753b5b0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 66 deletions.
127 changes: 66 additions & 61 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,31 @@ static volatile ATOMIC_BOOL gKvsWebRtcInitialized = (SIZE_T) FALSE;
// Function to get access to the Singleton instance
PWebRtcClientContext getWebRtcClientInstance()
{
static WebRtcClientContext w = {.pStunIpAddrCtx = NULL};
static WebRtcClientContext w = {.pStunIpAddrCtx = NULL, .stunCtxlock = INVALID_MUTEX_VALUE, .isContextInitialized = FALSE};
return &w;
}

PStunIpAddrContext getStunIpContext()
{
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
if (!pWebRtcClientContext->isContextInitialized) {
return NULL;
}
return pWebRtcClientContext->pStunIpAddrCtx;
}

STATUS createWebRtcClientInstance()
{
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
CHK_WARN(!pWebRtcClientContext->isContextInitialized, retStatus, "WebRtc client context already initialized, nothing to do");
CHK_ERR(!IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), retStatus, "Mutex seems to have been created already");
pWebRtcClientContext->stunCtxlock = MUTEX_CREATE(FALSE);

MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;
CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx == NULL, STATUS_INVALID_OPERATION, "STUN object already allocated");
pWebRtcClientContext->pStunIpAddrCtx = (PStunIpAddrContext) MEMCALLOC(1, SIZEOF(StunIpAddrContext));
CHK_ERR(pWebRtcClientContext->pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "Memory allocation for WebRtc client object failed");
pWebRtcClientContext->pStunIpAddrCtx->lock = MUTEX_CREATE(FALSE);
pWebRtcClientContext->pStunIpAddrCtx->expirationDuration = 2 * HUNDREDS_OF_NANOS_IN_AN_HOUR;

pWebRtcClientContext->isContextInitialized = TRUE;
DLOGI("Initialized WebRTC Client instance");
CleanUp:
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
return retStatus;
}

Expand Down Expand Up @@ -755,27 +753,33 @@ STATUS onSetStunServerIp(UINT64 customData, PCHAR url, PKvsIpAddress pIpAddr)
{
UNUSED_PARAM(customData);
STATUS retStatus = STATUS_SUCCESS;
PStunIpAddrContext pStunIpAddrCtx = getStunIpContext();
CHK_ERR(pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "WebRTC Client object could not be created");
BOOL locked = FALSE;
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();

CHK_WARN(pWebRtcClientContext->isContextInitialized, STATUS_NULL_ARG, "WebRTC Client object Object not initialized");

UINT64 currentTime = GETTIME();

MUTEX_LOCK(pStunIpAddrCtx->lock);
CHK(STRCMP(url, pStunIpAddrCtx->hostname) == 0, STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME);
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;
CHK(STRCMP(url, pWebRtcClientContext->pStunIpAddrCtx->hostname) == 0, STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME);

if (pStunIpAddrCtx->isIpInitialized) {
if (pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized) {
DLOGI("Initialized successfully");
if (currentTime > (pStunIpAddrCtx->startTime + pStunIpAddrCtx->expirationDuration)) {
if (currentTime > (pWebRtcClientContext->pStunIpAddrCtx->startTime + pWebRtcClientContext->pStunIpAddrCtx->expirationDuration)) {
DLOGI("Expired...need to refresh STUN address");
// Reset start time
pStunIpAddrCtx->startTime = 0;
CHK_ERR(getAddrAsync(pStunIpAddrCtx) == STATUS_SUCCESS, retStatus, "Failed to resolve after cache expiry");
pWebRtcClientContext->pStunIpAddrCtx->startTime = 0;
CHK_ERR(getAddrAsync(pWebRtcClientContext->pStunIpAddrCtx) == STATUS_SUCCESS, retStatus, "Failed to resolve after cache expiry");
}
MEMCPY(pIpAddr, &pStunIpAddrCtx->kvsIpAddr, SIZEOF(pStunIpAddrCtx->kvsIpAddr));
MEMCPY(pIpAddr, &pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr, SIZEOF(pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr));
} else {
DLOGE("Initialization failed");
}
CleanUp:
MUTEX_UNLOCK(pStunIpAddrCtx->lock);
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
return retStatus;
}

Expand All @@ -784,38 +788,39 @@ PVOID resolveStunIceServerIp(PVOID args)
UNUSED_PARAM(args);
BOOL locked = FALSE;
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};
PStunIpAddrContext pStunIpAddrCtx = getStunIpContext();

PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
PCHAR pRegion;
PCHAR pHostnamePostfix;
if (pStunIpAddrCtx == NULL) {
DLOGE("Failed to resolve STUN IP address because webrtc client instance was not created");
return NULL;
}

if ((pRegion = GETENV(DEFAULT_REGION_ENV_VAR)) == NULL) {
pRegion = DEFAULT_AWS_REGION;
}

pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
if (STRSTR(pRegion, "cn-")) {
pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX_CN;
}

MUTEX_LOCK(pStunIpAddrCtx->lock);
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;
SNPRINTF(pStunIpAddrCtx->hostname, SIZEOF(pStunIpAddrCtx->hostname), KINESIS_VIDEO_STUN_URL_WITHOUT_PORT, pRegion, pHostnamePostfix);
if (getAddrAsync(pStunIpAddrCtx) == STATUS_SUCCESS) {
getIpAddrStr(&pStunIpAddrCtx->kvsIpAddr, addressResolved, ARRAY_SIZE(addressResolved));
DLOGI("ICE Server address for %s with getaddrinfo: %s", pStunIpAddrCtx->hostname, addressResolved);
pStunIpAddrCtx->isIpInitialized = TRUE;

if (pWebRtcClientContext->pStunIpAddrCtx == NULL) {
DLOGE("Failed to resolve STUN IP address because webrtc client instance was not created");
} else {
DLOGE("Failed to resolve %s", pStunIpAddrCtx->hostname);
if ((pRegion = GETENV(DEFAULT_REGION_ENV_VAR)) == NULL) {
pRegion = DEFAULT_AWS_REGION;
}

pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
if (STRSTR(pRegion, "cn-")) {
pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX_CN;
}

SNPRINTF(pWebRtcClientContext->pStunIpAddrCtx->hostname, SIZEOF(pWebRtcClientContext->pStunIpAddrCtx->hostname),
KINESIS_VIDEO_STUN_URL_WITHOUT_PORT, pRegion, pHostnamePostfix);
if (getAddrAsync(pWebRtcClientContext->pStunIpAddrCtx) == STATUS_SUCCESS) {
getIpAddrStr(&pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr, addressResolved, ARRAY_SIZE(addressResolved));
DLOGI("ICE Server address for %s with getaddrinfo: %s", pWebRtcClientContext->pStunIpAddrCtx->hostname, addressResolved);
pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized = TRUE;
} else {
DLOGE("Failed to resolve %s", pWebRtcClientContext->pStunIpAddrCtx->hostname);
}
pWebRtcClientContext->pStunIpAddrCtx->startTime = GETTIME();
}
pStunIpAddrCtx->startTime = GETTIME();
if (locked) {
MUTEX_UNLOCK(pStunIpAddrCtx->lock);
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
return NULL;
}
Expand Down Expand Up @@ -1548,7 +1553,7 @@ STATUS initKvsWebRtc(VOID)
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
CHK(!ATOMIC_LOAD_BOOL(&gKvsWebRtcInitialized), retStatus);

DLOGI("Initializing WebRTC library...");
SRAND(GETTIME());

CHK(srtp_init() == srtp_err_status_ok, STATUS_SRTP_INIT_FAILED);
Expand All @@ -1564,8 +1569,8 @@ STATUS initKvsWebRtc(VOID)
#endif
#ifdef ENABLE_KVS_THREADPOOL
DLOGI("KVS WebRtc library using thread pool");
CHK_STATUS(createWebRtcClientInstance());
CHK_STATUS(createThreadPoolContext());
createWebRtcClientInstance();
CHK_STATUS(threadpoolContextPush(resolveStunIceServerIp, NULL));
#endif
ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, TRUE);
Expand All @@ -1576,7 +1581,7 @@ STATUS initKvsWebRtc(VOID)
return retStatus;
}

STATUS cleanupWebRtcClientContext()
STATUS cleanupWebRtcClientInstance()
{
STATUS retStatus = STATUS_SUCCESS;
// Stun object cleanup
Expand All @@ -1589,18 +1594,19 @@ STATUS cleanupWebRtcClientContext()
// Need this check to ensure we do not clean up the object in the next
// step while the resolve thread is ongoing

MUTEX_LOCK(pWebRtcClientContext->pStunIpAddrCtx->lock);
pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized = FALSE;
MUTEX_UNLOCK(pWebRtcClientContext->pStunIpAddrCtx->lock);
if (IS_VALID_MUTEX_VALUE(pWebRtcClientContext->pStunIpAddrCtx->lock)) {
MUTEX_FREE(pWebRtcClientContext->pStunIpAddrCtx->lock);
}
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
SAFE_MEMFREE(pWebRtcClientContext->pStunIpAddrCtx);
pWebRtcClientContext->pStunIpAddrCtx = NULL;
DLOGI("Destroyed STUN IP object");
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
/* End of handling STUN object */

DLOGI("Destroyed WebRtc client context");
if (IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock)) {
MUTEX_FREE(pWebRtcClientContext->stunCtxlock);
pWebRtcClientContext->stunCtxlock = INVALID_MUTEX_VALUE;
}
pWebRtcClientContext->isContextInitialized = FALSE;
DLOGI("Destroyed WebRtc client context");
CleanUp:
return retStatus;
}
Expand All @@ -1617,13 +1623,12 @@ STATUS deinitKvsWebRtc(VOID)

srtp_shutdown();

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
#ifdef ENABLE_KVS_THREADPOOL
cleanupWebRtcClientContext();
DLOGI("Destroyed threadpool");
cleanupWebRtcClientInstance();
destroyThreadPoolContext();
DLOGI("Destroyed threadpool");
#endif

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
CleanUp:

LEAVES();
Expand Down
2 changes: 1 addition & 1 deletion src/source/PeerConnection/PeerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ typedef struct {
UINT64 startTime;
UINT64 expirationDuration;
STATUS status;
MUTEX lock;
} StunIpAddrContext, *PStunIpAddrContext;

// Declare the structure of the Singleton
// Members of the singleton are responsible for their own sync mechanisms.
typedef struct {
PStunIpAddrContext pStunIpAddrCtx;
BOOL isContextInitialized;
MUTEX stunCtxlock;
} WebRtcClientContext, *PWebRtcClientContext;

STATUS onFrameReadyFunc(UINT64, UINT16, UINT16, UINT32);
Expand Down
7 changes: 3 additions & 4 deletions src/source/Threadpool/ThreadPoolContext.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ STATUS createThreadPoolContext()
PCHAR pMinThreads, pMaxThreads;
UINT32 minThreads, maxThreads;
PThreadPoolContext pThreadPoolContext = getThreadContextInstance();

if (NULL == (pMinThreads = GETENV(WEBRTC_THREADPOOL_MIN_THREADS_ENV_VAR)) || STATUS_SUCCESS != STRTOUI32(pMinThreads, NULL, 10, &minThreads)) {
minThreads = THREADPOOL_MIN_THREADS;
}
if (NULL == (pMaxThreads = GETENV(WEBRTC_THREADPOOL_MAX_THREADS_ENV_VAR)) || STATUS_SUCCESS != STRTOUI32(pMaxThreads, NULL, 10, &maxThreads)) {
maxThreads = THREADPOOL_MAX_THREADS;
}

CHK_ERR(!IS_VALID_MUTEX_VALUE(pThreadPoolContext->threadpoolContextLock), STATUS_INVALID_OPERATION, "Mutex seems to have been created already");

pThreadPoolContext->threadpoolContextLock = MUTEX_CREATE(FALSE);
// Protecting this section to ensure we are not pushing threads / destroying the pool
// when it is being created.
if (!IS_VALID_MUTEX_VALUE(pThreadPoolContext->threadpoolContextLock)) {
pThreadPoolContext->threadpoolContextLock = MUTEX_CREATE(FALSE);
}
MUTEX_LOCK(pThreadPoolContext->threadpoolContextLock);
locked = TRUE;
CHK_WARN(!pThreadPoolContext->isInitialized, retStatus, "Threadpool already set up. Nothing to do");
Expand Down

0 comments on commit 753b5b0

Please sign in to comment.