Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebRTC Client singleton and early STUN DNS resolution #1812

Merged
merged 10 commits into from
Sep 26, 2023
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- master
jobs:
clang-format-check:
runs-on: macos-11
runs-on: macos-latest
steps:
- name: Clone repository
uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ extern "C" {
#define STATUS_PEERCONNECTION_CREATE_ANSWER_WITHOUT_REMOTE_DESCRIPTION STATUS_PEERCONNECTION_BASE + 0x00000001
#define STATUS_PEERCONNECTION_CODEC_INVALID STATUS_PEERCONNECTION_BASE + 0x00000002
#define STATUS_PEERCONNECTION_CODEC_MAX_EXCEEDED STATUS_PEERCONNECTION_BASE + 0x00000003
#define STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME STATUS_PEERCONNECTION_BASE + 0x00000004
/*!@} */

/////////////////////////////////////////////////////
Expand Down Expand Up @@ -696,9 +697,10 @@ extern "C" {
/**
* Parameterized string for KVS STUN Server
*/
#define KINESIS_VIDEO_STUN_URL_POSTFIX "amazonaws.com"
#define KINESIS_VIDEO_STUN_URL_POSTFIX_CN "amazonaws.com.cn"
#define KINESIS_VIDEO_STUN_URL "stun:stun.kinesisvideo.%s.%s:443"
#define KINESIS_VIDEO_STUN_URL_POSTFIX "amazonaws.com"
#define KINESIS_VIDEO_STUN_URL_POSTFIX_CN "amazonaws.com.cn"
#define KINESIS_VIDEO_STUN_URL "stun:stun.kinesisvideo.%s.%s:443"
#define KINESIS_VIDEO_STUN_URL_WITHOUT_PORT "stun.kinesisvideo.%s.%s"

/**
* Default signaling SSL port
Expand Down
5 changes: 5 additions & 0 deletions src/source/Ice/IceAgent.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ STATUS createIceAgent(PCHAR username, PCHAR password, PIceAgentCallbacks pIceAge
pIceAgent->iceServersCount = 0;
for (i = 0; i < MAX_ICE_SERVERS_COUNT; i++) {
if (pRtcConfiguration->iceServers[i].urls[0] != '\0') {
if (STRSTR(pRtcConfiguration->iceServers[i].urls, "stun")) {
pIceAgent->iceServers[pIceAgent->iceServersCount].setIpFn = pIceAgent->iceAgentCallbacks.setStunServerIpFn;
} else {
pIceAgent->iceServers[pIceAgent->iceServersCount].setIpFn = NULL;
}
PROFILE_CALL_WITH_T_OBJ(
retStatus = parseIceServer(&pIceAgent->iceServers[pIceAgent->iceServersCount], (PCHAR) pRtcConfiguration->iceServers[i].urls,
(PCHAR) pRtcConfiguration->iceServers[i].username, (PCHAR) pRtcConfiguration->iceServers[i].credential),
Expand Down
1 change: 1 addition & 0 deletions src/source/Ice/IceAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ typedef struct {
IceInboundPacketFunc inboundPacketFn;
IceConnectionStateChangedFunc connectionStateChangedFn;
IceNewLocalCandidateFunc newLocalCandidateFn;
IceServerSetIpFunc setStunServerIpFn;
} IceAgentCallbacks, *PIceAgentCallbacks;

typedef struct {
Expand Down
17 changes: 16 additions & 1 deletion src/source/Ice/IceUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ STATUS parseIceServer(PIceServer pIceServer, PCHAR url, PCHAR username, PCHAR cr
STATUS retStatus = STATUS_SUCCESS;
PCHAR separator = NULL, urlNoPrefix = NULL, paramStart = NULL;
UINT32 port = ICE_STUN_DEFAULT_PORT;
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};

// username and credential is only mandatory for turn server
CHK(url != NULL && pIceServer != NULL, STATUS_NULL_ARG);
Expand Down Expand Up @@ -249,8 +250,22 @@ STATUS parseIceServer(PIceServer pIceServer, PCHAR url, PCHAR username, PCHAR cr
STRNCPY(pIceServer->url, urlNoPrefix, MAX_ICE_CONFIG_URI_LEN);
}

CHK_STATUS(getIpWithHostName(pIceServer->url, &pIceServer->ipAddress));
if (pIceServer->setIpFn != NULL) {
retStatus = pIceServer->setIpFn(0, pIceServer->url, &pIceServer->ipAddress);
}

// Adding a NULL_ARG check specifically to cover for the case where early STUN
// resolution might not be enabled
if (retStatus == STATUS_NULL_ARG || pIceServer->setIpFn == NULL) {
disa6302 marked this conversation as resolved.
Show resolved Hide resolved
// Reset the retStatus to ensure the appropriate status code is returned from
// getIpWithHostName
retStatus = STATUS_SUCCESS;
CHK_STATUS(getIpWithHostName(pIceServer->url, &pIceServer->ipAddress));
}

pIceServer->ipAddress.port = (UINT16) getInt16((INT16) port);
getIpAddrStr(&pIceServer->ipAddress, addressResolved, ARRAY_SIZE(addressResolved));
DLOGP("ICE Server address for %s: %s", pIceServer->url, addressResolved);

CleanUp:

Expand Down
1 change: 1 addition & 0 deletions src/source/Ice/IceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ typedef struct {
CHAR username[MAX_ICE_CONFIG_USER_NAME_LEN + 1];
CHAR credential[MAX_ICE_CONFIG_CREDENTIAL_LEN + 1];
KVS_SOCKET_PROTOCOL transport;
IceServerSetIpFunc setIpFn;
} IceServer, *PIceServer;

STATUS parseIceServer(PIceServer, PCHAR, PCHAR, PCHAR);
Expand Down
4 changes: 0 additions & 4 deletions src/source/Ice/Network.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ STATUS getIpWithHostName(PCHAR hostname, PKvsIpAddress destIp)
struct in_addr inaddr;

CHAR addr[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};

CHK(hostname != NULL, STATUS_NULL_ARG);
DLOGI("ICE SERVER Hostname received: %s", hostname);
Expand Down Expand Up @@ -442,12 +441,9 @@ STATUS getIpWithHostName(PCHAR hostname, PKvsIpAddress destIp)
}
freeaddrinfo(res);
CHK_ERR(resolved, STATUS_HOSTNAME_NOT_FOUND, "Could not find network address of %s", hostname);
getIpAddrStr(destIp, addressResolved, ARRAY_SIZE(addressResolved));
DLOGP("ICE Server address for %s with getaddrinfo: %s", hostname, addressResolved);
}

else {
DLOGP("ICE Server address for %s: %s", hostname, addr);
inet_pton(AF_INET, addr, &inaddr);
destIp->family = KVS_IP_FAMILY_TYPE_IPV4;
MEMCPY(destIp->address, &inaddr, IPV4_ADDRESS_LENGTH);
Expand Down
3 changes: 3 additions & 0 deletions src/source/Include_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ typedef struct {
// Used for ensuring alignment
#define ALIGN_UP_TO_MACHINE_WORD(x) ROUND_UP((x), SIZEOF(SIZE_T))

typedef STATUS (*IceServerSetIpFunc)(UINT64, PCHAR, PKvsIpAddress);
STATUS getIpAddrStr(PKvsIpAddress pKvsIpAddress, PCHAR pBuffer, UINT32 bufferLen);

////////////////////////////////////////////////////
// Project forward declarations
////////////////////////////////////////////////////
Expand Down
211 changes: 207 additions & 4 deletions src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,46 @@

static volatile ATOMIC_BOOL gKvsWebRtcInitialized = (SIZE_T) FALSE;

// Function to get access to the Singleton instance
PWebRtcClientContext getWebRtcClientInstance()
{
static WebRtcClientContext w = {.pStunIpAddrCtx = NULL, .stunCtxlock = INVALID_MUTEX_VALUE, .contextRefCnt = 0, .isContextInitialized = FALSE};
ATOMIC_INCREMENT(&w.contextRefCnt);
return &w;
}

VOID releaseHoldOnInstance(PWebRtcClientContext pWebRtcClientContext)
{
ATOMIC_DECREMENT(&pWebRtcClientContext->contextRefCnt);
}

STATUS createWebRtcClientInstance()
{
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;

CHK_WARN(!ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), retStatus, "WebRtc client context already initialized, nothing to do");
CHK_ERR(!IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), retStatus, "Mutex seems to have been created already");

pWebRtcClientContext->stunCtxlock = MUTEX_CREATE(FALSE);
CHK_ERR(IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock), STATUS_NULL_ARG, "Mutex creation failed");
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;
CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx == NULL, STATUS_INVALID_OPERATION, "STUN object already allocated");
pWebRtcClientContext->pStunIpAddrCtx = (PStunIpAddrContext) MEMCALLOC(1, SIZEOF(StunIpAddrContext));
CHK_ERR(pWebRtcClientContext->pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "Memory allocation for WebRtc client object failed");
pWebRtcClientContext->pStunIpAddrCtx->expirationDuration = 2 * HUNDREDS_OF_NANOS_IN_AN_HOUR;
ATOMIC_STORE_BOOL(&pWebRtcClientContext->isContextInitialized, TRUE);
DLOGI("Initialized WebRTC Client instance");
CleanUp:
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
releaseHoldOnInstance(pWebRtcClientContext);
return retStatus;
}

STATUS allocateSrtp(PKvsPeerConnection pKvsPeerConnection)
{
DtlsKeyingMaterial dtlsKeyingMaterial;
Expand Down Expand Up @@ -687,6 +727,121 @@ STATUS rtcpReportsCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData
return retStatus;
}

// Not thread safe
STATUS getStunAddr(PStunIpAddrContext pStunIpAddrCtx)
{
INT32 errCode;
STATUS retStatus = STATUS_SUCCESS;
struct addrinfo *rp, *res;
struct sockaddr_in* ipv4Addr;
BOOL resolved = FALSE;

errCode = getaddrinfo(pStunIpAddrCtx->hostname, NULL, NULL, &res);
if (errCode != 0) {
DLOGI("Failed to resolve hostname with errcode: %d", errCode);
retStatus = STATUS_RESOLVE_HOSTNAME_FAILED;
} else {
for (rp = res; rp != NULL && !resolved; rp = rp->ai_next) {
if (rp->ai_family == AF_INET) {
ipv4Addr = (struct sockaddr_in*) rp->ai_addr;
pStunIpAddrCtx->kvsIpAddr.family = KVS_IP_FAMILY_TYPE_IPV4;
pStunIpAddrCtx->kvsIpAddr.port = 0;
MEMCPY(pStunIpAddrCtx->kvsIpAddr.address, &ipv4Addr->sin_addr, IPV4_ADDRESS_LENGTH);
resolved = TRUE;
}
}
freeaddrinfo(res);
}
if (!resolved) {
retStatus = STATUS_RESOLVE_HOSTNAME_FAILED;
}
return retStatus;
}

STATUS onSetStunServerIp(UINT64 customData, PCHAR url, PKvsIpAddress pIpAddr)
{
UNUSED_PARAM(customData);
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
CHK_WARN(ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), STATUS_NULL_ARG, "WebRTC Client object Object not initialized");

UINT64 currentTime = GETTIME();

MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;

CHK(STRCMP(url, pWebRtcClientContext->pStunIpAddrCtx->hostname) == 0, STATUS_PEERCONNECTION_UNSUPPORTED_HOSTNAME);

if (pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized) {
DLOGI("Initialized successfully");
if (currentTime > (pWebRtcClientContext->pStunIpAddrCtx->startTime + pWebRtcClientContext->pStunIpAddrCtx->expirationDuration)) {
DLOGI("Expired...need to refresh STUN address");
// Reset start time
pWebRtcClientContext->pStunIpAddrCtx->startTime = 0;
CHK_ERR(getStunAddr(pWebRtcClientContext->pStunIpAddrCtx) == STATUS_SUCCESS, retStatus, "Failed to resolve after cache expiry");
}
MEMCPY(pIpAddr, &pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr, SIZEOF(pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr));
} else {
DLOGE("Initialization failed");
}
CleanUp:
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
DLOGD("Exiting from stun server IP callback");
releaseHoldOnInstance(pWebRtcClientContext);
return retStatus;
}

PVOID resolveStunIceServerIp(PVOID args)
{
UNUSED_PARAM(args);
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();
BOOL locked = FALSE;
CHAR addressResolved[KVS_IP_ADDRESS_STRING_BUFFER_LEN + 1] = {'\0'};
PCHAR pRegion;
PCHAR pHostnamePostfix;

if (ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized)) {
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
locked = TRUE;

if (pWebRtcClientContext->pStunIpAddrCtx == NULL) {
DLOGE("Failed to resolve STUN IP address because webrtc client instance was not created");
} else {
if ((pRegion = GETENV(DEFAULT_REGION_ENV_VAR)) == NULL) {
pRegion = DEFAULT_AWS_REGION;
}

pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX;
// If region is in CN, add CN region uri postfix
if (STRSTR(pRegion, "cn-")) {
pHostnamePostfix = KINESIS_VIDEO_STUN_URL_POSTFIX_CN;
}

SNPRINTF(pWebRtcClientContext->pStunIpAddrCtx->hostname, SIZEOF(pWebRtcClientContext->pStunIpAddrCtx->hostname),
KINESIS_VIDEO_STUN_URL_WITHOUT_PORT, pRegion, pHostnamePostfix);
if (getStunAddr(pWebRtcClientContext->pStunIpAddrCtx) == STATUS_SUCCESS) {
getIpAddrStr(&pWebRtcClientContext->pStunIpAddrCtx->kvsIpAddr, addressResolved, ARRAY_SIZE(addressResolved));
DLOGI("ICE Server address for %s with getaddrinfo: %s", pWebRtcClientContext->pStunIpAddrCtx->hostname, addressResolved);
pWebRtcClientContext->pStunIpAddrCtx->isIpInitialized = TRUE;
} else {
DLOGE("Failed to resolve %s", pWebRtcClientContext->pStunIpAddrCtx->hostname);
}
pWebRtcClientContext->pStunIpAddrCtx->startTime = GETTIME();
}
if (locked) {
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
}
} else {
DLOGW("STUN DNS thread invoked without context being initialized");
}
releaseHoldOnInstance(pWebRtcClientContext);
DLOGD("Exiting from stun server IP resolution thread");
return NULL;
}

STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection* ppPeerConnection)
{
ENTERS();
Expand Down Expand Up @@ -738,6 +893,8 @@ STATUS createPeerConnection(PRtcConfiguration pConfiguration, PRtcPeerConnection
iceAgentCallbacks.inboundPacketFn = onInboundPacket;
iceAgentCallbacks.connectionStateChangedFn = onIceConnectionStateChange;
iceAgentCallbacks.newLocalCandidateFn = onNewIceLocalCandidate;
iceAgentCallbacks.setStunServerIpFn = onSetStunServerIp;

CHK_STATUS(createConnectionListener(&pConnectionListener));
// IceAgent will own the lifecycle of pConnectionListener;
CHK_STATUS(createIceAgent(pKvsPeerConnection->localIceUfrag, pKvsPeerConnection->localIcePwd, &iceAgentCallbacks, pConfiguration,
Expand Down Expand Up @@ -1416,7 +1573,7 @@ STATUS initKvsWebRtc(VOID)
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
CHK(!ATOMIC_LOAD_BOOL(&gKvsWebRtcInitialized), retStatus);

DLOGI("Initializing WebRTC library...");
SRAND(GETTIME());

CHK(srtp_init() == srtp_err_status_ok, STATUS_SRTP_INIT_FAILED);
Expand All @@ -1427,12 +1584,15 @@ STATUS initKvsWebRtc(VOID)
KVS_CRYPTO_INIT();
LOG_GIT_HASH();

SET_INSTRUMENTED_ALLOCATORS();
#ifdef ENABLE_DATA_CHANNEL
CHK_STATUS(initSctpSession());
#endif
#ifdef ENABLE_KVS_THREADPOOL
DLOGI("KVS WebRtc library using thread pool");
CHK_STATUS(createWebRtcClientInstance());
CHK_STATUS(createThreadPoolContext());
CHK_STATUS(threadpoolContextPush(resolveStunIceServerIp, NULL));
#endif
ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, TRUE);

Expand All @@ -1442,6 +1602,48 @@ STATUS initKvsWebRtc(VOID)
return retStatus;
}

STATUS cleanupWebRtcClientInstance()
{
STATUS retStatus = STATUS_SUCCESS;

// Stun object cleanup
PWebRtcClientContext pWebRtcClientContext = getWebRtcClientInstance();

DLOGD("Releasing webrtc client context instance from cleanupWebRtcClientInstance");
releaseHoldOnInstance(pWebRtcClientContext);

CHK_WARN(ATOMIC_LOAD_BOOL(&pWebRtcClientContext->isContextInitialized), STATUS_INVALID_OPERATION,
"WebRtc context not initialized, nothing to clean up");

ATOMIC_STORE_BOOL(&pWebRtcClientContext->isContextInitialized, FALSE);

while (ATOMIC_LOAD(&pWebRtcClientContext->contextRefCnt) > 0) {
DLOGV("Waiting on all references to be returned...%d", pWebRtcClientContext->contextRefCnt);
THREAD_SLEEP(100 * HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}

/* Start of handling STUN object */
// Need this check to ensure we do not clean up the object in the next
// step while the resolve thread is ongoing
CHK_WARN(pWebRtcClientContext->pStunIpAddrCtx != NULL, STATUS_NULL_ARG, "Destroying STUN object without setting up");
MUTEX_LOCK(pWebRtcClientContext->stunCtxlock);
SAFE_MEMFREE(pWebRtcClientContext->pStunIpAddrCtx);
pWebRtcClientContext->pStunIpAddrCtx = NULL;
DLOGI("Destroyed STUN IP object");
MUTEX_UNLOCK(pWebRtcClientContext->stunCtxlock);
/* End of handling STUN object */

if (IS_VALID_MUTEX_VALUE(pWebRtcClientContext->stunCtxlock)) {
MUTEX_FREE(pWebRtcClientContext->stunCtxlock);
pWebRtcClientContext->stunCtxlock = INVALID_MUTEX_VALUE;
}

DLOGI("Destroyed WebRtc client context");

CleanUp:
return retStatus;
}

STATUS deinitKvsWebRtc(VOID)
{
ENTERS();
Expand All @@ -1454,12 +1656,13 @@ STATUS deinitKvsWebRtc(VOID)

srtp_shutdown();

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
#ifdef ENABLE_KVS_THREADPOOL
DLOGI("Destroying KVS Webrtc library threadpool");
cleanupWebRtcClientInstance();
destroyThreadPoolContext();
DLOGI("Destroyed threadpool");
RESET_INSTRUMENTED_ALLOCATORS();
#endif

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
CleanUp:

LEAVES();
Expand Down
Loading