diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 08c196802780b8..b6e744250799c6 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -6,11 +6,13 @@ const { } = primordials; function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function'); + return !!(obj && typeof obj.pipe === 'function' && + typeof obj.on === 'function'); } function isWritable(obj) { - return !!(obj && typeof obj.write === 'function'); + return !!(obj && typeof obj.write === 'function' && + typeof obj.on === 'function'); } function isStream(obj) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 714a1945d78a7d..428f0fe89abe60 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1247,3 +1247,19 @@ const net = require('net'); () => common.mustNotCall(), ); } + +{ + const content = 'abc'; + pipeline(Buffer.from(content), PassThrough({ objectMode: true }), + common.mustSucceed(() => {})); + + let res = ''; + pipeline(Buffer.from(content), async function*(previous) { + for await (const val of previous) { + res += String.fromCharCode(val); + yield val; + } + }, common.mustSucceed(() => { + assert.strictEqual(res, content); + })); +}