Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: Readable batched iteration #34207

Open
ronag opened this issue Jul 4, 2020 · 18 comments
Open

stream: Readable batched iteration #34207

ronag opened this issue Jul 4, 2020 · 18 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@ronag
Copy link
Member

ronag commented Jul 4, 2020

This is a continuation of #34035 and the promises session we had on OpenJS about async iteration performance of streams. One alternative discussed was to batch reading.

I was thinking we could do something along the lines of:

async function* createBatchedAsyncIterator(stream, batchLen) {
  let callback = nop;

  function next(resolve) {
    if (this === stream) {
      callback();
      callback = nop;
    } else {
      callback = resolve;
    }
  }

  stream
    .on('readable', next)
    .on('error', next)
    .on('end', next)
    .on('close', next);

  try {
    const state = stream._readableState;
    while (true) {
      let buffer;
      while (true) {
        const chunk = stream.read();
        if (chunk === null) break;
        if (!buffer) buffer = [];
        buffer.push(chunk);
        if (batchLen && buffer.length >= batchLen) break;
      }
      if (buffer) {
        yield buffer;
      } else if (state.errored) {
        throw state.errored;
      } else if (state.ended) {
        break;
      } else if (state.closed) {
        // TODO(ronag): ERR_PREMATURE_CLOSE?
        break;
      } else {
        await new Promise(next);
      }
    }
  } catch (err) {
    destroyImpl.destroyer(stream, err);
    throw err;
  } finally {
    destroyImpl.destroyer(stream, null);
  }
}

Readable.batched = function (stream, batchLen) {
  return createBatchedAsyncIterator(stream, batchLen);
}

Which would make the following possible:

// Concurrency
for await (const requests of Readable.batched(stream, 128)) {
  // Process in parallel with concurrency limit of 128.
  await Promise.all(requests.map(dispatch))
}

// Speed
for await (const requests of Readable.batched(stream, 128)) {
  for (const request of requests) {
    // All in the same tick
  }
}

It's still not perfect since if one element takes very long it would reduce concurrency. However, it would still be a step forward. Also reducing the async iteration overhead.

@ronag ronag added the stream Issues and PRs related to the stream subsystem. label Jul 4, 2020
@ronag
Copy link
Member Author

ronag commented Jul 4, 2020

@nodejs/streams @benjamingr

@ronag
Copy link
Member Author

ronag commented Jul 4, 2020

Another possible implementation with maybe better performance:

      
      stream.read(0);
      const buffer = state.length > 0 ? state.buffer.splice(0, batchLen) : null;
      if (buffer) yield buffer;
      else ...

Though then it becomes a question on whether it's an array or just an iterable.

@mcollina
Copy link
Member

mcollina commented Jul 5, 2020

I think this is great.

what do you think @jasnell?

@ronag
Copy link
Member Author

ronag commented Jul 5, 2020

Giving this further thought, If you want there always to be max concurrency we could also go for a pattern like this:

for await (const result of Readable.parallel(stream, async (chunk) => {
  // Process chunk
  return String(chunk).toUpperCase()
}, 128)) {
  // Result chunk in order
}

for await (const result of Readable.unorderedParallel(stream, async (chunk) => {
  // Process chunk
  return String(chunk).toUpperCase()
}, 128)) {
  // Result chunk not in order
}

Or something...

for await (const chunks of pipeline(
  Readable.from([1,2,3]),
  Readable.parallel(async function* (chunk) {
    yield chunk * 2
  }, 128),
  // [2,4,6]
  Readable.batched(async function* (chunks) {
    yield chunks
  }, 128),
  // [[2,4,6]]
  Readable.parallel(async function* (chunks) {
    yield* chunks
    yield* chunks
  }, 128),
  // [2,4,6,2,4,6]
  Readable.batched(128),
  // [[2,4,6,2,4,6]]
)) {
   for (const chunk of chunks) {
     console.log(chunk)
   }
}

@ronag
Copy link
Member Author

ronag commented Jul 5, 2020

Anyway, going back to to original proposal I think it's a good idea to put the stream parameter as the last argument, i.e. Readable.batched(128, stream) to allow for extensions such as the previous post.

@mcollina
Copy link
Member

