Skip to content

Commit

Permalink
fixup: sync opt
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 17, 2021
1 parent e6c8d9f commit aabfd75
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module.exports.map = function map(stream, fn, options) {

let reading = false;

// TODO: What about hwm? This will cause some unecessary buffering.
// TODO: What about hwm? This will cause some unnecessary buffering.
const ret = new Readable({
objectMode: stream.readableObjectMode ?? stream.objectMode ?? true,
read () {
Expand All @@ -47,7 +47,12 @@ module.exports.map = function map(stream, fn, options) {
!state.destroyed &&
(!state.length || state.length < state.highWaterMark)
) {
const [err, val] = await queue.shift();
let entry = queue.shift();
if (typeof entry.then === 'function') {
entry = await entry;
}

const [err, val] = entry;
if (err) {
ret.destroy(err);
} else {
Expand All @@ -63,7 +68,7 @@ module.exports.map = function map(stream, fn, options) {

async function wrap (val) {
try {
return [null, await fn(val, { signal })];
return [null, await val];
} catch (err) {
return [err, null];
}
Expand All @@ -78,11 +83,20 @@ module.exports.map = function map(stream, fn, options) {

function pump () {
while (queue.length < concurrency) {
const val = stream.read();
let val = stream.read();
if (val === null) {
return;
}
enqueue(wrap(val));
try {
val = fn(val, { signal });
if (val && typeof val.then === 'function') {
enqueue(wrap(val));
} else {
enqueue([null, val]);
}
} catch (err) {
enqueue([err, null]);
}
}
}

Expand Down

0 comments on commit aabfd75

Please sign in to comment.