Skip to content

Commit

Permalink
zlib: split JS code as prep for non-zlib-backed streams
Browse files Browse the repository at this point in the history
Split the `Zlib` class into `ZlibBase` and `Zlib` classes,
to facilitate introduction of similar streams with minor
implementation differences.

Backport-PR-URL: #25228
PR-URL: #24939
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information
addaleax committed May 13, 2019
1 parent 29c6530 commit ebe51a4
Showing 1 changed file with 128 additions and 111 deletions.
239 changes: 128 additions & 111 deletions lib/zlib.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,10 @@ function checkRangesOrGetDefault(number, name, lower, upper, def) {
return number;
}

// the Zlib class they all inherit from
// This thing manages the queue of requests, and returns
// true or false if there is anything in the queue when
// you call the .write() method.
function Zlib(opts, mode) {
// The base class for all Zlib-style streams.
function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
var chunkSize = Z_DEFAULT_CHUNK;
var flush = Z_NO_FLUSH;
var finishFlush = Z_FINISH;
var windowBits = Z_DEFAULT_WINDOWBITS;
var level = Z_DEFAULT_COMPRESSION;
var memLevel = Z_DEFAULT_MEMLEVEL;
var strategy = Z_DEFAULT_STRATEGY;
var dictionary;

// The Zlib class is not exported to user land, the mode should only be
// The ZlibBase class is not exported to user land, the mode should only be
// passed in by us.
assert(typeof mode === 'number');
assert(mode >= DEFLATE && mode <= UNZIP);
Expand All @@ -235,50 +224,11 @@ function Zlib(opts, mode) {

flush = checkRangesOrGetDefault(
opts.flush, 'options.flush',
Z_NO_FLUSH, Z_BLOCK, Z_NO_FLUSH);
Z_NO_FLUSH, Z_BLOCK, flush);

finishFlush = checkRangesOrGetDefault(
opts.finishFlush, 'options.finishFlush',
Z_NO_FLUSH, Z_BLOCK, Z_FINISH);

// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
if ((opts.windowBits == null || opts.windowBits === 0) &&
(mode === INFLATE ||
mode === GUNZIP ||
mode === UNZIP)) {
windowBits = 0;
} else {
windowBits = checkRangesOrGetDefault(
opts.windowBits, 'options.windowBits',
Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_DEFAULT_WINDOWBITS);
}

level = checkRangesOrGetDefault(
opts.level, 'options.level',
Z_MIN_LEVEL, Z_MAX_LEVEL, Z_DEFAULT_COMPRESSION);

memLevel = checkRangesOrGetDefault(
opts.memLevel, 'options.memLevel',
Z_MIN_MEMLEVEL, Z_MAX_MEMLEVEL, Z_DEFAULT_MEMLEVEL);

strategy = checkRangesOrGetDefault(
opts.strategy, 'options.strategy',
Z_DEFAULT_STRATEGY, Z_FIXED, Z_DEFAULT_STRATEGY);

dictionary = opts.dictionary;
if (dictionary !== undefined && !isArrayBufferView(dictionary)) {
if (isAnyArrayBuffer(dictionary)) {
dictionary = Buffer.from(dictionary);
} else {
throw new ERR_INVALID_ARG_TYPE(
'options.dictionary',
['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'],
dictionary
);
}
}
Z_NO_FLUSH, Z_BLOCK, finishFlush);

if (opts.encoding || opts.objectMode || opts.writableObjectMode) {
opts = _extend({}, opts);
Expand All @@ -287,39 +237,29 @@ function Zlib(opts, mode) {
opts.writableObjectMode = false;
}
}

Transform.call(this, opts);
this._hadError = false;
this.bytesWritten = 0;
this._handle = new binding.Zlib(mode);
this._handle = handle;
handle[owner_symbol] = this;
// Used by processCallback() and zlibOnError()
this._handle[owner_symbol] = this;
this._handle.onerror = zlibOnError;
this._hadError = false;
this._writeState = new Uint32Array(2);

if (!this._handle.init(windowBits,
level,
memLevel,
strategy,
this._writeState,
processCallback,
dictionary)) {
throw new ERR_ZLIB_INITIALIZATION_FAILED();
}

