From 1e09cb6149fc0a45464f311dd13f2fbf4dc15206 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Sat, 10 Jul 2021 20:56:56 -0700 Subject: [PATCH] streams: add CompressionStream and DecompressionStream Signed-off-by: James M Snell --- doc/api/webstreams.md | 52 ++++++ lib/internal/webstreams/compression.js | 165 ++++++++++++++++++ lib/stream/web.js | 7 + .../test-whatwg-webstreams-compression.js | 59 +++++++ 4 files changed, 283 insertions(+) create mode 100644 lib/internal/webstreams/compression.js create mode 100644 test/parallel/test-whatwg-webstreams-compression.js diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md index 407230f96812ee..317cc7788f5be3 100644 --- a/doc/api/webstreams.md +++ b/doc/api/webstreams.md @@ -1118,5 +1118,57 @@ added: v16.5.0 * `chunk` {any} * Returns: {number} +### Class: `CompressionStream` + + +#### `new CompressionStream(format)` + + +* `format` {string} One of either `'deflate'` or `'gzip'`. + +#### `compressionStream.readable` + + +* Type: {ReadableStream} + +#### `compressionStream.writable` + + +* Type: {WritableStream} + +### Class: `DecompressionStream` + + +#### `new DecompressionStream(format)` + + +* `format` {string} One of either `'deflate'` or `'gzip'`. + +#### `decompressionStream.readable` + + +* Type: {ReadableStream} + +#### `deccompressionStream.writable` + + +* Type: {WritableStream} + [Streams]: stream.md [WHATWG Streams Standard]: https://streams.spec.whatwg.org/ diff --git a/lib/internal/webstreams/compression.js b/lib/internal/webstreams/compression.js new file mode 100644 index 00000000000000..6dfddbce248b7a --- /dev/null +++ b/lib/internal/webstreams/compression.js @@ -0,0 +1,165 @@ +'use strict'; + +const { + ObjectDefineProperties, + Symbol, +} = primordials; + +const { + codes: { + ERR_INVALID_ARG_VALUE, + ERR_INVALID_THIS, + }, +} = require('internal/errors'); + +const { + newReadableWritablePairFromDuplex, +} = require('internal/webstreams/adapters'); + +const { + customInspect, + kEnumerableProperty, +} = require('internal/webstreams/util'); + +const { + customInspectSymbol: kInspect, +} = require('internal/util'); + +const { + createDeflate, + createInflate, + createGzip, + createGunzip, +} = require('zlib'); + +const kHandle = Symbol('kHandle'); +const kTransform = Symbol('kTransform'); +const kType = Symbol('kType'); + +/** + * @typedef {import('./readablestream').ReadableStream} ReadableStream + * @typedef {import('./writablestream').WritableStream} WritableStream + */ + +function isCompressionStream(value) { + return typeof value?.[kHandle] === 'object' && + value?.[kType] === 'CompressionStream'; +} + +function isDecompressionStream(value) { + return typeof value?.[kHandle] === 'object' && + value?.[kType] === 'DecompressionStream'; +} + +class CompressionStream { + /** + * @param {'deflate'|'gzip'} format + */ + constructor(format) { + this[kType] = 'CompressionStream'; + switch (format) { + case 'deflate': + this[kHandle] = createDeflate(); + break; + case 'gzip': + this[kHandle] = createGzip(); + break; + default: + throw new ERR_INVALID_ARG_VALUE('format', format); + } + this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]); + } + + /** + * @readonly + * @type {ReadableStream} + */ + get readable() { + if (!isCompressionStream(this)) + throw new ERR_INVALID_THIS('CompressionStream'); + return this[kTransform].readable; + } + + /** + * @readonly + * @type {WritableStream} + */ + get writable() { + if (!isCompressionStream(this)) + throw new ERR_INVALID_THIS('CompressionStream'); + return this[kTransform].writable; + } + + [kInspect](depth, options) { + if (!isCompressionStream(this)) + throw new ERR_INVALID_THIS('CompressionStream'); + customInspect(depth, options, 'CompressionStream', { + readable: this[kTransform].readable, + writable: this[kTransform].writable, + }); + } +} + +class DecompressionStream { + /** + * @param {'deflate'|'gzip'} format + */ + constructor(format) { + this[kType] = 'DecompressionStream'; + switch (format) { + case 'deflate': + this[kHandle] = createInflate(); + break; + case 'gzip': + this[kHandle] = createGunzip(); + break; + default: + throw new ERR_INVALID_ARG_VALUE('format', format); + } + this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]); + } + + /** + * @readonly + * @type {ReadableStream} + */ + get readable() { + if (!isDecompressionStream(this)) + throw new ERR_INVALID_THIS('DecompressionStream'); + return this[kTransform].readable; + } + + /** + * @readonly + * @type {WritableStream} + */ + get writable() { + if (!isDecompressionStream(this)) + throw new ERR_INVALID_THIS('DecompressionStream'); + return this[kTransform].writable; + } + + [kInspect](depth, options) { + if (!isDecompressionStream(this)) + throw new ERR_INVALID_THIS('DecompressionStream'); + customInspect(depth, options, 'DecompressionStream', { + readable: this[kTransform].readable, + writable: this[kTransform].writable, + }); + } +} + +ObjectDefineProperties(CompressionStream.prototype, { + readable: kEnumerableProperty, + writable: kEnumerableProperty, +}); + +ObjectDefineProperties(DecompressionStream.prototype, { + readable: kEnumerableProperty, + writable: kEnumerableProperty, +}); + +module.exports = { + CompressionStream, + DecompressionStream, +}; diff --git a/lib/stream/web.js b/lib/stream/web.js index 929abd19044458..3852ab97d841c7 100644 --- a/lib/stream/web.js +++ b/lib/stream/web.js @@ -31,6 +31,11 @@ const { CountQueuingStrategy, } = require('internal/webstreams/queuingstrategies'); +const { + CompressionStream, + DecompressionStream, +} = require('internal/webstreams/compression'); + module.exports = { ReadableStream, ReadableStreamDefaultReader, @@ -45,4 +50,6 @@ module.exports = { WritableStreamDefaultController, ByteLengthQueuingStrategy, CountQueuingStrategy, + CompressionStream, + DecompressionStream, }; diff --git a/test/parallel/test-whatwg-webstreams-compression.js b/test/parallel/test-whatwg-webstreams-compression.js new file mode 100644 index 00000000000000..6d3d6253bd1b86 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-compression.js @@ -0,0 +1,59 @@ +// Flags: --no-warnings +'use strict'; + +const common = require('../common'); + +const { + CompressionStream, + DecompressionStream, +} = require('stream/web'); + +const assert = require('assert'); +const dec = new TextDecoder(); + +async function test(format) { + const gzip = new CompressionStream(format); + const gunzip = new DecompressionStream(format); + + gzip.readable.pipeTo(gunzip.writable).then(common.mustCall()); + + const reader = gunzip.readable.getReader(); + const writer = gzip.writable.getWriter(); + + await Promise.all([ + reader.read().then(({ value, done }) => { + assert.strictEqual(dec.decode(value), 'hello'); + }), + reader.read().then(({ done }) => assert(done)), + writer.write('hello'), + writer.close(), + ]); +} + +Promise.all(['gzip', 'deflate'].map((i) => test(i))).then(common.mustCall()); + +[1, 'hello', false, {}].forEach((i) => { + assert.throws(() => new CompressionStream(i), { + code: 'ERR_INVALID_ARG_VALUE', + }); + assert.throws(() => new DecompressionStream(i), { + code: 'ERR_INVALID_ARG_VALUE', + }); +}); + +assert.throws( + () => Reflect.get(CompressionStream.prototype, 'readable', {}), { + code: 'ERR_INVALID_THIS', + }); +assert.throws( + () => Reflect.get(CompressionStream.prototype, 'writable', {}), { + code: 'ERR_INVALID_THIS', + }); +assert.throws( + () => Reflect.get(DecompressionStream.prototype, 'readable', {}), { + code: 'ERR_INVALID_THIS', + }); +assert.throws( + () => Reflect.get(DecompressionStream.prototype, 'writable', {}), { + code: 'ERR_INVALID_THIS', + });