Skip to content

Commit

Permalink
Asynchronous get ice config (#1854)
Browse files Browse the repository at this point in the history
* Asynchronous get ice config

* Fixing mac compile error, addressing comments, correcting spelling error

* clang format

* fixing test util functions to include new APIs

* Unused variables for certain compile time flags breaking mac compile

* fixing more compile errors for mac

* Fix a dead lock, and fix a test with the API changes

* iceAgentRestart does not remove the IceServers since the old design required them to be supplied at object creation. Initialize relay candidates at end of iceAgentRestart.

* Update PIC build in an attempt to fix static build on Mac

* Moving git tag back to develop, since develop has been updated

* remove geticeconfig from standard connect state machine flow

* Async test, and moving geticeserverconfig out of the standard signaling state machine flow

* clang format

* Incorrect state transition

* fixing async test

* Up the sleep time

* change async func for test to handle answer and offer

* change location of creating pointer pointer

* Fixing tests

* Fix gathering to allow reporting relay candidates even after all srflx candidates have been recieved

* unit test, longer sleep on teardown of threadpool

* Fixing tests

* Update samples based off feedback, fix clang compile error in test

* Moved iceUriCount increment, added comments to public API

* fix mac compile error from unused variable for specific ifdef
  • Loading branch information
jdelapla authored and disa6302 committed Dec 15, 2023
1 parent c3e6438 commit 680e76a
Show file tree
Hide file tree
Showing 16 changed files with 648 additions and 212 deletions.
105 changes: 73 additions & 32 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,51 @@ VOID onIceCandidateHandler(UINT64 customData, PCHAR candidateJson)
CHK_LOG_ERR(retStatus);
}

PVOID asyncGetIceConfigInfo(PVOID args)
{
STATUS retStatus = STATUS_SUCCESS;
AsyncGetIceStruct* data = (AsyncGetIceStruct*) args;
PIceConfigInfo pIceConfigInfo = NULL;
UINT32 uriCount = 0;
UINT32 i = 0, maxTurnServer = 1;

if (data != NULL) {
/* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize
* candidate gathering latency. But user can also choose to use more than 1 turn server. */
for (uriCount = 0, i = 0; i < maxTurnServer; i++) {
/*
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do TURN
* over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UDP and TCP/TLS
*
* It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle mode.
*/
CHK_STATUS(signalingClientGetIceConfigInfo(data->signalingClientHandle, i, &pIceConfigInfo));
CHECK(uriCount < MAX_ICE_SERVERS_COUNT);
uriCount += pIceConfigInfo->uriCount;
CHK_STATUS(addConfigToServerList(&(data->pRtcPeerConnection), pIceConfigInfo));
}
}
*(data->pUriCount) += uriCount;

CleanUp:
SAFE_MEMFREE(data);
CHK_LOG_ERR(retStatus);
return NULL;
}

STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcPeerConnection* ppRtcPeerConnection)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
RtcConfiguration configuration;
UINT32 i, j, iceConfigCount, uriCount = 0, maxTurnServer = 1;
#ifndef ENABLE_KVS_THREADPOOL
UINT32 i, j, maxTurnServer = 1;
PIceConfigInfo pIceConfigInfo;
UINT32 uriCount = 0;
#endif
UINT64 data;
PRtcCertificate pRtcCertificate = NULL;

Expand All @@ -385,37 +423,6 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP
SNPRINTF(configuration.iceServers[0].urls, MAX_ICE_CONFIG_URI_LEN, KINESIS_VIDEO_STUN_URL, pSampleConfiguration->channelInfo.pRegion,
pKinesisVideoStunUrlPostFix);

if (pSampleConfiguration->useTurn) {
// Set the URIs from the configuration
CHK_STATUS(signalingClientGetIceConfigInfoCount(pSampleConfiguration->signalingClientHandle, &iceConfigCount));

/* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize
* candidate gathering latency. But user can also choose to use more than 1 turn server. */
for (uriCount = 0, i = 0; i < maxTurnServer; i++) {
CHK_STATUS(signalingClientGetIceConfigInfo(pSampleConfiguration->signalingClientHandle, i, &pIceConfigInfo));
for (j = 0; j < pIceConfigInfo->uriCount; j++) {
CHECK(uriCount < MAX_ICE_SERVERS_COUNT);
/*
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do TURN
* over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UDP and TCP/TLS
*
* It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle mode.
*/

STRNCPY(configuration.iceServers[uriCount + 1].urls, pIceConfigInfo->uris[j], MAX_ICE_CONFIG_URI_LEN);
STRNCPY(configuration.iceServers[uriCount + 1].credential, pIceConfigInfo->password, MAX_ICE_CONFIG_CREDENTIAL_LEN);
STRNCPY(configuration.iceServers[uriCount + 1].username, pIceConfigInfo->userName, MAX_ICE_CONFIG_USER_NAME_LEN);

uriCount++;
}
}
}

