Skip to content

Commit

Permalink
Merge pull request #1327 from facebook/adapt
Browse files Browse the repository at this point in the history
Adaptive compression
  • Loading branch information
Cyan4973 authored Sep 26, 2018
2 parents 3dae90c + 6c51bf4 commit 8883af6
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 174 deletions.
1 change: 1 addition & 0 deletions doc/zstd_manual.html
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ <h3>Advanced Streaming compression functions</h3><pre></pre><b><pre>size_t ZSTD_
unsigned long long ingested;
unsigned long long consumed;
unsigned long long produced;
unsigned currentJobID;
} ZSTD_frameProgression;
</b></pre><BR>
<h3>Advanced Streaming decompression functions</h3><pre></pre><b><pre>typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e;
Expand Down
19 changes: 19 additions & 0 deletions lib/compress/zstd_compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -906,9 +906,27 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
fp.ingested = cctx->consumedSrcSize + buffered;
fp.consumed = cctx->consumedSrcSize;
fp.produced = cctx->producedCSize;
fp.flushed = cctx->producedCSize; /* simplified; some data might still be left within streaming output buffer */
fp.currentJobID = 0;
fp.nbActiveWorkers = 0;
return fp;
} }

/*! ZSTD_toFlushNow()
* Only useful for multithreading scenarios currently (nbWorkers >= 1).
*/
size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx)
{
#ifdef ZSTD_MULTITHREAD
if (cctx->appliedParams.nbWorkers > 0) {
return ZSTDMT_toFlushNow(cctx->mtctx);
}
#endif
(void)cctx;
return 0; /* over-simplification; could also check if context is currently running in streaming mode, and in which case, report how many bytes are left to be flushed within output buffer */
}



static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1,
ZSTD_compressionParameters cParams2)
Expand Down Expand Up @@ -3733,6 +3751,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_CCtx_reset(cctx);
}
DEBUGLOG(5, "completed ZSTD_compress_generic delegating to ZSTDMT_compressStream_generic");
return flushMin;
} }
#endif
Expand Down
131 changes: 85 additions & 46 deletions lib/compress/zstdmt_compress.c

Large diffs are not rendered by default.

22 changes: 16 additions & 6 deletions lib/compress/zstdmt_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,31 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
* === Not exposed in libzstd. Never invoke directly ===
* ======================================================== */

/*! ZSTDMT_toFlushNow()
* Tell how many bytes are ready to be flushed immediately.
* Probe the oldest active job (not yet entirely flushed) and check its output buffer.
* If return 0, it means there is no active job,
* or, it means oldest job is still active, but everything produced has been flushed so far,
* therefore flushing is limited by speed of oldest job. */
size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx);

/*! ZSTDMT_CCtxParam_setMTCtxParameter()
* like ZSTDMT_setMTCtxParameter(), but into a ZSTD_CCtx_Params */
size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);

/* ZSTDMT_CCtxParam_setNbWorkers()
* Set nbWorkers, and clamp it.
* Also reset jobSize and overlapLog */
/*! ZSTDMT_CCtxParam_setNbWorkers()
* Set nbWorkers, and clamp it.
* Also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);

/*! ZSTDMT_updateCParams_whileCompressing() :
* Updates only a selected set of compression parameters, to remain compatible with current frame.
* New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams);

/* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads.
/*! ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads.
*/
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx);

Expand Down
37 changes: 28 additions & 9 deletions lib/zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -746,29 +746,48 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const

/*! ZSTD_resetCStream() :
* start a new compression job, using same parameters from previous job.
* This is typically useful to skip dictionary loading stage, since it will re-use it in-place..
* This is typically useful to skip dictionary loading stage, since it will re-use it in-place.
* Note that zcs must be init at least once before using ZSTD_resetCStream().
* If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN.
* If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end.
* For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs,
* but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
* @return : 0, or an error code (which can be tested using ZSTD_isError())
*/
ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize);


typedef struct {
unsigned long long ingested;
unsigned long long consumed;
unsigned long long produced;
unsigned long long ingested; /* nb input bytes read and buffered */
unsigned long long consumed; /* nb input bytes actually compressed */
unsigned long long produced; /* nb of compressed bytes generated and buffered */
unsigned long long flushed; /* nb of compressed bytes flushed : not provided; can be tracked from caller side */
unsigned currentJobID; /* MT only : latest started job nb */
unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */
} ZSTD_frameProgression;

/* ZSTD_getFrameProgression():
/* ZSTD_getFrameProgression() :
* tells how much data has been ingested (read from input)
* consumed (input actually compressed) and produced (output) for current frame.
* Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
* Can report progression inside worker threads (multi-threading and non-blocking mode).
* Note : (ingested - consumed) is amount of input data buffered internally, not yet compressed.
* Aggregates progression inside active worker threads.
*/
ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);

/*! ZSTD_toFlushNow() :
* Tell how many bytes are ready to be flushed immediately.
* Useful for multithreading scenarios (nbWorkers >= 1).
* Probe the oldest active job, defined as oldest job not yet entirely flushed,
* and check its output buffer.
* @return : amount of data stored in oldest job and ready to be flushed immediately.
* if @return == 0, it means either :
* + there is no active job (could be checked with ZSTD_frameProgression()), or
* + oldest job is still actively compressing data,
* but everything it has produced has also been flushed so far,
* therefore flushing speed is currently limited by production speed of oldest job
* irrespective of the speed of concurrent newer jobs.
*/
ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx);



Expand Down
Loading

0 comments on commit 8883af6

Please sign in to comment.