Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The stream is not in a state that permits enqueue; new Response(readable).blob().text() resolve to [object ReadableStream] not underlying source #95

Closed
guest271314 opened this issue Sep 26, 2021 · 10 comments

Comments

@guest271314
Copy link

Consider this code:

Promise.allSettled([
  new Promise(async (resolve, reject) => {
    let controller;
    const rs = new ReadableStream(
      {
        start(_) {
          return (controller = _);
        },
      },
      { highWaterMark: 1 }
    );
    for (let chunk of Array.from({ length: 100 }, (_, i) => i)) {
      try {
        controller.enqueue(new Uint8Array([chunk]));
      } catch (err) {
        console.warn(
          `new Response().arrayBuffer() outside event handler: ${err.message}`
        );
        reject(err);
      }
    }
    controller.close();
    const buffer = await new Response(rs).arrayBuffer();
    resolve(buffer);
  }),
  new Promise(async (resolve, reject) => {
    let controller;
    const rs = new ReadableStream(
      {
        start(_) {
          return (controller = _);
        },
      },
      { highWaterMark: 1 }
    );
    const ac = new AudioContext();
    if (ac.state === 'suspended') {
      await ac.resume();
    }
    const msd = new MediaStreamAudioDestinationNode(ac);
    const osc = new OscillatorNode(ac);
    osc.connect(msd);
    osc.onended = () => {
      recorder.requestData();
      recorder.stop();
    };

    const recorder = new MediaRecorder(msd.stream, {
      audioBitrateMode: 'constant',
    });
    recorder.onstop = async (e) => {
      // console.log(e.type);
      const buffer = await new Response(rs).arrayBuffer();
      resolve(buffer);
    };
    recorder.ondataavailable = async (e) => {
      // console.log(e.type, e.data.size);
      if (e.data.size > 0) {
        try {
          controller.enqueue(new Uint8Array(await e.data.arrayBuffer()));
        } catch (err) {
          console.warn(`Response.arrayBuffer(): ${err.message}`);
          reject(err);
        }
      } else {
        controller.close();
        await ac.close();
      }
    };
    osc.start(ac.currentTime);
    recorder.start(1);
    osc.stop(ac.currentTime + 1.15);
  }),
  new Promise(async (resolve, reject) => {
    let controller;
    const rs = new ReadableStream(
      {
        start(_) {
          return (controller = _);
        },
      },
      { highWaterMark: 1 }
    );
    const ac = new AudioContext();
    if (ac.state === 'suspended') {
      await ac.resume();
    }
    const msd = new MediaStreamAudioDestinationNode(ac);
    const osc = new OscillatorNode(ac);
    osc.connect(msd);
    osc.onended = () => {
      recorder.requestData();
      recorder.stop();
    };

    const recorder = new MediaRecorder(msd.stream, {
      audioBitrateMode: 'constant',
    });
    recorder.onstop = async (e) => {
      // console.log(e.type);
      const buffer = await new Response(rs).blob();
      resolve(buffer);
    };
    recorder.ondataavailable = async (e) => {
      // console.log(e.type, e.data.size);
      if (e.data.size > 0) {
        try {
          controller.enqueue(new Uint8Array(await e.data.arrayBuffer()));
        } catch (err) {
          console.warn(`Response.blob(): ${err.message}`);
          reject(err);
        }
      } else {
        controller.close();
        await ac.close();
      }
    };
    osc.start(ac.currentTime);
    recorder.start(1);
    osc.stop(ac.currentTime + 1.15);
  }),
])
  .then(async (data) => {
    console.table(data);
    console.log(await data[2].value.text());
  })
  .catch((err) => console.error(err));

