Skip to content

Commit

Permalink
streams: add CompressionStream and DecompressionStream
Browse files Browse the repository at this point in the history
Signed-off-by: James M Snell <jasnell@gmail.com>
  • Loading branch information
jasnell committed Jul 14, 2021
1 parent 1bb660e commit 1e09cb6
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
52 changes: 52 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -1118,5 +1118,57 @@ added: v16.5.0
* `chunk` {any}
* Returns: {number}
### Class: `CompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new CompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `compressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `compressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
### Class: `DecompressionStream`
<!-- YAML
added: REPLACEME
-->
#### `new DecompressionStream(format)`
<!-- YAML
added: REPLACEME
-->
* `format` {string} One of either `'deflate'` or `'gzip'`.
#### `decompressionStream.readable`
<!-- YAML
added: REPLACEME
-->
* Type: {ReadableStream}
#### `deccompressionStream.writable`
<!-- YAML
added: REPLACEME
-->
* Type: {WritableStream}
[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
165 changes: 165 additions & 0 deletions lib/internal/webstreams/compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
'use strict';

const {
ObjectDefineProperties,
Symbol,
} = primordials;

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
},
} = require('internal/errors');

const {
newReadableWritablePairFromDuplex,
} = require('internal/webstreams/adapters');

const {
customInspect,
kEnumerableProperty,
} = require('internal/webstreams/util');

const {
customInspectSymbol: kInspect,
} = require('internal/util');

const {
createDeflate,
createInflate,
createGzip,
createGunzip,
} = require('zlib');

const kHandle = Symbol('kHandle');
const kTransform = Symbol('kTransform');
const kType = Symbol('kType');

/**
* @typedef {import('./readablestream').ReadableStream} ReadableStream
* @typedef {import('./writablestream').WritableStream} WritableStream
*/

function isCompressionStream(value) {
return typeof value?.[kHandle] === 'object' &&
value?.[kType] === 'CompressionStream';
}

function isDecompressionStream(value) {
return typeof value?.[kHandle] === 'object' &&
value?.[kType] === 'DecompressionStream';
}

class CompressionStream {
/**
* @param {'deflate'|'gzip'} format
*/
constructor(format) {
this[kType] = 'CompressionStream';
switch (format) {
case 'deflate':
this[kHandle] = createDeflate();
break;
case 'gzip':
this[kHandle] = createGzip();
break;
default:
throw new ERR_INVALID_ARG_VALUE('format', format);
}
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
}

/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
return this[kTransform].readable;
}

/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
return this[kTransform].writable;
}

[kInspect](depth, options) {
if (!isCompressionStream(this))
throw new ERR_INVALID_THIS('CompressionStream');
customInspect(depth, options, 'CompressionStream', {
readable: this[kTransform].readable,
writable: this[kTransform].writable,
});
}
}

class DecompressionStream {
/**
* @param {'deflate'|'gzip'} format
*/
constructor(format) {
this[kType] = 'DecompressionStream';
switch (format) {
case 'deflate':
this[kHandle] = createInflate();
break;
case 'gzip':
this[kHandle] = createGunzip();
break;
default:
throw new ERR_INVALID_ARG_VALUE('format', format);
}
this[kTransform] = newReadableWritablePairFromDuplex(this[kHandle]);
}

/**
* @readonly
* @type {ReadableStream}
*/
get readable() {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
return this[kTransform].readable;
}

/**
* @readonly
* @type {WritableStream}
*/
get writable() {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
return this[kTransform].writable;
}

[kInspect](depth, options) {
if (!isDecompressionStream(this))
throw new ERR_INVALID_THIS('DecompressionStream');
customInspect(depth, options, 'DecompressionStream', {
readable: this[kTransform].readable,
writable: this[kTransform].writable,
});
}
}

ObjectDefineProperties(CompressionStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
});

ObjectDefineProperties(DecompressionStream.prototype, {
readable: kEnumerableProperty,
writable: kEnumerableProperty,
});

module.exports = {
CompressionStream,
DecompressionStream,
};
7 changes: 7 additions & 0 deletions lib/stream/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
CompressionStream,
DecompressionStream,
} = require('internal/webstreams/compression');

module.exports = {
ReadableStream,
ReadableStreamDefaultReader,
Expand All @@ -45,4 +50,6 @@ module.exports = {
WritableStreamDefaultController,
ByteLengthQueuingStrategy,
CountQueuingStrategy,
CompressionStream,
DecompressionStream,
};
59 changes: 59 additions & 0 deletions test/parallel/test-whatwg-webstreams-compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Flags: --no-warnings
'use strict';

const common = require('../common');

const {
CompressionStream,
DecompressionStream,
} = require('stream/web');

const assert = require('assert');
const dec = new TextDecoder();

async function test(format) {
const gzip = new CompressionStream(format);
const gunzip = new DecompressionStream(format);

gzip.readable.pipeTo(gunzip.writable).then(common.mustCall());

const reader = gunzip.readable.getReader();
const writer = gzip.writable.getWriter();

await Promise.all([
reader.read().then(({ value, done }) => {
assert.strictEqual(dec.decode(value), 'hello');
}),
reader.read().then(({ done }) => assert(done)),
writer.write('hello'),
writer.close(),
]);
}

Promise.all(['gzip', 'deflate'].map((i) => test(i))).then(common.mustCall());

[1, 'hello', false, {}].forEach((i) => {
assert.throws(() => new CompressionStream(i), {
code: 'ERR_INVALID_ARG_VALUE',
});
assert.throws(() => new DecompressionStream(i), {
code: 'ERR_INVALID_ARG_VALUE',
});
});

assert.throws(
() => Reflect.get(CompressionStream.prototype, 'readable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(CompressionStream.prototype, 'writable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(DecompressionStream.prototype, 'readable', {}), {
code: 'ERR_INVALID_THIS',
});
assert.throws(
() => Reflect.get(DecompressionStream.prototype, 'writable', {}), {
code: 'ERR_INVALID_THIS',
});

0 comments on commit 1e09cb6

Please sign in to comment.