Skip to content

Commit

Permalink
improve zlib for nodejs_compat
Browse files Browse the repository at this point in the history
  • Loading branch information
anonrig committed Aug 24, 2024
1 parent 0cce828 commit eecd02d
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 37 deletions.
32 changes: 18 additions & 14 deletions src/node/internal/internal_zlib_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ function processCallback(this: zlibUtil.ZlibStream): void {
return;
}

const [availOutAfter, availInAfter] = state as unknown as [number, number];
const availOutAfter = state[0] as number;
const availInAfter = state[1] as number;

const inDelta = handle.availInBefore - availInAfter;
self.bytesWritten += inDelta;
Expand Down Expand Up @@ -119,9 +120,10 @@ function processCallback(this: zlibUtil.ZlibStream): void {
handle.availInBefore = availInAfter;

if (!streamBufferIsFull) {
ok(this.buffer, 'Buffer should not have been null');
this.write(
handle.flushFlag,
this.buffer as NodeJS.TypedArray, // in
this.buffer, // in
handle.inOff, // in_off
handle.availInBefore, // in_len
self._outBuffer, // out
Expand All @@ -132,10 +134,11 @@ function processCallback(this: zlibUtil.ZlibStream): void {
// eslint-disable-next-line @typescript-eslint/unbound-method
const oldRead = self._read;
self._read = (n): void => {
ok(this.buffer, 'Buffer should not have been null');
self._read = oldRead;
this.write(
handle.flushFlag,
this.buffer as NodeJS.TypedArray, // in
this.buffer, // in
handle.inOff, // in_off
handle.availInBefore, // in_len
self._outBuffer, // out
Expand Down Expand Up @@ -171,7 +174,7 @@ function processCallback(this: zlibUtil.ZlibStream): void {
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
const flushiness: number[] = [];
const kFlushFlagList = [
const kFlushFlagList: number[] = [
CONST_Z_NO_FLUSH,
CONST_Z_BLOCK,
CONST_Z_PARTIAL_FLUSH,
Expand Down Expand Up @@ -239,16 +242,16 @@ function processChunkSync(
});

while (true) {
// TODO(soon): This was `writeSync` before, but it's not anymore.
handle?.write(
ok(handle, 'Handle should have been defined');
handle.writeSync(
flushFlag,
chunk, // in
inOff, // in_off
availInBefore, // in_len
buffer, // out
offset, // out_off
availOutBefore
); // out_len
availOutBefore // out_len
);
if (error) throw error;
else if (self[kError]) throw self[kError];

Expand Down Expand Up @@ -461,8 +464,7 @@ export class ZlibBase extends Transform {
}
} else if (this.writableEnded) {
if (callback) {
/* eslint-disable-next-line @typescript-eslint/no-unsafe-call */
queueMicrotask(callback);
this.once('end', callback);
}
} else {
this.write(kFlushBuffers[kind as number], 'utf8', callback);
Expand Down Expand Up @@ -509,6 +511,7 @@ export class ZlibBase extends Transform {
flushFlag: number,
cb?: () => void
): Buffer | Uint8Array | undefined {
// _processChunk is left for backwards compatibility
if (cb != null && typeof cb === 'function') {
this.#processChunk(chunk, flushFlag, cb);
return;
Expand All @@ -523,10 +526,11 @@ export class ZlibBase extends Transform {
return;
}

this._handle.buffer = null;
this._handle.buffer = chunk;
this._handle.cb = cb;
this._handle.availOutBefore = this._chunkSize - this._outOffset;
this._handle.availInBefore = chunk.byteLength;
this._handle.inOff = 0;
this._handle.flushFlag = flushFlag;

this._handle.write(
Expand Down Expand Up @@ -603,7 +607,7 @@ export class Zlib extends ZlibBase {
);
dictionary = options.dictionary;

if (dictionary != null && !isArrayBufferView(dictionary)) {
if (dictionary !== undefined && !isArrayBufferView(dictionary)) {
if (isAnyArrayBuffer(dictionary)) {
dictionary = Buffer.from(dictionary);
} else {
Expand All @@ -624,7 +628,7 @@ export class Zlib extends ZlibBase {
memLevel,
strategy,
writeState,
processCallback,
processCallback.bind(handle),
dictionary
);
super(options ?? {}, mode, handle);
Expand All @@ -635,7 +639,7 @@ export class Zlib extends ZlibBase {
this._writeState = writeState;
}

public params(level: number, strategy: number, callback: () => never): void {
public params(level: number, strategy: number, callback: () => void): void {
checkRangesOrGetDefault(
level,
'level',
Expand Down
9 changes: 9 additions & 0 deletions src/node/internal/zlib.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ export class ZlibStream {
outputOffset: number,
outputLength: number
): void;
public writeSync(
flushFlag: number,
inputBuffer: NodeJS.TypedArray,
inputOffset: number,
inputLength: number,
outputBuffer: NodeJS.TypedArray,
outputOffset: number,
outputLength: number
): void;
public params(level: number, strategy: number): void;
public reset(): void;

Expand Down
68 changes: 64 additions & 4 deletions src/workerd/api/node/tests/zlib-nodejs-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,12 @@ export const testFailedInit = {

{
const stream = zlib.createGzip({ level: NaN });
assert.strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION);
strictEqual(stream._level, zlib.constants.Z_DEFAULT_COMPRESSION);
}

{
const stream = zlib.createGzip({ strategy: NaN });
assert.strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY);
strictEqual(stream._strategy, zlib.constants.Z_DEFAULT_STRATEGY);
}
},
};
Expand All @@ -622,7 +622,7 @@ export const zlibDestroyTest = {
{
const ts = zlib.createGzip();
ts.destroy();
assert.strictEqual(ts._handle, null);
strictEqual(ts._handle, null);

const { promise, resolve, reject } = Promise.withResolvers();
promises.push(promise);
Expand All @@ -643,7 +643,7 @@ export const zlibDestroyTest = {
decompress.on('error', (err) => {
errorCount++;
decompress.close();
assert.strictEqual(errorCount, 1, 'Error should only be emitted once');
strictEqual(errorCount, 1, 'Error should only be emitted once');
resolve();
});

Expand Down Expand Up @@ -700,6 +700,66 @@ export const closeAfterError = {
// },
// };

// Tests are taken from:
// https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/test/parallel/test-zlib-bytes-read.js
export const testZlibBytesRead = {
async test() {
const expectStr = 'abcdefghijklmnopqrstuvwxyz'.repeat(2);
const expectBuf = Buffer.from(expectStr);

function createWriter(target, buffer) {
const writer = { size: 0 };
const write = () => {
target.write(Buffer.from([buffer[writer.size]]), () => {
writer.size += 1;
if (writer.size < buffer.length) {
target.flush(write);
} else {
target.end();
}
});
};
write();
return writer;
}

// This test is simplified a lot because of test runner limitations.
// TODO(soon): Add createBrotliCompress once it is implemented.
for (const method of ['createGzip', 'createDeflate', 'createDeflateRaw']) {
assert(method in zlib, `${method} is not available in "node:zlib"`);
const { promise, resolve, reject } = Promise.withResolvers();
let compData = Buffer.alloc(0);
const comp = zlib[method]();
const compWriter = createWriter(comp, expectBuf);
comp.on('data', function (d) {
compData = Buffer.concat([compData, d]);
strictEqual(
this.bytesWritten,
compWriter.size,
`Should get write size on ${method} data.`
);
});
comp.on('error', reject);
comp.on('end', function () {
strictEqual(
this.bytesWritten,
compWriter.size,
`Should get write size on ${method} end.`
);
strictEqual(
this.bytesWritten,
expectStr.length,
`Should get data size on ${method} end.`
);

resolve();
});

await promise;
}
},
};

// Node.js tests relevant to zlib
//
// - [ ] test-zlib-brotli-16GB.js
Expand Down
82 changes: 66 additions & 16 deletions src/workerd/api/node/zlib-util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ kj::Maybe<CompressionError> ZlibContext::setDictionary() {
return kj::none;
}

err = Z_OK;

switch (mode) {
case ZlibMode::DEFLATE:
case ZlibMode::DEFLATERAW:
err = deflateSetDictionary(&stream, dictionary.begin(), dictionary.size());
;
break;
case ZlibMode::INFLATERAW:
err = inflateSetDictionary(&stream, dictionary.begin(), dictionary.size());
Expand Down Expand Up @@ -173,8 +174,6 @@ void ZlibContext::work() {
}
const Bytef* next_expected_header_byte = nullptr;

err = Z_OK;

// If the avail_out is left at 0, then it means that it ran out
// of room. If there was avail_out left over, then it means
// that all the input was consumed.
Expand Down Expand Up @@ -271,7 +270,7 @@ kj::Maybe<CompressionError> ZlibContext::setParams(int _level, int _strategy) {
switch (mode) {
case ZlibMode::DEFLATE:
case ZlibMode::DEFLATERAW:
err = deflateParams(&stream, level, strategy);
err = deflateParams(&stream, _level, _strategy);
break;
default:
break;
Expand Down Expand Up @@ -348,6 +347,7 @@ void CompressionStream<CompressionContext>::emitError(
}

template <typename CompressionContext>
template <bool async>
void CompressionStream<CompressionContext>::writeStream(jsg::Lock& js,
int flush,
kj::ArrayPtr<kj::byte> input,
Expand All @@ -362,11 +362,34 @@ void CompressionStream<CompressionContext>::writeStream(jsg::Lock& js,
context.setBuffers(input, inputLength, output, outputLength);
context.setFlush(flush);

// This implementation always follow the sync version.
if constexpr (!async) {
context.work();
if (checkError(js)) {
updateWriteResult();
writing = false;
}
return;
}

// On Node.js, this is called as a result of `ScheduleWork()` call.
// Since, we implement the whole thing as sync, we're going to ahead and call the whole thing here.
context.work();
if (checkError(js)) {
context.getAfterWriteOffsets(writeResult);
writing = false;

// This is implemented slightly differently in Node.js
// Node.js calls AfterThreadPoolWork().
// Ref: https://github.com/nodejs/node/blob/9edf4a0856681a7665bd9dcf2ca7cac252784b98/src/node_zlib.cc#L402
writing = false;
if (!checkError(js)) return;
updateWriteResult();
KJ_IF_SOME(cb, writeCallback) {
// We only wrap the callback in the enqueue microtask.
// Rest of the implementation is still sync.
auto fn = js.wrapSimpleFunction(js.v8Context(), [&cb](jsg::Lock& js, auto&) { cb(js); });
js.v8Isolate->EnqueueMicrotask(kj::mv(fn));
}

if (pending_close) {
close();
}
}

Expand Down Expand Up @@ -413,12 +436,13 @@ void ZlibUtil::ZlibStream::initialize(int windowBits,
context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary));
}

void ZlibUtil::ZlibStream::write(jsg::Lock& js,
template <bool async = false>
void ZlibUtil::ZlibStream::write_(jsg::Lock& js,
int flush,
kj::Array<kj::byte> input,
kj::ArrayPtr<kj::byte> input,
int inputOffset,
int inputLength,
kj::Array<kj::byte> output,
kj::ArrayPtr<kj::byte> output,
int outputOffset,
int outputLength) {
if (flush != Z_NO_FLUSH && flush != Z_PARTIAL_FLUSH && flush != Z_SYNC_FLUSH &&
Expand All @@ -427,17 +451,43 @@ void ZlibUtil::ZlibStream::write(jsg::Lock& js,
}

// Check bounds
JSG_REQUIRE(inputOffset >= 0 && inputOffset < input.size(), Error,
"Offset should be smaller than size and bigger than 0"_kj);
JSG_REQUIRE(inputOffset >= 0 && inputOffset <= input.size(), Error,
kj::str("Input offset should be smaller than size and bigger than 0, but received: ",
inputOffset));
JSG_REQUIRE(input.size() >= inputLength, Error, "Invalid inputLength"_kj);
JSG_REQUIRE(outputOffset >= 0 && outputOffset < output.size(), Error,
"Offset should be smaller than size and bigger than 0"_kj);
JSG_REQUIRE(outputOffset >= 0 && outputOffset <= output.size(), Error,
kj::str("Output offset should be smaller than size and bigger than 0 , but received: ",
outputOffset));
JSG_REQUIRE(output.size() >= outputLength, Error, "Invalid outputLength"_kj);

writeStream(
writeStream<async>(
js, flush, input.slice(inputOffset), inputLength, output.slice(outputOffset), outputLength);
}

void ZlibUtil::ZlibStream::write(jsg::Lock& js,
int flush,
kj::Array<kj::byte> input,
int inputOffset,
int inputLength,
kj::Array<kj::byte> output,
int outputOffset,
int outputLength) {
write_<true>(js, flush, input.asPtr(), inputOffset, inputLength, output.asPtr(), outputOffset,
outputLength);
}

void ZlibUtil::ZlibStream::writeSync(jsg::Lock& js,
int flush,
kj::Array<kj::byte> input,
int inputOffset,
int inputLength,
kj::Array<kj::byte> output,
int outputOffset,
int outputLength) {
write_<false>(js, flush, input.asPtr(), inputOffset, inputLength, output.asPtr(), outputOffset,
outputLength);
}

void ZlibUtil::ZlibStream::params(int level, int strategy) {
context.setParams(level, strategy);
}
Expand Down
Loading

0 comments on commit eecd02d

Please sign in to comment.