Using Streams API shipped with Chromium 96.0.4651.0 (Developer Build) (64-bit)
Revision aabda36a688c0883019e7762faa00db6342a7e37-refs/heads/main@{#924134} this error is thrown

TypeError: Failed to execute 'enqueue' on 'ReadableStreamDefaultController': Cannot enqueue a chunk into a readable stream that is closed or has been requested to be closed

This polyfill, included as

  <script src="https://unpkg.com/web-streams-polyfill/dist/polyfill.min.js"></script>

at https://plnkr.co/edit/XwtpbXt4aqTQTIhb?preview throws

The stream is not in a state that permits enqueue

Observe that the first element of the array passed to Promise.allSettled() uses a basic loop in which ReadableStreamDefaultController.enqueue() is called with a Uint8Array() as value, then close() is called, then the ReadableStream is passed to new Response() and read to completion using blob().

The second element of the array follows the same pattern as the first element, except close() being called in MediaRecorder dataavailable event handler, and new Response(readable).arrayBuffer() called in onstop event handler. The TypeError is still thrown, logged at console, yet not caught in try..catch.

The third element of the array follows same pattern as first and second element, though uses Response.blob(), which always throws the TypeError.

I do not see anywhere in the code where enqueue() is called after close().

Using the polyfill I did observe where the third element fulfilled, was not rejected, while the very next test with the same code blob() throws an error.

The Streams API shipped with Chromium always throws https://bugs.chromium.org/p/chromium/issues/detail?id=1253143.

The polyfill does not resolve to the underlying data enqueued when Response(readable).blob(), rather appears to resolve to a string

[object ReadableStream]

There should not be any error thrown. The sequence is close(), then new Response(readable) with arrayBuffer() or blob() chained.

This behaviour of both Chromium Streams API and this polyfill makes no sense to me. (This cannot be working as intended.) Kindly illuminate.

@guest271314
Copy link
Author

Screenshot_2021-09-26_08-07-41

@guest271314
Copy link
Author

More details:

[Violation] 'dataavailable' handler took 566ms

is printed to console which indicates the MediaRecorder.ondataavailable event handler is retaining references and somehow is being fired (internally) after stop() is called. That is the only explanation I have been able to deduce given enqueue() is not called after close().

@MattiasBuelens
Copy link
Owner

MattiasBuelens commented Sep 26, 2021

The problem is that ondataavailable can be called again while the previous call is still inside await e.data.arrayBuffer():

recorder.ondataavailable = async (e) => {
  if (e.data.size > 0) {
    try {
      controller.enqueue(new Uint8Array(await e.data.arrayBuffer()));
    } catch (err) {
      console.warn(`Response.arrayBuffer(): ${err.message}`);
      reject(err);
    }
  } else {
    controller.close();
    await ac.close();
  }
};
  1. ondataavailable is called with a Blob of size 733.
  2. e.data.arrayBuffer() is called, asynchronously collecting the bytes of the blob.
  3. ondatavailable is called again with a Blob of size 0.
  4. You immediately call controller.close(), closing the stream.
  5. await e.data.arrayBuffer() from the previous event resolves, and you try to enqueue a chunk after the stream is closed. 💥

You can fix this by storing a promise between each call to ondataavailable and making sure that each call awaits the previous promise before attempting to enqueue or close.

However, there's a much better solution, namely to use more streams! 😁 Instead of awaiting something before you enqueue it to the stream, enqueue it immediately and use a TransformStream to transform it asynchronously. That way, all the nasty queuing logic is handled for you by the streams implementation. 😉

new Promise(async (resolve, reject) => {
  let controller;
  const input = new ReadableStream(
    {
      start(_) {
        return (controller = _);
      },
    },
    { highWaterMark: 1 }
  );
  const output = input.pipeThrough(new TransformStream({
    async transform(blob, c) {
      try {
        c.enqueue(new Uint8Array(await blob.arrayBuffer()));
      } catch (err) {
        console.warn(`Response.arrayBuffer(): ${err.message}`);
        reject(err);
      }
    }
  }));
  const ac = new AudioContext();
  if (ac.state === 'suspended') {
    await ac.resume();
  }
  const msd = new MediaStreamAudioDestinationNode(ac);
  const osc = new OscillatorNode(ac);
  osc.connect(msd);
  osc.onended = () => {
    recorder.requestData();
    recorder.stop();
  };

  const recorder = new MediaRecorder(msd.stream, {
    audioBitrateMode: 'constant',
  });
  recorder.onstop = async (e) => {
    // console.log(e.type);
    const buffer = await new Response(output).arrayBuffer();
    resolve(buffer);
  };
  recorder.ondataavailable = async (e) => {
    // console.log(e.type, e.data.size);
    if (e.data.size > 0) {
      controller.enqueue(e.data);
    } else {
      controller.close();
      await ac.close();
    }
  };
  osc.start(ac.currentTime);
  recorder.start(1);
  osc.stop(ac.currentTime + 1.15);
})

(I suggest you close the Chromium bug, since this is not a problem with the Chromium's streams implementation but rather with your code. Both Chromium and this polyfill behave as expected.)

@MattiasBuelens
Copy link
Owner

The polyfill does not resolve to the underlying data enqueued when Response(readable).blob(), rather appears to resolve to a string

[object ReadableStream]

Ah, that's an unfortunate limitation of (the current version of) the polyfill. new Response(stream) expects stream to be a native ReadableStream, but you're giving it a polyfilled ReadableStream. Thus, rather than reading each chunk from the stream, new Response() will just stringify the object.

I'm planning to make the polyfill play better with native streams for version 4.0, but it's never going to be perfect. And even then, if the browser doesn't support native streams at all, then new Response(stream) wouldn't work either. (For example, older browsers which support fetch but don't yet support streaming response bodies.)

You can work around this by concatenating the Uint8Array chunks manually, instead of using new Response().arrayBuffer():

// Collect all chunks
let chunks = [];
for await (const chunk of output) {
  chunks.push(chunk);
}
// Concatenate into single Uint8Array
const size = chunks.reduce((acc, chunk) => acc + chunk.byteLength, 0);
const concatenated = new Uint8Array(size);
let offset = 0;
for (const chunk of chunks) {
  concatenated.set(chunk, offset);
  offset += chunk.byteLength;
}
resolve(concatenated.buffer);

Alternatively, you can wait for ReadableStream.prototype.arrayBuffer() to become a thing (whatwg/streams#1019). 😛

@guest271314
Copy link
Author

  1. await e.data.arrayBuffer() from the previous event resolves, and you try to enqueue a chunk after the stream is closed.

Where does that occur? No data is enqueued when Blob size is 0.

I am testing your suggestion now. I will post my findings here in a few minutes.

Ah, that's an unfortunate limitation of (the current version of) the polyfill.

That should probably be included in https://github.com/MattiasBuelens/web-streams-polyfill#compliance list items?

@guest271314
Copy link
Author

You can work around this by concatenating the Uint8Array chunks manually, instead of using new Response().arrayBuffer():

Yes. I was previously just pushing Blobs to an array then calling Blob(chunks). I have also utilized WebAssembly.Memory.grow() in related experiments to write to a single memory instance https://github.com/guest271314/webtransport/blob/14f7e3f0cf5a84368c9626aabcd9fdee71572b69/webTransportAudioWorkletWebAssemblyMemoryGrow.js#L203-L211

        for (; i < value.buffer.byteLength; i++, readOffset++) {
          if (readOffset + 1 >= memory.buffer.byteLength) {
            console.log(`memory.buffer.byteLength before grow() for loop: ${memory.buffer.byteLength}.`);
            memory.grow(3);
            console.log(`memory.buffer.byteLength after grow() for loop: ${memory.buffer.byteLength}`);
            uint8_sab = new Uint8Array(memory.buffer);
          }              
          uint8_sab[readOffset] = value[i];
        }

My goal with the current version is to use Streams API without stroring the values in an object.

@guest271314
Copy link
Author

Your suggestion achieves the expected result. Thank you for the illumination.

I still do not see where I call enqueue() after close.

I posted this

INVALID

See #95 (comment)

at the Chromium bug. I have no way to close the bug that I am aware of.

@MattiasBuelens
Copy link
Owner

MattiasBuelens commented Sep 26, 2021

  1. await e.data.arrayBuffer() from the previous event resolves, and you try to enqueue a chunk after the stream is closed.

Where does that occur? No data is enqueued when Blob size is 0.

Not that one, the one with blob.size = 733. The first ondataavailable call isn't "done" yet after step 2, it lasts all the way until step 5. Meanwhile, another ondataavailable event is fired in step 3 with blob.size = 0. That's what's causing the seemingly "out of order" execution.

The key insight is to realize that the event loop does not await the result of your ondataavailable handler, so it'll happily call it again as soon as the synchronous call is done.

Ah, that's an unfortunate limitation of (the current version of) the polyfill.

That should probably be included in https://github.com/MattiasBuelens/web-streams-polyfill#compliance list items?

It's still technically true though: the polyfill faithfully implements the entire Streams API. What it doesn't do is implement all other web APIs that integrate with the Streams API, such as fetch.

I do see this sort of question coming up more frequently, so I'll try to explain this better in the README.

Yes. I was previously just pushing Blobs to an array then calling Blob(chunks).

I mean, that's probably even easier? You can pass other Blobs to the Blob constructor, and then call concatenatedBlob.arrayBuffer() at the end. If you're not interested in the concrete byte data of any individual chunk, you don't really need to turn each chunk into its own Uint8Array first.

@guest271314
Copy link
Author

The key insight is to realize that the event loop does not await the result of your ondataavailable handler, so it'll happily call it again as soon as the synchronous call is done.

The first ondataavailable call isn't "done" yet after step 2

How is that possible? The handler is async, I use await within enqueue().

Reason for making enqueue() a Promise and a standardized means to stream from dataavailable without await being meaningless withing dataavailable handler?

I mean, that's probably even easier? You can pass other Blobs to the Blob constructor, and then call concatenatedBlob.arrayBuffer() at the end. If you're not interested in the concrete byte data of any individual chunk, you don't really need to turn each chunk into its own Uint8Array first.

I have not performed exhaustive analysis of the comparive expense of Blob() constructor versus Streams API. It is not infrequent that specification authors have noted that Blob() construction is expensive. WebAssembly.Memory.grow() does have hard-coded limitation on Chromium.

Individual chunks are important. Developers are interested in partial audio decoding, and encoding various codecs, e.g., WebAudio/web-audio-api#337 (comment). I am testing MediaRecorder version, which can clip the ending second of recording (can also occur with other Chromium audio playback and capture API's), for example, a WebCodecs version to serialize and deserialize chunks of media https://github.com/guest271314/webcodecs/blob/main/serialize-to-json-deserialize-json-to-encodedaudiochunk-decode.js, where if the user does not actually test WebCodecs and Web Audio API claims and implementation until they break you will not discover that what you put in is not what you get out https://bugs.chromium.org/p/chromium/issues/detail?id=1237633 in spite of advertising "flexibility" as the rationale for the API. The discrete chunks are important to audio developers.

@guest271314
Copy link
Author

Thank you, again for your analysis and solution. I will re-read the code until I gather exactly what is occuring, though I am now reminded of promises.push(new Promise(resolve => {})) in synchronous for loop then Promise.all(promises).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants