From 57424757a6208d42f5cf67665afeb59df2a690ce Mon Sep 17 00:00:00 2001 From: Yonatan Komornik Date: Thu, 30 Dec 2021 19:54:46 -0800 Subject: [PATCH 1/6] Async IO decompression: - Added --[no-]asyncio flag for CLI decompression. - Replaced dstBuffer in decompression with a pool of write jobs. - Added an ability to execute write jobs in a separate thread. - Added an ability to wait (join) on all jobs in a thread pool (queue and running). --- lib/common/pool.c | 20 +++- lib/common/pool.h | 6 + programs/fileio.c | 290 ++++++++++++++++++++++++++++++++++++--------- programs/fileio.h | 1 + programs/zstdcli.c | 11 +- tests/playTests.sh | 16 +++ 6 files changed, 284 insertions(+), 60 deletions(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index 2e37cdd73c8..dce7d1bf2a9 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) { /* If the intended queue size was 0, signal after finishing job */ ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; - if (ctx->queueSize == 1) { - ZSTD_pthread_cond_signal(&ctx->queuePushCond); - } + ZSTD_pthread_cond_signal(&ctx->queuePushCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ @@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) { ZSTD_customFree(ctx, ctx->customMem); } +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx) { + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + } + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); +} + void ZSTD_freeThreadPool (ZSTD_threadPool* pool) { POOL_free (pool); } @@ -330,6 +339,11 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } +void POOL_joinJobs(POOL_ctx* ctx){ + assert(!ctx || ctx == &g_poolCtx); + (void)ctx; +}; + int POOL_resize(POOL_ctx* ctx, size_t numThreads) { (void)ctx; (void)numThreads; return 0; diff --git a/lib/common/pool.h b/lib/common/pool.h index 0ebde1805db..b86a3452e5c 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -38,6 +38,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, */ void POOL_free(POOL_ctx* ctx); + +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx); + /*! POOL_resize() : * Expands or shrinks pool's number of threads. * This is more efficient than releasing + creating a new context, diff --git a/programs/fileio.c b/programs/fileio.c index 5338fa62955..f6ee7847f16 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -34,6 +34,8 @@ #include /* INT_MAX */ #include #include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */ +#include "../lib/common/pool.h" +#include "../lib/common/threading.h" #if defined (_MSC_VER) # include @@ -325,6 +327,7 @@ struct FIO_prefs_s { /* IO preferences */ U32 removeSrcFile; U32 overwrite; + U32 asyncIO; /* Computation resources preferences */ unsigned memLimit; @@ -395,6 +398,7 @@ FIO_prefs_t* FIO_createPreferences(void) ret->literalCompressionMode = ZSTD_ps_auto; ret->excludeCompressedFiles = 0; ret->allowBlockDevices = 0; + ret->asyncIO = 0; return ret; } @@ -558,6 +562,10 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value) prefs->contentSize = value != 0; } +void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) { + prefs->asyncIO = value; +} + /* FIO_ctx_t functions */ void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) { @@ -2000,16 +2008,105 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, /* ************************************************************************** * Decompression ***************************************************************************/ +#define DECOMPRESSION_MAX_WRITE_JOBS (10) + +typedef struct { + POOL_ctx* writerPool; + ZSTD_pthread_mutex_t writeJobsMutex; + void* jobs[DECOMPRESSION_MAX_WRITE_JOBS]; + volatile int availableWriteJobs; + int totalWriteJobs; + FILE* dstFile; + unsigned storedSkips; + FIO_prefs_t* prefs; +} write_pool_ctx_t; + +typedef struct { + write_pool_ctx_t *ctx; + FILE* dstFile; + void *buffer; + size_t bufferSize; + size_t usedBufferSize; +} write_job_t; + typedef struct { void* srcBuffer; size_t srcBufferSize; size_t srcBufferLoaded; - void* dstBuffer; - size_t dstBufferSize; ZSTD_DStream* dctx; - FILE* dstFile; + write_pool_ctx_t *writePoolCtx; } dRess_t; +static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) { + void *buffer; + write_job_t *job; + job = (write_job_t*) malloc(sizeof(write_job_t)); + buffer = malloc(ZSTD_DStreamOutSize()); + if(!job || !buffer) + EXM_THROW(101, "Allocation error : not enough memory"); + job->buffer = buffer; + job->bufferSize = ZSTD_DStreamOutSize(); + job->usedBufferSize = 0; + job->dstFile = NULL; + job->ctx = ctx; + return job; +} + +static void FIO_writePoolCreateThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) { + ctx->writerPool = NULL; + if(prefs->asyncIO) { +#ifdef ZSTD_MULTITHREAD + if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL)) + EXM_THROW(102, "Failed creating write jobs mutex"); + // We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to + // decompress into and 1 buffer that's actively written to disk and owned by the writing thread. + assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2); + ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2); + if (!ctx->writerPool) + EXM_THROW(103, "Failed creating writer thread pool"); +#else + DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n"); +#endif + } +} + +static write_pool_ctx_t* FIO_writePoolCreate(FIO_prefs_t* const prefs) { + write_pool_ctx_t *ctx; + int i; + ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t)); + if(!ctx) + EXM_THROW(100, "Allocation error : not enough memory"); + FIO_writePoolCreateThreadPool(ctx, prefs); + ctx->prefs = prefs; + ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1; + ctx->availableWriteJobs = ctx->totalWriteJobs; + for(i=0; i < ctx->availableWriteJobs; i++) { + ctx->jobs[i] = FIO_createWriteJob(ctx); + } + ctx->storedSkips = 0; + ctx->dstFile = NULL; + return ctx; +} + +static void FIO_writePoolFree(write_pool_ctx_t* ctx) { + int i=0; + if(ctx->writerPool) { + // Make sure we finish all tasks and then free the resources + POOL_joinJobs(ctx->writerPool); + // Make sure we are not leaking jobs + assert(ctx->availableWriteJobs==ctx->totalWriteJobs); + POOL_free(ctx->writerPool); + ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex); + } + for(i=0; iavailableWriteJobs; i++) { + write_job_t* job = (write_job_t*) ctx->jobs[i]; + free(job->buffer); + free(job); + } + free(ctx); +} + + static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName) { dRess_t ress; @@ -2027,9 +2124,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi ress.srcBufferSize = ZSTD_DStreamInSize(); ress.srcBuffer = malloc(ress.srcBufferSize); - ress.dstBufferSize = ZSTD_DStreamOutSize(); - ress.dstBuffer = malloc(ress.dstBufferSize); - if (!ress.srcBuffer || !ress.dstBuffer) + if (!ress.srcBuffer) EXM_THROW(61, "Allocation error : not enough memory"); /* dictionary */ @@ -2039,6 +2134,8 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi free(dictBuffer); } + ress.writePoolCtx = FIO_writePoolCreate(prefs); + return ress; } @@ -2046,7 +2143,7 @@ static void FIO_freeDResources(dRess_t ress) { CHECK( ZSTD_freeDStream(ress.dctx) ); free(ress.srcBuffer); - free(ress.dstBuffer); + FIO_writePoolFree(ress.writePoolCtx); } @@ -2148,6 +2245,82 @@ FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedS } } } +static void FIO_writePoolSetDstFile(write_pool_ctx_t *ctx, FILE* dstFile) { + assert(ctx!=NULL); + // We can change the dst file only if we have finished writing + if(ctx->writerPool) + POOL_joinJobs(ctx->writerPool); + assert(ctx->storedSkips == 0); + assert(ctx->availableWriteJobs == ctx->totalWriteJobs); + ctx->dstFile = dstFile; +} + +static int FIO_writePoolCloseDstFile(write_pool_ctx_t *ctx) { + FILE *dstFile = ctx->dstFile; + assert(dstFile!=NULL || ctx->prefs->testMode!=0); + FIO_writePoolSetDstFile(ctx, NULL); + return fclose(dstFile); +} + + +static void FIO_releaseWriteJob(write_job_t *job) { + write_pool_ctx_t *ctx = job->ctx; + if(ctx->writerPool) { + ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); + assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS); + ctx->jobs[ctx->availableWriteJobs++] = job; + ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex); + } else { + ctx->availableWriteJobs++; + } +} + +static write_job_t* FIO_writePoolGetAvailableWriteJob(write_pool_ctx_t *ctx) { + write_job_t *job; + assert(ctx->dstFile!=NULL || ctx->prefs->testMode); + if(ctx->writerPool) { + ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); + assert(ctx->availableWriteJobs > 0); + job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs]; + ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex); + } else { + assert(ctx->availableWriteJobs==1); + ctx->availableWriteJobs--; + job = (write_job_t*)ctx->jobs[0]; + } + job->usedBufferSize = 0; + job->dstFile = ctx->dstFile; + return job; +} + +static void FIO_WritePoolWriteJobExecute(void* opaque){ + write_job_t* job = (write_job_t*) opaque; + write_pool_ctx_t* ctx = job->ctx; + ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips); + FIO_releaseWriteJob(job); +} + +static void FIO_writePoolQueueWriteJob(write_job_t *job) { + write_pool_ctx_t* ctx = job->ctx; + if(ctx->writerPool) + POOL_add(ctx->writerPool, FIO_WritePoolWriteJobExecute, job); + else + FIO_WritePoolWriteJobExecute(job); +} + +static void FIO_writePoolQueueWriteJobAndGetNextAvailable(write_job_t **job) { + FIO_writePoolQueueWriteJob(*job); + *job = FIO_writePoolGetAvailableWriteJob((*job)->ctx); +} + +static void FIO_writePoolQueueSparseWriteEnd(write_pool_ctx_t* ctx) { + assert(ctx != NULL); + if(ctx->writerPool) + POOL_joinJobs(ctx->writerPool); + assert(ctx->availableWriteJobs == ctx->totalWriteJobs); + FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips); + ctx->storedSkips = 0; +} /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode @return : 0 (no error) */ @@ -2224,7 +2397,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, U64 alreadyDecoded) /* for multi-frames streams */ { U64 frameSize = 0; - U32 storedSkips = 0; + write_job_t *writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); /* display last 20 characters only */ { size_t const srcFileLength = strlen(srcFileName); @@ -2244,7 +2417,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, /* Main decompression Loop */ while (1) { ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 }; - ZSTD_outBuffer outBuff= { ress->dstBuffer, ress->dstBufferSize, 0 }; + ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff); const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2; UTIL_HumanReadableSize_t const hrs = UTIL_makeHumanReadableSize(alreadyDecoded+frameSize); @@ -2256,7 +2429,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, } /* Write block */ - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, prefs, storedSkips); + writeJob->usedBufferSize = outBuff.pos; + FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); frameSize += outBuff.pos; if (fCtx->nbFilesTotal > 1) { size_t srcFileNameSize = strlen(srcFileName); @@ -2294,7 +2468,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, ress->srcBufferLoaded += readSize; } } } - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + FIO_releaseWriteJob(writeJob); + FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); return frameSize; } @@ -2302,15 +2477,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, #ifdef ZSTD_GZDECOMPRESS static unsigned long long -FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, - const char* srcFileName) +FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) { unsigned long long outFileSize = 0; z_stream strm; int flush = Z_NO_FLUSH; int decodingError = 0; - unsigned storedSkips = 0; + write_job_t *writeJob = NULL; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; @@ -2321,8 +2494,9 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK) return FIO_ERROR_FRAME_DECODING; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; strm.avail_in = (uInt)ress->srcBufferLoaded; strm.next_in = (z_const unsigned char*)ress->srcBuffer; @@ -2343,12 +2517,13 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, DISPLAYLEVEL(1, "zstd: %s: inflate error %d \n", srcFileName, ret); decodingError = 1; break; } - { size_t const decompBytes = ress->dstBufferSize - strm.avail_out; + { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips); + writeJob->usedBufferSize = decompBytes; + FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); outFileSize += decompBytes; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; } } if (ret == Z_STREAM_END) break; @@ -2362,16 +2537,15 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName); decodingError = 1; } - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + FIO_releaseWriteJob(writeJob); + FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif - #ifdef ZSTD_LZMADECOMPRESS static unsigned long long FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, const char* srcFileName, int plain_lzma) { unsigned long long outFileSize = 0; @@ -2379,7 +2553,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, lzma_action action = LZMA_RUN; lzma_ret initRet; int decodingError = 0; - unsigned storedSkips = 0; + write_job_t *writeJob = NULL; strm.next_in = 0; strm.avail_in = 0; @@ -2396,8 +2570,9 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, return FIO_ERROR_FRAME_DECODING; } - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; strm.next_in = (BYTE const*)ress->srcBuffer; strm.avail_in = ress->srcBufferLoaded; @@ -2420,12 +2595,13 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, srcFileName, ret); decodingError = 1; break; } - { size_t const decompBytes = ress->dstBufferSize - strm.avail_out; + { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips); + writeJob->usedBufferSize = decompBytes; + FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); outFileSize += decompBytes; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = writeJob->bufferSize; } } if (ret == LZMA_STREAM_END) break; } @@ -2434,7 +2610,8 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, memmove(ress->srcBuffer, strm.next_in, strm.avail_in); ress->srcBufferLoaded = strm.avail_in; lzma_end(&strm); - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + FIO_releaseWriteJob(writeJob); + FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif @@ -2442,7 +2619,6 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, #ifdef ZSTD_LZ4DECOMPRESS static unsigned long long FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, const char* srcFileName) { unsigned long long filesize = 0; @@ -2450,7 +2626,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, LZ4F_decompressionContext_t dCtx; LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION); int decodingError = 0; - unsigned storedSkips = 0; + write_job_t *writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); if (LZ4F_isError(errorCode)) { DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n"); @@ -2461,7 +2637,8 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, { size_t inSize = 4; size_t outSize= 0; MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER); - nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &outSize, ress->srcBuffer, &inSize, NULL); + nextToLoad = LZ4F_decompress(dCtx, NULL, &outSize, ress->srcBuffer, &inSize, NULL); + assert(outSize == 0); // We don't expect to output anything here if (LZ4F_isError(nextToLoad)) { DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n", srcFileName, LZ4F_getErrorName(nextToLoad)); @@ -2473,29 +2650,32 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, for (;nextToLoad;) { size_t readSize; size_t pos = 0; - size_t decodedBytes = ress->dstBufferSize; + size_t decodedBytes = writeJob->bufferSize; + int fullBufferDecoded = 0; /* Read input */ if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize; readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile); if (!readSize) break; /* reached end of file or stream */ - while ((pos < readSize) || (decodedBytes == ress->dstBufferSize)) { /* still to read, or still to flush */ + while ((pos < readSize) || fullBufferDecoded) { /* still to read, or still to flush */ /* Decode Input (at least partially) */ size_t remaining = readSize - pos; - decodedBytes = ress->dstBufferSize; - nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); + decodedBytes = writeJob->bufferSize; + nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); if (LZ4F_isError(nextToLoad)) { DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n", srcFileName, LZ4F_getErrorName(nextToLoad)); decodingError = 1; nextToLoad = 0; break; } pos += remaining; + fullBufferDecoded = decodedBytes == writeJob->bufferSize; /* Write Block */ if (decodedBytes) { UTIL_HumanReadableSize_t hrs; - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decodedBytes, prefs, storedSkips); + writeJob->usedBufferSize = decodedBytes; + FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); filesize += decodedBytes; hrs = UTIL_makeHumanReadableSize(filesize); DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix); @@ -2517,7 +2697,8 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, LZ4F_freeDecompressionContext(dCtx); ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */ - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + FIO_releaseWriteJob(writeJob); + FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : filesize; } @@ -2566,7 +2747,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, filesize += frameSize; } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */ #ifdef ZSTD_GZDECOMPRESS - unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, prefs, srcFileName); + unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2576,7 +2757,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, } else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */ || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */ #ifdef ZSTD_LZMADECOMPRESS - unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, prefs, srcFileName, buf[0] != 0xFD); + unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2585,7 +2766,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) { #ifdef ZSTD_LZ4DECOMPRESS - unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, prefs, srcFileName); + unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2594,7 +2775,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */ return FIO_passThrough(prefs, - ress.dstFile, srcFile, + ress.writePoolCtx->dstFile, srcFile, ress.srcBuffer, ress.srcBufferSize, ress.srcBufferLoaded); } else { @@ -2632,7 +2813,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, int releaseDstFile = 0; int transferMTime = 0; - if ((ress.dstFile == NULL) && (prefs->testMode==0)) { + if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) { + FILE *dstFile; int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */ && strcmp(dstFileName, stdoutmark) @@ -2644,8 +2826,9 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, releaseDstFile = 1; - ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); - if (ress.dstFile==NULL) return 1; + dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); + if (dstFile==NULL) return 1; + FIO_writePoolSetDstFile(ress.writePoolCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, @@ -2657,10 +2840,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName); if (releaseDstFile) { - FILE* const dstFile = ress.dstFile; clearHandler(); - ress.dstFile = NULL; - if (fclose(dstFile)) { + if (FIO_writePoolCloseDstFile(ress.writePoolCtx)) { DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result = 1; } @@ -2874,15 +3055,16 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx, return 1; } if (!prefs->testMode) { - ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); - if (ress.dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); + FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); + if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); + FIO_writePoolSetDstFile(ress.writePoolCtx, dstFile); } for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) { status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if ((!prefs->testMode) && (fclose(ress.dstFile))) + if ((!prefs->testMode) && (FIO_writePoolCloseDstFile(ress.writePoolCtx))) EXM_THROW(72, "Write error : %s : cannot properly close output file", strerror(errno)); } else { diff --git a/programs/fileio.h b/programs/fileio.h index 61094db83cb..398937a64e8 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -109,6 +109,7 @@ void FIO_setAllowBlockDevices(FIO_prefs_t* const prefs, int allowBlockDevices); void FIO_setPatchFromMode(FIO_prefs_t* const prefs, int value); void FIO_setContentSize(FIO_prefs_t* const prefs, int value); void FIO_displayCompressionParameters(const FIO_prefs_t* prefs); +void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value); /* FIO_ctx_t functions */ void FIO_setNbFilesTotal(FIO_ctx_t* const fCtx, int value); diff --git a/programs/zstdcli.c b/programs/zstdcli.c index bfe18c0c1ba..fd563e1c24d 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -239,9 +239,12 @@ static void usage_advanced(const char* programName) #ifndef ZSTD_NODECOMPRESS DISPLAYOUT( "\n"); DISPLAYOUT( "Advanced decompression arguments : \n"); - DISPLAYOUT( " -l : print information about zstd compressed files \n"); - DISPLAYOUT( "--test : test compressed file integrity \n"); - DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); + DISPLAYOUT( " -l : print information about zstd compressed files \n"); + DISPLAYOUT( "--test : test compressed file integrity \n"); + DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); +#ifdef ZSTD_MULTITHREAD + DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n"); +#endif # if ZSTD_SPARSE_DEFAULT DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n"); # else @@ -912,6 +915,8 @@ int main(int argCount, const char* argv[]) if (!strcmp(argument, "--sparse")) { FIO_setSparseWrite(prefs, 2); continue; } if (!strcmp(argument, "--no-sparse")) { FIO_setSparseWrite(prefs, 0); continue; } if (!strcmp(argument, "--test")) { operation=zom_test; continue; } + if (!strcmp(argument, "--asyncio")) { FIO_setAsyncIOFlag(prefs, 1); continue;} + if (!strcmp(argument, "--no-asyncio")) { FIO_setAsyncIOFlag(prefs, 0); continue;} if (!strcmp(argument, "--train")) { operation=zom_train; if (outFileName==NULL) outFileName=g_defaultDictName; continue; } if (!strcmp(argument, "--no-dictID")) { FIO_setDictIDFlag(prefs, 0); continue; } if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(prefs, 0); continue; } diff --git a/tests/playTests.sh b/tests/playTests.sh index b7a3d88a817..dc29d5bd9d5 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -1575,6 +1575,22 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then exit 1 fi +println "\n===> zstd asyncio decompression tests " +roundTripTest -g8M "3 --asyncio" +roundTripTest -g8M "3 --no-asyncio" +if [ $GZIPMODE -eq 1 ]; then + roundTripTest -g8M "3 --format=gzip --asyncio" + roundTripTest -g8M "3 --format=gzip --no-asyncio" +fi +if [ $LZMAMODE -eq 1 ]; then + roundTripTest -g8M "3 --format=lzma --asyncio" + roundTripTest -g8M "3 --format=lzma --no-asyncio" +fi +if [ $LZ4MODE -eq 1 ]; then + roundTripTest -g8M "3 --format=lz4 --asyncio" + roundTripTest -g8M "3 --format=lz4 --no-asyncio" +fi + if [ "$1" != "--test-large-data" ]; then println "Skipping large data tests" From 86ec45ce80555306dbb7c1327fc32c6d04376d42 Mon Sep 17 00:00:00 2001 From: Yonatan Komornik Date: Wed, 5 Jan 2022 16:53:56 -0800 Subject: [PATCH 2/6] Async IO decompression: CR fixes --- programs/fileio.c | 175 +++++++++++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 66 deletions(-) diff --git a/programs/fileio.c b/programs/fileio.c index f6ee7847f16..97206cb6f8e 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -1806,7 +1806,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, static const char* checked_index(const char* options[], size_t length, size_t index) { assert(index < length); - // Necessary to avoid warnings since -O3 will omit the above `assert` + /* Necessary to avoid warnings since -O3 will omit the above `assert` */ (void) length; return options[index]; } @@ -2011,21 +2011,31 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, #define DECOMPRESSION_MAX_WRITE_JOBS (10) typedef struct { + /* These struct fields should be set only on creation and not changed afterwards */ POOL_ctx* writerPool; - ZSTD_pthread_mutex_t writeJobsMutex; - void* jobs[DECOMPRESSION_MAX_WRITE_JOBS]; - volatile int availableWriteJobs; int totalWriteJobs; + FIO_prefs_t* prefs; + + /* Controls the file we currently write to, make changes only by using provided utility functions */ FILE* dstFile; unsigned storedSkips; - FIO_prefs_t* prefs; + + /* The jobs and availableWriteJobs fields are access by both the main and writer threads and should + * only be mutated after locking the mutex */ + ZSTD_pthread_mutex_t writeJobsMutex; + void* jobs[DECOMPRESSION_MAX_WRITE_JOBS]; + int availableWriteJobs; } write_pool_ctx_t; typedef struct { + /* These fields are automaically set and shouldn't be changed by non WritePool code. */ write_pool_ctx_t *ctx; FILE* dstFile; void *buffer; size_t bufferSize; + + /* This field should be changed before a job is queued for execution and should contain the number + * of bytes to write from the buffer. */ size_t usedBufferSize; } write_job_t; @@ -2052,14 +2062,17 @@ static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) { return job; } -static void FIO_writePoolCreateThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) { +/* WritePool_createThreadPool: + * Creates a thread pool and a mutex for threaded write pool. + * Displays warning if asyncio is requested but MT isn't available. */ +static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) { ctx->writerPool = NULL; if(prefs->asyncIO) { #ifdef ZSTD_MULTITHREAD if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL)) EXM_THROW(102, "Failed creating write jobs mutex"); - // We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to - // decompress into and 1 buffer that's actively written to disk and owned by the writing thread. + /* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to + * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2); ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2); if (!ctx->writerPool) @@ -2070,13 +2083,15 @@ static void FIO_writePoolCreateThreadPool(write_pool_ctx_t *ctx, const FIO_prefs } } -static write_pool_ctx_t* FIO_writePoolCreate(FIO_prefs_t* const prefs) { +/* WritePool_create: + * Allocates and sets and a new write pool including its included jobs. */ +static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) { write_pool_ctx_t *ctx; int i; ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t)); if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); - FIO_writePoolCreateThreadPool(ctx, prefs); + WritePool_createThreadPool(ctx, prefs); ctx->prefs = prefs; ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1; ctx->availableWriteJobs = ctx->totalWriteJobs; @@ -2088,16 +2103,20 @@ static write_pool_ctx_t* FIO_writePoolCreate(FIO_prefs_t* const prefs) { return ctx; } -static void FIO_writePoolFree(write_pool_ctx_t* ctx) { +/* WritePool_free: + * Release a previously allocated write thread pool. Makes sure all takss are done and released. */ +static void WritePool_free(write_pool_ctx_t* ctx) { int i=0; if(ctx->writerPool) { - // Make sure we finish all tasks and then free the resources + /* Make sure we finish all tasks and then free the resources */ POOL_joinJobs(ctx->writerPool); - // Make sure we are not leaking jobs + /* Make sure we are not leaking jobs */ assert(ctx->availableWriteJobs==ctx->totalWriteJobs); POOL_free(ctx->writerPool); ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex); } + assert(ctx->dstFile==NULL); + assert(ctx->storedSkips==0); for(i=0; iavailableWriteJobs; i++) { write_job_t* job = (write_job_t*) ctx->jobs[i]; free(job->buffer); @@ -2134,7 +2153,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi free(dictBuffer); } - ress.writePoolCtx = FIO_writePoolCreate(prefs); + ress.writePoolCtx = WritePool_create(prefs); return ress; } @@ -2143,7 +2162,7 @@ static void FIO_freeDResources(dRess_t ress) { CHECK( ZSTD_freeDStream(ress.dctx) ); free(ress.srcBuffer); - FIO_writePoolFree(ress.writePoolCtx); + WritePool_free(ress.writePoolCtx); } @@ -2245,25 +2264,9 @@ FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedS } } } -static void FIO_writePoolSetDstFile(write_pool_ctx_t *ctx, FILE* dstFile) { - assert(ctx!=NULL); - // We can change the dst file only if we have finished writing - if(ctx->writerPool) - POOL_joinJobs(ctx->writerPool); - assert(ctx->storedSkips == 0); - assert(ctx->availableWriteJobs == ctx->totalWriteJobs); - ctx->dstFile = dstFile; -} - -static int FIO_writePoolCloseDstFile(write_pool_ctx_t *ctx) { - FILE *dstFile = ctx->dstFile; - assert(dstFile!=NULL || ctx->prefs->testMode!=0); - FIO_writePoolSetDstFile(ctx, NULL); - return fclose(dstFile); -} - - -static void FIO_releaseWriteJob(write_job_t *job) { +/* WritePool_releaseWriteJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +static void WritePool_releaseWriteJob(write_job_t *job) { write_pool_ctx_t *ctx = job->ctx; if(ctx->writerPool) { ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); @@ -2275,7 +2278,9 @@ static void FIO_releaseWriteJob(write_job_t *job) { } } -static write_job_t* FIO_writePoolGetAvailableWriteJob(write_pool_ctx_t *ctx) { +/* WritePool_acquireWriteJob: + * Returns an available write job to be used for a future write. */ +static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) { write_job_t *job; assert(ctx->dstFile!=NULL || ctx->prefs->testMode); if(ctx->writerPool) { @@ -2293,35 +2298,73 @@ static write_job_t* FIO_writePoolGetAvailableWriteJob(write_pool_ctx_t *ctx) { return job; } -static void FIO_WritePoolWriteJobExecute(void* opaque){ +/* WritePool_executeWriteJob: + * Executes a write job synchronously. Can be used as a function for a thread pool. */ +static void WritePool_executeWriteJob(void* opaque){ write_job_t* job = (write_job_t*) opaque; write_pool_ctx_t* ctx = job->ctx; ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips); - FIO_releaseWriteJob(job); + WritePool_releaseWriteJob(job); } -static void FIO_writePoolQueueWriteJob(write_job_t *job) { +/* WritePool_queueWriteJob: + * Queues a write job for execution. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +static void WritePool_queueWriteJob(write_job_t *job) { write_pool_ctx_t* ctx = job->ctx; if(ctx->writerPool) - POOL_add(ctx->writerPool, FIO_WritePoolWriteJobExecute, job); + POOL_add(ctx->writerPool, WritePool_executeWriteJob, job); else - FIO_WritePoolWriteJobExecute(job); + WritePool_executeWriteJob(job); } -static void FIO_writePoolQueueWriteJobAndGetNextAvailable(write_job_t **job) { - FIO_writePoolQueueWriteJob(*job); - *job = FIO_writePoolGetAvailableWriteJob((*job)->ctx); +/* WritePool_queueAndReacquireWriteJob: + * Queues a write job for execution and acquires a new one. + * After execution `job`'s pointed value would change to the newly acquired job. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +static void WritePool_queueAndReacquireWriteJob(write_job_t **job) { + WritePool_queueWriteJob(*job); + *job = WritePool_acquireWriteJob((*job)->ctx); } -static void FIO_writePoolQueueSparseWriteEnd(write_pool_ctx_t* ctx) { +/* WritePool_sparseWriteEnd: + * Ends sparse writes to the current dstFile. + * Blocks on completion of all current write jobs before executing. */ +static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) { assert(ctx != NULL); if(ctx->writerPool) POOL_joinJobs(ctx->writerPool); - assert(ctx->availableWriteJobs == ctx->totalWriteJobs); FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips); ctx->storedSkips = 0; } +/* WritePool_setDstFile: + * Sets the destination file for future files in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) { + assert(ctx!=NULL); + /* We can change the dst file only if we have finished writing */ + if(ctx->writerPool) + POOL_joinJobs(ctx->writerPool); + assert(ctx->storedSkips == 0); + assert(ctx->availableWriteJobs == ctx->totalWriteJobs); + ctx->dstFile = dstFile; +} + +/* WritePool_closeDstFile: + * Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ +static int WritePool_closeDstFile(write_pool_ctx_t *ctx) { + FILE *dstFile = ctx->dstFile; + assert(dstFile!=NULL || ctx->prefs->testMode!=0); + WritePool_sparseWriteEnd(ctx); + WritePool_setDstFile(ctx, NULL); + return fclose(dstFile); +} + /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode @return : 0 (no error) */ static int FIO_passThrough(const FIO_prefs_t* const prefs, @@ -2397,7 +2440,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, U64 alreadyDecoded) /* for multi-frames streams */ { U64 frameSize = 0; - write_job_t *writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); + write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); /* display last 20 characters only */ { size_t const srcFileLength = strlen(srcFileName); @@ -2430,7 +2473,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, /* Write block */ writeJob->usedBufferSize = outBuff.pos; - FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); + WritePool_queueAndReacquireWriteJob(&writeJob); frameSize += outBuff.pos; if (fCtx->nbFilesTotal > 1) { size_t srcFileNameSize = strlen(srcFileName); @@ -2468,8 +2511,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, ress->srcBufferLoaded += readSize; } } } - FIO_releaseWriteJob(writeJob); - FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return frameSize; } @@ -2494,7 +2537,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK) return FIO_ERROR_FRAME_DECODING; - writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); + writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; strm.avail_in = (uInt)ress->srcBufferLoaded; @@ -2520,7 +2563,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { writeJob->usedBufferSize = decompBytes; - FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); + WritePool_queueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; @@ -2537,8 +2580,8 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName); decodingError = 1; } - FIO_releaseWriteJob(writeJob); - FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif @@ -2570,7 +2613,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, return FIO_ERROR_FRAME_DECODING; } - writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); + writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; strm.next_in = (BYTE const*)ress->srcBuffer; @@ -2598,7 +2641,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { writeJob->usedBufferSize = decompBytes; - FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); + WritePool_queueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = writeJob->bufferSize; @@ -2610,8 +2653,8 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, memmove(ress->srcBuffer, strm.next_in, strm.avail_in); ress->srcBufferLoaded = strm.avail_in; lzma_end(&strm); - FIO_releaseWriteJob(writeJob); - FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif @@ -2626,7 +2669,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, LZ4F_decompressionContext_t dCtx; LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION); int decodingError = 0; - write_job_t *writeJob = FIO_writePoolGetAvailableWriteJob(ress->writePoolCtx); + write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); if (LZ4F_isError(errorCode)) { DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n"); @@ -2638,7 +2681,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, size_t outSize= 0; MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER); nextToLoad = LZ4F_decompress(dCtx, NULL, &outSize, ress->srcBuffer, &inSize, NULL); - assert(outSize == 0); // We don't expect to output anything here + assert(outSize == 0); /* We don't expect to output anything here */ if (LZ4F_isError(nextToLoad)) { DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n", srcFileName, LZ4F_getErrorName(nextToLoad)); @@ -2675,7 +2718,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, if (decodedBytes) { UTIL_HumanReadableSize_t hrs; writeJob->usedBufferSize = decodedBytes; - FIO_writePoolQueueWriteJobAndGetNextAvailable(&writeJob); + WritePool_queueAndReacquireWriteJob(&writeJob); filesize += decodedBytes; hrs = UTIL_makeHumanReadableSize(filesize); DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix); @@ -2697,8 +2740,8 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, LZ4F_freeDecompressionContext(dCtx); ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */ - FIO_releaseWriteJob(writeJob); - FIO_writePoolQueueSparseWriteEnd(ress->writePoolCtx); + WritePool_releaseWriteJob(writeJob); + WritePool_sparseWriteEnd(ress->writePoolCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : filesize; } @@ -2828,7 +2871,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); if (dstFile==NULL) return 1; - FIO_writePoolSetDstFile(ress.writePoolCtx, dstFile); + WritePool_setDstFile(ress.writePoolCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, @@ -2841,7 +2884,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, if (releaseDstFile) { clearHandler(); - if (FIO_writePoolCloseDstFile(ress.writePoolCtx)) { + if (WritePool_closeDstFile(ress.writePoolCtx)) { DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result = 1; } @@ -3057,14 +3100,14 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx, if (!prefs->testMode) { FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); - FIO_writePoolSetDstFile(ress.writePoolCtx, dstFile); + WritePool_setDstFile(ress.writePoolCtx, dstFile); } for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) { status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if ((!prefs->testMode) && (FIO_writePoolCloseDstFile(ress.writePoolCtx))) + if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx))) EXM_THROW(72, "Write error : %s : cannot properly close output file", strerror(errno)); } else { From ac90062ef064ccb712c65a5eeb4eaed68645418e Mon Sep 17 00:00:00 2001 From: Yonatan Komornik Date: Thu, 6 Jan 2022 11:50:43 -0800 Subject: [PATCH 3/6] LZ4 decompression: Fixed multiframe decompression --- programs/fileio.c | 61 +++++++++++++++++++---------------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/programs/fileio.c b/programs/fileio.c index 97206cb6f8e..b85e0806bcf 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -2165,6 +2165,13 @@ static void FIO_freeDResources(dRess_t ress) WritePool_free(ress.writePoolCtx); } +/* FIO_consumeDSrcBuffer: + * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */ +static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) { + assert(ress->srcBufferLoaded >= len); + ress->srcBufferLoaded -= len; + memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded); +} /** FIO_fwriteSparse() : * @return : storedSkips, @@ -2490,10 +2497,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, srcFileName, hrs.precision, hrs.value, hrs.suffix); } - if (inBuff.pos > 0) { - memmove(ress->srcBuffer, (char*)ress->srcBuffer + inBuff.pos, inBuff.size - inBuff.pos); - ress->srcBufferLoaded -= inBuff.pos; - } + FIO_consumeDSrcBuffer(ress, inBuff.pos); if (readSizeHint == 0) break; /* end of frame */ @@ -2572,9 +2576,8 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) if (ret == Z_STREAM_END) break; } - if (strm.avail_in > 0) - memmove(ress->srcBuffer, strm.next_in, strm.avail_in); - ress->srcBufferLoaded = strm.avail_in; + FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); + if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */ && (decodingError==0) ) { DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName); @@ -2649,9 +2652,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, if (ret == LZMA_STREAM_END) break; } - if (strm.avail_in > 0) - memmove(ress->srcBuffer, strm.next_in, strm.avail_in); - ress->srcBufferLoaded = strm.avail_in; + FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); lzma_end(&strm); WritePool_releaseWriteJob(writeJob); WritePool_sparseWriteEnd(ress->writePoolCtx); @@ -2665,7 +2666,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, const char* srcFileName) { unsigned long long filesize = 0; - LZ4F_errorCode_t nextToLoad; + LZ4F_errorCode_t nextToLoad = 4; LZ4F_decompressionContext_t dCtx; LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION); int decodingError = 0; @@ -2676,19 +2677,6 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, return FIO_ERROR_FRAME_DECODING; } - /* Init feed with magic number (already consumed from FILE* sFile) */ - { size_t inSize = 4; - size_t outSize= 0; - MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER); - nextToLoad = LZ4F_decompress(dCtx, NULL, &outSize, ress->srcBuffer, &inSize, NULL); - assert(outSize == 0); /* We don't expect to output anything here */ - if (LZ4F_isError(nextToLoad)) { - DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n", - srcFileName, LZ4F_getErrorName(nextToLoad)); - LZ4F_freeDecompressionContext(dCtx); - return FIO_ERROR_FRAME_DECODING; - } } - /* Main Loop */ for (;nextToLoad;) { size_t readSize; @@ -2697,13 +2685,19 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, int fullBufferDecoded = 0; /* Read input */ - if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize; - readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile); - if (!readSize) break; /* reached end of file or stream */ + nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded); + readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile); + if(!readSize && ferror(srcFile)) { + DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName); + decodingError=1; + break; + } + if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */ + ress->srcBufferLoaded += readSize; - while ((pos < readSize) || fullBufferDecoded) { /* still to read, or still to flush */ + while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */ /* Decode Input (at least partially) */ - size_t remaining = readSize - pos; + size_t remaining = ress->srcBufferLoaded - pos; decodedBytes = writeJob->bufferSize; nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); if (LZ4F_isError(nextToLoad)) { @@ -2712,6 +2706,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, decodingError = 1; nextToLoad = 0; break; } pos += remaining; + assert(pos <= ress->srcBufferLoaded); fullBufferDecoded = decodedBytes == writeJob->bufferSize; /* Write Block */ @@ -2726,20 +2721,14 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, if (!nextToLoad) break; } + FIO_consumeDSrcBuffer(ress, pos); } - /* can be out because readSize == 0, which could be an fread() error */ - if (ferror(srcFile)) { - DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName); - decodingError=1; - } - if (nextToLoad!=0) { DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName); decodingError=1; } LZ4F_freeDecompressionContext(dCtx); - ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */ WritePool_releaseWriteJob(writeJob); WritePool_sparseWriteEnd(ress->writePoolCtx); From 873cb5654da0d6a1f6418e2bc7224250eeedaf94 Mon Sep 17 00:00:00 2001 From: Yonatan Komornik Date: Thu, 6 Jan 2022 11:51:17 -0800 Subject: [PATCH 4/6] Async IO decompression: Added multiframe test for --[no-]asyncio --- tests/playTests.sh | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/tests/playTests.sh b/tests/playTests.sh index dc29d5bd9d5..78d8e742aa3 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -1576,21 +1576,43 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then fi println "\n===> zstd asyncio decompression tests " -roundTripTest -g8M "3 --asyncio" -roundTripTest -g8M "3 --no-asyncio" + +addFrame() { + datagen -g2M -s$2 >> tmp_uncompressed + datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst +} + +addTwoFrames() { + addFrame $1 1 + addFrame $1 2 +} + +testAsyncIO() { + roundTripTest -g2M "3 --asyncio --format=$1" + roundTripTest -g2M "3 --no-asyncio --format=$1" +} + +rm -f tmp_compressed tmp_uncompressed +testAsyncIO zstd +addTwoFrames zstd if [ $GZIPMODE -eq 1 ]; then - roundTripTest -g8M "3 --format=gzip --asyncio" - roundTripTest -g8M "3 --format=gzip --no-asyncio" + testAsyncIO gzip + addTwoFrames gzip fi if [ $LZMAMODE -eq 1 ]; then - roundTripTest -g8M "3 --format=lzma --asyncio" - roundTripTest -g8M "3 --format=lzma --no-asyncio" + testAsyncIO lzma + addTwoFrames lzma fi if [ $LZ4MODE -eq 1 ]; then - roundTripTest -g8M "3 --format=lz4 --asyncio" - roundTripTest -g8M "3 --format=lz4 --no-asyncio" + testAsyncIO lz4 + addTwoFrames lz4 fi - +cat tmp_uncompressed | $MD5SUM > tmp2 +zstd -d tmp_compressed.zst --asyncio -c | $MD5SUM > tmp1 +$DIFF -q tmp1 tmp2 +rm tmp1 +zstd -d tmp_compressed.zst --no-asyncio -c | $MD5SUM > tmp1 +$DIFF -q tmp1 tmp2 if [ "$1" != "--test-large-data" ]; then println "Skipping large data tests" From f0e01a582f8eb667e43be130ac5129513b788020 Mon Sep 17 00:00:00 2001 From: Yonatan Komornik Date: Thu, 6 Jan 2022 13:26:48 -0800 Subject: [PATCH 5/6] Async IO decompression: C standard fix --- lib/common/pool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index dce7d1bf2a9..5c1d07d356e 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -342,7 +342,7 @@ void POOL_free(POOL_ctx* ctx) { void POOL_joinJobs(POOL_ctx* ctx){ assert(!ctx || ctx == &g_poolCtx); (void)ctx; -}; +} int POOL_resize(POOL_ctx* ctx, size_t numThreads) { (void)ctx; (void)numThreads; From 30152d48302cb02d6d1b4eeadaeecd0840697617 Mon Sep 17 00:00:00 2001 From: Yonatan Komornik Date: Mon, 10 Jan 2022 18:16:57 -0800 Subject: [PATCH 6/6] Async IO decompression: fix meson compilation --- build/meson/programs/meson.build | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/build/meson/programs/meson.build b/build/meson/programs/meson.build index 4181030c2ee..0ae93fc107c 100644 --- a/build/meson/programs/meson.build +++ b/build/meson/programs/meson.build @@ -20,14 +20,24 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/dibio.c'), join_paths(zstd_rootdir, 'programs/zstdcli_trace.c'), # needed due to use of private symbol + -fvisibility=hidden - join_paths(zstd_rootdir, 'lib/common/xxhash.c')] + join_paths(zstd_rootdir, 'lib/common/xxhash.c'), + join_paths(zstd_rootdir, 'lib/common/pool.c'), + join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), + join_paths(zstd_rootdir, 'lib/common/error_private.c')] +zstd_deps = [ libzstd_dep ] zstd_c_args = libzstd_debug_cflags + +zstd_frugal_deps = [ libzstd_dep ] +zstd_frugal_c_args = [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ] + if use_multi_thread + zstd_deps += [ thread_dep ] zstd_c_args += [ '-DZSTD_MULTITHREAD' ] + zstd_frugal_deps += [ thread_dep ] + zstd_frugal_c_args += [ '-DZSTD_MULTITHREAD' ] endif -zstd_deps = [ libzstd_dep ] if use_zlib zstd_deps += [ zlib_dep ] zstd_c_args += [ '-DZSTD_GZCOMPRESS', '-DZSTD_GZDECOMPRESS' ] @@ -69,14 +79,17 @@ zstd = executable('zstd', zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/timefn.c'), join_paths(zstd_rootdir, 'programs/util.c'), - join_paths(zstd_rootdir, 'programs/fileio.c')] + join_paths(zstd_rootdir, 'programs/fileio.c'), + join_paths(zstd_rootdir, 'lib/common/pool.c'), + join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), + join_paths(zstd_rootdir, 'lib/common/error_private.c')] # Minimal target, with only zstd compression and decompression. # No bench. No legacy. executable('zstd-frugal', zstd_frugal_sources, - dependencies: libzstd_dep, - c_args: [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ], + dependencies: zstd_frugal_deps, + c_args: zstd_frugal_c_args, install: true) install_data(join_paths(zstd_rootdir, 'programs/zstdgrep'),