From 79bdb8cbb6232370336c2da0d1cc7610b67bb35d Mon Sep 17 00:00:00 2001 From: Yonatan Komornik <11005061+yoniko@users.noreply.github.com> Date: Thu, 2 Feb 2023 15:19:22 -0800 Subject: [PATCH] AsyncIO performance regression for small files fix (#3474) - Do not use threaded AsyncIO when handling small files. - Some typo / doc fixes --- programs/fileio.c | 25 ++++++++++ programs/fileio_asyncio.c | 97 ++++++++++++++++++++++++++++----------- programs/fileio_asyncio.h | 22 +++++++++ 3 files changed, 116 insertions(+), 28 deletions(-) diff --git a/programs/fileio.c b/programs/fileio.c index fb71804d6a..9a8300cdd8 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -1758,6 +1758,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, int result; FILE* srcFile; stat_t srcFileStat; + U64 fileSize = UTIL_FILESIZE_UNKNOWN; DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName); if (strcmp(srcFileName, stdinmark)) { @@ -1790,6 +1791,17 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat); if (srcFile == NULL) return 1; /* srcFile could not be opened */ + /* Don't use AsyncIO for small files */ + if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */ + fileSize = UTIL_getFileSizeStat(&srcFileStat); + if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) { + AIO_ReadPool_setAsync(ress.readCtx, 0); + AIO_WritePool_setAsync(ress.writeCtx, 0); + } else { + AIO_ReadPool_setAsync(ress.readCtx, 1); + AIO_WritePool_setAsync(ress.writeCtx, 1); + } + AIO_ReadPool_setFile(ress.readCtx, srcFile); result = FIO_compressFilename_dstFile( fCtx, prefs, ress, @@ -2586,6 +2598,7 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs FILE* srcFile; stat_t srcFileStat; int result; + U64 fileSize = UTIL_FILESIZE_UNKNOWN; if (UTIL_isDirectory(srcFileName)) { DISPLAYLEVEL(1, "zstd: %s is a directory -- ignored \n", srcFileName); @@ -2594,6 +2607,18 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat); if (srcFile==NULL) return 1; + + /* Don't use AsyncIO for small files */ + if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */ + fileSize = UTIL_getFileSizeStat(&srcFileStat); + if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) { + AIO_ReadPool_setAsync(ress.readCtx, 0); + AIO_WritePool_setAsync(ress.writeCtx, 0); + } else { + AIO_ReadPool_setAsync(ress.readCtx, 1); + AIO_WritePool_setAsync(ress.writeCtx, 1); + } + AIO_ReadPool_setFile(ress.readCtx, srcFile); result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName, &srcFileStat); diff --git a/programs/fileio_asyncio.c b/programs/fileio_asyncio.c index 8f12fe1f9f..fe9cca95d1 100644 --- a/programs/fileio_asyncio.c +++ b/programs/fileio_asyncio.c @@ -140,7 +140,7 @@ int AIO_supported(void) { } /* *********************************** - * General IoPool implementation + * Generic IoPool implementation *************************************/ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { @@ -163,20 +163,22 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { * Displays warning if asyncio is requested but MT isn't available. */ static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) { ctx->threadPool = NULL; + ctx->threadPoolActive = 0; if(prefs->asyncIO) { if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) - EXM_THROW(102,"Failed creating write availableJobs mutex"); + EXM_THROW(102,"Failed creating ioJobsMutex mutex"); /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ assert(MAX_IO_JOBS >= 2); ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); + ctx->threadPoolActive = 1; if (!ctx->threadPool) - EXM_THROW(104, "Failed creating writer thread pool"); + EXM_THROW(104, "Failed creating I/O thread pool"); } } /* AIO_IOPool_init: - * Allocates and sets and a new write pool including its included availableJobs. */ + * Allocates and sets and a new I/O thread pool including its included availableJobs. */ static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) { int i; AIO_IOPool_createThreadPool(ctx, prefs); @@ -192,27 +194,59 @@ static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_fun } +/* AIO_IOPool_threadPoolActive: + * Check if current operation uses thread pool. + * Note that in some cases we have a thread pool initialized but choose not to use it. */ +static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) { + return ctx->threadPool && ctx->threadPoolActive; +} + + +/* AIO_IOPool_lockJobsMutex: + * Locks the IO jobs mutex if threading is active */ +static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) { + if(AIO_IOPool_threadPoolActive(ctx)) + ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); +} + +/* AIO_IOPool_unlockJobsMutex: + * Unlocks the IO jobs mutex if threading is active */ +static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) { + if(AIO_IOPool_threadPoolActive(ctx)) + ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); +} + /* AIO_IOPool_releaseIoJob: * Releases an acquired job back to the pool. Doesn't execute the job. */ static void AIO_IOPool_releaseIoJob(IOJob_t* job) { IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx; - if(ctx->threadPool) - ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); + AIO_IOPool_lockJobsMutex(ctx); assert(ctx->availableJobsCount < ctx->totalIoJobs); ctx->availableJobs[ctx->availableJobsCount++] = job; - if(ctx->threadPool) - ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); + AIO_IOPool_unlockJobsMutex(ctx); } /* AIO_IOPool_join: * Waits for all tasks in the pool to finish executing. */ static void AIO_IOPool_join(IOPoolCtx_t* ctx) { - if(ctx->threadPool) + if(AIO_IOPool_threadPoolActive(ctx)) POOL_joinJobs(ctx->threadPool); } +/* AIO_IOPool_setThreaded: + * Allows (de)activating threaded mode, to be used when the expected overhead + * of threading costs more than the expected gains. */ +static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) { + assert(threaded == 0 || threaded == 1); + assert(ctx != NULL); + if(ctx->threadPoolActive != threaded) { + AIO_IOPool_join(ctx); + ctx->threadPoolActive = threaded; + } +} + /* AIO_IOPool_free: - * Release a previously allocated write thread pool. Makes sure all takss are done and released. */ + * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { int i; if(ctx->threadPool) { @@ -236,12 +270,10 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { IOJob_t *job; assert(ctx->file != NULL || ctx->prefs->testMode); - if(ctx->threadPool) - ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); + AIO_IOPool_lockJobsMutex(ctx); assert(ctx->availableJobsCount > 0); job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; - if(ctx->threadPool) - ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); + AIO_IOPool_unlockJobsMutex(ctx); job->usedBufferSize = 0; job->file = ctx->file; job->offset = 0; @@ -251,8 +283,7 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { /* AIO_IOPool_setFile: * Sets the destination file for future files in the pool. - * Requires completion of all queues write jobs and release of all otherwise acquired jobs. - * Also requires ending of sparse write if a previous file was used in sparse mode. */ + * Requires completion of all queued jobs and release of all otherwise acquired jobs. */ static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) { assert(ctx!=NULL); AIO_IOPool_join(ctx); @@ -269,7 +300,7 @@ static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) { * The queued job shouldn't be used directly after queueing it. */ static void AIO_IOPool_enqueueJob(IOJob_t* job) { IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx; - if(ctx->threadPool) + if(AIO_IOPool_threadPoolActive(ctx)) POOL_add(ctx->threadPool, ctx->poolFunction, job); else ctx->poolFunction(job); @@ -300,8 +331,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { * Blocks on completion of all current write jobs before executing. */ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) { assert(ctx != NULL); - if(ctx->base.threadPool) - POOL_joinJobs(ctx->base.threadPool); + AIO_IOPool_join(&ctx->base); AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); ctx->storedSkips = 0; } @@ -368,6 +398,13 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) { free(ctx); } +/* AIO_WritePool_setAsync: + * Allows (de)activating async mode, to be used when the expected overhead + * of asyncio costs more than the expected gains. */ +void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) { + AIO_IOPool_setThreaded(&ctx->base, async); +} + /* *********************************** * ReadPool implementation @@ -383,14 +420,13 @@ static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) { static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) { ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; - if(ctx->base.threadPool) - ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex); + AIO_IOPool_lockJobsMutex(&ctx->base); assert(ctx->completedJobsCount < MAX_IO_JOBS); ctx->completedJobs[ctx->completedJobsCount++] = job; - if(ctx->base.threadPool) { + if(AIO_IOPool_threadPoolActive(&ctx->base)) { ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); - ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex); } + AIO_IOPool_unlockJobsMutex(&ctx->base); } /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked: @@ -426,8 +462,7 @@ static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) { * Would block. */ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { IOJob_t *job = NULL; - if (ctx->base.threadPool) - ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex); + AIO_IOPool_lockJobsMutex(&ctx->base); job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); @@ -443,8 +478,7 @@ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { ctx->waitingOnOffset += job->usedBufferSize; } - if (ctx->base.threadPool) - ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex); + AIO_IOPool_unlockJobsMutex(&ctx->base); return job; } @@ -524,7 +558,7 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) if(ctx->base.threadPool) if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) - EXM_THROW(103,"Failed creating write jobCompletedCond mutex"); + EXM_THROW(103,"Failed creating jobCompletedCond cond"); return ctx; } @@ -620,3 +654,10 @@ int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) { AIO_ReadPool_setFile(ctx, NULL); return fclose(file); } + +/* AIO_ReadPool_setAsync: + * Allows (de)activating async mode, to be used when the expected overhead + * of asyncio costs more than the expected gains. */ +void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) { + AIO_IOPool_setThreaded(&ctx->base, async); +} diff --git a/programs/fileio_asyncio.h b/programs/fileio_asyncio.h index 34dad6f4da..feb25a3f9e 100644 --- a/programs/fileio_asyncio.h +++ b/programs/fileio_asyncio.h @@ -8,6 +8,17 @@ * You may select, at your option, one of the above-listed licenses. */ + /* + * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously. + * Current implementation relies on having one thread that reads and one that + * writes. + * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but + * are performed serially by the appropriate worker thread. + * Most systems exposes better primitives to perform asynchronous IO, such as + * io_uring on newer linux systems. The API is built in such a way that in the + * future we could replace the threads with better solutions when available. + */ + #ifndef ZSTD_FILEIO_ASYNCIO_H #define ZSTD_FILEIO_ASYNCIO_H @@ -27,6 +38,7 @@ extern "C" { typedef struct { /* These struct fields should be set only on creation and not changed afterwards */ POOL_ctx* threadPool; + int threadPoolActive; int totalIoJobs; const FIO_prefs_t* prefs; POOL_function poolFunction; @@ -136,6 +148,11 @@ WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize * Frees and releases a writePool and its resources. Closes destination file. */ void AIO_WritePool_free(WritePoolCtx_t* ctx); +/* AIO_WritePool_setAsync: + * Allows (de)activating async mode, to be used when the expected overhead + * of asyncio costs more than the expected gains. */ +void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async); + /* AIO_ReadPool_create: * Allocates and sets and a new readPool including its included jobs. * bufferSize should be set to the maximal buffer we want to read at a time, will also be used @@ -146,6 +163,11 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); * Frees and releases a readPool and its resources. Closes source file. */ void AIO_ReadPool_free(ReadPoolCtx_t* ctx); +/* AIO_ReadPool_setAsync: + * Allows (de)activating async mode, to be used when the expected overhead + * of asyncio costs more than the expected gains. */ +void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async); + /* AIO_ReadPool_consumeBytes: * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);