Skip to content

Commit

Permalink
zlib: simplify flushing mechanism
Browse files Browse the repository at this point in the history
Previously, flushing on zlib streams was implemented through
stream 'drain' handlers. This has a number of downsides; in
particular, it is complex, and could lead to unpredictable
behaviour, since it meant that in a sequence like

```js
compressor.write('abc');
compressor.flush();
waitForMoreDataAsynchronously(() => {
  compressor.write('def');
});
```

it was not fully deterministic whether the flush happens after
the second chunk is written or the first one.

This commit replaces this mechanism by one that piggy-backs
along the stream’s write queue, using a “special” `Buffer`
instance that signals that a flush is currently due.

PR-URL: #23186
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax authored and targos committed Oct 10, 2018
1 parent 70abcf2 commit 18cbde5
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 31 deletions.
48 changes: 19 additions & 29 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,9 @@ function Zlib(opts, mode) {
this._level = level;
this._strategy = strategy;
this._chunkSize = chunkSize;
this._flushFlag = flush;
this._scheduledFlushFlag = Z_NO_FLUSH;
this._origFlushFlag = flush;
this._defaultFlushFlag = flush;
this._finishFlushFlag = finishFlush;
this._nextFlush = -1;
this._info = opts && opts.info;
this.once('end', this.close);
}
Expand Down Expand Up @@ -398,6 +397,7 @@ function maxFlush(a, b) {
return flushiness[a] > flushiness[b] ? a : b;
}

const flushBuffer = Buffer.alloc(0);
Zlib.prototype.flush = function flush(kind, callback) {
var ws = this._writableState;

Expand All @@ -412,21 +412,13 @@ Zlib.prototype.flush = function flush(kind, callback) {
} else if (ws.ending) {
if (callback)
this.once('end', callback);
} else if (ws.needDrain) {
const alreadyHadFlushScheduled = this._scheduledFlushFlag !== Z_NO_FLUSH;
this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag);

// If a callback was passed, always register a new `drain` + flush handler,
// mostly because that's simpler and flush callbacks piling up is a rare
// thing anyway.
if (!alreadyHadFlushScheduled || callback) {
const drainHandler = () => this.flush(this._scheduledFlushFlag, callback);
this.once('drain', drainHandler);
}
} else if (this._nextFlush !== -1) {
// This means that there is a flush currently in the write queue.
// We currently coalesce this flush into the pending one.
this._nextFlush = maxFlush(this._nextFlush, kind);
} else {
this._flushFlag = kind;
this.write(Buffer.alloc(0), '', callback);
this._scheduledFlushFlag = Z_NO_FLUSH;
this._nextFlush = kind;
this.write(flushBuffer, '', callback);
}
};

Expand All @@ -436,20 +428,18 @@ Zlib.prototype.close = function close(callback) {
};

Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
// If it's the last chunk, or a final flush, we use the Z_FINISH flush flag
// (or whatever flag was provided using opts.finishFlush).
// If it's explicitly flushing at some other time, then we use
// Z_FULL_FLUSH. Otherwise, use the original opts.flush flag.
var flushFlag;
var flushFlag = this._defaultFlushFlag;
// We use a 'fake' zero-length chunk to carry information about flushes from
// the public API to the actual stream implementation.
if (chunk === flushBuffer) {
flushFlag = this._nextFlush;
this._nextFlush = -1;
}

// For the last chunk, also apply `_finishFlushFlag`.
var ws = this._writableState;
if ((ws.ending || ws.ended) && ws.length === chunk.byteLength) {
flushFlag = this._finishFlushFlag;
} else {
flushFlag = this._flushFlag;
// once we've flushed the last of the queue, stop flushing and
// go back to the normal behavior.
if (chunk.byteLength >= ws.length)
this._flushFlag = this._origFlushFlag;
flushFlag = maxFlush(flushFlag, this._finishFlushFlag);
}
processChunk(this, chunk, flushFlag, cb);
};
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-zlib-flush-drain.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ process.once('exit', function() {
assert.strictEqual(
drainCount, 1);
assert.strictEqual(
flushCount, 2);
flushCount, 1);
});
2 changes: 1 addition & 1 deletion test/parallel/test-zlib-write-after-flush.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ gunz.setEncoding('utf8');
gunz.on('data', (c) => output += c);
gunz.on('end', common.mustCall(() => {
assert.strictEqual(output, input);
assert.strictEqual(gzip._flushFlag, zlib.constants.Z_NO_FLUSH);
assert.strictEqual(gzip._nextFlush, -1);
}));

// make sure that flush/write doesn't trigger an assert failure
Expand Down

0 comments on commit 18cbde5

Please sign in to comment.