diff --git a/src/node/internal/internal_zlib_base.ts b/src/node/internal/internal_zlib_base.ts index 443fd43c01e..ebcdf4393d1 100644 --- a/src/node/internal/internal_zlib_base.ts +++ b/src/node/internal/internal_zlib_base.ts @@ -83,7 +83,8 @@ function processCallback(this: zlibUtil.ZlibStream): void { return; } - const [availOutAfter, availInAfter] = state as unknown as [number, number]; + const availOutAfter = state[0] as number; + const availInAfter = state[1] as number; const inDelta = handle.availInBefore - availInAfter; self.bytesWritten += inDelta; @@ -119,9 +120,10 @@ function processCallback(this: zlibUtil.ZlibStream): void { handle.availInBefore = availInAfter; if (!streamBufferIsFull) { + ok(this.buffer, 'Buffer should not have been null'); this.write( handle.flushFlag, - this.buffer as NodeJS.TypedArray, // in + this.buffer, // in handle.inOff, // in_off handle.availInBefore, // in_len self._outBuffer, // out @@ -132,10 +134,11 @@ function processCallback(this: zlibUtil.ZlibStream): void { // eslint-disable-next-line @typescript-eslint/unbound-method const oldRead = self._read; self._read = (n): void => { + ok(this.buffer, 'Buffer should not have been null'); self._read = oldRead; this.write( handle.flushFlag, - this.buffer as NodeJS.TypedArray, // in + this.buffer, // in handle.inOff, // in_off handle.availInBefore, // in_len self._outBuffer, // out @@ -171,7 +174,7 @@ function processCallback(this: zlibUtil.ZlibStream): void { // Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < // Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH const flushiness: number[] = []; -const kFlushFlagList = [ +const kFlushFlagList: number[] = [ CONST_Z_NO_FLUSH, CONST_Z_BLOCK, CONST_Z_PARTIAL_FLUSH, @@ -239,16 +242,16 @@ function processChunkSync( }); while (true) { - // TODO(soon): This was `writeSync` before, but it's not anymore. - handle?.write( + ok(handle, 'Handle should have been defined'); + handle.writeSync( flushFlag, chunk, // in inOff, // in_off availInBefore, // in_len buffer, // out offset, // out_off - availOutBefore - ); // out_len + availOutBefore // out_len + ); if (error) throw error; else if (self[kError]) throw self[kError]; @@ -461,8 +464,7 @@ export class ZlibBase extends Transform { } } else if (this.writableEnded) { if (callback) { - /* eslint-disable-next-line @typescript-eslint/no-unsafe-call */ - queueMicrotask(callback); + this.once('end', callback); } } else { this.write(kFlushBuffers[kind as number], 'utf8', callback); @@ -509,6 +511,7 @@ export class ZlibBase extends Transform { flushFlag: number, cb?: () => void ): Buffer | Uint8Array | undefined { + // _processChunk is left for backwards compatibility if (cb != null && typeof cb === 'function') { this.#processChunk(chunk, flushFlag, cb); return; @@ -523,10 +526,11 @@ export class ZlibBase extends Transform { return; } - this._handle.buffer = null; + this._handle.buffer = chunk; this._handle.cb = cb; this._handle.availOutBefore = this._chunkSize - this._outOffset; this._handle.availInBefore = chunk.byteLength; + this._handle.inOff = 0; this._handle.flushFlag = flushFlag; this._handle.write( @@ -603,7 +607,7 @@ export class Zlib extends ZlibBase { ); dictionary = options.dictionary; - if (dictionary != null && !isArrayBufferView(dictionary)) { + if (dictionary !== undefined && !isArrayBufferView(dictionary)) { if (isAnyArrayBuffer(dictionary)) { dictionary = Buffer.from(dictionary); } else { @@ -618,13 +622,14 @@ export class Zlib extends ZlibBase { const writeState = new Uint32Array(2); const handle = new zlibUtil.ZlibStream(mode); + handle.initialize( windowBits, level, memLevel, strategy, writeState, - processCallback, + () => queueMicrotask(processCallback.bind(handle)), dictionary ); super(options ?? {}, mode, handle); @@ -635,7 +640,7 @@ export class Zlib extends ZlibBase { this._writeState = writeState; } - public params(level: number, strategy: number, callback: () => never): void { + public params(level: number, strategy: number, callback: () => void): void { checkRangesOrGetDefault( level, 'level', diff --git a/src/node/internal/zlib.d.ts b/src/node/internal/zlib.d.ts index ce050f1110b..52346749034 100644 --- a/src/node/internal/zlib.d.ts +++ b/src/node/internal/zlib.d.ts @@ -164,6 +164,15 @@ export class ZlibStream { outputOffset: number, outputLength: number ): void; + public writeSync( + flushFlag: number, + inputBuffer: NodeJS.TypedArray, + inputOffset: number, + inputLength: number, + outputBuffer: NodeJS.TypedArray, + outputOffset: number, + outputLength: number + ): void; public params(level: number, strategy: number): void; public reset(): void; diff --git a/src/workerd/api/node/tests/zlib-nodejs-test.js b/src/workerd/api/node/tests/zlib-nodejs-test.js index 56fa27a5e7e..b55663ac4b6 100644 --- a/src/workerd/api/node/tests/zlib-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-nodejs-test.js @@ -602,12 +602,12 @@ export const testFailedInit = { { const stream = zlib.createGzip({ level: NaN }); - assert.strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION); + strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION); } { const stream = zlib.createGzip({ strategy: NaN }); - assert.strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY); + strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY); } }, }; @@ -622,7 +622,7 @@ export const zlibDestroyTest = { { const ts = zlib.createGzip(); ts.destroy(); - assert.strictEqual(ts._handle, null); + strictEqual(ts._handle, null); const { promise, resolve, reject } = Promise.withResolvers(); promises.push(promise); @@ -643,7 +643,7 @@ export const zlibDestroyTest = { decompress.on('error', (err) => { errorCount++; decompress.close(); - assert.strictEqual(errorCount, 1, 'Error should only be emitted once'); + strictEqual(errorCount, 1, 'Error should only be emitted once'); resolve(); }); @@ -700,6 +700,66 @@ export const closeAfterError = { // }, // }; +// Tests are taken from: +// https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/test/parallel/test-zlib-bytes-read.js +export const testZlibBytesRead = { + async test() { + const expectStr = 'abcdefghijklmnopqrstuvwxyz'.repeat(2); + const expectBuf = Buffer.from(expectStr); + + function createWriter(target, buffer) { + const writer = { size: 0 }; + const write = () => { + target.write(Buffer.from([buffer[writer.size]]), () => { + writer.size += 1; + if (writer.size < buffer.length) { + target.flush(write); + } else { + target.end(); + } + }); + }; + write(); + return writer; + } + + // This test is simplified a lot because of test runner limitations. + // TODO(soon): Add createBrotliCompress once it is implemented. + for (const method of ['createGzip', 'createDeflate', 'createDeflateRaw']) { + assert(method in zlib, `${method} is not available in "node:zlib"`); + const { promise, resolve, reject } = Promise.withResolvers(); + let compData = Buffer.alloc(0); + const comp = zlib[method](); + const compWriter = createWriter(comp, expectBuf); + comp.on('data', function (d) { + compData = Buffer.concat([compData, d]); + strictEqual( + this.bytesWritten, + compWriter.size, + `Should get write size on ${method} data.` + ); + }); + comp.on('error', reject); + comp.on('end', function () { + strictEqual( + this.bytesWritten, + compWriter.size, + `Should get write size on ${method} end.` + ); + strictEqual( + this.bytesWritten, + expectStr.length, + `Should get data size on ${method} end.` + ); + + resolve(); + }); + + await promise; + } + }, +}; + // Node.js tests relevant to zlib // // - [ ] test-zlib-brotli-16GB.js diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index c5558425989..32c9f0c7a2e 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -88,11 +88,12 @@ kj::Maybe ZlibContext::setDictionary() { return kj::none; } + err = Z_OK; + switch (mode) { case ZlibMode::DEFLATE: case ZlibMode::DEFLATERAW: err = deflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); - ; break; case ZlibMode::INFLATERAW: err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size()); @@ -173,8 +174,6 @@ void ZlibContext::work() { } const Bytef* next_expected_header_byte = nullptr; - err = Z_OK; - // If the avail_out is left at 0, then it means that it ran out // of room. If there was avail_out left over, then it means // that all the input was consumed. @@ -271,7 +270,7 @@ kj::Maybe ZlibContext::setParams(int _level, int _strategy) { switch (mode) { case ZlibMode::DEFLATE: case ZlibMode::DEFLATERAW: - err = deflateParams(&stream, level, strategy); + err = deflateParams(&stream, _level, _strategy); break; default: break; @@ -348,6 +347,7 @@ void CompressionStream::emitError( } template +template void CompressionStream::writeStream(jsg::Lock& js, int flush, kj::ArrayPtr input, @@ -362,11 +362,31 @@ void CompressionStream::writeStream(jsg::Lock& js, context.setBuffers(input, inputLength, output, outputLength); context.setFlush(flush); - // This implementation always follow the sync version. + if constexpr (!async) { + context.work(); + if (checkError(js)) { + updateWriteResult(); + writing = false; + } + return; + } + + // On Node.js, this is called as a result of `ScheduleWork()` call. + // Since, we implement the whole thing as sync, we're going to ahead and call the whole thing here. context.work(); - if (checkError(js)) { - context.getAfterWriteOffsets(writeResult); - writing = false; + + // This is implemented slightly differently in Node.js + // Node.js calls AfterThreadPoolWork(). + // Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L402 + writing = false; + if (!checkError(js)) return; + updateWriteResult(); + KJ_IF_SOME(cb, writeCallback) { + cb(js); + } + + if (pending_close) { + close(); } } @@ -413,12 +433,13 @@ void ZlibUtil::ZlibStream::initialize(int windowBits, context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary)); } -void ZlibUtil::ZlibStream::write(jsg::Lock& js, +template +void ZlibUtil::ZlibStream::write_(jsg::Lock& js, int flush, - kj::Array input, + kj::ArrayPtr input, int inputOffset, int inputLength, - kj::Array output, + kj::ArrayPtr output, int outputOffset, int outputLength) { if (flush != Z_NO_FLUSH && flush != Z_PARTIAL_FLUSH && flush != Z_SYNC_FLUSH && @@ -427,17 +448,43 @@ void ZlibUtil::ZlibStream::write(jsg::Lock& js, } // Check bounds - JSG_REQUIRE(inputOffset >= 0 && inputOffset < input.size(), Error, - "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(inputOffset >= 0 && inputOffset <= input.size(), Error, + kj::str("Input offset should be smaller than size and bigger than 0, but received: ", + inputOffset)); JSG_REQUIRE(input.size() >= inputLength, Error, "Invalid inputLength"_kj); - JSG_REQUIRE(outputOffset >= 0 && outputOffset < output.size(), Error, - "Offset should be smaller than size and bigger than 0"_kj); + JSG_REQUIRE(outputOffset >= 0 && outputOffset <= output.size(), Error, + kj::str("Output offset should be smaller than size and bigger than 0 , but received: ", + outputOffset)); JSG_REQUIRE(output.size() >= outputLength, Error, "Invalid outputLength"_kj); - writeStream( + writeStream( js, flush, input.slice(inputOffset), inputLength, output.slice(outputOffset), outputLength); } +void ZlibUtil::ZlibStream::write(jsg::Lock& js, + int flush, + kj::Array input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength) { + write_(js, flush, input.asPtr(), inputOffset, inputLength, output.asPtr(), outputOffset, + outputLength); +} + +void ZlibUtil::ZlibStream::writeSync(jsg::Lock& js, + int flush, + kj::Array input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength) { + write_(js, flush, input.asPtr(), inputOffset, inputLength, output.asPtr(), outputOffset, + outputLength); +} + void ZlibUtil::ZlibStream::params(int level, int strategy) { context.setParams(level, strategy); } diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index ecf34a2aa37..e4129a4b7ca 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -8,6 +8,7 @@ #include #include "zlib.h" + #include #include #include @@ -99,15 +100,20 @@ class ZlibContext final { void setFlush(int value) { flush = value; }; - void getAfterWriteOffsets(kj::ArrayPtr writeResult) const { - writeResult[0] = stream.avail_out; - writeResult[1] = stream.avail_in; + // Function signature is same as Node.js implementation. + // Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L880 + void getAfterWriteResult(kj::byte* availIn, kj::byte* availOut) const { + *availIn = stream.avail_in; + *availOut = stream.avail_out; } void setMode(ZlibMode value) { mode = value; }; kj::Maybe resetStream(); kj::Maybe getError() const; + + // Equivalent to Node.js' `DoThreadPoolWork` function. + // Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L760 void work(); uint getAvailIn() const { @@ -167,6 +173,7 @@ class CompressionStream { void close(); bool checkError(jsg::Lock& js); void emitError(jsg::Lock& js, const CompressionError& error); + template void writeStream(jsg::Lock& js, int flush, kj::ArrayPtr input, @@ -177,6 +184,9 @@ class CompressionStream { errorHandler = kj::mv(handler); }; void initializeStream(kj::ArrayPtr _write_result, jsg::Function writeCallback); + void updateWriteResult() { + context.getAfterWriteResult(&writeResult[1], &writeResult[0]); + } protected: CompressionContext context; @@ -213,6 +223,17 @@ class ZlibUtil final: public jsg::Object { kj::Array writeState, jsg::Function writeCallback, jsg::Optional> dictionary); + template + void write_(jsg::Lock& js, + int flush, + kj::ArrayPtr input, + int inputOffset, + int inputLength, + kj::ArrayPtr output, + int outputOffset, + int outputLength); + + // TODO(soon): Find a way to expose functions with templates using JSG_METHOD. void write(jsg::Lock& js, int flush, kj::Array input, @@ -221,6 +242,14 @@ class ZlibUtil final: public jsg::Object { kj::Array output, int outputOffset, int outputLength); + void writeSync(jsg::Lock& js, + int flush, + kj::Array input, + int inputOffset, + int inputLength, + kj::Array output, + int outputOffset, + int outputLength); void params(int level, int strategy); void reset(jsg::Lock& js); @@ -228,6 +257,7 @@ class ZlibUtil final: public jsg::Object { JSG_METHOD(initialize); JSG_METHOD(close); JSG_METHOD(write); + JSG_METHOD(writeSync); JSG_METHOD(params); JSG_METHOD(setErrorHandler); JSG_METHOD(reset);