Skip to content

Commit

Permalink
[fileio] Separate parameter adaption from display update rate
Browse files Browse the repository at this point in the history
Split the logic for parameter adaption from the logic to update the display rate.
This decouples the two updates, so changes to display updates don't affect
parameter adaption.

Also add a test case that checks that parameter adaption actually happens.

This fixes Issue facebook#3353, where --adapt is broken when --no-progress is passed.
  • Loading branch information
terrelln committed Dec 14, 2022
1 parent 26c3e42 commit dd38093
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 97 deletions.
196 changes: 101 additions & 95 deletions programs/fileio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,9 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
unsigned inputPresented = 0;
unsigned inputBlocked = 0;
unsigned lastJobID = 0;
UTIL_time_t lastAdaptTime = UTIL_getTime();
U64 const adaptEveryMicro = REFRESH_RATE;

UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);

DISPLAYLEVEL(6, "compression using zstd format \n");
Expand Down Expand Up @@ -1369,13 +1372,104 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
compressedfilesize += outBuff.pos;
}

/* display notification; and adapt compression level */
if (READY_FOR_UPDATE()) {
/* adaptive mode : statistics measurement and speed correction */
if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced);

lastAdaptTime = UTIL_getTime();
/* check output speed */
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */

unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
assert(zfp.produced >= previous_zfp_update.produced);
assert(prefs->nbWorkers >= 1);

/* test if compression is blocked
* either because output is slow and all buffers are full
* or because input is slow and no job can start while waiting for at least one buffer to be filled.
* note : exclude starting part, since currentJobID > 1 */
if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
) {
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
speedChange = slower;
}

previous_zfp_update = zfp;

if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
) {
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
speedChange = slower;
}
flushWaiting = 0;
}

/* course correct only if there is at least one new job completed */
if (zfp.currentJobID > lastJobID) {
DISPLAYLEVEL(6, "compression level adaptation check \n")

/* check input speed */
if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
if (inputBlocked <= 0) {
DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
speedChange = slower;
} else if (speedChange == noChange) {
unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
previous_zfp_correction = zfp;
assert(inputPresented > 0);
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
(unsigned)newlyIngested, (unsigned)newlyConsumed,
(unsigned)newlyFlushed, (unsigned)newlyProduced);
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
) {
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
speedChange = faster;
}
}
inputBlocked = 0;
inputPresented = 0;
}

if (speedChange == slower) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel ++;
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
compressionLevel += (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
}
if (speedChange == faster) {
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
compressionLevel --;
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
compressionLevel -= (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
}
speedChange = noChange;

lastJobID = zfp.currentJobID;
} /* if (zfp.currentJobID > lastJobID) */
} /* if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) */

/* display notification */
if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced);

DELAY_NEXT_UPDATE();

/* display progress notifications */
DISPLAY_PROGRESS("\r%79s\r", ""); /* Clear out the current displayed line */
Expand Down Expand Up @@ -1406,96 +1500,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
if (fileSize != UTIL_FILESIZE_UNKNOWN)
DISPLAY_PROGRESS("/%6.*f%4s", file_hrs.precision, file_hrs.value, file_hrs.suffix);
DISPLAY_PROGRESS(" ==> %2.f%%", cShare);
DELAY_NEXT_UPDATE();
}

/* adaptive mode : statistics measurement and speed correction */
if (prefs->adaptiveMode) {

/* check output speed */
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */

unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
assert(zfp.produced >= previous_zfp_update.produced);
assert(prefs->nbWorkers >= 1);

/* test if compression is blocked
* either because output is slow and all buffers are full
* or because input is slow and no job can start while waiting for at least one buffer to be filled.
* note : exclude starting part, since currentJobID > 1 */
if ( (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression ongoing */
) {
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
speedChange = slower;
}

previous_zfp_update = zfp;

if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
) {
DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
speedChange = slower;
}
flushWaiting = 0;
}

/* course correct only if there is at least one new job completed */
if (zfp.currentJobID > lastJobID) {
DISPLAYLEVEL(6, "compression level adaptation check \n")

/* check input speed */
if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) { /* warm up period, to fill all workers */
if (inputBlocked <= 0) {
DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
speedChange = slower;
} else if (speedChange == noChange) {
unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
unsigned long long newlyFlushed = zfp.flushed - previous_zfp_correction.flushed;
previous_zfp_correction = zfp;
assert(inputPresented > 0);
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
(unsigned)newlyIngested, (unsigned)newlyConsumed,
(unsigned)newlyFlushed, (unsigned)newlyProduced);
if ( (inputBlocked > inputPresented / 8) /* input is waiting often, because input buffers is full : compression or output too slow */
&& (newlyFlushed * 33 / 32 > newlyProduced) /* flush everything that is produced */
&& (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
) {
DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
speedChange = faster;
}
}
inputBlocked = 0;
inputPresented = 0;
}

if (speedChange == slower) {
DISPLAYLEVEL(6, "slower speed , higher compression \n")
compressionLevel ++;
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
compressionLevel += (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
}
if (speedChange == faster) {
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
compressionLevel --;
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
compressionLevel -= (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
}
speedChange = noChange;

lastJobID = zfp.currentJobID;
} /* if (zfp.currentJobID > lastJobID) */
} /* if (g_adaptiveMode) */
} /* if (READY_FOR_UPDATE()) */
} /* if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) */
} /* while ((inBuff.pos != inBuff.size) */
} while (directive != ZSTD_e_end);

Expand Down
4 changes: 2 additions & 2 deletions programs/fileio_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ extern FIO_display_prefs_t g_display_prefs;
extern UTIL_time_t g_displayClock;

#define REFRESH_RATE ((U64)(SEC_TO_MICRO / 6))
#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE)
#define READY_FOR_UPDATE() (UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE || g_display_prefs.displayLevel >= 4)
#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
#define DISPLAYUPDATE(l, ...) { \
if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
if (READY_FOR_UPDATE()) { \
DELAY_NEXT_UPDATE(); \
DISPLAY(__VA_ARGS__); \
if (g_display_prefs.displayLevel>=4) fflush(stderr); \
Expand Down
8 changes: 8 additions & 0 deletions tests/cli-tests/compression/adapt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ set -e

# Test --adapt
zstd -f file --adapt -c | zstd -t

datagen -g100M > file100M

# Pick parameters to force fast adaptation, even on slow systems
zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"

# Adaption still happens with --no-progress
zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"

0 comments on commit dd38093

Please sign in to comment.