From a704fe02887483ce91d2c0f30aabf23aeda28a7c Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 11 Jan 2024 16:38:36 +0100 Subject: [PATCH 1/2] fixes #8 Signed-off-by: Matteo Collina --- syncthrough.js | 6 +++- test.js | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/syncthrough.js b/syncthrough.js index f76017d..85896d4 100644 --- a/syncthrough.js +++ b/syncthrough.js @@ -149,9 +149,13 @@ function doEnd (that) { that.writable = false if (that._destination) { that._endEmitted = true + const toFlush = that._flush() || null that.emit('end') if (that._destinationNeedsEnd) { - that._destination.end(that._flush() || null) + that._destination.end(toFlush) + } else if (toFlush !== null) { + console.log(toFlush) + that._destination.write(toFlush) } } } diff --git a/test.js b/test.js index e3875cf..50047ab 100644 --- a/test.js +++ b/test.js @@ -8,6 +8,7 @@ const Writable = require('readable-stream').Writable const fs = require('fs') const eos = require('end-of-stream') const pump = require('pump') +const { pipeline } = require('node:stream') const through = require('through') function stringFrom (chunks) { @@ -721,3 +722,83 @@ test('pipe with through', async function (_t) { th.resume() await t.completed }) + +test('works with pipeline', async function (_t) { + const t = tspl(_t, { plan: 3 }) + + const stream = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toUpperCase()) + }) + + const stream2 = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toLowerCase()) + }) + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]) + const sink = stringSink(t, [Buffer.from('foo'), Buffer.from('bar')]) + + pipeline(from, stream, stream2, sink, function (err) { + t.equal(err, null, 'pipeline finished without error') + }) + await t.completed +}) + +test('works with pipeline and handles errors', async function (_t) { + const t = tspl(_t, { plan: 3 }) + + const stream = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toUpperCase()) + }) + + stream.on('close', function () { + t.ok('stream closed prematurely') + }) + + const stream2 = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toLowerCase()) + }) + + stream2.on('close', function () { + t.ok('stream2 closed prematurely') + }) + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]) + const sink = new Writable({ + write: function (chunk, enc, cb) { + cb(new Error('kaboom')) + } + }) + + pipeline(from, stream, stream2, sink, function (err) { + t.ok(err, 'pipeline finished with error') + }) + await t.completed +}) + +test('works with pipeline and calls flush', async function (_t) { + const t = tspl(_t, { plan: 3 }) + const expected = 'hello world!' + let actual = '' + pipeline( + Readable.from('hello world'), + syncthrough( + undefined, + function flush () { + t.ok('flush called') + this.push('!') + } + ), + new Writable({ + write (chunk, enc, cb) { + actual += chunk.toString() + cb() + } + }), + (err) => { + t.equal(err, null, 'pipeline finished without error') + t.equal(actual, expected, 'actual matches expected') + } + ) + + await t.completed +}) From e0cacfe1362dbf4b3830292004aebd82c9e2e6e9 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 11 Jan 2024 16:41:03 +0100 Subject: [PATCH 2/2] fixup Signed-off-by: Matteo Collina --- syncthrough.js | 3 +-- test.js | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/syncthrough.js b/syncthrough.js index 85896d4..2c9aa97 100644 --- a/syncthrough.js +++ b/syncthrough.js @@ -150,13 +150,12 @@ function doEnd (that) { if (that._destination) { that._endEmitted = true const toFlush = that._flush() || null - that.emit('end') if (that._destinationNeedsEnd) { that._destination.end(toFlush) } else if (toFlush !== null) { - console.log(toFlush) that._destination.write(toFlush) } + that.emit('end') } } } diff --git a/test.js b/test.js index 50047ab..7d02525 100644 --- a/test.js +++ b/test.js @@ -802,3 +802,31 @@ test('works with pipeline and calls flush', async function (_t) { await t.completed }) + +test('works with pipeline and calls flush / 2', async function (_t) { + const t = tspl(_t, { plan: 3 }) + const expected = 'hello world!' + let actual = '' + pipeline( + Readable.from('hello world'), + syncthrough( + undefined, + function flush () { + t.ok('flush called') + return '!' + } + ), + new Writable({ + write (chunk, enc, cb) { + actual += chunk.toString() + cb() + } + }), + (err) => { + t.equal(err, null, 'pipeline finished without error') + t.equal(actual, expected, 'actual matches expected') + } + ) + + await t.completed +})