handle.onerror = zlibOnError;
this._outBuffer = Buffer.allocUnsafe(chunkSize);
this._outOffset = 0;
this._level = level;
this._strategy = strategy;

this._chunkSize = chunkSize;
this._defaultFlushFlag = flush;
this._finishFlushFlag = finishFlush;
this._nextFlush = -1;
this._info = opts && opts.info;
this._defaultFullFlushFlag = fullFlush;
this.once('end', this.close);
this._info = opts && opts.info;
}
inherits(Zlib, Transform);

Object.defineProperty(Zlib.prototype, '_closed', {
inherits(ZlibBase, Transform);

Object.defineProperty(ZlibBase.prototype, '_closed', {
configurable: true,
enumerable: true,
get() {
Expand All @@ -331,7 +271,7 @@ Object.defineProperty(Zlib.prototype, '_closed', {
// perspective, but it is inconsistent with all other streams exposed by Node.js
// that have this concept, where it stands for the number of bytes read
// *from* the stream (that is, net.Socket/tls.Socket & file system streams).
Object.defineProperty(Zlib.prototype, 'bytesRead', {
Object.defineProperty(ZlibBase.prototype, 'bytesRead', {
configurable: true,
enumerable: true,
get() {
Expand All @@ -342,41 +282,15 @@ Object.defineProperty(Zlib.prototype, 'bytesRead', {
}
});

// This callback is used by `.params()` to wait until a full flush happened
// before adjusting the parameters. In particular, the call to the native
// `params()` function should not happen while a write is currently in progress
// on the threadpool.
function paramsAfterFlushCallback(level, strategy, callback) {
assert(this._handle, 'zlib binding closed');
this._handle.params(level, strategy);
if (!this._hadError) {
this._level = level;
this._strategy = strategy;
if (callback) callback();
}
}

Zlib.prototype.params = function params(level, strategy, callback) {
checkRangesOrGetDefault(level, 'level', Z_MIN_LEVEL, Z_MAX_LEVEL);
checkRangesOrGetDefault(strategy, 'strategy', Z_DEFAULT_STRATEGY, Z_FIXED);

if (this._level !== level || this._strategy !== strategy) {
this.flush(Z_SYNC_FLUSH,
paramsAfterFlushCallback.bind(this, level, strategy, callback));
} else {
process.nextTick(callback);
}
};

Zlib.prototype.reset = function reset() {
ZlibBase.prototype.reset = function() {
if (!this._handle)
assert(false, 'zlib binding closed');
return this._handle.reset();
};

// This is the _flush function called by the transform class,
// internally, when the last chunk has been written.
Zlib.prototype._flush = function _flush(callback) {
ZlibBase.prototype._flush = function(callback) {
this._transform(Buffer.alloc(0), '', callback);
};

Expand All @@ -397,12 +311,12 @@ function maxFlush(a, b) {
}

const flushBuffer = Buffer.alloc(0);
Zlib.prototype.flush = function flush(kind, callback) {
ZlibBase.prototype.flush = function(kind, callback) {
var ws = this._writableState;

if (typeof kind === 'function' || (kind === undefined && !callback)) {
callback = kind;
kind = Z_FULL_FLUSH;
kind = this._defaultFullFlushFlag;
}

if (ws.ended) {
Expand All @@ -421,17 +335,17 @@ Zlib.prototype.flush = function flush(kind, callback) {
}
};

Zlib.prototype.close = function close(callback) {
ZlibBase.prototype.close = function(callback) {
_close(this, callback);
this.destroy();
};

Zlib.prototype._destroy = function _destroy(err, callback) {
ZlibBase.prototype._destroy = function(err, callback) {
_close(this);
callback(err);
};

Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
ZlibBase.prototype._transform = function(chunk, encoding, cb) {
var flushFlag = this._defaultFlushFlag;
// We use a 'fake' zero-length chunk to carry information about flushes from
// the public API to the actual stream implementation.
Expand All @@ -448,7 +362,7 @@ Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
processChunk(this, chunk, flushFlag, cb);
};

Zlib.prototype._processChunk = function _processChunk(chunk, flushFlag, cb) {
ZlibBase.prototype._processChunk = function(chunk, flushFlag, cb) {
// _processChunk() is left for backwards compatibility
if (typeof cb === 'function')
processChunk(this, chunk, flushFlag, cb);
Expand Down Expand Up @@ -638,6 +552,109 @@ function _close(engine, callback) {
engine._handle = null;
}

const zlibDefaultOpts = {
flush: Z_NO_FLUSH,
finishFlush: Z_FINISH,
fullFlush: Z_FULL_FLUSH
};
// Base class for all streams actually backed by zlib and using zlib-specific
// parameters.
function Zlib(opts, mode) {
var windowBits = Z_DEFAULT_WINDOWBITS;
var level = Z_DEFAULT_COMPRESSION;
var memLevel = Z_DEFAULT_MEMLEVEL;
var strategy = Z_DEFAULT_STRATEGY;
var dictionary;

if (opts) {
// windowBits is special. On the compression side, 0 is an invalid value.
// But on the decompression side, a value of 0 for windowBits tells zlib
// to use the window size in the zlib header of the compressed stream.
if ((opts.windowBits == null || opts.windowBits === 0) &&
(mode === INFLATE ||
mode === GUNZIP ||
mode === UNZIP)) {
windowBits = 0;
} else {
windowBits = checkRangesOrGetDefault(
opts.windowBits, 'options.windowBits',
Z_MIN_WINDOWBITS, Z_MAX_WINDOWBITS, Z_DEFAULT_WINDOWBITS);
}

level = checkRangesOrGetDefault(
opts.level, 'options.level',
Z_MIN_LEVEL, Z_MAX_LEVEL, Z_DEFAULT_COMPRESSION);

memLevel = checkRangesOrGetDefault(
opts.memLevel, 'options.memLevel',
Z_MIN_MEMLEVEL, Z_MAX_MEMLEVEL, Z_DEFAULT_MEMLEVEL);

strategy = checkRangesOrGetDefault(
opts.strategy, 'options.strategy',
Z_DEFAULT_STRATEGY, Z_FIXED, Z_DEFAULT_STRATEGY);

dictionary = opts.dictionary;
if (dictionary !== undefined && !isArrayBufferView(dictionary)) {
if (isAnyArrayBuffer(dictionary)) {
dictionary = Buffer.from(dictionary);
} else {
throw new ERR_INVALID_ARG_TYPE(
'options.dictionary',
['Buffer', 'TypedArray', 'DataView', 'ArrayBuffer'],
dictionary
);
}
}
}

const handle = new binding.Zlib(mode);
// Ideally, we could let ZlibBase() set up _writeState. I haven't been able
// to come up with a good solution that doesn't break our internal API,
// and with it all supported npm versions at the time of writing.
this._writeState = new Uint32Array(2);
if (!handle.init(windowBits,
level,
memLevel,
strategy,
this._writeState,
processCallback,
dictionary)) {
throw new ERR_ZLIB_INITIALIZATION_FAILED();
}

ZlibBase.call(this, opts, mode, handle, zlibDefaultOpts);

this._level = level;
this._strategy = strategy;
}
inherits(Zlib, ZlibBase);

// This callback is used by `.params()` to wait until a full flush happened
// before adjusting the parameters. In particular, the call to the native
// `params()` function should not happen while a write is currently in progress
// on the threadpool.
function paramsAfterFlushCallback(level, strategy, callback) {
assert(this._handle, 'zlib binding closed');
this._handle.params(level, strategy);
if (!this._hadError) {
this._level = level;
this._strategy = strategy;
if (callback) callback();
}
}

Zlib.prototype.params = function params(level, strategy, callback) {
checkRangesOrGetDefault(level, 'level', Z_MIN_LEVEL, Z_MAX_LEVEL);
checkRangesOrGetDefault(strategy, 'strategy', Z_DEFAULT_STRATEGY, Z_FIXED);

if (this._level !== level || this._strategy !== strategy) {
this.flush(Z_SYNC_FLUSH,
paramsAfterFlushCallback.bind(this, level, strategy, callback));
} else {
process.nextTick(callback);
}
};

// generic zlib
// minimal 2-byte header
function Deflate(opts) {
Expand Down

0 comments on commit ebe51a4

Please sign in to comment.