pSampleConfiguration->iceUriCount = uriCount + 1;

// Check if we have any pregenerated certs and use them
// NOTE: We are running under the config lock
retStatus = stackQueueDequeue(pSampleConfiguration->pregeneratedCertificates, &data);
Expand All @@ -430,6 +437,40 @@ STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcP
}

CHK_STATUS(createPeerConnection(&configuration, ppRtcPeerConnection));

if (pSampleConfiguration->useTurn) {
#ifdef ENABLE_KVS_THREADPOOL
pSampleConfiguration->iceUriCount = 1;
AsyncGetIceStruct* pAsyncData = NULL;

pAsyncData = (AsyncGetIceStruct*) MEMCALLOC(1, SIZEOF(AsyncGetIceStruct));
pAsyncData->signalingClientHandle = pSampleConfiguration->signalingClientHandle;
pAsyncData->pRtcPeerConnection = *ppRtcPeerConnection;
pAsyncData->pUriCount = &(pSampleConfiguration->iceUriCount);
CHK_STATUS(peerConnectionAsync(asyncGetIceConfigInfo, (PVOID) pAsyncData));
#else

/* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize
* candidate gathering latency. But user can also choose to use more than 1 turn server. */
for (uriCount = 0, i = 0; i < maxTurnServer; i++) {
/*
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do TURN
* over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
* if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UDP and TCP/TLS
*
* It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle mode.
*/
CHK_STATUS(signalingClientGetIceConfigInfo(pSampleConfiguration->signalingClientHandle, i, &pIceConfigInfo));
CHECK(uriCount < MAX_ICE_SERVERS_COUNT);
uriCount += pIceConfigInfo->uriCount;
CHK_STATUS(addConfigToServerList(ppRtcPeerConnection, pIceConfigInfo));
}
pSampleConfiguration->iceUriCount = uriCount + 1;
#endif
}

CleanUp:

CHK_LOG_ERR(retStatus);
Expand Down
9 changes: 9 additions & 0 deletions samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ struct __SampleStreamingSession {
KvsIceAgentMetrics iceMetrics;
};

// TODO this should all be in a higher webrtccontext layer above PeerConnection
// Placing it here now since this is where all the current webrtccontext functions are placed
typedef struct {
SIGNALING_CLIENT_HANDLE signalingClientHandle;
PRtcPeerConnection pRtcPeerConnection;
PUINT32 pUriCount;

} AsyncGetIceStruct;

