diff --git a/syncthrough.js b/syncthrough.js index f76017d..2c9aa97 100644 --- a/syncthrough.js +++ b/syncthrough.js @@ -149,10 +149,13 @@ function doEnd (that) { that.writable = false if (that._destination) { that._endEmitted = true - that.emit('end') + const toFlush = that._flush() || null if (that._destinationNeedsEnd) { - that._destination.end(that._flush() || null) + that._destination.end(toFlush) + } else if (toFlush !== null) { + that._destination.write(toFlush) } + that.emit('end') } } } diff --git a/test.js b/test.js index e3875cf..7d02525 100644 --- a/test.js +++ b/test.js @@ -8,6 +8,7 @@ const Writable = require('readable-stream').Writable const fs = require('fs') const eos = require('end-of-stream') const pump = require('pump') +const { pipeline } = require('node:stream') const through = require('through') function stringFrom (chunks) { @@ -721,3 +722,111 @@ test('pipe with through', async function (_t) { th.resume() await t.completed }) + +test('works with pipeline', async function (_t) { + const t = tspl(_t, { plan: 3 }) + + const stream = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toUpperCase()) + }) + + const stream2 = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toLowerCase()) + }) + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]) + const sink = stringSink(t, [Buffer.from('foo'), Buffer.from('bar')]) + + pipeline(from, stream, stream2, sink, function (err) { + t.equal(err, null, 'pipeline finished without error') + }) + await t.completed +}) + +test('works with pipeline and handles errors', async function (_t) { + const t = tspl(_t, { plan: 3 }) + + const stream = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toUpperCase()) + }) + + stream.on('close', function () { + t.ok('stream closed prematurely') + }) + + const stream2 = syncthrough(function (chunk) { + return Buffer.from(chunk.toString().toLowerCase()) + }) + + stream2.on('close', function () { + t.ok('stream2 closed prematurely') + }) + + const from = stringFrom([Buffer.from('foo'), Buffer.from('bar')]) + const sink = new Writable({ + write: function (chunk, enc, cb) { + cb(new Error('kaboom')) + } + }) + + pipeline(from, stream, stream2, sink, function (err) { + t.ok(err, 'pipeline finished with error') + }) + await t.completed +}) + +test('works with pipeline and calls flush', async function (_t) { + const t = tspl(_t, { plan: 3 }) + const expected = 'hello world!' + let actual = '' + pipeline( + Readable.from('hello world'), + syncthrough( + undefined, + function flush () { + t.ok('flush called') + this.push('!') + } + ), + new Writable({ + write (chunk, enc, cb) { + actual += chunk.toString() + cb() + } + }), + (err) => { + t.equal(err, null, 'pipeline finished without error') + t.equal(actual, expected, 'actual matches expected') + } + ) + + await t.completed +}) + +test('works with pipeline and calls flush / 2', async function (_t) { + const t = tspl(_t, { plan: 3 }) + const expected = 'hello world!' + let actual = '' + pipeline( + Readable.from('hello world'), + syncthrough( + undefined, + function flush () { + t.ok('flush called') + return '!' + } + ), + new Writable({ + write (chunk, enc, cb) { + actual += chunk.toString() + cb() + } + }), + (err) => { + t.equal(err, null, 'pipeline finished without error') + t.equal(actual, expected, 'actual matches expected') + } + ) + + await t.completed +})