Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prunable Pool Allocator #4517

Merged
merged 9 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/generated/linux/platform_worker.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#define _clog_MACRO_QuicTraceLogInfo 1
#define QuicTraceLogInfo(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
#endif
#ifndef _clog_MACRO_QuicTraceLogVerbose
#define _clog_MACRO_QuicTraceLogVerbose 1
#define QuicTraceLogVerbose(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
#endif
#ifndef _clog_MACRO_QuicTraceEvent
#define _clog_MACRO_QuicTraceEvent 1
#define QuicTraceEvent(a, ...) _clog_CAT(_clog_ARGN_SELECTOR(__VA_ARGS__), _clog_CAT(_,a(#a, __VA_ARGS__)))
Expand Down Expand Up @@ -61,6 +65,24 @@ tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop , arg2);\



/*----------------------------------------------------------
// Decoder Ring for PlatformWorkerProcessPools
// [ lib][%p] Processing pools
// QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);
// arg2 = arg2 = Worker = arg2
----------------------------------------------------------*/
#ifndef _clog_3_ARGS_TRACE_PlatformWorkerProcessPools
#define _clog_3_ARGS_TRACE_PlatformWorkerProcessPools(uniqueId, encoded_arg_string, arg2)\
tracepoint(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools , arg2);\

#endif




