Skip to content

Commit

Permalink
Threadpool context for WebRTC (#1810)
Browse files Browse the repository at this point in the history
* Threadpool context for WebRTC

* Disable for windows

* Include directory to shared library for windows

* Remove context object, use envs to allow configuring threadpool

* Include README changes

* Get back threadpool context

This reverts commit 2307056.

* Move thread sleep to unit tests

* Revert sleep from test to main file

* Remove threadpool usage from signaling

* Rebased off develop

* Add locks around threadpool calls
  • Loading branch information
disa6302 authored Sep 21, 2023
1 parent 8ce0c98 commit 3ad6f81
Show file tree
Hide file tree
Showing 16 changed files with 194 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ jobs:
- name: Run tests
shell: powershell
run: |
$env:Path += ';C:\webrtc\open-source\bin;C:\tools\pthreads-w32-2-9-1-release\Pre-built.2\dll\x64'
$env:Path += ';C:\webrtc\open-source\bin;C:\tools\pthreads-w32-2-9-1-release\Pre-built.2\dll\x64;C:\webrtc\build'
& "C:\webrtc\build\tst\webrtc_client_test.exe" --gtest_filter="-DataChannelFunctionalityTest.*:IceApiTest.*:IceFunctionalityTest.*:PeerConnectionFunctionalityTest.*:SignalingApiFunctionalityTest.*:TurnConnectionFunctionalityTest.*:RtpFunctionalityTest.marshallUnmarshallH264Data:RtpFunctionalityTest.packingUnpackingVerifySameH264Frame:RtcpFunctionalityTest.onRtcpPacketCompound:RtcpFunctionalityTest.twcc3"
# windows-msvc-mbedtls:
# runs-on: windows-2022
Expand Down
14 changes: 13 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,21 @@ file(GLOB WEBRTC_SIGNALING_CLIENT_SOURCE_FILES "src/source/Signaling/*.c")
include_directories(${OPEN_SRC_INCLUDE_DIRS})
include_directories(${OPEN_SRC_INSTALL_PREFIX}/include)
include_directories(${KINESIS_VIDEO_WEBRTC_CLIENT_SRC}/src/include)
include_directories(${KINESIS_VIDEO_WEBRTC_CLIENT_SRC}/src/ice)

add_library(kvsWebrtcClient ${LINKAGE} ${WEBRTC_CLIENT_SOURCE_FILES} ${DATACHANNEL_SRC})

if(USE_MBEDTLS)
target_compile_definitions(kvsWebrtcClient PRIVATE LWS_WITH_MBEDTLS)
endif()

# The library **MUST** have a shared linkage. This library is used by both WebRtc client an
#signaling client.
if(ENABLE_KVS_THREADPOOL)
file(GLOB THREADPOOL_SOURCE_FILES "src/source/Threadpool/ThreadPoolContext.c")
add_library(kvsWebRtcThreadpool SHARED ${THREADPOOL_SOURCE_FILES})
target_link_libraries(kvsWebRtcThreadpool PRIVATE kvspicUtils)
endif()

target_link_libraries(
kvsWebrtcClient
PRIVATE kvspicUtils
Expand All @@ -360,6 +367,11 @@ if(USE_MBEDTLS)
target_compile_definitions(kvsWebrtcSignalingClient PRIVATE LWS_WITH_MBEDTLS)
endif()

if(ENABLE_KVS_THREADPOOL)
target_link_libraries(kvsWebrtcClient PRIVATE kvsWebRtcThreadpool)
target_link_libraries(kvsWebrtcSignalingClient PRIVATE kvsWebRtcThreadpool)
endif()

target_link_libraries(
kvsWebrtcSignalingClient
PUBLIC
Expand Down
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,12 @@ When building on MacOS M1, if the build fails while trying to build OpenSSL or W
To build on a 32-bit Raspbian GNU/Linux 11 on 64-bit hardware, the OpenSSL library must be manually configured. This is due to the OpenSSL autoconfiguration script detecting 64-bit hardware and emitting 64-bit ARM assembly instructions which are not allowed in 32-bit executables. A 32-bit ARM version of OpenSSL can be configured by setting 32-bit ARM platform:
`cmake .. -DBUILD_OPENSSL_PLATFORM=linux-armv4`

### Threadpool for Signaling Channel messages
The threadpool is enabled by default, and starts with 3 threads that it can increase up to 5 if all 3 are actively in use. To change these values to better match the resources of your use case
please edit samples/Samples.h defines `KVS_SIGNALING_THREADPOOL_MIN` and `KVS_SIGNALING_THREADPOOL_MAX`. You can also disable the threadpool to instead create and detach each thread to handle signaling messages by disabling the flag `-DENABLE_KVS_THREADPOOL` while building with cmake.
### Threadpool for the SDK
The threadpool is enabled by default, and starts with 5 threads that it can increase up to 10 if all are actively in use. To change these values to better match the resources of your use case you can set the environment variables to do so:
1. `export AWS_KVS_WEBRTC_THREADPOOL_MIN_THREADS=<value>`
2. `export AWS_KVS_WEBRTC_THREADPOOL_MAX_THREADS=<value>`

To disable threadpool, run `cmake .. -DENABLE_KVS_THREADPOOL=OFF`

## Documentation
All Public APIs are documented in our [Include.h](https://github.com/awslabs/amazon-kinesis-video-streams-webrtc-sdk-c/blob/master/src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h), we also generate a [Doxygen](https://awslabs.github.io/amazon-kinesis-video-streams-webrtc-sdk-c/) each commit for easier navigation.
Expand Down
2 changes: 0 additions & 2 deletions samples/Common.c
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,6 @@ STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE
pSampleConfiguration->clientInfo.loggingLevel = logLevel;
pSampleConfiguration->clientInfo.cacheFilePath = NULL; // Use the default path
pSampleConfiguration->clientInfo.signalingClientCreationMaxRetryAttempts = CREATE_SIGNALING_CLIENT_RETRY_ATTEMPTS_SENTINEL_VALUE;
pSampleConfiguration->clientInfo.signalingMessagesMinimumThreads = KVS_SIGNALING_THREADPOOL_MIN;
pSampleConfiguration->clientInfo.signalingMessagesMaximumThreads = KVS_SIGNALING_THREADPOOL_MAX;
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
pSampleConfiguration->pregenerateCertTimerId = MAX_UINT32;
pSampleConfiguration->signalingClientMetrics.version = SIGNALING_CLIENT_METRICS_CURRENT_VERSION;
Expand Down
4 changes: 0 additions & 4 deletions samples/Samples.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ extern "C" {
#define MASTER_DATA_CHANNEL_MESSAGE "This message is from the KVS Master"
#define VIEWER_DATA_CHANNEL_MESSAGE "This message is from the KVS Viewer"

// Signaling client threadpool for handling messages
#define KVS_SIGNALING_THREADPOOL_MIN 3
#define KVS_SIGNALING_THREADPOOL_MAX 5

/* Uncomment the following line in order to enable IoT credentials checks in the provided samples */
// #define IOT_CORE_ENABLE_CREDENTIALS 1

Expand Down
28 changes: 24 additions & 4 deletions src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,26 @@ extern "C" {
*/
#define SIGNALING_CONNECT_TIMEOUT (5 * HUNDREDS_OF_NANOS_IN_A_SECOND)

/**
* Default minimum number of threads in the threadpool for the SDK
*/
#define THREADPOOL_MIN_THREADS 3

/**
* Default maximum number of threads in the threadpool for the SDK
*/
#define THREADPOOL_MAX_THREADS 10

/**
* Env to set minimum number of threads in the threadpool for the KVS SDK
*/
#define WEBRTC_THREADPOOL_MIN_THREADS_ENV_VAR (PCHAR) "AWS_KVS_WEBRTC_THREADPOOL_MIN_THREADS"

/**
* Env to set maximum number of threads in the threadpool for the SDK
*/
#define WEBRTC_THREADPOOL_MAX_THREADS_ENV_VAR (PCHAR) "AWS_KVS_WEBRTC_THREADPOOL_MAX_THREADS"

#ifdef _WIN32
/**
* Default timeout for sending data
Expand Down Expand Up @@ -1249,10 +1269,10 @@ typedef struct {
//!< being used this value can be NULL or point to an EMPTY_STRING.
KvsRetryStrategyCallbacks signalingRetryStrategyCallbacks; //!< Retry strategy callbacks used while creating signaling client
INT32 signalingClientCreationMaxRetryAttempts; //!< Max attempts to create signaling client before returning error to the caller
UINT32 stateMachineRetryCountReadOnly; //!< Retry count of state machine. Note that this **MUST NOT** be modified by the user. It is a read only
//!< field
UINT32 signalingMessagesMinimumThreads;
UINT32 signalingMessagesMaximumThreads;
UINT32 stateMachineRetryCountReadOnly; //!< Retry count of state machine. Note that this **MUST NOT** be modified by the user. It is a read only
//!< field
UINT32 signalingMessagesMinimumThreads; //!< Unused field post v1.8.1
UINT32 signalingMessagesMaximumThreads; //!< Unused field post v1.8.1
} SignalingClientInfo, *PSignalingClientInfo;

/**
Expand Down
1 change: 1 addition & 0 deletions src/source/Include_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ STATUS generateJSONSafeString(PCHAR, UINT32);
////////////////////////////////////////////////////
// Project internal includes
////////////////////////////////////////////////////
#include "Threadpool/ThreadpoolContext.h"
#include "Crypto/IOBuffer.h"
#include "Crypto/Crypto.h"
#include "Crypto/Dtls.h"
Expand Down
9 changes: 8 additions & 1 deletion src/source/PeerConnection/PeerConnection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,10 @@ STATUS initKvsWebRtc(VOID)
#ifdef ENABLE_DATA_CHANNEL
CHK_STATUS(initSctpSession());
#endif

#ifdef ENABLE_KVS_THREADPOOL
DLOGI("KVS WebRtc library using thread pool");
CHK_STATUS(createThreadPoolContext());
#endif
ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, TRUE);

CleanUp:
Expand All @@ -1449,6 +1452,10 @@ STATUS deinitKvsWebRtc(VOID)
srtp_shutdown();

ATOMIC_STORE_BOOL(&gKvsWebRtcInitialized, FALSE);
#ifdef ENABLE_KVS_THREADPOOL
DLOGI("Destroying KVS Webrtc library threadpool");
destroyThreadPoolContext();
#endif

CleanUp:

Expand Down
3 changes: 2 additions & 1 deletion src/source/Signaling/LwsApiCalls.c
Original file line number Diff line number Diff line change
Expand Up @@ -2012,7 +2012,8 @@ STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT
}

#ifdef ENABLE_KVS_THREADPOOL
CHK_STATUS(threadpoolPush(pSignalingClient->pThreadpool, receiveLwsMessageWrapper, (PVOID) pSignalingMessageWrapper));
// This would fail if threadpool was not created
CHK_STATUS(threadpoolContextPush(receiveLwsMessageWrapper, pSignalingMessageWrapper));
#else
// Issue the callback on a separate thread
CHK_STATUS(THREAD_CREATE(&receivedTid, receiveLwsMessageWrapper, (PVOID) pSignalingMessageWrapper));
Expand Down
10 changes: 0 additions & 10 deletions src/source/Signaling/Signaling.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ STATUS createSignalingSync(PSignalingClientInfoInternal pClientInfo, PChannelInf
CHK_STATUS(createValidateChannelInfo(pChannelInfo, &pSignalingClient->pChannelInfo));
CHK_STATUS(validateSignalingCallbacks(pSignalingClient, pCallbacks));
CHK_STATUS(validateSignalingClientInfo(pSignalingClient, pClientInfo));
#ifdef ENABLE_KVS_THREADPOOL
DLOGD("Going to crate the threadpool for signaling");
CHK_STATUS(threadpoolCreate(&pSignalingClient->pThreadpool, pClientInfo->signalingClientInfo.signalingMessagesMinimumThreads,
pClientInfo->signalingClientInfo.signalingMessagesMaximumThreads));
DLOGD("Successfully created the threadpool for signaling");
#endif

pSignalingClient->version = SIGNALING_CLIENT_CURRENT_VERSION;
// Set invalid call times
Expand Down Expand Up @@ -227,10 +221,6 @@ STATUS freeSignaling(PSignalingClient* ppSignalingClient)

hashTableFree(pSignalingClient->diagnostics.pEndpointToClockSkewHashMap);

#ifdef ENABLE_KVS_THREADPOOL
threadpoolFree(pSignalingClient->pThreadpool);
#endif

if (IS_VALID_MUTEX_VALUE(pSignalingClient->connectedLock)) {
MUTEX_FREE(pSignalingClient->connectedLock);
}
Expand Down
4 changes: 0 additions & 4 deletions src/source/Signaling/Signaling.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,6 @@ typedef struct {
UINT64 getIceConfigTime;
UINT64 deleteTime;
UINT64 connectTime;

#ifdef ENABLE_KVS_THREADPOOL
PThreadpool pThreadpool;
#endif
UINT64 offerTime;
} SignalingClient, *PSignalingClient;

Expand Down
94 changes: 94 additions & 0 deletions src/source/Threadpool/ThreadPoolContext.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#define LOG_CLASS "ThreadPoolContext"
#include "../Include_i.h"

// Function to get access to the Singleton instance
PThreadPoolContext getThreadContextInstance()
{
static ThreadPoolContext t = {.pThreadpool = NULL, .isInitialized = FALSE, .threadpoolContextLock = INVALID_MUTEX_VALUE};
return &t;
}

STATUS createThreadPoolContext()
{
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PCHAR pMinThreads, pMaxThreads;
UINT32 minThreads, maxThreads;
PThreadPoolContext pThreadPoolContext = getThreadContextInstance();

if (!IS_VALID_MUTEX_VALUE(pThreadPoolContext->threadpoolContextLock)) {
pThreadPoolContext->threadpoolContextLock = MUTEX_CREATE(FALSE);
}

if (NULL == (pMinThreads = GETENV(WEBRTC_THREADPOOL_MIN_THREADS_ENV_VAR)) || STATUS_SUCCESS != STRTOUI32(pMinThreads, NULL, 10, &minThreads)) {
minThreads = THREADPOOL_MIN_THREADS;
}
if (NULL == (pMaxThreads = GETENV(WEBRTC_THREADPOOL_MAX_THREADS_ENV_VAR)) || STATUS_SUCCESS != STRTOUI32(pMaxThreads, NULL, 10, &maxThreads)) {
maxThreads = THREADPOOL_MAX_THREADS;
}

// Protecting this section to ensure we are not pushing threads / destroying the pool
// when it is being created.
MUTEX_LOCK(pThreadPoolContext->threadpoolContextLock);
locked = TRUE;
CHK_WARN(!pThreadPoolContext->isInitialized, retStatus, "Threadpool already set up. Nothing to do");
CHK_ERR(pThreadPoolContext->pThreadpool == NULL, STATUS_NULL_ARG, "Threadpool object is to be set up");
CHK_STATUS(threadpoolCreate(&pThreadPoolContext->pThreadpool, minThreads, maxThreads));
pThreadPoolContext->isInitialized = TRUE;
CleanUp:
if (locked) {
MUTEX_UNLOCK(pThreadPoolContext->threadpoolContextLock);
}
return retStatus;
}

STATUS threadpoolContextPush(startRoutine fn, PVOID customData)
{
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PThreadPoolContext pThreadPoolContext = getThreadContextInstance();

// Protecting this section to ensure we are destroying the pool
// when it is being used.
MUTEX_LOCK(pThreadPoolContext->threadpoolContextLock);
locked = TRUE;
CHK_ERR(pThreadPoolContext->isInitialized, STATUS_INVALID_OPERATION, "Threadpool not initialized yet");
CHK_ERR(pThreadPoolContext->pThreadpool != NULL, STATUS_NULL_ARG, "Threadpool object is NULL");
CHK_STATUS(threadpoolPush(pThreadPoolContext->pThreadpool, fn, customData));
CleanUp:
if (locked) {
MUTEX_UNLOCK(pThreadPoolContext->threadpoolContextLock);
}
return retStatus;
}

STATUS destroyThreadPoolContext()
{
STATUS retStatus = STATUS_SUCCESS;
BOOL locked = FALSE;
PThreadPoolContext pThreadPoolContext = getThreadContextInstance();

// Ensure we do not destroy the pool if threads are still being pushed
MUTEX_LOCK(pThreadPoolContext->threadpoolContextLock);
locked = TRUE;
CHK_WARN(pThreadPoolContext->isInitialized, STATUS_INVALID_OPERATION, "Threadpool not initialized yet, nothing to destroy");
CHK_WARN(pThreadPoolContext->pThreadpool != NULL, STATUS_NULL_ARG, "Destroying threadpool without setting up");
threadpoolFree(pThreadPoolContext->pThreadpool);

// All members of the static instance **MUST** be reset after destruction to allow for
// the static object to be re-created after destruction (more relevant for unit tests)
pThreadPoolContext->pThreadpool = NULL;
pThreadPoolContext->isInitialized = FALSE;
CleanUp:
if (locked) {
MUTEX_UNLOCK(pThreadPoolContext->threadpoolContextLock);
}
if (IS_VALID_MUTEX_VALUE(pThreadPoolContext->threadpoolContextLock)) {
MUTEX_FREE(pThreadPoolContext->threadpoolContextLock);

// Important to reset, specifically in case of unit tests where initKvsWebRtc() and
// deinitKvsWebRtc() is invoked before and after every test suite
pThreadPoolContext->threadpoolContextLock = INVALID_MUTEX_VALUE;
}
return retStatus;
};
31 changes: 31 additions & 0 deletions src/source/Threadpool/ThreadpoolContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*******************************************
Main internal include file
*******************************************/
#ifndef __KINESIS_VIDEO_WEBRTC_CLIENT_THREADPOOLCONTEXT__
#define __KINESIS_VIDEO_WEBRTC_CLIENT_THREADPOOLCONTEXT__

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

////////////////////////////////////////////////////
// Project include files
////////////////////////////////////////////////////

typedef struct {
PThreadpool pThreadpool;
BOOL isInitialized;
MUTEX threadpoolContextLock;
} ThreadPoolContext, *PThreadPoolContext;

PUBLIC_API STATUS createThreadPoolContext();
PUBLIC_API STATUS getThreadPoolContext(PThreadPoolContext);
PUBLIC_API STATUS threadpoolContextPush(startRoutine, PVOID);
PUBLIC_API STATUS destroyThreadPoolContext();

#ifdef __cplusplus
}
#endif
#endif /* __KINESIS_VIDEO_WEBRTC_CLIENT_THREADPOOLCONTEXT__ */
Loading

0 comments on commit 3ad6f81

Please sign in to comment.