VOID sigintHandler(INT32);
STATUS readFrameFromDisk(PBYTE, PUINT32, PCHAR);
PVOID sendVideoPackets(PVOID);
Expand Down
22 changes: 21 additions & 1 deletion src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ extern "C" {
#define STATUS_TURN_CONNECTION_PEER_NOT_USABLE STATUS_ICE_BASE + 0x00000027
#define STATUS_ICE_SERVER_INDEX_INVALID STATUS_ICE_BASE + 0x00000028
#define STATUS_ICE_CANDIDATE_STRING_MISSING_TYPE STATUS_ICE_BASE + 0x00000029
#define STATUS_TURN_CONNECTION_ALLOCAITON_FAILED STATUS_ICE_BASE + 0x0000002a
#define STATUS_TURN_CONNECTION_ALLOCATION_FAILED STATUS_ICE_BASE + 0x0000002a
#define STATUS_TURN_INVALID_STATE STATUS_ICE_BASE + 0x0000002b
/*!@} */

Expand Down Expand Up @@ -1573,6 +1573,16 @@ typedef struct {
*/
PUBLIC_API STATUS createPeerConnection(PRtcConfiguration, PRtcPeerConnection*);

/**
* @brief Give peer connection an ice config to add to its server list
*
* @param[in] PRtcPeerConnection* initialized RtcPeerConnection
* @param[in] PIceConfigInfo Ice config info to add to this peer connection
*
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS addConfigToServerList(PRtcPeerConnection*, PIceConfigInfo);

/**
* @brief Free a RtcPeerConnection
*
Expand Down Expand Up @@ -1649,6 +1659,16 @@ PUBLIC_API STATUS peerConnectionGetLocalDescription(PRtcPeerConnection, PRtcSess
*/
PUBLIC_API STATUS peerConnectionGetCurrentLocalDescription(PRtcPeerConnection, PRtcSessionDescriptionInit);

/**
* Allows use of internal threadpool
*
* @param[in] startRoutine function pointer to execute in threadpool
* @param[in] PVOID void pointer to pass to function pointer
*
* @return STATUS code of the execution. STATUS_SUCCESS on success
*/
PUBLIC_API STATUS peerConnectionAsync(startRoutine fn, PVOID data);

/**
* @brief Populate the provided answer that contains an RFC 3264 offer
* with the supported configurations for the session.
Expand Down
90 changes: 82 additions & 8 deletions src/source/Ice/IceAgent.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,82 @@ STATUS freeIceAgent(PIceAgent* ppIceAgent)
return retStatus;
}

STATUS iceAgentAddConfig(PIceAgent pIceAgent, PIceConfigInfo pIceConfigInfo)
{
STATUS retStatus = STATUS_SUCCESS;
UINT32 i = 0;
// used in PROFILE macro
UINT64 startTimeInMacro = 0;
BOOL locked = FALSE;

CHK(pIceAgent != NULL && pIceConfigInfo != NULL, STATUS_NULL_ARG);

for (i = 0; i < pIceConfigInfo->uriCount; i++) {
MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;
PROFILE_CALL_WITH_T_OBJ(retStatus = parseIceServer(&pIceAgent->iceServers[pIceAgent->iceServersCount], (PCHAR) pIceConfigInfo->uris[i],
(PCHAR) pIceConfigInfo->userName, (PCHAR) pIceConfigInfo->password),
pIceAgent->iceAgentProfileDiagnostics.iceServerParsingTime[i], "ICE server parsing");
MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;

if (STATUS_SUCCEEDED(retStatus)) {
MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;
pIceAgent->rtcIceServerDiagnostics[i].port = (INT32) getInt16(pIceAgent->iceServers[i].ipAddress.port);
switch (pIceAgent->iceServers[pIceAgent->iceServersCount].transport) {
case KVS_SOCKET_PROTOCOL_UDP:
STRCPY(pIceAgent->rtcIceServerDiagnostics[i].protocol, ICE_TRANSPORT_TYPE_UDP);
break;
case KVS_SOCKET_PROTOCOL_TCP:
STRCPY(pIceAgent->rtcIceServerDiagnostics[i].protocol, ICE_TRANSPORT_TYPE_TCP);
break;
default:
MEMSET(pIceAgent->rtcIceServerDiagnostics[i].protocol, 0, SIZEOF(pIceAgent->rtcIceServerDiagnostics[i].protocol));
}
STRCPY(pIceAgent->rtcIceServerDiagnostics[i].url, pIceConfigInfo->uris[i]);

MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;

// important to unlock iceAgent lock before calling init relay candidate, since iceAgent APIs are thread safe
// if you don't unlock this can lead to a deadlock with the timerqueue.
// init candidate && pairs
if (pIceAgent->iceServers[pIceAgent->iceServersCount].isTurn) {
if (pIceAgent->iceServers[pIceAgent->iceServersCount].transport == KVS_SOCKET_PROTOCOL_UDP ||
pIceAgent->iceServers[pIceAgent->iceServersCount].transport == KVS_SOCKET_PROTOCOL_NONE) {
CHK_STATUS(iceAgentInitRelayCandidate(pIceAgent, pIceAgent->iceServersCount, KVS_SOCKET_PROTOCOL_UDP));
}

if (pIceAgent->iceServers[pIceAgent->iceServersCount].transport == KVS_SOCKET_PROTOCOL_TCP ||
pIceAgent->iceServers[pIceAgent->iceServersCount].transport == KVS_SOCKET_PROTOCOL_NONE) {
CHK_STATUS(iceAgentInitRelayCandidate(pIceAgent, pIceAgent->iceServersCount, KVS_SOCKET_PROTOCOL_TCP));
}
}

MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;

pIceAgent->iceServersCount++;

MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;

} else {
DLOGE("Failed to parse ICE servers");
}
}

CleanUp:
CHK_LOG_ERR(retStatus);

if (locked) {
MUTEX_UNLOCK(pIceAgent->lock);
}

return retStatus;
}

STATUS iceAgentValidateKvsRtcConfig(PKvsRtcConfiguration pKvsRtcConfiguration)
{
STATUS retStatus = STATUS_SUCCESS;
Expand Down Expand Up @@ -310,8 +386,6 @@ STATUS iceAgentReportNewLocalCandidate(PIceAgent pIceAgent, PIceCandidate pIceCa
iceAgentLogNewCandidate(pIceCandidate);

CHK_WARN(pIceAgent->iceAgentCallbacks.newLocalCandidateFn != NULL, retStatus, "newLocalCandidateFn callback not implemented");
CHK_WARN(!ATOMIC_LOAD_BOOL(&pIceAgent->candidateGatheringFinished), retStatus,
"Cannot report new ice candidate because candidate gathering is already finished");
CHK_STATUS(iceCandidateSerialize(pIceCandidate, serializedIceCandidateBuf, &serializedIceCandidateBufLen));
pIceAgent->iceAgentCallbacks.newLocalCandidateFn(pIceAgent->iceAgentCallbacks.customData, serializedIceCandidateBuf);

Expand Down Expand Up @@ -614,9 +688,6 @@ STATUS iceAgentStartGathering(PIceAgent pIceAgent)
"Srflx candidates setup time");
}

PROFILE_CALL_WITH_T_OBJ(CHK_STATUS(iceAgentInitRelayCandidates(pIceAgent)), pIceAgent->iceAgentProfileDiagnostics.relayCandidateSetUpTime,
"Relay candidates setup time");

// start listening for incoming data
CHK_STATUS(connectionListenerStart(pIceAgent->pConnectionListener));

Expand Down Expand Up @@ -956,6 +1027,8 @@ STATUS iceAgentRestart(PIceAgent pIceAgent, PCHAR localIceUfrag, PCHAR localIceP
CHK_STATUS(setStateMachineCurrentState(pIceAgent->pStateMachine, ICE_AGENT_STATE_NEW));

ATOMIC_STORE_BOOL(&pIceAgent->processStun, TRUE);
// this API does not reset servers, so re-initialize relay candidates now.
CHK_STATUS(iceAgentInitRelayCandidates(pIceAgent));

CleanUp:

Expand Down Expand Up @@ -1857,16 +1930,17 @@ STATUS iceAgentInitRelayCandidate(PIceAgent pIceAgent, UINT32 iceServerIndex, KV
callback.relayAddressAvailableFn = NULL;
callback.turnStateFailedFn = turnStateFailedFn;

MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;

CHK_STATUS(createTurnConnection(&pIceAgent->iceServers[iceServerIndex], pIceAgent->timerQueueHandle,
TURN_CONNECTION_DATA_TRANSFER_MODE_SEND_INDIDATION, protocol, &callback, pNewCandidate->pSocketConnection,
pIceAgent->pConnectionListener, &pTurnConnection));
pNewCandidate->pIceAgent = pIceAgent;
pNewCandidate->pTurnConnection = pTurnConnection;

MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;

CHK_STATUS(doubleListInsertItemHead(pIceAgent->localCandidates, (UINT64) pNewCandidate));
CHK_STATUS(iceAgentReportNewLocalCandidate(pIceAgent, pNewCandidate));
pNewCandidate = NULL;

/* add existing remote candidates to turn. Need to acquire lock because remoteCandidates can be mutated by
Expand Down
2 changes: 2 additions & 0 deletions src/source/Ice/IceAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ STATUS updateSelectedLocalRemoteCandidateStats(PIceAgent);

STATUS getIceAgentStats(PIceAgent, PKvsIceAgentMetrics);

STATUS iceAgentAddConfig(PIceAgent, PIceConfigInfo);

#ifdef __cplusplus
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/source/Ice/TurnConnectionStateMachine.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ STATUS executeAllocationTurnState(UINT64 customData, UINT64 time)
pTurnConnection->state = TURN_STATE_ALLOCATION;
} else {
pTurnConnection->stateTryCount++;
CHK(pTurnConnection->stateTryCount < pTurnConnection->stateTryCountMax, STATUS_TURN_CONNECTION_ALLOCAITON_FAILED);
CHK(pTurnConnection->stateTryCount < pTurnConnection->stateTryCountMax, STATUS_TURN_CONNECTION_ALLOCATION_FAILED);
}
CHK_STATUS(iceUtilsSendStunPacket(pTurnConnection->pTurnPacket, pTurnConnection->longTermKey, ARRAY_SIZE(pTurnConnection->longTermKey),
&pTurnConnection->turnServer.ipAddress, pTurnConnection->pControlChannel, NULL, FALSE));
Expand Down
10 changes: 5 additions & 5 deletions src/source/Include_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ STATUS generateJSONSafeString(PCHAR, UINT32);
#include "Ice/NatBehaviorDiscovery.h"
#include "Srtp/SrtpSession.h"
#include "Sctp/Sctp.h"
#include "Signaling/FileCache.h"
#include "Signaling/Signaling.h"
#include "Signaling/ChannelInfo.h"
#include "Signaling/StateMachine.h"
#include "Signaling/LwsApiCalls.h"
#include "Rtp/RtpPacket.h"
#include "Rtcp/RtcpPacket.h"
#include "Rtcp/RollingBuffer.h"
Expand All @@ -154,11 +159,6 @@ STATUS generateJSONSafeString(PCHAR, UINT32);
#include "Rtp/Codecs/RtpH264Payloader.h"
#include "Rtp/Codecs/RtpOpusPayloader.h"
#include "Rtp/Codecs/RtpG711Payloader.h"
#include "Signaling/FileCache.h"
#include "Signaling/Signaling.h"
#include "Signaling/ChannelInfo.h"
#include "Signaling/StateMachine.h"
#include "Signaling/LwsApiCalls.h"
#include "Metrics/Metrics.h"

////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 680e76a

Please sign in to comment.