Skip to content

Commit

Permalink
Ice turn sleepless state machine (#1825)
Browse files Browse the repository at this point in the history
* Starting point for turn state machine

* Add timerqueue kick and checks for ice agent state machine

* Completing turn connection state machine

* improved error logging

* Better error logging around ICE_STATE_FAILED

* Adding lock around step state machine to prevent race conditions with
iceAgentStatus

* Only lock the iterator during stepstate

* Add turn state transition after handling stun packets for turn

* add null check to iceagentrestart

* Use producer-c develop

* Fix data race

* Cleanup commented out code

* remove excessive buffer check in dtls_openssl
  • Loading branch information
jdelapla authored Oct 16, 2023
1 parent 3ba7c05 commit a03c26c
Show file tree
Hide file tree
Showing 13 changed files with 1,010 additions and 433 deletions.
2 changes: 1 addition & 1 deletion CMake/Dependencies/libkvsCommonLws-CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ include(ExternalProject)

ExternalProject_Add(libkvsCommonLws-download
GIT_REPOSITORY https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.git
GIT_TAG fd562a16e41b1972df14c230b6be7a7d3f75bc6b
GIT_TAG develop
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/build
CMAKE_ARGS
-DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ extern "C" {
#define STATUS_ICE_SERVER_INDEX_INVALID STATUS_ICE_BASE + 0x00000028
#define STATUS_ICE_CANDIDATE_STRING_MISSING_TYPE STATUS_ICE_BASE + 0x00000029
#define STATUS_TURN_CONNECTION_ALLOCAITON_FAILED STATUS_ICE_BASE + 0x0000002a
#define STATUS_TURN_INVALID_STATE STATUS_ICE_BASE + 0x0000002b
/*!@} */

/////////////////////////////////////////////////////
Expand Down
4 changes: 3 additions & 1 deletion src/source/Crypto/Dtls_openssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,13 @@ STATUS dtlsSessionHandshakeInThread(PDtlsSession pDtlsSession, BOOL isServer)
switch (pDtlsSession->handshakeState) {
case DTLS_STATE_HANDSHAKE_NEW:
if (sslRet <= 0) {
DLOGI("Failed to complete handshake..but let it go on");
sslErr = SSL_get_error(pDtlsSession->pSsl, sslRet);
if (sslErr == SSL_ERROR_WANT_READ || sslErr == SSL_ERROR_WANT_WRITE) {
// If OpenSSL wants to read or write, it's an indication we should check the BIO
DLOGD("Handshake want READ/WRITE");
CHK_STATUS(dtlsCheckOutgoingDataBuffer(pDtlsSession));
} else {
DLOGI("Failed to complete handshake..but let it go on");
// Handle other errors
LOG_OPENSSL_ERROR("SSL_do_handshake");
}
Expand All @@ -482,6 +483,7 @@ STATUS dtlsSessionHandshakeInThread(PDtlsSession pDtlsSession, BOOL isServer)
ATOMIC_STORE_BOOL(&pDtlsSession->sslInitFinished, TRUE);
CHK_STATUS(dtlsSessionChangeState(pDtlsSession, RTC_DTLS_TRANSPORT_STATE_CONNECTED));
}
pDtlsSession->handshakeState = DTLS_STATE_HANDSHAKE_IN_PROGRESS;
break;
case DTLS_STATE_HANDSHAKE_IN_PROGRESS:
if (SSL_is_init_finished(pDtlsSession->pSsl)) {
Expand Down
17 changes: 14 additions & 3 deletions src/source/Ice/IceAgent.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Kinesis Video Producer Callbacks Provider
* Ice Agent APIs
*/
#define LOG_CLASS "IceAgent"
#include "../Include_i.h"
Expand Down Expand Up @@ -61,7 +61,7 @@ STATUS createIceAgent(PCHAR username, PCHAR password, PIceAgentCallbacks pIceAge
pIceAgent->localNetworkInterfaceCount = ARRAY_SIZE(pIceAgent->localNetworkInterfaces);
pIceAgent->candidateGatheringEndTime = INVALID_TIMESTAMP_VALUE;

pIceAgent->lock = MUTEX_CREATE(FALSE);
pIceAgent->lock = MUTEX_CREATE(TRUE);

// Create the state machine
CHK_STATUS(createStateMachine(ICE_AGENT_STATE_MACHINE_STATES, ICE_AGENT_STATE_MACHINE_STATE_COUNT, (UINT64) pIceAgent, iceAgentGetCurrentTime,
Expand Down Expand Up @@ -580,6 +580,8 @@ STATUS iceAgentStartAgent(PIceAgent pIceAgent, PCHAR remoteUsername, PCHAR remot

CleanUp:

CHK_LOG_ERR(retStatus);

if (locked) {
MUTEX_UNLOCK(pIceAgent->lock);
}
Expand Down Expand Up @@ -913,7 +915,7 @@ STATUS iceAgentRestart(PIceAgent pIceAgent, PCHAR localIceUfrag, PCHAR localIceP
* pIceAgent->pDataSendingIceCandidatePair and its ice candidates. Therefore safe to proceed freeing resources */

for (i = 0; i < localCandidateCount; ++i) {
if (localCandidates[i] != pIceAgent->pDataSendingIceCandidatePair->local) {
if (pIceAgent->pDataSendingIceCandidatePair == NULL || localCandidates[i] != pIceAgent->pDataSendingIceCandidatePair->local) {
if (localCandidates[i]->iceCandidateType != ICE_CANDIDATE_TYPE_RELAYED) {
CHK_STATUS(connectionListenerRemoveConnection(pIceAgent->pConnectionListener, localCandidates[i]->pSocketConnection));
CHK_STATUS(freeSocketConnection(&localCandidates[i]->pSocketConnection));
Expand Down Expand Up @@ -1804,12 +1806,14 @@ STATUS turnStateFailedFn(PSocketConnection pSocketConnection, UINT64 data)

PIceCandidate pNewCandidate = (PIceCandidate) data;
CHK(pNewCandidate != NULL, STATUS_NULL_ARG);
MUTEX_LOCK(pNewCandidate->pIceAgent->lock);

if (pNewCandidate->state == ICE_CANDIDATE_STATE_NEW) {
pNewCandidate->state = ICE_CANDIDATE_STATE_INVALID;
}

CleanUp:
MUTEX_UNLOCK(pNewCandidate->pIceAgent->lock);
return retStatus;
}

Expand Down Expand Up @@ -2112,6 +2116,10 @@ STATUS iceAgentNominatingStateSetup(PIceAgent pIceAgent)

pIceAgent->stateEndTime = GETTIME() + pIceAgent->kvsRtcConfiguration.iceCandidateNominationTimeout;

MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;
checkIceAgentStateMachine(pIceAgent);

CleanUp:

CHK_LOG_ERR(retStatus);
Expand Down Expand Up @@ -2369,6 +2377,9 @@ STATUS incomingDataHandler(UINT64 customData, PSocketConnection pSocketConnectio
} else {
if (ATOMIC_LOAD_BOOL(&pIceAgent->processStun)) {
CHK_STATUS(handleStunPacket(pIceAgent, pBuffer, bufferLen, pSocketConnection, pSrc, pDest));
MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;
checkIceAgentStateMachine(pIceAgent);
}
}

Expand Down
124 changes: 91 additions & 33 deletions src/source/Ice/IceAgentStateMachine.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,73 @@ StateMachineState ICE_AGENT_STATE_MACHINE_STATES[] = {

UINT32 ICE_AGENT_STATE_MACHINE_STATE_COUNT = ARRAY_SIZE(ICE_AGENT_STATE_MACHINE_STATES);

STATUS checkIceAgentStateMachine(PIceAgent pIceAgent)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
BOOL transitionReady = FALSE;

CHK(pIceAgent != NULL && pIceAgent->pStateMachine != NULL, STATUS_NULL_ARG);

// if a state transition is ready, tell the timer to kick the timer
CHK_STATUS(checkForStateTransition(pIceAgent->pStateMachine, &transitionReady));
if (transitionReady) {
// dangerous to have any mutexes locked by timerqueue when entering this function
CHK_STATUS(timerQueueKick(pIceAgent->timerQueueHandle, pIceAgent->iceAgentStateTimerTask));
}

CleanUp:

CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
}

STATUS stepIceAgentStateMachine(PIceAgent pIceAgent)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
UINT64 oldState;
BOOL locked = FALSE;

CHK(pIceAgent != NULL, STATUS_NULL_ARG);

oldState = pIceAgent->iceAgentState;

CHK_STATUS(stepStateMachine(pIceAgent->pStateMachine));
do {
oldState = pIceAgent->iceAgentState;
MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;

// if any failure happened and state machine is not in failed state, stepStateMachine again into failed state.
if (pIceAgent->iceAgentState != ICE_AGENT_STATE_FAILED && STATUS_FAILED(pIceAgent->iceAgentStatus)) {
CHK_STATUS(stepStateMachine(pIceAgent->pStateMachine));
}

if (oldState != pIceAgent->iceAgentState) {
if (pIceAgent->iceAgentCallbacks.connectionStateChangedFn != NULL) {
DLOGI("Ice agent state changed from %s to %s.", iceAgentStateToString(oldState), iceAgentStateToString(pIceAgent->iceAgentState));
pIceAgent->iceAgentCallbacks.connectionStateChangedFn(pIceAgent->iceAgentCallbacks.customData, pIceAgent->iceAgentState);
// if any failure happened and state machine is not in failed state, stepStateMachine again into failed state.
if (pIceAgent->iceAgentState != ICE_AGENT_STATE_FAILED && STATUS_FAILED(pIceAgent->iceAgentStatus)) {
DLOGD("Ice agent state %s operation encountered error 0x%08x", iceAgentStateToString(pIceAgent->iceAgentState),
pIceAgent->iceAgentStatus);
CHK_STATUS(stepStateMachine(pIceAgent->pStateMachine));
}
} else {
// state machine retry is not used. resetStateMachineRetryCount just to avoid
// state machine retry grace period overflow warning.
CHK_STATUS(resetStateMachineRetryCount(pIceAgent->pStateMachine));
}

MUTEX_UNLOCK(pIceAgent->lock);
locked = FALSE;

if (oldState != pIceAgent->iceAgentState) {
if (pIceAgent->iceAgentCallbacks.connectionStateChangedFn != NULL) {
DLOGI("Ice agent state changed from %s to %s.", iceAgentStateToString(oldState), iceAgentStateToString(pIceAgent->iceAgentState));
pIceAgent->iceAgentCallbacks.connectionStateChangedFn(pIceAgent->iceAgentCallbacks.customData, pIceAgent->iceAgentState);
}
} else {
// state machine retry is not used. resetStateMachineRetryCount just to avoid
// state machine retry grace period overflow warning.
CHK_STATUS(resetStateMachineRetryCount(pIceAgent->pStateMachine));
}
} while (oldState != pIceAgent->iceAgentState);

CleanUp:

CHK_LOG_ERR(retStatus);
if (locked) {
MUTEX_UNLOCK(pIceAgent->lock);
}

LEAVES();
return retStatus;
Expand All @@ -80,6 +116,7 @@ STATUS acceptIceAgentMachineState(PIceAgent pIceAgent, UINT64 state)
CHK_STATUS(acceptStateMachineState(pIceAgent->pStateMachine, state));

CleanUp:
CHK_LOG_ERR(retStatus);

if (locked) {
MUTEX_UNLOCK(pIceAgent->lock);
Expand Down Expand Up @@ -194,6 +231,7 @@ STATUS executeNewIceAgentState(UINT64 customData, UINT64 time)
pIceAgent->iceAgentState = ICE_AGENT_STATE_NEW;

CleanUp:
CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
Expand Down Expand Up @@ -241,6 +279,7 @@ STATUS fromCheckConnectionIceAgentState(UINT64 customData, PUINT64 pState)
}

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
state = ICE_AGENT_STATE_FAILED;
Expand Down Expand Up @@ -278,6 +317,7 @@ STATUS executeCheckConnectionIceAgentState(UINT64 customData, UINT64 time)
CHK_STATUS(iceAgentCheckCandidatePairConnection(pIceAgent));

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
pIceAgent->iceAgentStatus = retStatus;
Expand Down Expand Up @@ -317,6 +357,7 @@ STATUS fromConnectedIceAgentState(UINT64 customData, PUINT64 pState)
state = ICE_AGENT_STATE_NOMINATING;

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
state = ICE_AGENT_STATE_FAILED;
Expand Down Expand Up @@ -351,6 +392,7 @@ STATUS executeConnectedIceAgentState(UINT64 customData, UINT64 time)
pIceAgent->iceAgentState = ICE_AGENT_STATE_CONNECTED;

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
pIceAgent->iceAgentStatus = retStatus;
Expand Down Expand Up @@ -405,6 +447,7 @@ STATUS fromNominatingIceAgentState(UINT64 customData, PUINT64 pState)
}

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
state = ICE_AGENT_STATE_FAILED;
Expand Down Expand Up @@ -450,6 +493,7 @@ STATUS executeNominatingIceAgentState(UINT64 customData, UINT64 time)
}

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
pIceAgent->iceAgentStatus = retStatus;
Expand All @@ -469,8 +513,6 @@ STATUS fromReadyIceAgentState(UINT64 customData, PUINT64 pState)
PIceAgent pIceAgent = (PIceAgent) customData;
UINT64 state = ICE_AGENT_STATE_READY; // original state
BOOL locked = FALSE;
PDoubleListNode pCurNode = NULL, pNodeToDelete = NULL;
PIceCandidate pIceCandidate = NULL;

CHK(pIceAgent != NULL && pState != NULL, STATUS_NULL_ARG);

Expand All @@ -482,26 +524,11 @@ STATUS fromReadyIceAgentState(UINT64 customData, PUINT64 pState)

CHK_STATUS(iceAgentStateMachineCheckDisconnection(pIceAgent, &state));

// Free TurnConnections that are shutdown
CHK_STATUS(doubleListGetHeadNode(pIceAgent->localCandidates, &pCurNode));
while (pCurNode != NULL) {
pIceCandidate = (PIceCandidate) pCurNode->data;
pNodeToDelete = pCurNode;
pCurNode = pCurNode->pNext;

if (pIceCandidate->iceCandidateType == ICE_CANDIDATE_TYPE_RELAYED && turnConnectionIsShutdownComplete(pIceCandidate->pTurnConnection)) {
MUTEX_UNLOCK(pIceAgent->lock);
CHK_LOG_ERR(freeTurnConnection(&pIceCandidate->pTurnConnection));
MUTEX_LOCK(pIceAgent->lock);
MEMFREE(pIceCandidate);
CHK_STATUS(doubleListDeleteNode(pIceAgent->localCandidates, pNodeToDelete));
}
}

// return early if changing to disconnected state
CHK(state != ICE_AGENT_STATE_DISCONNECTED, retStatus);

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
state = ICE_AGENT_STATE_FAILED;
Expand All @@ -527,15 +554,38 @@ STATUS executeReadyIceAgentState(UINT64 customData, UINT64 time)
ENTERS();
UNUSED_PARAM(time);
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PIceAgent pIceAgent = (PIceAgent) customData;
PDoubleListNode pCurNode = NULL, pNodeToDelete = NULL;
PIceCandidate pIceCandidate = NULL;

CHK(pIceAgent != NULL, STATUS_NULL_ARG);
if (pIceAgent->iceAgentState != ICE_AGENT_STATE_READY) {
CHK_STATUS(iceAgentReadyStateSetup(pIceAgent));
pIceAgent->iceAgentState = ICE_AGENT_STATE_READY;
}

MUTEX_LOCK(pIceAgent->lock);
locked = TRUE;

// Free TurnConnections that are shutdown
CHK_STATUS(doubleListGetHeadNode(pIceAgent->localCandidates, &pCurNode));
while (pCurNode != NULL) {
pIceCandidate = (PIceCandidate) pCurNode->data;
pNodeToDelete = pCurNode;
pCurNode = pCurNode->pNext;

if (pIceCandidate->iceCandidateType == ICE_CANDIDATE_TYPE_RELAYED && turnConnectionIsShutdownComplete(pIceCandidate->pTurnConnection)) {
MUTEX_UNLOCK(pIceAgent->lock);
CHK_LOG_ERR(freeTurnConnection(&pIceCandidate->pTurnConnection));
MUTEX_LOCK(pIceAgent->lock);
MEMFREE(pIceCandidate);
CHK_STATUS(doubleListDeleteNode(pIceAgent->localCandidates, pNodeToDelete));
}
}

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
pIceAgent->iceAgentStatus = retStatus;
Expand All @@ -550,6 +600,10 @@ STATUS executeReadyIceAgentState(UINT64 customData, UINT64 time)
pIceAgent->iceAgentStartTime = 0;
}

if (locked) {
MUTEX_UNLOCK(pIceAgent->lock);
}

LEAVES();
return retStatus;
}
Expand All @@ -571,6 +625,7 @@ STATUS fromDisconnectedIceAgentState(UINT64 customData, PUINT64 pState)
CHK_STATUS(pIceAgent->iceAgentStatus);

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
state = ICE_AGENT_STATE_FAILED;
Expand Down Expand Up @@ -619,6 +674,7 @@ STATUS executeDisconnectedIceAgentState(UINT64 customData, UINT64 time)
CHK_STATUS(stepStateMachine(pIceAgent->pStateMachine));

CleanUp:
CHK_LOG_ERR(retStatus);

if (STATUS_FAILED(retStatus)) {
pIceAgent->iceAgentStatus = retStatus;
Expand All @@ -643,6 +699,7 @@ STATUS fromFailedIceAgentState(UINT64 customData, PUINT64 pState)
*pState = ICE_AGENT_STATE_FAILED;

CleanUp:
CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
Expand Down Expand Up @@ -672,6 +729,7 @@ STATUS executeFailedIceAgentState(UINT64 customData, UINT64 time)
}

CleanUp:
CHK_LOG_ERR(retStatus);

LEAVES();
return retStatus;
Expand Down
3 changes: 2 additions & 1 deletion src/source/Ice/IceAgentStateMachine.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ extern "C" {
#define ICE_AGENT_STATE_FAILED_STR (PCHAR) "ICE_AGENT_STATE_FAILED"

// Whether to step the state machine
STATUS checkIceAgentStateMachine(PIceAgent);
STATUS stepIceAgentStateMachine(PIceAgent);
STATUS acceptIceAgentMachineState(PIceAgent, UINT64);
STATUS iceAgentStateMachineCheckDisconnection(PIceAgent, PUINT64);
PCHAR iceAgentStateToString(UINT64);

/**
* Signaling state machine callbacks
* Ice agent state machine callbacks
*/
STATUS fromNewIceAgentState(UINT64, PUINT64);
STATUS executeNewIceAgentState(UINT64, UINT64);
Expand Down
Loading

0 comments on commit a03c26c

Please sign in to comment.