/*----------------------------------------------------------
// Decoder Ring for AllocFailure
// Allocation of '%s' failed. (%llu bytes)
Expand Down
19 changes: 19 additions & 0 deletions src/generated/linux/platform_worker.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerThreadStop,



/*----------------------------------------------------------
// Decoder Ring for PlatformWorkerProcessPools
// [ lib][%p] Processing pools
// QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);
// arg2 = arg2 = Worker = arg2
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_PLATFORM_WORKER_C, PlatformWorkerProcessPools,
TP_ARGS(
const void *, arg2),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg2, (uint64_t)arg2)
)
)



/*----------------------------------------------------------
// Decoder Ring for AllocFailure
// Allocation of '%s' failed. (%llu bytes)
Expand Down
34 changes: 32 additions & 2 deletions src/inc/quic_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,43 @@ typedef struct QUIC_EXECUTION_CONFIG QUIC_EXECUTION_CONFIG;
typedef struct CXPLAT_EXECUTION_CONTEXT CXPLAT_EXECUTION_CONTEXT;

typedef struct CXPLAT_EXECUTION_STATE {
uint64_t TimeNow; // in microseconds
uint64_t LastWorkTime; // in microseconds
uint64_t TimeNow; // in microseconds
uint64_t LastWorkTime; // in microseconds
uint64_t LastPoolProcessTime; // in microseconds
uint32_t WaitTime;
uint32_t NoWorkCount;
CXPLAT_THREAD_ID ThreadID;
} CXPLAT_EXECUTION_STATE;

//
// Supports more dynamic operations, but must be submitted to the platform worker
// to manage.
//
typedef struct CXPLAT_POOL_EX {
#ifdef __cplusplus
struct CXPLAT_POOL _;
#else
struct CXPLAT_POOL;
#endif
CXPLAT_LIST_ENTRY Link;
void* Owner;
} CXPLAT_POOL_EX;

#ifndef _KERNEL_MODE // Not supported on kernel mode

void
CxPlatAddDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool,
_In_ uint16_t Index // Into the execution config processor array
);

void
CxPlatRemoveDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
);

#endif

//
// Returns FALSE when it's time to cleanup.
//
Expand Down
17 changes: 17 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -8745,6 +8745,18 @@
"splitArgs": [],
"macroName": "QuicTraceLogWarning"
},
"PlatformWorkerProcessPools": {
"ModuleProperites": {},
"TraceString": "[ lib][%p] Processing pools",
"UniqueId": "PlatformWorkerProcessPools",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg2"
}
],
"macroName": "QuicTraceLogVerbose"
},
"PlatformWorkerThreadStart": {
"ModuleProperites": {},
"TraceString": "[ lib][%p] Worker start",
Expand Down Expand Up @@ -15936,6 +15948,11 @@
"TraceID": "PlatformThreadCreateFailed",
"EncodingString": "[ lib] pthread_create failed, retrying without affinitization"
},
{
"UniquenessHash": "3f46cbc3-c609-dab4-07c9-fbe956e68f5c",
"TraceID": "PlatformWorkerProcessPools",
"EncodingString": "[ lib][%p] Processing pools"
},
{
"UniquenessHash": "5c3773e2-ef60-26b9-4b3c-d433ca2656df",
"TraceID": "PlatformWorkerThreadStart",
Expand Down
10 changes: 7 additions & 3 deletions src/platform/datapath_winuser.c
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,10 @@ DataPathInitialize(
FALSE,
RecvDatagramLength,
QUIC_POOL_DATA,
&Datapath->Partitions[i].RecvDatagramPool);
(CXPLAT_POOL*)&Datapath->Partitions[i].RecvDatagramPool);
CxPlatAddDynamicPoolAllocator(
&Datapath->Partitions[i].RecvDatagramPool,
i);

CxPlatPoolInitializeEx(
FALSE,
Expand Down Expand Up @@ -1020,7 +1023,8 @@ CxPlatProcessorContextRelease(
CxPlatPoolUninitialize(&DatapathProc->LargeSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RioSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RioLargeSendBufferPool);
CxPlatPoolUninitialize(&DatapathProc->RecvDatagramPool);
CxPlatRemoveDynamicPoolAllocator(&DatapathProc->RecvDatagramPool);
CxPlatPoolUninitialize((CXPLAT_POOL*)&DatapathProc->RecvDatagramPool);
CxPlatPoolUninitialize(&DatapathProc->RioRecvPool);
CxPlatDataPathRelease(DatapathProc->Datapath);
}
Expand Down Expand Up @@ -2465,7 +2469,7 @@ CxPlatSocketAllocRxIoBlock(
if (SocketProc->Parent->UseRio) {
OwningPool = &DatapathProc->RioRecvPool;
} else {
OwningPool = &DatapathProc->RecvDatagramPool;
OwningPool = (CXPLAT_POOL*)&DatapathProc->RecvDatagramPool;
nibanks marked this conversation as resolved.
Show resolved Hide resolved
}

IoBlock = CxPlatPoolAlloc(OwningPool);
Expand Down
2 changes: 1 addition & 1 deletion src/platform/platform_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_DATAPATH_PROC {
// Pool of receive datagram contexts and buffers to be shared by all sockets
// on this core.
//
CXPLAT_POOL RecvDatagramPool;
CXPLAT_POOL_EX RecvDatagramPool;

//
// Pool of RIO receive datagram contexts and buffers to be shared by all
Expand Down
71 changes: 71 additions & 0 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
//
CXPLAT_LOCK ECLock;

//
// List of dynamic pools to manage.
//
CXPLAT_LIST_ENTRY DynamicPoolList;

//
// Execution contexts that are waiting to be added to CXPLAT_WORKER::ExecutionContexts.
//
Expand Down Expand Up @@ -160,6 +165,7 @@ CxPlatWorkersLazyStart(
CxPlatZeroMemory(CxPlatWorkers, WorkersSize);
for (uint32_t i = 0; i < CxPlatWorkerCount; ++i) {
CxPlatLockInitialize(&CxPlatWorkers[i].ECLock);
CxPlatListInitializeHead(&CxPlatWorkers[i].DynamicPoolList);
CxPlatWorkers[i].InitializedECLock = TRUE;
CxPlatWorkers[i].IdealProcessor = ProcessorList ? ProcessorList[i] : (uint16_t)i;
CXPLAT_DBG_ASSERT(CxPlatWorkers[i].IdealProcessor < CxPlatProcCount());
Expand Down Expand Up @@ -288,6 +294,7 @@ CxPlatWorkersUninit(
CxPlatSqeCleanup(&CxPlatWorkers[i].EventQ, &CxPlatWorkers[i].ShutdownSqe);
#endif // CXPLAT_SQE_INIT
CxPlatEventQCleanup(&CxPlatWorkers[i].EventQ);
CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&CxPlatWorkers[i].DynamicPoolList));
CxPlatLockUninitialize(&CxPlatWorkers[i].ECLock);
}

Expand All @@ -300,6 +307,65 @@ CxPlatWorkersUninit(
CxPlatLockUninitialize(&CxPlatWorkerLock);
}

#define DYNAMIC_POOL_PROCESSING_TIME 1000000 // 1 second
nibanks marked this conversation as resolved.
Show resolved Hide resolved
#define DYNAMIC_POOL_PRUNE_COUNT 8

void
CxPlatAddDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool,
_In_ uint16_t Index // Into the execution config processor array
)
{
CXPLAT_WORKER* Worker = &CxPlatWorkers[Index];
Pool->Owner = Worker;
CxPlatLockAcquire(&Worker->ECLock);
CxPlatListInsertTail(&CxPlatWorkers[Index].DynamicPoolList, &Pool->Link);
CxPlatLockRelease(&Worker->ECLock);
}

void
CxPlatRemoveDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
)
{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Pool->Owner;
CxPlatLockAcquire(&Worker->ECLock);
CxPlatListEntryRemove(&Pool->Link);
CxPlatLockRelease(&Worker->ECLock);
}

void
CxPlatProcessDynamicPoolAllocator(
_Inout_ CXPLAT_POOL_EX* Pool
)
{
for (uint32_t i = 0; i < DYNAMIC_POOL_PRUNE_COUNT; ++i) {
void* Entry = InterlockedPopEntrySList(&Pool->ListHead);
if (!Entry) break;
Pool->Free(Entry, Pool->Tag, (CXPLAT_POOL*)Pool);
}
}

void
CxPlatProcessDynamicPoolAllocators(
_In_ CXPLAT_WORKER* Worker
)
{
QuicTraceLogVerbose(
PlatformWorkerProcessPools,
"[ lib][%p] Processing pools",
Worker);

CxPlatLockAcquire(&Worker->ECLock);
mtfriesen marked this conversation as resolved.
Show resolved Hide resolved
CXPLAT_LIST_ENTRY* Entry = Worker->DynamicPoolList.Flink;
while (Entry != &Worker->DynamicPoolList) {
CXPLAT_POOL_EX* Pool = CXPLAT_CONTAINING_RECORD(Entry, CXPLAT_POOL_EX, Link);
Entry = Entry->Flink;
CxPlatProcessDynamicPoolAllocator(Pool);
}
CxPlatLockRelease(&Worker->ECLock);
}

CXPLAT_EVENTQ*
CxPlatWorkerGetEventQ(
_In_ uint16_t Index
Expand Down Expand Up @@ -504,6 +570,11 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
CxPlatSchedulerYield();
State.NoWorkCount = 0;
}

if (State.TimeNow - State.LastPoolProcessTime > DYNAMIC_POOL_PROCESSING_TIME) {
nibanks marked this conversation as resolved.
Show resolved Hide resolved
CxPlatProcessDynamicPoolAllocators(Worker);
State.LastPoolProcessTime = State.TimeNow;
}
}

Shutdown:
Expand Down
Loading