mcollina commented Jul 5, 2020

I would use an option-object instead.

@addaleax
Copy link
Member

addaleax commented Jul 5, 2020

How would this effectively differ from increasing the readable HWM? The Readable.batched() here would trade off memory usage for speed, which is also what the HWM does, but increasing the HWM doesn’t increase latency whereas this would, right?

@ronag
Copy link
Member Author

ronag commented Jul 5, 2020

How would this effectively differ from increasing the readable HWM?

Would differ quite a bit in objectMode.

  1. We would reduce the number of yield invocation, e.g. with batch=128 you would do yield once instead of 128 times. Most of the overhead of async iterators comes from this. Also remember that between each item there has to be a tick, so we would also reduce the number of ticks from 128, to 1.

  2. It would allow the users to work on several items in parallel. Currently async iterators serializes everything and forces the user to work on one item at a time.

What I suggest here would basically bring the HWM buffer from the stream directly into the iteration.

@imcotton
Copy link
Contributor

imcotton commented Jul 8, 2020

Just want to pitch in my 2 cents, instead of fixed batch size, how about accepting size dynamically?

The syntax is actually quite simple:

const { asyncReadable } = require('async-readable');

const stream = createReadStream('./sample.gif');
const { read, off } = asyncReadable(stream);

const [ G, I, F, EIGHT ] = await read(4);
const [ SEVEN_OR_NINE, A ] = await read(2);

const width = (await read(2)).readUInt16LE(0);
const height = (await read(2)).readUInt16LE(0);

off();

console.info({ width, height });

Based on that, Async Generator could also comes in play:

const { toReadableStream } = require('async-readable');

const chop = toReadableStream(async function* ({ read }) {

    while (true) {
        const size = rand(1, 9);
        const chunk = await read(size);
        yield { size, chunk };
    }

});

for await (const { size, chunk } of chop(stream)) {
    console.info({ size, chunk });
}

More realistic examples are like 1) bitcoin block parser, or 2) socket5 client negotiator.

If you were interested, the underlying implementations are only around 60 lines of code, not sure it would be a good fit in core, would love to have any feedback, thanks.

edit: code sample correction

@ronag
Copy link
Member Author

ronag commented Jul 8, 2020

Just want to pitch in my 2 cents, instead of fixed batch size, how about accepting size dynamically?

I'm not sure I see how that differs from my example?

for await (const { size, chunk } of chop(stream)) {
    console.info({ size, chunk }); // size === length, chunk == items
}

vs

for await (const chunks of Readable.batched(128, stream)) {
    console.info(chunks); // chunks.length === length, chunks == items
}

@imcotton
Copy link
Contributor

imcotton commented Jul 8, 2020

Its read :: number -> Promise<Buffer> part I'm referring to, the read signature.

@ronag
Copy link
Member Author

ronag commented Jul 8, 2020

part I'm referring to, the read signature.

Ah, I think that's a different topic though?

@imcotton
Copy link
Contributor

imcotton commented Jul 8, 2020

Sorry I don't quite follow, the topic here been around batch reading methodology and its implementation, no?

@ronag
Copy link
Member Author

ronag commented Jul 29, 2020

Here is another case where this would be useful:

const file = await fs.open()
try {
  for await (const chunks of Readable.batched(source)) {
    await file.writev(chunks)
  }
} finally {
  await file.close()
}

@jimmywarting
Copy link

whatwg talked about having a extendable queuingStrategy

ReadableStream.from(asyncIterable, queuingStrategy = {})

@mcollina
Copy link
Member

mcollina commented Dec 7, 2020

@jimmywarting that would not help in this case. A significant part of the cost is having a promise per chunk. Essentially we'll need to have an alternative async iteration implementation that returns an array.

In

you can see that we could have an alternate implementation that returns an array of chunks instead.

@benjamingr
Copy link
Member

I'm honestly fine with just landing one of the APIs Robert mentioned above as experimental and going from there they seem adequate.

@mcollina
Copy link
Member

mcollina commented Dec 7, 2020

I'm honestly fine with just landing one of the APIs Robert mentioned above as experimental and going from there they seem adequate.

I think we should just add this in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

6 participants