From 60f5c611353d72befa3872861220a86bb21cdb55 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Thu, 15 Jun 2023 00:11:32 -0400 Subject: [PATCH 01/11] Add feature flag --- packages/shared/ReactFeatureFlags.js | 1 + packages/shared/forks/ReactFeatureFlags.native-fb.js | 1 + packages/shared/forks/ReactFeatureFlags.native-oss.js | 1 + packages/shared/forks/ReactFeatureFlags.test-renderer.js | 1 + .../shared/forks/ReactFeatureFlags.test-renderer.native-fb.js | 1 + packages/shared/forks/ReactFeatureFlags.test-renderer.www.js | 1 + packages/shared/forks/ReactFeatureFlags.www.js | 2 ++ 7 files changed, 8 insertions(+) diff --git a/packages/shared/ReactFeatureFlags.js b/packages/shared/ReactFeatureFlags.js index 3ed8f45f9a433..a0c3bc774eec5 100644 --- a/packages/shared/ReactFeatureFlags.js +++ b/packages/shared/ReactFeatureFlags.js @@ -81,6 +81,7 @@ export const enableLegacyCache = __EXPERIMENTAL__; export const enableFetchInstrumentation = true; export const enableBinaryFlight = __EXPERIMENTAL__; +export const enableFlightReadableStream = __EXPERIMENTAL__; export const enableTaint = __EXPERIMENTAL__; diff --git a/packages/shared/forks/ReactFeatureFlags.native-fb.js b/packages/shared/forks/ReactFeatureFlags.native-fb.js index c3aee2ab41bee..5c29c8b9a71d8 100644 --- a/packages/shared/forks/ReactFeatureFlags.native-fb.js +++ b/packages/shared/forks/ReactFeatureFlags.native-fb.js @@ -44,6 +44,7 @@ export const enableCache = true; export const enableLegacyCache = false; export const enableFetchInstrumentation = false; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const debugRenderPhaseSideEffectsForStrictMode = __DEV__; diff --git a/packages/shared/forks/ReactFeatureFlags.native-oss.js b/packages/shared/forks/ReactFeatureFlags.native-oss.js index 6f5293398b8c8..e1064dfa1bf59 100644 --- a/packages/shared/forks/ReactFeatureFlags.native-oss.js +++ b/packages/shared/forks/ReactFeatureFlags.native-oss.js @@ -52,6 +52,7 @@ export const enableTaint = __NEXT_RN_MAJOR__; export const enableUnifiedSyncLane = __NEXT_RN_MAJOR__; export const enableFizzExternalRuntime = __NEXT_RN_MAJOR__; // DOM-only export const enableBinaryFlight = __NEXT_RN_MAJOR__; // DOM-only +export const enableFlightReadableStream = __NEXT_RN_MAJOR__; // DOM-only export const enableServerComponentKeys = __NEXT_RN_MAJOR__; export const enableServerComponentLogs = __NEXT_RN_MAJOR__; diff --git a/packages/shared/forks/ReactFeatureFlags.test-renderer.js b/packages/shared/forks/ReactFeatureFlags.test-renderer.js index 1fe4a6e2d86d9..b0a0c7f5d5342 100644 --- a/packages/shared/forks/ReactFeatureFlags.test-renderer.js +++ b/packages/shared/forks/ReactFeatureFlags.test-renderer.js @@ -22,6 +22,7 @@ export const enableCache = true; export const enableLegacyCache = __EXPERIMENTAL__; export const enableFetchInstrumentation = true; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const disableCommentsAsDOMContainers = true; diff --git a/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js b/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js index e78740263ff90..40297ad5fa620 100644 --- a/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js +++ b/packages/shared/forks/ReactFeatureFlags.test-renderer.native-fb.js @@ -22,6 +22,7 @@ export const enableCache = true; export const enableLegacyCache = false; export const enableFetchInstrumentation = false; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const disableCommentsAsDOMContainers = true; diff --git a/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js b/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js index a222f31812407..0aa61e8b9bd4e 100644 --- a/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js +++ b/packages/shared/forks/ReactFeatureFlags.test-renderer.www.js @@ -22,6 +22,7 @@ export const enableCache = true; export const enableLegacyCache = true; export const enableFetchInstrumentation = false; export const enableBinaryFlight = true; +export const enableFlightReadableStream = true; export const enableTaint = true; export const enablePostpone = false; export const disableCommentsAsDOMContainers = true; diff --git a/packages/shared/forks/ReactFeatureFlags.www.js b/packages/shared/forks/ReactFeatureFlags.www.js index d05926e29bafa..82e3b08c7ecd0 100644 --- a/packages/shared/forks/ReactFeatureFlags.www.js +++ b/packages/shared/forks/ReactFeatureFlags.www.js @@ -69,6 +69,8 @@ export const enableLegacyCache = true; export const enableFetchInstrumentation = false; export const enableBinaryFlight = false; +export const enableFlightReadableStream = false; + export const enableTaint = false; export const enablePostpone = false; From 7dd8e6fab825084e05518be8c39d16be4e20e09d Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Thu, 15 Jun 2023 01:08:55 -0400 Subject: [PATCH 02/11] Serialize ReadableStream and AsyncIterable --- .eslintrc.js | 5 + .../react-client/src/ReactFlightClient.js | 131 +++++++++++++- .../__tests__/ReactFlightDOMBrowser-test.js | 94 ++++++++++ .../react-server/src/ReactFlightServer.js | 168 +++++++++++++++++- 4 files changed, 388 insertions(+), 10 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 005727b5e14d4..370565eb8216a 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -499,7 +499,11 @@ module.exports = { DOMHighResTimeStamp: 'readonly', EventListener: 'readonly', Iterable: 'readonly', + AsyncIterable: 'readonly', + $AsyncIterable: 'readonly', + $AsyncIterator: 'readonly', Iterator: 'readonly', + AsyncIterator: 'readonly', JSONValue: 'readonly', JSResourceReference: 'readonly', MouseEventHandler: 'readonly', @@ -520,6 +524,7 @@ module.exports = { React$Portal: 'readonly', React$Ref: 'readonly', ReadableStreamController: 'readonly', + ReadableStreamReader: 'readonly', RequestInfo: 'readonly', RequestOptions: 'readonly', StoreAsGlobal: 'readonly', diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index b4739f4d720e3..c0c92ee31693d 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -42,6 +42,7 @@ import { enableBinaryFlight, enablePostpone, enableRefAsProp, + enableFlightReadableStream, } from 'shared/ReactFeatureFlags'; import { @@ -137,11 +138,18 @@ type ResolvedModuleChunk = { type InitializedChunk = { status: 'fulfilled', value: T, - reason: null, + reason: null | ReadableStreamController, _response: Response, _debugInfo?: null | ReactDebugInfo, then(resolve: (T) => mixed, reject: (mixed) => mixed): void, }; +type InitializedStreamChunk = { + status: 'fulfilled', + value: ReadableStream, + reason: ReadableStreamController, + _response: Response, + then(resolve: (ReadableStream) => mixed, reject: (mixed) => mixed): void, +}; type ErroredChunk = { status: 'rejected', value: null, @@ -312,7 +320,14 @@ function wakeChunkIfInitialized( function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { if (chunk.status !== PENDING && chunk.status !== BLOCKED) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + // $FlowFixMe[incompatible-call]: The error method should accept mixed. + controller.error(error); + } return; } const listeners = chunk.reason; @@ -356,12 +371,30 @@ function createInitializedBufferChunk( return new Chunk(INITIALIZED, value, null, response); } +function createInitializedStreamChunk( + response: Response, + value: ReadableStream, + controller: ReadableStreamController, +): InitializedChunk { + // We use the reason field to stash the controller since we already have that + // field. It's a bit of a hack but efficient. + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(INITIALIZED, value, controller, response); +} + function resolveModelChunk( chunk: SomeChunk, value: UninitializedModel, ): void { if (chunk.status !== PENDING) { - // We already resolved. We didn't expect to see this. + if (enableFlightReadableStream) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + const parsedValue: T = parseModel(chunk._response, value); + controller.enqueue(parsedValue); + } return; } const resolveListeners = chunk.value; @@ -1035,6 +1068,57 @@ function resolveModule( } } +function startReadableStream( + response: Response, + id: number, + type: void | 'bytes', +): void { + const chunks = response._chunks; + let controller: ReadableStreamController = (null: any); + const stream = new ReadableStream({ + type: type, + start(c) { + controller = c; + }, + }); + const chunk = chunks.get(id); + if (!chunk) { + chunks.set(id, createInitializedStreamChunk(response, stream, controller)); + return; + } + if (chunk.status !== PENDING) { + // We already resolved. We didn't expect to see this. + return; + } + const resolveListeners = chunk.value; + const resolvedChunk: InitializedStreamChunk = (chunk: any); + resolvedChunk.status = INITIALIZED; + resolvedChunk.value = stream; + resolvedChunk.reason = controller; + if (resolveListeners !== null) { + wakeChunk(resolveListeners, chunk.value); + } +} + +function startAsyncIterator(response: Response, id: number): void { + // TODO +} + +function startAsyncIterable(response: Response, id: number): void { + // TODO +} + +function stopStream(response: Response, id: number): void { + const chunks = response._chunks; + const chunk = chunks.get(id); + if (!chunk || chunk.status !== INITIALIZED) { + // We didn't expect not to have an existing stream; + } + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.close(); +} + type ErrorWithDigest = Error & {digest?: string}; function resolveErrorProd( response: Response, @@ -1362,6 +1446,41 @@ function processFullRow( 'matching versions on the server and the client.', ); } + case 82 /* "R" */: { + if (enableFlightReadableStream) { + startReadableStream(response, id, undefined); + return; + } + } + // Fallthrough + case 114 /* "r" */: { + if (enableFlightReadableStream) { + startReadableStream(response, id, 'bytes'); + return; + } + } + // Fallthrough + case 88 /* "X" */: { + if (enableFlightReadableStream) { + startAsyncIterable(response, id); + return; + } + } + // Fallthrough + case 120 /* "x" */: { + if (enableFlightReadableStream) { + startAsyncIterator(response, id); + return; + } + } + // Fallthrough + case 67 /* "C" */: { + if (enableFlightReadableStream) { + stopStream(response, id); + return; + } + } + // Fallthrough case 80 /* "P" */: { if (enablePostpone) { if (__DEV__) { @@ -1433,7 +1552,11 @@ export function processBinaryChunk( rowTag = resolvedRowTag; rowState = ROW_LENGTH; i++; - } else if (resolvedRowTag > 64 && resolvedRowTag < 91 /* "A"-"Z" */) { + } else if ( + (resolvedRowTag > 64 && resolvedRowTag < 91) /* "A"-"Z" */ || + resolvedRowTag === 114 /* "r" */ || + resolvedRowTag === 120 /* "x" */ + ) { rowTag = resolvedRowTag; rowState = ROW_CHUNK_BY_NEWLINE; i++; diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js index 1f5034f4bf985..00b83aded58f0 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js @@ -1406,4 +1406,98 @@ describe('ReactFlightDOMBrowser', () => { expect(postponed).toBe('testing postpone'); expect(error).toBe(null); }); + + function passThrough(stream) { + // Simulate more realistic network by splitting up and rejoining some chunks. + // This lets us test that we don't accidentally rely on particular bounds of the chunks. + return new ReadableStream({ + async start(controller) { + const reader = stream.getReader(); + function push() { + reader.read().then(({done, value}) => { + if (done) { + controller.close(); + return; + } + controller.enqueue(value); + push(); + return; + }); + } + push(); + }, + }); + } + + // @gate enableFlightReadableStream + it('should supports streaming ReadableStream with objects', async () => { + const errors = []; + let controller1; + let controller2; + const s1 = new ReadableStream({ + start(c) { + controller1 = c; + }, + }); + const s2 = new ReadableStream({ + start(c) { + controller2 = c; + }, + }); + const rscStream = ReactServerDOMServer.renderToReadableStream( + { + s1, + s2, + }, + {}, + { + onError(x) { + errors.push(x); + return x; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + const reader1 = result.s1.getReader(); + const reader2 = result.s2.getReader(); + + controller1.enqueue({hello: 'world'}); + controller2.enqueue({hi: 'there'}); + expect(await reader1.read()).toEqual({ + value: {hello: 'world'}, + done: false, + }); + expect(await reader2.read()).toEqual({ + value: {hi: 'there'}, + done: false, + }); + + controller1.enqueue('text1'); + controller2.enqueue('text2'); + controller1.close(); + controller2.error('rejected'); + + expect(await reader1.read()).toEqual({ + value: 'text1', + done: false, + }); + expect(await reader1.read()).toEqual({ + value: undefined, + done: true, + }); + expect(await reader2.read()).toEqual({ + value: 'text2', + done: false, + }); + let error = null; + try { + await reader2.read(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('rejected'); + expect(errors).toEqual(['rejected']); + }); }); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 3c8ac8c1469a8..dc4cb5b1e7695 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -20,6 +20,8 @@ import { enableServerComponentLogs, } from 'shared/ReactFeatureFlags'; +import {enableFlightReadableStream} from 'shared/ReactFeatureFlags'; + import { scheduleWork, flushBuffered, @@ -199,6 +201,8 @@ if ( const ObjectPrototype = Object.prototype; +const ASYNC_ITERATOR = Symbol.asyncIterator; + type JSONValue = | string | boolean @@ -236,6 +240,8 @@ export type ReactClientValue = | null | void | bigint + | ReadableStream + | AsyncIterable | Iterable | Array | Map @@ -509,15 +515,151 @@ function serializeThenable( emitErrorChunk(request, newTask.id, digest, reason); } request.abortableTasks.delete(newTask); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + enqueueFlush(request); }, ); return newTask.id; } +function serializeReadableStream( + request: Request, + stream: ReadableStream, +): string { + // Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the + // receiving side. It also implies that different chunks can be split up or merged as opposed + // to a readable stream that happens to have Uint8Array as the type which might expect it to be + // received in the same slices. + // $FlowFixMe: This is a Node.js extension. + let supportsBYOB: void | boolean = stream.supportsBYOB; + if (supportsBYOB === undefined) { + try { + // $FlowFixMe[extra-arg]: This argument is accepted. + stream.getReader({mode: 'byob'}).releaseLock(); + supportsBYOB = true; + } catch (x) { + supportsBYOB = false; + } + } + + const reader = stream.getReader(); + + request.pendingChunks += 2; // Start and Stop rows. + const streamId = request.nextChunkId++; + const startStreamRow = + streamId.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; + request.completedRegularChunks.push(stringToChunk(startStreamRow)); + + function progress(entry: {done: boolean, value: ReactClientValue, ...}) { + if (entry.done) { + const endStreamRow = streamId.toString(16) + ':C\n'; + request.pendingChunks++; + request.completedRegularChunks.push(stringToChunk(endStreamRow)); + enqueueFlush(request); + } else { + try { + const chunkId = outlineModel(request, entry.value); + const processedChunk = encodeReferenceChunk( + request, + streamId, + serializeByValueID(chunkId), + ); + request.pendingChunks++; + request.completedRegularChunks.push(processedChunk); + enqueueFlush(request); + reader.read().then(progress, error); + } catch (x) { + error(x); + } + } + } + function error(reason: mixed) { + if ( + enablePostpone && + typeof reason === 'object' && + reason !== null && + (reason: any).$$typeof === REACT_POSTPONE_TYPE + ) { + const postponeInstance: Postpone = (reason: any); + logPostpone(request, postponeInstance.message); + emitPostponeChunk(request, streamId, postponeInstance); + } else { + const digest = logRecoverableError(request, reason); + emitErrorChunk(request, streamId, digest, reason); + } + enqueueFlush(request); + } + // TODO: Handle cancellation when the Request is aborted. + reader.read().then(progress, error); + return serializeByValueID(streamId); +} + +function serializeAsyncIterable( + request: Request, + iteratable: $AsyncIterable, + iterator: $AsyncIterator, +): string { + // Generators/Iterators are Iterables but they're also their own iterator + // functions. If that's the case, we treat them as single-shot. Otherwise, + // we assume that this iterable might be a multi-shot and allow it to be + // iterated more than once on the client. + const isIterator = iteratable === iterator; + + request.pendingChunks += 2; // Start and Stop rows. + const streamId = request.nextChunkId++; + const startStreamRow = + streamId.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n'; + request.completedRegularChunks.push(stringToChunk(startStreamRow)); + + function progress( + entry: + | {done: false, +value: ReactClientValue, ...} + | {done: true, +value: ReactClientValue, ...}, + ) { + try { + const chunkId = outlineModel(request, entry.value); + const processedChunk = encodeReferenceChunk( + request, + streamId, + serializeByValueID(chunkId), + ); + request.pendingChunks++; + request.completedRegularChunks.push(processedChunk); + enqueueFlush(request); + iterator.next().then(progress, error); + } catch (x) { + error(x); + } + if (entry.done) { + // Unlike streams, the last entry is encoded as a row. Even if it's undefined, + // because it might not be. + const endStreamRow = streamId.toString(16) + ':C\n'; + request.pendingChunks++; + request.completedRegularChunks.push(stringToChunk(endStreamRow)); + enqueueFlush(request); + } + } + function error(reason: mixed) { + if ( + enablePostpone && + typeof reason === 'object' && + reason !== null && + (reason: any).$$typeof === REACT_POSTPONE_TYPE + ) { + const postponeInstance: Postpone = (reason: any); + logPostpone(request, postponeInstance.message); + emitPostponeChunk(request, streamId, postponeInstance); + } else { + const digest = logRecoverableError(request, reason); + emitErrorChunk(request, streamId, digest, reason); + } + enqueueFlush(request); + } + // TODO: Handle cancellation when the Request is aborted. + iterator.next().then(progress, error); + return serializeByValueID(streamId); +} + export function emitHint( request: Request, code: Code, @@ -1265,9 +1407,7 @@ function serializeBlob(request: Request, blob: Blob): string { const digest = logRecoverableError(request, reason); emitErrorChunk(request, newTask.id, digest, reason); request.abortableTasks.delete(newTask); - if (request.destination !== null) { - flushCompletedChunks(request, request.destination); - } + enqueueFlush(request); } // $FlowFixMe[incompatible-call] reader.read().then(progress).catch(error); @@ -1667,6 +1807,22 @@ function renderModelDestructive( return renderFragment(request, task, Array.from((value: any))); } + if (enableFlightReadableStream) { + // TODO: Blob is not available in old Node. Remove the typeof check later. + if ( + typeof ReadableStream === 'function' && + value instanceof ReadableStream + ) { + return serializeReadableStream(request, value); + } + const getAsyncIterator: void | (() => $AsyncIterator) = + (value: any)[ASYNC_ITERATOR]; + if (typeof getAsyncIterator === 'function') { + const asyncIterator = getAsyncIterator.call(value); + return serializeAsyncIterable(request, (value: any), asyncIterator); + } + } + // Verify that this is a simple plain object. const proto = getPrototypeOf(value); if ( From bed74ee3afce1020611b2872607911dfe208e2f5 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Sat, 17 Jun 2023 21:13:46 -0400 Subject: [PATCH 03/11] Propagate a cancel/abort signal to the serialized streams --- .../__tests__/ReactFlightDOMBrowser-test.js | 85 +++++++++++++++++ .../src/__tests__/ReactFlightDOMNode-test.js | 94 +++++++++++++++++++ .../react-server/src/ReactFlightServer.js | 90 +++++++++++++++++- scripts/error-codes/codes.json | 3 +- 4 files changed, 266 insertions(+), 6 deletions(-) diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js index 00b83aded58f0..e4ffebab0901a 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js @@ -1500,4 +1500,89 @@ describe('ReactFlightDOMBrowser', () => { expect(error.digest).toBe('rejected'); expect(errors).toEqual(['rejected']); }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we are cancelled', async () => { + let controller; + let cancelReason; + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + let loggedReason; + const rscStream = ReactServerDOMServer.renderToReadableStream( + s, + {}, + { + onError(reason) { + loggedReason = reason; + }, + }, + ); + const reader = rscStream.getReader(); + controller.enqueue('hi'); + const reason = new Error('aborted'); + reader.cancel(reason); + await reader.read(); + expect(cancelReason).toBe(reason); + expect(loggedReason).toBe(reason); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we abort', async () => { + const errors = []; + let controller; + let cancelReason; + const abortController = new AbortController(); + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + const rscStream = ReactServerDOMServer.renderToReadableStream( + s, + {}, + { + signal: abortController.signal, + onError(x) { + errors.push(x); + return x.message; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + const reader = result.getReader(); + controller.enqueue('hi'); + + await 0; + + const reason = new Error('aborted'); + abortController.abort(reason); + + // We should be able to read the part we already emitted before the abort + expect(await reader.read()).toEqual({ + value: 'hi', + done: false, + }); + + expect(cancelReason).toBe(reason); + + let error = null; + try { + await reader.read(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('aborted'); + expect(errors).toEqual([reason]); + }); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js index 87fc83360018e..df1850896d827 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMNode-test.js @@ -9,6 +9,9 @@ 'use strict'; +global.ReadableStream = + require('web-streams-polyfill/ponyfill/es6').ReadableStream; + // Don't wait before processing work on the server. // TODO: we can replace this with FlightServer.act(). global.setImmediate = cb => cb(); @@ -258,4 +261,95 @@ describe('ReactFlightDOMNode', () => { 'Client Component', ); }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we are cancelled', async () => { + let controller; + let cancelReason; + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + + const rscStream = ReactServerDOMServer.renderToPipeableStream( + s, + {}, + { + onError(error) { + return error.message; + }, + }, + ); + + const writable = new Stream.PassThrough(); + rscStream.pipe(writable); + + controller.enqueue('hi'); + + const reason = new Error('aborted'); + writable.destroy(reason); + + await new Promise(resolve => { + writable.on('error', () => { + resolve(); + }); + }); + + expect(cancelReason.message).toBe( + 'The destination stream errored while writing data.', + ); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying ReadableStream when we abort', async () => { + const errors = []; + let controller; + let cancelReason; + const s = new ReadableStream({ + start(c) { + controller = c; + }, + cancel(r) { + cancelReason = r; + }, + }); + const rscStream = ReactServerDOMServer.renderToPipeableStream( + s, + {}, + { + onError(x) { + errors.push(x); + return x.message; + }, + }, + ); + + const readable = new Stream.PassThrough(); + rscStream.pipe(readable); + + const result = await ReactServerDOMClient.createFromNodeStream(readable, { + moduleMap: {}, + moduleLoading: webpackModuleLoading, + }); + const reader = result.getReader(); + controller.enqueue('hi'); + + const reason = new Error('aborted'); + rscStream.abort(reason); + + expect(cancelReason).toBe(reason); + + let error = null; + try { + await reader.read(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('aborted'); + expect(errors).toEqual([reason]); + }); }); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index dc4cb5b1e7695..67e60c1397380 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -288,6 +288,7 @@ export type Request = { nextChunkId: number, pendingChunks: number, hints: Hints, + abortListeners: Set<(reason: mixed) => void>, abortableTasks: Set, pingedTasks: Array, completedImportChunks: Array, @@ -384,6 +385,7 @@ export function createRequest( nextChunkId: 0, pendingChunks: 0, hints, + abortListeners: new Set(), abortableTasks: abortSet, pingedTasks: pingedTasks, completedImportChunks: ([]: Array), @@ -550,12 +552,21 @@ function serializeReadableStream( streamId.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); + // There's a race condition between when the stream is aborted and when the promise + // resolves so we track whether we already aborted it to avoid writing twice. + let aborted = false; function progress(entry: {done: boolean, value: ReactClientValue, ...}) { + if (aborted) { + return; + } + if (entry.done) { + request.abortListeners.delete(error); const endStreamRow = streamId.toString(16) + ':C\n'; request.pendingChunks++; request.completedRegularChunks.push(stringToChunk(endStreamRow)); enqueueFlush(request); + aborted = true; } else { try { const chunkId = outlineModel(request, entry.value); @@ -574,6 +585,11 @@ function serializeReadableStream( } } function error(reason: mixed) { + if (aborted) { + return; + } + aborted = true; + request.abortListeners.delete(error); if ( enablePostpone && typeof reason === 'object' && @@ -588,8 +604,10 @@ function serializeReadableStream( emitErrorChunk(request, streamId, digest, reason); } enqueueFlush(request); + // $FlowFixMe should be able to pass mixed + reader.cancel(reason).then(error, error); } - // TODO: Handle cancellation when the Request is aborted. + request.abortListeners.add(error); reader.read().then(progress, error); return serializeByValueID(streamId); } @@ -611,11 +629,18 @@ function serializeAsyncIterable( streamId.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); + // There's a race condition between when the stream is aborted and when the promise + // resolves so we track whether we already aborted it to avoid writing twice. + let aborted = false; function progress( entry: | {done: false, +value: ReactClientValue, ...} | {done: true, +value: ReactClientValue, ...}, ) { + if (aborted) { + return; + } + try { const chunkId = outlineModel(request, entry.value); const processedChunk = encodeReferenceChunk( @@ -626,20 +651,29 @@ function serializeAsyncIterable( request.pendingChunks++; request.completedRegularChunks.push(processedChunk); enqueueFlush(request); - iterator.next().then(progress, error); } catch (x) { error(x); + return; } if (entry.done) { // Unlike streams, the last entry is encoded as a row. Even if it's undefined, // because it might not be. + request.abortListeners.delete(error); const endStreamRow = streamId.toString(16) + ':C\n'; request.pendingChunks++; request.completedRegularChunks.push(stringToChunk(endStreamRow)); enqueueFlush(request); + aborted = true; + } else { + iterator.next().then(progress, error); } } function error(reason: mixed) { + if (aborted) { + return; + } + aborted = true; + request.abortListeners.delete(error); if ( enablePostpone && typeof reason === 'object' && @@ -654,8 +688,13 @@ function serializeAsyncIterable( emitErrorChunk(request, streamId, digest, reason); } enqueueFlush(request); + if (typeof (iterator: any).throw === 'function') { + // The iterator protocol doesn't necessarily include this but a generator do. + // $FlowFixMe should be able to pass mixed + iterator.throw(reason).then(error, error); + } } - // TODO: Handle cancellation when the Request is aborted. + request.abortListeners.add(error); iterator.next().then(progress, error); return serializeByValueID(streamId); } @@ -1390,10 +1429,16 @@ function serializeBlob(request: Request, blob: Blob): string { const reader = blob.stream().getReader(); + let aborted = false; function progress( entry: {done: false, value: Uint8Array} | {done: true, value: void}, ): Promise | void { + if (aborted) { + return; + } if (entry.done) { + request.abortListeners.delete(error); + aborted = true; pingTask(request, newTask); return; } @@ -1404,11 +1449,21 @@ function serializeBlob(request: Request, blob: Blob): string { } function error(reason: mixed) { + if (aborted) { + return; + } + aborted = true; + request.abortListeners.delete(error); const digest = logRecoverableError(request, reason); emitErrorChunk(request, newTask.id, digest, reason); request.abortableTasks.delete(newTask); enqueueFlush(request); + // $FlowFixMe should be able to pass mixed + reader.cancel(reason).then(error, error); } + + request.abortListeners.add(error); + // $FlowFixMe[incompatible-call] reader.read().then(progress).catch(error); @@ -2759,6 +2814,7 @@ function flushCompletedChunks( cleanupTaintQueue(request); } close(destination); + request.destination = null; } } @@ -2816,9 +2872,9 @@ export function stopFlowing(request: Request): void { export function abort(request: Request, reason: mixed): void { try { const abortableTasks = request.abortableTasks; + // We have tasks to abort. We'll emit one error row and then emit a reference + // to that row from every row that's still remaining. if (abortableTasks.size > 0) { - // We have tasks to abort. We'll emit one error row and then emit a reference - // to that row from every row that's still remaining. request.pendingChunks++; const errorId = request.nextChunkId++; if ( @@ -2843,6 +2899,30 @@ export function abort(request: Request, reason: mixed): void { abortableTasks.forEach(task => abortTask(task, request, errorId)); abortableTasks.clear(); } + const abortListeners = request.abortListeners; + if (abortListeners.size > 0) { + let error; + if ( + enablePostpone && + typeof reason === 'object' && + reason !== null && + (reason: any).$$typeof === REACT_POSTPONE_TYPE + ) { + // We aborted with a Postpone but since we're passing this to an + // external handler, passing this object would leak it outside React. + // We create an alternative reason for it instead. + error = new Error('The render was aborted due to being postponed.'); + } else { + error = + reason === undefined + ? new Error( + 'The render was aborted by the server without a reason.', + ) + : reason; + } + abortListeners.forEach(callback => callback(error)); + abortListeners.clear(); + } if (request.destination !== null) { flushCompletedChunks(request, request.destination); } diff --git a/scripts/error-codes/codes.json b/scripts/error-codes/codes.json index 2ccd0ee9f9a3e..da438e8403ef0 100644 --- a/scripts/error-codes/codes.json +++ b/scripts/error-codes/codes.json @@ -507,5 +507,6 @@ "519": "Hydration Mismatch Exception: This is not a real error, and should not leak into userspace. If you're seeing this, it's likely a bug in React.", "520": "There was an error during concurrent rendering but React was able to recover by instead synchronously rendering the entire root.", "521": "flushSyncWork should not be called from builds that support legacy mode. This is a bug in React.", - "522": "Invalid form element. requestFormReset must be passed a form that was rendered by React." + "522": "Invalid form element. requestFormReset must be passed a form that was rendered by React.", + "523": "The render was aborted due to being postponed." } From 65f017270ff4a1c2278fe1f350c68e819dd11088 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Thu, 11 Apr 2024 23:32:03 -0400 Subject: [PATCH 04/11] Implement Consuming AsyncIterables --- .../react-client/src/ReactFlightClient.js | 194 ++++++++++++++--- .../__tests__/ReactFlightDOMBrowser-test.js | 202 ++++++++++++++++++ .../react-server/src/ReactFlightServer.js | 51 +++-- scripts/error-codes/codes.json | 3 +- 4 files changed, 401 insertions(+), 49 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index c0c92ee31693d..a30c1fe35d1b3 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -69,6 +69,12 @@ import { export type {CallServerCallback, EncodeFormActionCallback}; +interface StreamController { + close(returnValue: any): void; + enqueue(value: any): void; + error(error: Error): void; +} + type UninitializedModel = string; export type JSONValue = @@ -138,15 +144,17 @@ type ResolvedModuleChunk = { type InitializedChunk = { status: 'fulfilled', value: T, - reason: null | ReadableStreamController, + reason: null | StreamController, _response: Response, _debugInfo?: null | ReactDebugInfo, then(resolve: (T) => mixed, reject: (mixed) => mixed): void, }; -type InitializedStreamChunk = { +type InitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +> = { status: 'fulfilled', - value: ReadableStream, - reason: ReadableStreamController, + value: T, + reason: StreamController, _response: Response, then(resolve: (ReadableStream) => mixed, reject: (mixed) => mixed): void, }; @@ -323,7 +331,7 @@ function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { if (enableFlightReadableStream) { // If we get more data to an already resolved ID, we assume that it's // a stream chunk since any other row shouldn't have more than one entry. - const streamChunk: InitializedStreamChunk = (chunk: any); + const streamChunk: InitializedStreamChunk = (chunk: any); const controller = streamChunk.reason; // $FlowFixMe[incompatible-call]: The error method should accept mixed. controller.error(error); @@ -371,11 +379,13 @@ function createInitializedBufferChunk( return new Chunk(INITIALIZED, value, null, response); } -function createInitializedStreamChunk( +function createInitializedStreamChunk< + T: ReadableStream | $AsyncIterable, +>( response: Response, - value: ReadableStream, - controller: ReadableStreamController, -): InitializedChunk { + value: T, + controller: StreamController, +): InitializedChunk { // We use the reason field to stash the controller since we already have that // field. It's a bit of a hack but efficient. // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors @@ -390,8 +400,9 @@ function resolveModelChunk( if (enableFlightReadableStream) { // If we get more data to an already resolved ID, we assume that it's // a stream chunk since any other row shouldn't have more than one entry. - const streamChunk: InitializedStreamChunk = (chunk: any); + const streamChunk: InitializedStreamChunk = (chunk: any); const controller = streamChunk.reason; + // TODO: This model might get blocked. const parsedValue: T = parseModel(chunk._response, value); controller.enqueue(parsedValue); } @@ -1068,19 +1079,13 @@ function resolveModule( } } -function startReadableStream( +function resolveStream>( response: Response, id: number, - type: void | 'bytes', + stream: T, + controller: StreamController, ): void { const chunks = response._chunks; - let controller: ReadableStreamController = (null: any); - const stream = new ReadableStream({ - type: type, - start(c) { - controller = c; - }, - }); const chunk = chunks.get(id); if (!chunk) { chunks.set(id, createInitializedStreamChunk(response, stream, controller)); @@ -1091,7 +1096,7 @@ function startReadableStream( return; } const resolveListeners = chunk.value; - const resolvedChunk: InitializedStreamChunk = (chunk: any); + const resolvedChunk: InitializedStreamChunk = (chunk: any); resolvedChunk.status = INITIALIZED; resolvedChunk.value = stream; resolvedChunk.reason = controller; @@ -1100,23 +1105,152 @@ function startReadableStream( } } -function startAsyncIterator(response: Response, id: number): void { - // TODO +function startReadableStream( + response: Response, + id: number, + type: void | 'bytes', +): void { + let controller: StreamController = (null: any); + const stream = new ReadableStream({ + type: type, + start(c) { + controller = c; + }, + }); + resolveStream(response, id, stream, controller); } -function startAsyncIterable(response: Response, id: number): void { - // TODO +type IteratorEntry = {done: false, value: T} | {done: true, value: T}; + +function asyncIterator(this: $AsyncIterator) { + // Self referencing iterator. + return this; } -function stopStream(response: Response, id: number): void { +function createIterator( + next: (arg: void) => SomeChunk>, +): $AsyncIterator { + const iterator: any = { + next: next, + // TODO: Add return/throw as options for aborting. + }; + // TODO: The iterator could inherit the AsyncIterator prototype which is not exposed as + // a global but exists as a prototype of an AsyncGenerator. However, it's not needed + // to satisfy the iterable protocol. + (iterator: any)[Symbol.asyncIterator] = asyncIterator; + return iterator; +} + +function initializeIteratorEntry( + chunk: SomeChunk>, + value: T, + done: boolean, +): void { + const entry: IteratorEntry = ({done, value}: any); + const pendingChunk: PendingChunk> = (chunk: any); + const resolveListeners = pendingChunk.value; + const initializedChunk: InitializedChunk> = + (pendingChunk: any); + initializedChunk.status = INITIALIZED; + initializedChunk.value = entry; + if (resolveListeners !== null) { + wakeChunk(resolveListeners, entry); + } +} + +function startAsyncIterable( + response: Response, + id: number, + iterator: boolean, +): void { + const buffer: Array>> = []; + let closed = false; + let nextWriteIndex = 0; + const controller: StreamController = { + enqueue(value: any): void { + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createPendingChunk>(response); + } + initializeIteratorEntry(buffer[nextWriteIndex++], value, false); + }, + close(value: T): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createPendingChunk>(response); + } + initializeIteratorEntry(buffer[nextWriteIndex++], value, true); + while (nextWriteIndex < buffer.length) { + // In generators, any extra reads from the iterator have the value undefined. + initializeIteratorEntry( + buffer[nextWriteIndex++], + (undefined: any), + true, + ); + } + }, + error(error: Error): void { + closed = true; + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createPendingChunk>(response); + } + while (nextWriteIndex < buffer.length) { + triggerErrorOnChunk(buffer[nextWriteIndex++], error); + } + }, + }; + const iterable: $AsyncIterable = { + [Symbol.asyncIterator](): $AsyncIterator { + let nextReadIndex = 0; + return createIterator(arg => { + if (arg !== undefined) { + throw new Error( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + } + if (nextReadIndex === buffer.length) { + if (closed) { + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk( + INITIALIZED, + {done: true, value: undefined}, + null, + response, + ); + } + buffer[nextReadIndex] = + createPendingChunk>(response); + } + return buffer[nextReadIndex++]; + }); + }, + }; + // TODO: If it's a single shot iterator we can optimize memory by cleaning up the buffer after + // reading through the end, but currently we favor code size over this optimization. + resolveStream( + response, + id, + iterator ? iterable[Symbol.asyncIterator]() : iterable, + controller, + ); +} + +function stopStream( + response: Response, + id: number, + row: UninitializedModel, +): void { const chunks = response._chunks; const chunk = chunks.get(id); if (!chunk || chunk.status !== INITIALIZED) { // We didn't expect not to have an existing stream; + return; } - const streamChunk: InitializedStreamChunk = (chunk: any); + const streamChunk: InitializedStreamChunk = (chunk: any); const controller = streamChunk.reason; - controller.close(); + // TODO: This model might get blocked. + const parsedValue: any = + row === '' ? undefined : parseModel(chunk._response, row); + controller.close(parsedValue); } type ErrorWithDigest = Error & {digest?: string}; @@ -1462,21 +1596,21 @@ function processFullRow( // Fallthrough case 88 /* "X" */: { if (enableFlightReadableStream) { - startAsyncIterable(response, id); + startAsyncIterable(response, id, false); return; } } // Fallthrough case 120 /* "x" */: { if (enableFlightReadableStream) { - startAsyncIterator(response, id); + startAsyncIterable(response, id, true); return; } } // Fallthrough case 67 /* "C" */: { if (enableFlightReadableStream) { - stopStream(response, id); + stopStream(response, id, row); return; } } diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js index e4ffebab0901a..c92f7799bb6e8 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMBrowser-test.js @@ -1585,4 +1585,206 @@ describe('ReactFlightDOMBrowser', () => { expect(error.digest).toBe('aborted'); expect(errors).toEqual([reason]); }); + + // @gate enableFlightReadableStream + it('should supports streaming AsyncIterables with objects', async () => { + let resolve; + const wait = new Promise(r => (resolve = r)); + const errors = []; + const multiShotIterable = { + async *[Symbol.asyncIterator]() { + const next = yield {hello: 'A'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'B'}; + return 'C'; + }, + }; + const singleShotIterator = (async function* () { + const next = yield {hello: 'D'}; + expect(next).toBe(undefined); + await wait; + yield {hi: 'E'}; + // eslint-disable-next-line no-throw-literal + throw 'F'; + })(); + + const rscStream = ReactServerDOMServer.renderToReadableStream( + { + multiShotIterable, + singleShotIterator, + }, + {}, + { + onError(x) { + errors.push(x); + return x; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + + const iterator1 = result.multiShotIterable[Symbol.asyncIterator](); + const iterator2 = result.singleShotIterator[Symbol.asyncIterator](); + + expect(iterator1).not.toBe(result.multiShotIterable); + expect(iterator2).toBe(result.singleShotIterator); + + expect(await iterator1.next()).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hello: 'D'}, + done: false, + }); + + await resolve(); + + expect(await iterator1.next()).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(await iterator2.next()).toEqual({ + value: {hi: 'E'}, + done: false, + }); + expect(await iterator1.next()).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(await iterator1.next()).toEqual({ + value: undefined, + done: true, + }); + + let error = null; + try { + await iterator2.next(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('F'); + expect(errors).toEqual(['F']); + + // Multi-shot iterables should be able to do the same thing again + const iterator3 = result.multiShotIterable[Symbol.asyncIterator](); + + expect(iterator3).not.toBe(iterator1); + + // We should be able to iterate over the iterable again and it should be + // synchronously available using instrumented promises so that React can + // rerender it synchronously. + expect(iterator3.next().value).toEqual({ + value: {hello: 'A'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: {hi: 'B'}, + done: false, + }); + expect(iterator3.next().value).toEqual({ + value: 'C', // Return value + done: true, + }); + expect(iterator3.next().value).toEqual({ + value: undefined, + done: true, + }); + + expect(() => iterator3.next('this is not allowed')).toThrow( + 'Values cannot be passed to next() of AsyncIterables passed to Client Components.', + ); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying AsyncIterable when we are cancelled', async () => { + let resolve; + const wait = new Promise(r => (resolve = r)); + let thrownReason; + const iterator = (async function* () { + try { + await wait; + yield 'a'; + yield 'b'; + } catch (x) { + thrownReason = x; + } + yield 'c'; + })(); + let loggedReason; + const rscStream = ReactServerDOMServer.renderToReadableStream( + iterator, + {}, + { + onError(reason) { + loggedReason = reason; + }, + }, + ); + const reader = rscStream.getReader(); + const reason = new Error('aborted'); + reader.cancel(reason); + await resolve(); + await reader.read(); + expect(thrownReason).toBe(reason); + expect(loggedReason).toBe(reason); + }); + + // @gate enableFlightReadableStream + it('should cancels the underlying AsyncIterable when we abort', async () => { + const errors = []; + const abortController = new AbortController(); + let resolve; + const wait = new Promise(r => (resolve = r)); + let thrownReason; + const iterator = (async function* () { + try { + yield 'a'; + await wait; + yield 'b'; + } catch (x) { + thrownReason = x; + } + yield 'c'; + })(); + const rscStream = ReactServerDOMServer.renderToReadableStream( + iterator, + {}, + { + signal: abortController.signal, + onError(x) { + errors.push(x); + return x.message; + }, + }, + ); + const result = await ReactServerDOMClient.createFromReadableStream( + passThrough(rscStream), + ); + + const reason = new Error('aborted'); + abortController.abort(reason); + + await resolve(); + + // We should be able to read the part we already emitted before the abort + expect(await result.next()).toEqual({ + value: 'a', + done: false, + }); + + expect(thrownReason).toBe(reason); + + let error = null; + try { + await result.next(); + } catch (x) { + error = x; + } + expect(error.digest).toBe('aborted'); + expect(errors).toEqual([reason]); + }); }); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 67e60c1397380..c1d07d371dec2 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -641,31 +641,46 @@ function serializeAsyncIterable( return; } - try { - const chunkId = outlineModel(request, entry.value); - const processedChunk = encodeReferenceChunk( - request, - streamId, - serializeByValueID(chunkId), - ); - request.pendingChunks++; - request.completedRegularChunks.push(processedChunk); - enqueueFlush(request); - } catch (x) { - error(x); - return; - } if (entry.done) { - // Unlike streams, the last entry is encoded as a row. Even if it's undefined, - // because it might not be. request.abortListeners.delete(error); - const endStreamRow = streamId.toString(16) + ':C\n'; + let endStreamRow; + if (entry.value === undefined) { + endStreamRow = streamId.toString(16) + ':C\n'; + } else { + // Unlike streams, the last value may not be undefined. If it's not + // we outline it and encode a reference to it in the closing instruction. + try { + const chunkId = outlineModel(request, entry.value); + endStreamRow = + streamId.toString(16) + + ':C' + + stringify(serializeByValueID(chunkId)) + + '\n'; + } catch (x) { + error(x); + return; + } + } request.pendingChunks++; request.completedRegularChunks.push(stringToChunk(endStreamRow)); enqueueFlush(request); aborted = true; } else { - iterator.next().then(progress, error); + try { + const chunkId = outlineModel(request, entry.value); + const processedChunk = encodeReferenceChunk( + request, + streamId, + serializeByValueID(chunkId), + ); + request.pendingChunks++; + request.completedRegularChunks.push(processedChunk); + enqueueFlush(request); + iterator.next().then(progress, error); + } catch (x) { + error(x); + return; + } } } function error(reason: mixed) { diff --git a/scripts/error-codes/codes.json b/scripts/error-codes/codes.json index da438e8403ef0..1fef8eb97413f 100644 --- a/scripts/error-codes/codes.json +++ b/scripts/error-codes/codes.json @@ -508,5 +508,6 @@ "520": "There was an error during concurrent rendering but React was able to recover by instead synchronously rendering the entire root.", "521": "flushSyncWork should not be called from builds that support legacy mode. This is a bug in React.", "522": "Invalid form element. requestFormReset must be passed a form that was rendered by React.", - "523": "The render was aborted due to being postponed." + "523": "The render was aborted due to being postponed.", + "524": "Values cannot be passed to next() of AsyncIterables passed to Client Components." } From 105fa03747585cdc399b9312c882500cb734164b Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Thu, 11 Apr 2024 23:44:19 -0400 Subject: [PATCH 05/11] Add a wrapper fragment if an AsyncIterable needs to be keyed This is consistent with other Iterables. --- .../src/__tests__/ReactFlight-test.js | 141 ++++++++++++++++++ .../react-server/src/ReactFlightServer.js | 70 ++++++++- 2 files changed, 208 insertions(+), 3 deletions(-) diff --git a/packages/react-client/src/__tests__/ReactFlight-test.js b/packages/react-client/src/__tests__/ReactFlight-test.js index 2164ca0b2781a..0052f73c4ac6a 100644 --- a/packages/react-client/src/__tests__/ReactFlight-test.js +++ b/packages/react-client/src/__tests__/ReactFlight-test.js @@ -2068,6 +2068,147 @@ describe('ReactFlight', () => { expect(ReactNoop).toMatchRenderedOutput(
Ba
); }); + it('shares state when moving keyed Server Components that render fragments', async () => { + function StatefulClient({name, initial}) { + const [state] = React.useState(initial); + return {state}; + } + const Stateful = clientReference(StatefulClient); + + function ServerComponent({item, initial}) { + return [ + , + , + ]; + } + + const transport = ReactNoopFlightServer.render( +
+ + +
, + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a1 + b1 + a2 + b2 +
, + ); + + // We swap the Server Components and the state of each child inside each fragment should move. + // Really the Fragment itself moves. + const transport2 = ReactNoopFlightServer.render( +
+ + +
, + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport2)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a2 + b2 + a1 + b1 +
, + ); + }); + + it('shares state when moving keyed Server Components that render async iterables', async () => { + function StatefulClient({name, initial}) { + const [state] = React.useState(initial); + return {state}; + } + const Stateful = clientReference(StatefulClient); + + function ServerComponent({item, initial}) { + // While the ServerComponent itself could be an async generator, single-shot iterables + // are not supported as React children since React might need to re-map them based on + // state updates. So we create an AsyncIterable instead. + return { + async *[Symbol.asyncIterator]() { + yield ; + yield ; + }, + }; + } + + function ListClient({children}) { + // TODO: Unwrap AsyncIterables natively in React. For now we do it in this wrapper. + let resolvedChildren = []; + for (let fragment of children) { + // We should've wrapped each child in a keyed Fragment. + expect(fragment.type).toBe(React.Fragment); + let fragmentChildren = []; + const iterator = fragment.props.children[Symbol.asyncIterator](); + for (let entry; !(entry = React.use(iterator.next())).done; ) { + fragmentChildren.push(entry.value); + } + resolvedChildren.push( + + {fragmentChildren} + , + ); + } + return
{resolvedChildren}
; + } + + const List = clientReference(ListClient); + + const transport = ReactNoopFlightServer.render( + + + + , + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a1 + b1 + a2 + b2 +
, + ); + + // We swap the Server Components and the state of each child inside each fragment should move. + // Really the Fragment itself moves. + const transport2 = ReactNoopFlightServer.render( + + + + , + ); + + await act(async () => { + ReactNoop.render(await ReactNoopFlightClient.read(transport2)); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ a2 + b2 + a1 + b1 +
, + ); + }); + it('preserves debug info for server-to-server pass through', async () => { function ThirdPartyLazyComponent() { return !; diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index c1d07d371dec2..154653c88eb44 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -885,7 +885,8 @@ function renderFunctionComponent( function renderFragment( request: Request, task: Task, - children: $ReadOnlyArray, + children: + | $ReadOnlyArray ): ReactJSONValue { if (__DEV__) { const debugInfo: ?ReactDebugInfo = (children: any)._debugInfo; @@ -899,6 +900,7 @@ function renderFragment( // Forward any debug info we have the first time we see it. // We do this after init so that we have received all the debug info // from the server by the time we emit it. + // TODO: We might see this fragment twice if it ends up wrapped below. forwardDebugInfo(request, debugID, debugInfo); } } @@ -940,6 +942,68 @@ function renderFragment( return children; } +function renderAsyncFragment( + request: Request, + task: Task, + children: $AsyncIterable, + getAsyncIterator: () => $AsyncIterator +): ReactJSONValue { + if (__DEV__) { + const debugInfo: ?ReactDebugInfo = (children: any)._debugInfo; + if (debugInfo) { + // If this came from Flight, forward any debug info into this new row. + if (debugID === null) { + // We don't have a chunk to assign debug info. We need to outline this + // component to assign it an ID. + return outlineTask(request, task); + } else { + // Forward any debug info we have the first time we see it. + // We do this after init so that we have received all the debug info + // from the server by the time we emit it. + // TODO: We might see this fragment twice if it ends up wrapped below. + forwardDebugInfo(request, debugID, debugInfo); + } + } + } + if (!enableServerComponentKeys) { + const asyncIterator = getAsyncIterator.call(children); + return serializeAsyncIterable(request, children, asyncIterator); + } + if (task.keyPath !== null) { + // We have a Server Component that specifies a key but we're now splitting + // the tree using a fragment. + const fragment = [ + REACT_ELEMENT_TYPE, + REACT_FRAGMENT_TYPE, + task.keyPath, + {children}, + ]; + if (!task.implicitSlot) { + // If this was keyed inside a set. I.e. the outer Server Component was keyed + // then we need to handle reorders of the whole set. To do this we need to wrap + // this array in a keyed Fragment. + return fragment; + } + // If the outer Server Component was implicit but then an inner one had a key + // we don't actually need to be able to move the whole set around. It'll always be + // in an implicit slot. The key only exists to be able to reset the state of the + // children. We could achieve the same effect by passing on the keyPath to the next + // set of components inside the fragment. This would also allow a keyless fragment + // reconcile against a single child. + // Unfortunately because of JSON.stringify, we can't call the recursive loop for + // each child within this context because we can't return a set with already resolved + // values. E.g. a string would get double encoded. Returning would pop the context. + // So instead, we wrap it with an unkeyed fragment and inner keyed fragment. + return [fragment]; + } + // Since we're yielding here, that implicitly resets the keyPath context on the + // way up. Which is what we want since we've consumed it. If this changes to + // be recursive serialization, we need to reset the keyPath and implicitSlot, + // before recursing here. + const asyncIterator = getAsyncIterator.call(children); + return serializeAsyncIterable(request, children, asyncIterator); +} + function renderClientElement( task: Task, type: any, @@ -1888,8 +1952,8 @@ function renderModelDestructive( const getAsyncIterator: void | (() => $AsyncIterator) = (value: any)[ASYNC_ITERATOR]; if (typeof getAsyncIterator === 'function') { - const asyncIterator = getAsyncIterator.call(value); - return serializeAsyncIterable(request, (value: any), asyncIterator); + // We treat AsyncIterables as a Fragment and as such we might need to key them. + return renderAsyncFragment(request, task, (value: any), getAsyncIterator); } } From e12febf7bc763fd888371088dedf98b05768b6bf Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Sat, 13 Apr 2024 19:25:05 -0400 Subject: [PATCH 06/11] Port sync resolution in thenables which are preinstrumented like Flight This was already done in Fiber but not in Fizz/Flight. --- .../react-server/src/ReactFizzThenable.js | 23 +++++++++++-------- .../react-server/src/ReactFlightThenable.js | 23 +++++++++++-------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/packages/react-server/src/ReactFizzThenable.js b/packages/react-server/src/ReactFizzThenable.js index b863d8b6f9f99..1991b9de1ca61 100644 --- a/packages/react-server/src/ReactFizzThenable.js +++ b/packages/react-server/src/ReactFizzThenable.js @@ -82,6 +82,9 @@ export function trackUsedThenable( // Only instrument the thenable if the status if not defined. If // it's defined, but an unknown value, assume it's been instrumented by // some custom userspace implementation. We treat it as "pending". + // Attach a dummy listener, to ensure that any lazy initialization can + // happen. Flight lazily parses JSON when the value is actually awaited. + thenable.then(noop, noop); } else { const pendingThenable: PendingThenable = (thenable: any); pendingThenable.status = 'pending'; @@ -101,17 +104,17 @@ export function trackUsedThenable( } }, ); + } - // Check one more time in case the thenable resolved synchronously - switch (thenable.status) { - case 'fulfilled': { - const fulfilledThenable: FulfilledThenable = (thenable: any); - return fulfilledThenable.value; - } - case 'rejected': { - const rejectedThenable: RejectedThenable = (thenable: any); - throw rejectedThenable.reason; - } + // Check one more time in case the thenable resolved synchronously + switch (thenable.status) { + case 'fulfilled': { + const fulfilledThenable: FulfilledThenable = (thenable: any); + return fulfilledThenable.value; + } + case 'rejected': { + const rejectedThenable: RejectedThenable = (thenable: any); + throw rejectedThenable.reason; } } diff --git a/packages/react-server/src/ReactFlightThenable.js b/packages/react-server/src/ReactFlightThenable.js index 852c13b2be4e4..cfda818f19ffc 100644 --- a/packages/react-server/src/ReactFlightThenable.js +++ b/packages/react-server/src/ReactFlightThenable.js @@ -82,6 +82,9 @@ export function trackUsedThenable( // Only instrument the thenable if the status if not defined. If // it's defined, but an unknown value, assume it's been instrumented by // some custom userspace implementation. We treat it as "pending". + // Attach a dummy listener, to ensure that any lazy initialization can + // happen. Flight lazily parses JSON when the value is actually awaited. + thenable.then(noop, noop); } else { const pendingThenable: PendingThenable = (thenable: any); pendingThenable.status = 'pending'; @@ -101,17 +104,17 @@ export function trackUsedThenable( } }, ); + } - // Check one more time in case the thenable resolved synchronously - switch (thenable.status) { - case 'fulfilled': { - const fulfilledThenable: FulfilledThenable = (thenable: any); - return fulfilledThenable.value; - } - case 'rejected': { - const rejectedThenable: RejectedThenable = (thenable: any); - throw rejectedThenable.reason; - } + // Check one more time in case the thenable resolved synchronously + switch (thenable.status) { + case 'fulfilled': { + const fulfilledThenable: FulfilledThenable = (thenable: any); + return fulfilledThenable.value; + } + case 'rejected': { + const rejectedThenable: RejectedThenable = (thenable: any); + throw rejectedThenable.reason; } } From f77a17192ca96c2cf8afd5b259837ccdc5cbb52b Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Sat, 13 Apr 2024 19:26:57 -0400 Subject: [PATCH 07/11] Forward debug info to async iterables --- .../react-client/src/ReactFlightClient.js | 9 +- .../src/__tests__/ReactFlight-test.js | 124 +++++++++++++++++- .../react-server/src/ReactFlightServer.js | 74 +++++------ 3 files changed, 158 insertions(+), 49 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index a30c1fe35d1b3..1be8b83821ec7 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -729,6 +729,7 @@ function getOutlinedModel( typeof chunkValue === 'object' && chunkValue !== null && (Array.isArray(chunkValue) || + typeof chunkValue[ASYNC_ITERATOR] === 'function' || chunkValue.$$typeof === REACT_ELEMENT_TYPE) && !chunkValue._debugInfo ) { @@ -1122,6 +1123,8 @@ function startReadableStream( type IteratorEntry = {done: false, value: T} | {done: true, value: T}; +const ASYNC_ITERATOR = Symbol.asyncIterator; + function asyncIterator(this: $AsyncIterator) { // Self referencing iterator. return this; @@ -1137,7 +1140,7 @@ function createIterator( // TODO: The iterator could inherit the AsyncIterator prototype which is not exposed as // a global but exists as a prototype of an AsyncGenerator. However, it's not needed // to satisfy the iterable protocol. - (iterator: any)[Symbol.asyncIterator] = asyncIterator; + (iterator: any)[ASYNC_ITERATOR] = asyncIterator; return iterator; } @@ -1199,7 +1202,7 @@ function startAsyncIterable( }, }; const iterable: $AsyncIterable = { - [Symbol.asyncIterator](): $AsyncIterator { + [ASYNC_ITERATOR](): $AsyncIterator { let nextReadIndex = 0; return createIterator(arg => { if (arg !== undefined) { @@ -1229,7 +1232,7 @@ function startAsyncIterable( resolveStream( response, id, - iterator ? iterable[Symbol.asyncIterator]() : iterable, + iterator ? iterable[ASYNC_ITERATOR]() : iterable, controller, ); } diff --git a/packages/react-client/src/__tests__/ReactFlight-test.js b/packages/react-client/src/__tests__/ReactFlight-test.js index 0052f73c4ac6a..5e876d41131fd 100644 --- a/packages/react-client/src/__tests__/ReactFlight-test.js +++ b/packages/react-client/src/__tests__/ReactFlight-test.js @@ -2125,6 +2125,7 @@ describe('ReactFlight', () => { ); }); + // @gate enableFlightReadableStream it('shares state when moving keyed Server Components that render async iterables', async () => { function StatefulClient({name, initial}) { const [state] = React.useState(initial); @@ -2146,11 +2147,12 @@ describe('ReactFlight', () => { function ListClient({children}) { // TODO: Unwrap AsyncIterables natively in React. For now we do it in this wrapper. - let resolvedChildren = []; - for (let fragment of children) { + const resolvedChildren = []; + // eslint-disable-next-line no-for-of-loops/no-for-of-loops + for (const fragment of children) { // We should've wrapped each child in a keyed Fragment. expect(fragment.type).toBe(React.Fragment); - let fragmentChildren = []; + const fragmentChildren = []; const iterator = fragment.props.children[Symbol.asyncIterator](); for (let entry; !(entry = React.use(iterator.next())).done; ) { fragmentChildren.push(entry.value); @@ -2222,6 +2224,10 @@ describe('ReactFlight', () => { return stranger; } + function ThirdPartyFragmentComponent() { + return [Who, ' ', dis?]; + } + function ServerComponent({transport}) { // This is a Server Component that receives other Server Components from a third party. const children = ReactNoopFlightClient.read(transport); @@ -2231,7 +2237,7 @@ describe('ReactFlight', () => { const promiseComponent = Promise.resolve(); const thirdPartyTransport = ReactNoopFlightServer.render( - [promiseComponent, lazy], + [promiseComponent, lazy, ], { environmentName: 'third-party', }, @@ -2264,6 +2270,17 @@ describe('ReactFlight', () => { ? [{name: 'ThirdPartyLazyComponent', env: 'third-party', owner: null}] : undefined, ); + expect(thirdPartyChildren[2]._debugInfo).toEqual( + __DEV__ + ? [ + { + name: 'ThirdPartyFragmentComponent', + env: 'third-party', + owner: null, + }, + ] + : undefined, + ); ReactNoop.render(result); }); @@ -2271,6 +2288,105 @@ describe('ReactFlight', () => {
Hello, stranger ! + Who dis? +
, + ); + }); + + // @gate enableFlightReadableStream + it('preserves debug info for server-to-server pass through of async iterables', async () => { + let resolve; + const iteratorPromise = new Promise(r => (resolve = r)); + + function ThirdPartyAsyncIterableComponent({item, initial}) { + // While the ServerComponent itself could be an async generator, single-shot iterables + // are not supported as React children since React might need to re-map them based on + // state updates. So we create an AsyncIterable instead. + return { + async *[Symbol.asyncIterator]() { + yield Who; + yield ' '; + yield dis?; + resolve(); + }, + }; + } + + function ListClient({children: fragment}) { + // TODO: Unwrap AsyncIterables natively in React. For now we do it in this wrapper. + const resolvedChildren = []; + const iterator = fragment.props.children[Symbol.asyncIterator](); + for (let entry; !(entry = React.use(iterator.next())).done; ) { + resolvedChildren.push(entry.value); + } + return
{resolvedChildren}
; + } + + const List = clientReference(ListClient); + + function Keyed({children}) { + // Keying this should generate a fragment. + return children; + } + + function ServerComponent({transport}) { + // This is a Server Component that receives other Server Components from a third party. + const children = ReactServer.use( + ReactNoopFlightClient.read(transport), + ).root; + return ( + + {children} + + ); + } + + const thirdPartyTransport = ReactNoopFlightServer.render( + {root: }, + { + environmentName: 'third-party', + }, + ); + + // Wait for the iterator to finish + await iteratorPromise; + await 0; // One more tick for the return value / closing. + + const transport = ReactNoopFlightServer.render( + , + ); + + await act(async () => { + const promise = ReactNoopFlightClient.read(transport); + expect(promise._debugInfo).toEqual( + __DEV__ + ? [{name: 'ServerComponent', env: 'Server', owner: null}] + : undefined, + ); + const result = await promise; + const thirdPartyFragment = await result.props.children; + expect(thirdPartyFragment._debugInfo).toEqual( + __DEV__ ? [{name: 'Keyed', env: 'Server', owner: null}] : undefined, + ); + // We expect the debug info to be transferred from the inner stream to the outer. + expect(thirdPartyFragment.props.children._debugInfo).toEqual( + __DEV__ + ? [ + { + name: 'ThirdPartyAsyncIterableComponent', + env: 'third-party', + owner: null, + }, + ] + : undefined, + ); + + ReactNoop.render(result); + }); + + expect(ReactNoop).toMatchRenderedOutput( +
+ Who dis?
, ); }); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 154653c88eb44..4ff9e3a1d5356 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -241,7 +241,7 @@ export type ReactClientValue = | void | bigint | ReadableStream - | AsyncIterable + | $AsyncIterable | Iterable | Array | Map @@ -614,14 +614,14 @@ function serializeReadableStream( function serializeAsyncIterable( request: Request, - iteratable: $AsyncIterable, + iterable: $AsyncIterable, iterator: $AsyncIterator, ): string { // Generators/Iterators are Iterables but they're also their own iterator // functions. If that's the case, we treat them as single-shot. Otherwise, // we assume that this iterable might be a multi-shot and allow it to be // iterated more than once on the client. - const isIterator = iteratable === iterator; + const isIterator = iterable === iterator; request.pendingChunks += 2; // Start and Stop rows. const streamId = request.nextChunkId++; @@ -629,6 +629,13 @@ function serializeAsyncIterable( streamId.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); + if (__DEV__) { + const debugInfo: ?ReactDebugInfo = (iterable: any)._debugInfo; + if (debugInfo) { + forwardDebugInfo(request, streamId, debugInfo); + } + } + // There's a race condition between when the stream is aborted and when the promise // resolves so we track whether we already aborted it to avoid writing twice. let aborted = false; @@ -885,30 +892,9 @@ function renderFunctionComponent( function renderFragment( request: Request, task: Task, - children: - | $ReadOnlyArray + children: $ReadOnlyArray, ): ReactJSONValue { - if (__DEV__) { - const debugInfo: ?ReactDebugInfo = (children: any)._debugInfo; - if (debugInfo) { - // If this came from Flight, forward any debug info into this new row. - if (debugID === null) { - // We don't have a chunk to assign debug info. We need to outline this - // component to assign it an ID. - return outlineTask(request, task); - } else { - // Forward any debug info we have the first time we see it. - // We do this after init so that we have received all the debug info - // from the server by the time we emit it. - // TODO: We might see this fragment twice if it ends up wrapped below. - forwardDebugInfo(request, debugID, debugInfo); - } - } - } - if (!enableServerComponentKeys) { - return children; - } - if (task.keyPath !== null) { + if (enableServerComponentKeys && task.keyPath !== null) { // We have a Server Component that specifies a key but we're now splitting // the tree using a fragment. const fragment = [ @@ -939,15 +925,6 @@ function renderFragment( // way up. Which is what we want since we've consumed it. If this changes to // be recursive serialization, we need to reset the keyPath and implicitSlot, // before recursing here. - return children; -} - -function renderAsyncFragment( - request: Request, - task: Task, - children: $AsyncIterable, - getAsyncIterator: () => $AsyncIterator -): ReactJSONValue { if (__DEV__) { const debugInfo: ?ReactDebugInfo = (children: any)._debugInfo; if (debugInfo) { @@ -960,16 +937,23 @@ function renderAsyncFragment( // Forward any debug info we have the first time we see it. // We do this after init so that we have received all the debug info // from the server by the time we emit it. - // TODO: We might see this fragment twice if it ends up wrapped below. forwardDebugInfo(request, debugID, debugInfo); } + // Since we're rendering this array again, create a copy that doesn't + // have the debug info so we avoid outlining or emitting debug info again. + children = Array.from(children); } } - if (!enableServerComponentKeys) { - const asyncIterator = getAsyncIterator.call(children); - return serializeAsyncIterable(request, children, asyncIterator); - } - if (task.keyPath !== null) { + return children; +} + +function renderAsyncFragment( + request: Request, + task: Task, + children: $AsyncIterable, + getAsyncIterator: () => $AsyncIterator, +): ReactJSONValue { + if (enableServerComponentKeys && task.keyPath !== null) { // We have a Server Component that specifies a key but we're now splitting // the tree using a fragment. const fragment = [ @@ -996,6 +980,7 @@ function renderAsyncFragment( // So instead, we wrap it with an unkeyed fragment and inner keyed fragment. return [fragment]; } + // Since we're yielding here, that implicitly resets the keyPath context on the // way up. Which is what we want since we've consumed it. If this changes to // be recursive serialization, we need to reset the keyPath and implicitSlot, @@ -1953,7 +1938,12 @@ function renderModelDestructive( (value: any)[ASYNC_ITERATOR]; if (typeof getAsyncIterator === 'function') { // We treat AsyncIterables as a Fragment and as such we might need to key them. - return renderAsyncFragment(request, task, (value: any), getAsyncIterator); + return renderAsyncFragment( + request, + task, + (value: any), + getAsyncIterator, + ); } } From 8e33e92aee3c00def90d2ab189eca8bca7380703 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Mon, 15 Apr 2024 17:53:33 -0400 Subject: [PATCH 08/11] If a model is blocked on a direct dependency we need to wait to enqueue it For AsyncIterables, since we control the promises, we can instead let the consumer peak ahead in that case. --- .eslintrc.js | 1 + .../react-client/src/ReactFlightClient.js | 196 ++++++++++++------ .../src/__tests__/ReactFlightDOMEdge-test.js | 101 +++++++++ 3 files changed, 238 insertions(+), 60 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 370565eb8216a..75c0b9fe8aa38 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -504,6 +504,7 @@ module.exports = { $AsyncIterator: 'readonly', Iterator: 'readonly', AsyncIterator: 'readonly', + IteratorResult: 'readonly', JSONValue: 'readonly', JSResourceReference: 'readonly', MouseEventHandler: 'readonly', diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 1be8b83821ec7..9e44ff69a9798 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -69,9 +69,9 @@ import { export type {CallServerCallback, EncodeFormActionCallback}; -interface StreamController { - close(returnValue: any): void; - enqueue(value: any): void; +interface FlightStreamController { + enqueue(json: UninitializedModel): void; + close(json: UninitializedModel): void; error(error: Error): void; } @@ -107,7 +107,7 @@ type PendingChunk = { reason: null | Array<(mixed) => mixed>, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type BlockedChunk = { status: 'blocked', @@ -115,7 +115,7 @@ type BlockedChunk = { reason: null | Array<(mixed) => mixed>, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type CyclicChunk = { status: 'cyclic', @@ -123,7 +123,7 @@ type CyclicChunk = { reason: null | Array<(mixed) => mixed>, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type ResolvedModelChunk = { status: 'resolved_model', @@ -131,7 +131,7 @@ type ResolvedModelChunk = { reason: null, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type ResolvedModuleChunk = { status: 'resolved_module', @@ -139,24 +139,24 @@ type ResolvedModuleChunk = { reason: null, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type InitializedChunk = { status: 'fulfilled', value: T, - reason: null | StreamController, + reason: null | FlightStreamController, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type InitializedStreamChunk< T: ReadableStream | $AsyncIterable, > = { status: 'fulfilled', value: T, - reason: StreamController, + reason: FlightStreamController, _response: Response, - then(resolve: (ReadableStream) => mixed, reject: (mixed) => mixed): void, + then(resolve: (ReadableStream) => mixed, reject?: (mixed) => mixed): void, }; type ErroredChunk = { status: 'rejected', @@ -164,7 +164,7 @@ type ErroredChunk = { reason: mixed, _response: Response, _debugInfo?: null | ReactDebugInfo, - then(resolve: (T) => mixed, reject: (mixed) => mixed): void, + then(resolve: (T) => mixed, reject?: (mixed) => mixed): void, }; type SomeChunk = | PendingChunk @@ -191,7 +191,7 @@ Chunk.prototype = (Object.create(Promise.prototype): any); Chunk.prototype.then = function ( this: SomeChunk, resolve: (value: T) => mixed, - reject: (reason: mixed) => mixed, + reject?: (reason: mixed) => mixed, ) { const chunk: SomeChunk = this; // If we have resolved content, we try to initialize it first which @@ -226,7 +226,9 @@ Chunk.prototype.then = function ( } break; default: - reject(chunk.reason); + if (reject) { + reject(chunk.reason); + } break; } }; @@ -384,7 +386,7 @@ function createInitializedStreamChunk< >( response: Response, value: T, - controller: StreamController, + controller: FlightStreamController, ): InitializedChunk { // We use the reason field to stash the controller since we already have that // field. It's a bit of a hack but efficient. @@ -392,6 +394,29 @@ function createInitializedStreamChunk< return new Chunk(INITIALIZED, value, controller, response); } +function createResolvedIteratorResultChunk( + response: Response, + value: UninitializedModel, + done: boolean, +): ResolvedModelChunk> { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(RESOLVED_MODEL, iteratorResultJSON, null, response); +} + +function resolveIteratorResultChunk( + chunk: SomeChunk>, + value: UninitializedModel, + done: boolean, +): void { + // To reuse code as much code as possible we add the wrapper element as part of the JSON. + const iteratorResultJSON = + (done ? '{"done":true,"value":' : '{"done":false,"value":') + value + '}'; + resolveModelChunk(chunk, iteratorResultJSON); +} + function resolveModelChunk( chunk: SomeChunk, value: UninitializedModel, @@ -402,9 +427,7 @@ function resolveModelChunk( // a stream chunk since any other row shouldn't have more than one entry. const streamChunk: InitializedStreamChunk = (chunk: any); const controller = streamChunk.reason; - // TODO: This model might get blocked. - const parsedValue: T = parseModel(chunk._response, value); - controller.enqueue(parsedValue); + controller.enqueue(value); } return; } @@ -1084,7 +1107,7 @@ function resolveStream>( response: Response, id: number, stream: T, - controller: StreamController, + controller: FlightStreamController, ): void { const chunks = response._chunks; const chunk = chunks.get(id); @@ -1106,23 +1129,83 @@ function resolveStream>( } } -function startReadableStream( +function startReadableStream( response: Response, id: number, type: void | 'bytes', ): void { - let controller: StreamController = (null: any); + let controller: ReadableStreamController = (null: any); const stream = new ReadableStream({ type: type, start(c) { controller = c; }, }); - resolveStream(response, id, stream, controller); + let previousBlockedChunk: SomeChunk | null = null; + const flightController = { + enqueue(json: UninitializedModel): void { + if (previousBlockedChunk === null) { + // If we're not blocked on any other chunks, we can try to eagerly initialize + // this as a fast-path to avoid awaiting them. + const chunk: ResolvedModelChunk = createResolvedModelChunk( + response, + json, + ); + initializeModelChunk(chunk); + const initializedChunk: SomeChunk = chunk; + if (initializedChunk.status === INITIALIZED) { + controller.enqueue(initializedChunk.value); + } else { + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + } + } else { + // We're still waiting on a previous chunk so we can't enqueue quite yet. + const blockedChunk = previousBlockedChunk; + const chunk: SomeChunk = createPendingChunk(response); + chunk.then( + v => controller.enqueue(v), + e => controller.error((e: any)), + ); + previousBlockedChunk = chunk; + blockedChunk.then(function () { + if (previousBlockedChunk === chunk) { + // We were still the last chunk so we can now clear the queue and return + // to synchronous emitting. + previousBlockedChunk = null; + } + resolveModelChunk(chunk, json); + }); + } + }, + close(json: UninitializedModel): void { + if (previousBlockedChunk === null) { + controller.close(); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.close()); + } + }, + error(error: mixed): void { + if (previousBlockedChunk === null) { + // $FlowFixMe[incompatible-call] + controller.error(error); + } else { + const blockedChunk = previousBlockedChunk; + // We shouldn't get any more enqueues after this so we can set it back to null. + previousBlockedChunk = null; + blockedChunk.then(() => controller.error((error: any))); + } + }, + }; + resolveStream(response, id, stream, flightController); } -type IteratorEntry = {done: false, value: T} | {done: true, value: T}; - const ASYNC_ITERATOR = Symbol.asyncIterator; function asyncIterator(this: $AsyncIterator) { @@ -1131,7 +1214,7 @@ function asyncIterator(this: $AsyncIterator) { } function createIterator( - next: (arg: void) => SomeChunk>, + next: (arg: void) => SomeChunk>, ): $AsyncIterator { const iterator: any = { next: next, @@ -1144,49 +1227,44 @@ function createIterator( return iterator; } -function initializeIteratorEntry( - chunk: SomeChunk>, - value: T, - done: boolean, -): void { - const entry: IteratorEntry = ({done, value}: any); - const pendingChunk: PendingChunk> = (chunk: any); - const resolveListeners = pendingChunk.value; - const initializedChunk: InitializedChunk> = - (pendingChunk: any); - initializedChunk.status = INITIALIZED; - initializedChunk.value = entry; - if (resolveListeners !== null) { - wakeChunk(resolveListeners, entry); - } -} - function startAsyncIterable( response: Response, id: number, iterator: boolean, ): void { - const buffer: Array>> = []; + const buffer: Array>> = []; let closed = false; let nextWriteIndex = 0; - const controller: StreamController = { - enqueue(value: any): void { + const flightController = { + enqueue(value: UninitializedModel): void { if (nextWriteIndex === buffer.length) { - buffer[nextWriteIndex] = createPendingChunk>(response); + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + false, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, false); } - initializeIteratorEntry(buffer[nextWriteIndex++], value, false); + nextWriteIndex++; }, - close(value: T): void { + close(value: UninitializedModel): void { closed = true; if (nextWriteIndex === buffer.length) { - buffer[nextWriteIndex] = createPendingChunk>(response); + buffer[nextWriteIndex] = createResolvedIteratorResultChunk( + response, + value, + true, + ); + } else { + resolveIteratorResultChunk(buffer[nextWriteIndex], value, true); } - initializeIteratorEntry(buffer[nextWriteIndex++], value, true); + nextWriteIndex++; while (nextWriteIndex < buffer.length) { // In generators, any extra reads from the iterator have the value undefined. - initializeIteratorEntry( + resolveIteratorResultChunk( buffer[nextWriteIndex++], - (undefined: any), + '"$undefined"', true, ); } @@ -1194,7 +1272,8 @@ function startAsyncIterable( error(error: Error): void { closed = true; if (nextWriteIndex === buffer.length) { - buffer[nextWriteIndex] = createPendingChunk>(response); + buffer[nextWriteIndex] = + createPendingChunk>(response); } while (nextWriteIndex < buffer.length) { triggerErrorOnChunk(buffer[nextWriteIndex++], error); @@ -1221,7 +1300,7 @@ function startAsyncIterable( ); } buffer[nextReadIndex] = - createPendingChunk>(response); + createPendingChunk>(response); } return buffer[nextReadIndex++]; }); @@ -1233,7 +1312,7 @@ function startAsyncIterable( response, id, iterator ? iterable[ASYNC_ITERATOR]() : iterable, - controller, + flightController, ); } @@ -1250,10 +1329,7 @@ function stopStream( } const streamChunk: InitializedStreamChunk = (chunk: any); const controller = streamChunk.reason; - // TODO: This model might get blocked. - const parsedValue: any = - row === '' ? undefined : parseModel(chunk._response, row); - controller.close(parsedValue); + controller.close(row === '' ? '"$undefined"' : row); } type ErrorWithDigest = Error & {digest?: string}; diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index e7b4e06f49f87..2715ec5d7d507 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -80,6 +80,7 @@ describe('ReactFlightDOMEdge', () => { reader.read().then(({done, value}) => { if (done) { controller.enqueue(prevChunk); + prevChunk = new Uint8Array(0); controller.close(); return; } @@ -90,7 +91,18 @@ describe('ReactFlightDOMEdge', () => { controller.enqueue(chunk.subarray(0, chunk.length - 50)); prevChunk = chunk.subarray(chunk.length - 50); } else { + // Wait to see if we get some more bytes to join in. prevChunk = chunk; + // Flush if we don't get any more. + (async function flushAfterAFewTasks() { + for (let i = 0; i < 10; i++) { + await i; + } + if (prevChunk.byteLength > 0) { + controller.enqueue(prevChunk); + } + prevChunk = new Uint8Array(0); + })(); } push(); }); @@ -430,6 +442,95 @@ describe('ReactFlightDOMEdge', () => { expect(result.get('value')).toBe('hello'); }); + it('can pass an async import to a ReadableStream while enqueuing in order', async () => { + let resolve; + const promise = new Promise(r => (resolve = r)); + + const asyncClient = clientExports(promise); + + // We await the value on the servers so it's an async value that the client should wait for + const awaitedValue = await asyncClient; + + const s = new ReadableStream({ + start(c) { + c.enqueue('hello'); + c.enqueue(awaitedValue); + c.enqueue('!'); + c.close(); + }, + }); + + const stream = passThrough( + ReactServerDOMServer.renderToReadableStream(s, webpackMap), + ); + + const result = await ReactServerDOMClient.createFromReadableStream(stream, { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }); + + const reader = result.getReader(); + + expect(await reader.read()).toEqual({value: 'hello', done: false}); + + const readPromise = reader.read(); + // We resolve this after we've already received the '!' row. + await resolve('world'); + + expect(await readPromise).toEqual({value: 'world', done: false}); + expect(await reader.read()).toEqual({value: '!', done: false}); + expect(await reader.read()).toEqual({value: undefined, done: true}); + }); + + it('can pass an async import a AsyncIterable while allowing peaking at future values', async () => { + let resolve; + const promise = new Promise(r => (resolve = r)); + + const asyncClient = clientExports(promise); + + const multiShotIterable = { + async *[Symbol.asyncIterator]() { + yield 'hello'; + // We await the value on the servers so it's an async value that the client should wait for + yield await asyncClient; + yield '!'; + }, + }; + + const stream = passThrough( + ReactServerDOMServer.renderToReadableStream( + multiShotIterable, + webpackMap, + ), + ); + + // Parsing the root blocks because the module hasn't loaded yet + const result = await ReactServerDOMClient.createFromReadableStream(stream, { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }); + + const iterator = result[Symbol.asyncIterator](); + + expect(await iterator.next()).toEqual({value: 'hello', done: false}); + + const readPromise = iterator.next(); + + // While the previous promise didn't resolve yet, we should be able to peak at the next value + // by iterating past it. + expect(await iterator.next()).toEqual({value: '!', done: false}); + + // We resolve the previous row after we've already received the '!' row. + await resolve('world'); + expect(await readPromise).toEqual({value: 'world', done: false}); + + expect(await iterator.next()).toEqual({value: undefined, done: true}); + }); + it('warns if passing a this argument to bind() of a server reference', async () => { const ServerModule = serverExports({ greet: function () {}, From cecd16655acd64a0ebc0214d53bbb1b9f35c13c5 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Mon, 15 Apr 2024 20:35:57 -0400 Subject: [PATCH 09/11] Try to synchronously emit a task if possible using the stream id This helps us avoid outlining a model if it's synchronously available. --- .../src/__tests__/ReactFlightDOMEdge-test.js | 133 ++++++++++++++++++ .../react-server/src/ReactFlightServer.js | 98 ++++++++----- 2 files changed, 197 insertions(+), 34 deletions(-) diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index 2715ec5d7d507..caa39146b0f21 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -124,6 +124,18 @@ describe('ReactFlightDOMEdge', () => { } } + async function readByteLength(stream) { + const reader = stream.getReader(); + let length = 0; + while (true) { + const {done, value} = await reader.read(); + if (done) { + return length; + } + length += value.byteLength; + } + } + it('should allow an alternative module mapping to be used for SSR', async () => { function ClientComponent() { return Client Component; @@ -557,4 +569,125 @@ describe('ReactFlightDOMEdge', () => { {withoutStack: true}, ); }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should supports ReadableStreams with typed arrays', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + buffer, + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This is not a binary stream, it's a stream that contain binary chunks. + const s = new ReadableStream({ + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const stream = ReactServerDOMServer.renderToReadableStream(s, {}); + + const [stream1, stream2] = passThrough(stream).tee(); + + const result = await ReactServerDOMClient.createFromReadableStream( + stream1, + { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + expect(await readByteLength(stream2)).toBeLessThan(400); + + const streamedBuffers = []; + const reader = result.getReader(); + let entry; + while (!(entry = await reader.read()).done) { + streamedBuffers.push(entry.value); + } + + expect(streamedBuffers).toEqual(buffers); + }); + + // @gate enableFlightReadableStream && enableBinaryFlight + it('should support BYOB binary ReadableStreams', async () => { + const buffer = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]).buffer; + const buffers = [ + new Int8Array(buffer, 1), + new Uint8Array(buffer, 2), + new Uint8ClampedArray(buffer, 2), + new Int16Array(buffer, 2), + new Uint16Array(buffer, 2), + new Int32Array(buffer, 4), + new Uint32Array(buffer, 4), + new Float32Array(buffer, 4), + new Float64Array(buffer, 0), + new BigInt64Array(buffer, 0), + new BigUint64Array(buffer, 0), + new DataView(buffer, 3), + ]; + + // This a binary stream where each chunk ends up as Uint8Array. + const s = new ReadableStream({ + type: 'bytes', + start(c) { + for (let i = 0; i < buffers.length; i++) { + c.enqueue(buffers[i]); + } + c.close(); + }, + }); + + const stream = ReactServerDOMServer.renderToReadableStream(s, {}); + + const [stream1, stream2] = passThrough(stream).tee(); + + const result = await ReactServerDOMClient.createFromReadableStream( + stream1, + { + ssrManifest: { + moduleMap: null, + moduleLoading: null, + }, + }, + ); + + expect(await readByteLength(stream2)).toBeLessThan(400); + + const streamedBuffers = []; + const reader = result.getReader({mode: 'byob'}); + let entry; + while (!(entry = await reader.read(new Uint8Array(10))).done) { + expect(entry.value instanceof Uint8Array).toBe(true); + streamedBuffers.push(entry.value); + } + + // The streamed buffers might be in different chunks and in Uint8Array form but + // the concatenated bytes should be the same. + expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual( + buffers.flatMap(c => + Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)), + ), + ); + }); }); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 4ff9e3a1d5356..0218bba97ab32 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -526,6 +526,7 @@ function serializeThenable( function serializeReadableStream( request: Request, + task: Task, stream: ReadableStream, ): string { // Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the @@ -546,10 +547,20 @@ function serializeReadableStream( const reader = stream.getReader(); - request.pendingChunks += 2; // Start and Stop rows. - const streamId = request.nextChunkId++; + // This task won't actually be retried. We just use it to attempt synchronous renders. + const streamTask = createTask( + request, + task.model, + task.keyPath, + task.implicitSlot, + request.abortableTasks, + ); + request.abortableTasks.delete(streamTask); + + request.pendingChunks++; // The task represents the Start row. This adds a Stop row. + const startStreamRow = - streamId.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; + streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); // There's a race condition between when the stream is aborted and when the promise @@ -562,21 +573,15 @@ function serializeReadableStream( if (entry.done) { request.abortListeners.delete(error); - const endStreamRow = streamId.toString(16) + ':C\n'; - request.pendingChunks++; + const endStreamRow = streamTask.id.toString(16) + ':C\n'; request.completedRegularChunks.push(stringToChunk(endStreamRow)); enqueueFlush(request); aborted = true; } else { try { - const chunkId = outlineModel(request, entry.value); - const processedChunk = encodeReferenceChunk( - request, - streamId, - serializeByValueID(chunkId), - ); + streamTask.model = entry.value; request.pendingChunks++; - request.completedRegularChunks.push(processedChunk); + tryStreamTask(request, streamTask); enqueueFlush(request); reader.read().then(progress, error); } catch (x) { @@ -598,10 +603,10 @@ function serializeReadableStream( ) { const postponeInstance: Postpone = (reason: any); logPostpone(request, postponeInstance.message); - emitPostponeChunk(request, streamId, postponeInstance); + emitPostponeChunk(request, streamTask.id, postponeInstance); } else { const digest = logRecoverableError(request, reason); - emitErrorChunk(request, streamId, digest, reason); + emitErrorChunk(request, streamTask.id, digest, reason); } enqueueFlush(request); // $FlowFixMe should be able to pass mixed @@ -609,11 +614,12 @@ function serializeReadableStream( } request.abortListeners.add(error); reader.read().then(progress, error); - return serializeByValueID(streamId); + return serializeByValueID(streamTask.id); } function serializeAsyncIterable( request: Request, + task: Task, iterable: $AsyncIterable, iterator: $AsyncIterator, ): string { @@ -623,16 +629,26 @@ function serializeAsyncIterable( // iterated more than once on the client. const isIterator = iterable === iterator; - request.pendingChunks += 2; // Start and Stop rows. - const streamId = request.nextChunkId++; + // This task won't actually be retried. We just use it to attempt synchronous renders. + const streamTask = createTask( + request, + task.model, + task.keyPath, + task.implicitSlot, + request.abortableTasks, + ); + request.abortableTasks.delete(streamTask); + + request.pendingChunks++; // The task represents the Start row. This adds a Stop row. + const startStreamRow = - streamId.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n'; + streamTask.id.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n'; request.completedRegularChunks.push(stringToChunk(startStreamRow)); if (__DEV__) { const debugInfo: ?ReactDebugInfo = (iterable: any)._debugInfo; if (debugInfo) { - forwardDebugInfo(request, streamId, debugInfo); + forwardDebugInfo(request, streamTask.id, debugInfo); } } @@ -652,14 +668,14 @@ function serializeAsyncIterable( request.abortListeners.delete(error); let endStreamRow; if (entry.value === undefined) { - endStreamRow = streamId.toString(16) + ':C\n'; + endStreamRow = streamTask.id.toString(16) + ':C\n'; } else { // Unlike streams, the last value may not be undefined. If it's not // we outline it and encode a reference to it in the closing instruction. try { const chunkId = outlineModel(request, entry.value); endStreamRow = - streamId.toString(16) + + streamTask.id.toString(16) + ':C' + stringify(serializeByValueID(chunkId)) + '\n'; @@ -668,20 +684,14 @@ function serializeAsyncIterable( return; } } - request.pendingChunks++; request.completedRegularChunks.push(stringToChunk(endStreamRow)); enqueueFlush(request); aborted = true; } else { try { - const chunkId = outlineModel(request, entry.value); - const processedChunk = encodeReferenceChunk( - request, - streamId, - serializeByValueID(chunkId), - ); + streamTask.model = entry.value; request.pendingChunks++; - request.completedRegularChunks.push(processedChunk); + tryStreamTask(request, streamTask); enqueueFlush(request); iterator.next().then(progress, error); } catch (x) { @@ -704,10 +714,10 @@ function serializeAsyncIterable( ) { const postponeInstance: Postpone = (reason: any); logPostpone(request, postponeInstance.message); - emitPostponeChunk(request, streamId, postponeInstance); + emitPostponeChunk(request, streamTask.id, postponeInstance); } else { const digest = logRecoverableError(request, reason); - emitErrorChunk(request, streamId, digest, reason); + emitErrorChunk(request, streamTask.id, digest, reason); } enqueueFlush(request); if (typeof (iterator: any).throw === 'function') { @@ -718,7 +728,7 @@ function serializeAsyncIterable( } request.abortListeners.add(error); iterator.next().then(progress, error); - return serializeByValueID(streamId); + return serializeByValueID(streamTask.id); } export function emitHint( @@ -986,7 +996,7 @@ function renderAsyncFragment( // be recursive serialization, we need to reset the keyPath and implicitSlot, // before recursing here. const asyncIterator = getAsyncIterator.call(children); - return serializeAsyncIterable(request, children, asyncIterator); + return serializeAsyncIterable(request, task, children, asyncIterator); } function renderClientElement( @@ -1932,7 +1942,7 @@ function renderModelDestructive( typeof ReadableStream === 'function' && value instanceof ReadableStream ) { - return serializeReadableStream(request, value); + return serializeReadableStream(request, task, value); } const getAsyncIterator: void | (() => $AsyncIterator) = (value: any)[ASYNC_ITERATOR]; @@ -2769,6 +2779,26 @@ function retryTask(request: Request, task: Task): void { } } +function tryStreamTask(request: Request, task: Task): void { + // This is used to try to emit something synchronously but if it suspends, + // we emit a reference to a new outlined task immediately instead. + const prevDebugID = debugID; + if (__DEV__) { + // We don't use the id of the stream task for debugID. Instead we leave it null + // so that we instead outline the row to get a new debugID if needed. + debugID = null; + } + try { + // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do + const json: string = stringify(task.model, task.toJSON); + emitModelChunk(request, task.id, json); + } finally { + if (__DEV__) { + debugID = prevDebugID; + } + } +} + function performWork(request: Request): void { const prevDispatcher = ReactSharedInternals.H; ReactSharedInternals.H = HooksDispatcher; From e8d5deabbe7e7ecbca944a3433e7acc703d85657 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Mon, 15 Apr 2024 21:45:04 -0400 Subject: [PATCH 10/11] Emit root strings or typed arrays without outlining --- .../react-client/src/ReactFlightClient.js | 76 +++++++- .../src/__tests__/ReactFlight-test.js | 4 +- .../src/__tests__/ReactFlightDOMEdge-test.js | 4 +- .../react-server/src/ReactFlightServer.js | 168 ++++++++++++++---- 4 files changed, 207 insertions(+), 45 deletions(-) diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index 9e44ff69a9798..1fb6ca6c2b0d1 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -70,7 +70,8 @@ import { export type {CallServerCallback, EncodeFormActionCallback}; interface FlightStreamController { - enqueue(json: UninitializedModel): void; + enqueueValue(value: any): void; + enqueueModel(json: UninitializedModel): void; close(json: UninitializedModel): void; error(error: Error): void; } @@ -381,6 +382,15 @@ function createInitializedBufferChunk( return new Chunk(INITIALIZED, value, null, response); } +function createInitializedIteratorResultChunk( + response: Response, + value: T, + done: boolean, +): InitializedChunk> { + // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors + return new Chunk(INITIALIZED, {done: done, value: value}, null, response); +} + function createInitializedStreamChunk< T: ReadableStream | $AsyncIterable, >( @@ -427,7 +437,7 @@ function resolveModelChunk( // a stream chunk since any other row shouldn't have more than one entry. const streamChunk: InitializedStreamChunk = (chunk: any); const controller = streamChunk.reason; - controller.enqueue(value); + controller.enqueueModel(value); } return; } @@ -1034,8 +1044,17 @@ function resolveModel( function resolveText(response: Response, id: number, text: string): void { const chunks = response._chunks; - // We assume that we always reference large strings after they've been - // emitted. + if (enableFlightReadableStream) { + const chunk = chunks.get(id); + if (chunk && chunk.status !== PENDING) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.enqueueValue(text); + return; + } + } chunks.set(id, createInitializedTextChunk(response, text)); } @@ -1045,7 +1064,17 @@ function resolveBuffer( buffer: $ArrayBufferView | ArrayBuffer, ): void { const chunks = response._chunks; - // We assume that we always reference buffers after they've been emitted. + if (enableFlightReadableStream) { + const chunk = chunks.get(id); + if (chunk && chunk.status !== PENDING) { + // If we get more data to an already resolved ID, we assume that it's + // a stream chunk since any other row shouldn't have more than one entry. + const streamChunk: InitializedStreamChunk = (chunk: any); + const controller = streamChunk.reason; + controller.enqueueValue(buffer); + return; + } + } chunks.set(id, createInitializedBufferChunk(response, buffer)); } @@ -1143,7 +1172,17 @@ function startReadableStream( }); let previousBlockedChunk: SomeChunk | null = null; const flightController = { - enqueue(json: UninitializedModel): void { + enqueueValue(value: T): void { + if (previousBlockedChunk === null) { + controller.enqueue(value); + } else { + // We're still waiting on a previous chunk so we can't enqueue quite yet. + previousBlockedChunk.then(function () { + controller.enqueue(value); + }); + } + }, + enqueueModel(json: UninitializedModel): void { if (previousBlockedChunk === null) { // If we're not blocked on any other chunks, we can try to eagerly initialize // this as a fast-path to avoid awaiting them. @@ -1236,7 +1275,30 @@ function startAsyncIterable( let closed = false; let nextWriteIndex = 0; const flightController = { - enqueue(value: UninitializedModel): void { + enqueueValue(value: T): void { + if (nextWriteIndex === buffer.length) { + buffer[nextWriteIndex] = createInitializedIteratorResultChunk( + response, + value, + false, + ); + } else { + const chunk: PendingChunk> = (buffer[ + nextWriteIndex + ]: any); + const resolveListeners = chunk.value; + const rejectListeners = chunk.reason; + const initializedChunk: InitializedChunk> = + (chunk: any); + initializedChunk.status = INITIALIZED; + initializedChunk.value = {done: false, value: value}; + if (resolveListeners !== null) { + wakeChunkIfInitialized(chunk, resolveListeners, rejectListeners); + } + } + nextWriteIndex++; + }, + enqueueModel(value: UninitializedModel): void { if (nextWriteIndex === buffer.length) { buffer[nextWriteIndex] = createResolvedIteratorResultChunk( response, diff --git a/packages/react-client/src/__tests__/ReactFlight-test.js b/packages/react-client/src/__tests__/ReactFlight-test.js index 5e876d41131fd..8f416189f87c0 100644 --- a/packages/react-client/src/__tests__/ReactFlight-test.js +++ b/packages/react-client/src/__tests__/ReactFlight-test.js @@ -2305,7 +2305,6 @@ describe('ReactFlight', () => { return { async *[Symbol.asyncIterator]() { yield Who; - yield ' '; yield dis?; resolve(); }, @@ -2386,7 +2385,8 @@ describe('ReactFlight', () => { expect(ReactNoop).toMatchRenderedOutput(
- Who dis? + Who + dis?
, ); }); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index caa39146b0f21..4057853c28c09 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -615,7 +615,7 @@ describe('ReactFlightDOMEdge', () => { }, ); - expect(await readByteLength(stream2)).toBeLessThan(400); + expect(await readByteLength(stream2)).toBeLessThan(300); const streamedBuffers = []; const reader = result.getReader(); @@ -672,7 +672,7 @@ describe('ReactFlightDOMEdge', () => { }, ); - expect(await readByteLength(stream2)).toBeLessThan(400); + expect(await readByteLength(stream2)).toBeLessThan(300); const streamedBuffers = []; const reader = result.getReader({mode: 'byob'}); diff --git a/packages/react-server/src/ReactFlightServer.js b/packages/react-server/src/ReactFlightServer.js index 0218bba97ab32..254e26e675bd0 100644 --- a/packages/react-server/src/ReactFlightServer.js +++ b/packages/react-server/src/ReactFlightServer.js @@ -1411,13 +1411,9 @@ function serializeTemporaryReference( } function serializeLargeTextString(request: Request, text: string): string { - request.pendingChunks += 2; + request.pendingChunks++; const textId = request.nextChunkId++; - const textChunk = stringToChunk(text); - const binaryLength = byteLengthOfChunk(textChunk); - const row = textId.toString(16) + ':T' + binaryLength.toString(16) + ','; - const headerChunk = stringToChunk(row); - request.completedRegularChunks.push(headerChunk, textChunk); + emitTextChunk(request, textId, text); return serializeByValueID(textId); } @@ -1467,27 +1463,9 @@ function serializeTypedArray( tag: string, typedArray: $ArrayBufferView, ): string { - if (enableTaint) { - if (TaintRegistryByteLengths.has(typedArray.byteLength)) { - // If we have had any tainted values of this length, we check - // to see if these bytes matches any entries in the registry. - const tainted = TaintRegistryValues.get( - binaryToComparableString(typedArray), - ); - if (tainted !== undefined) { - throwTaintViolation(tainted.message); - } - } - } - request.pendingChunks += 2; + request.pendingChunks++; const bufferId = request.nextChunkId++; - // TODO: Convert to little endian if that's not the server default. - const binaryChunk = typedArrayToBinaryChunk(typedArray); - const binaryLength = byteLengthOfBinaryChunk(binaryChunk); - const row = - bufferId.toString(16) + ':' + tag + binaryLength.toString(16) + ','; - const headerChunk = stringToChunk(row); - request.completedRegularChunks.push(headerChunk, binaryChunk); + emitTypedArrayChunk(request, bufferId, tag, typedArray); return serializeByValueID(bufferId); } @@ -2321,6 +2299,42 @@ function emitDebugChunk( request.completedRegularChunks.push(processedChunk); } +function emitTypedArrayChunk( + request: Request, + id: number, + tag: string, + typedArray: $ArrayBufferView, +): void { + if (enableTaint) { + if (TaintRegistryByteLengths.has(typedArray.byteLength)) { + // If we have had any tainted values of this length, we check + // to see if these bytes matches any entries in the registry. + const tainted = TaintRegistryValues.get( + binaryToComparableString(typedArray), + ); + if (tainted !== undefined) { + throwTaintViolation(tainted.message); + } + } + } + request.pendingChunks++; // Extra chunk for the header. + // TODO: Convert to little endian if that's not the server default. + const binaryChunk = typedArrayToBinaryChunk(typedArray); + const binaryLength = byteLengthOfBinaryChunk(binaryChunk); + const row = id.toString(16) + ':' + tag + binaryLength.toString(16) + ','; + const headerChunk = stringToChunk(row); + request.completedRegularChunks.push(headerChunk, binaryChunk); +} + +function emitTextChunk(request: Request, id: number, text: string): void { + request.pendingChunks++; // Extra chunk for the header. + const textChunk = stringToChunk(text); + const binaryLength = byteLengthOfChunk(textChunk); + const row = id.toString(16) + ':T' + binaryLength.toString(16) + ','; + const headerChunk = stringToChunk(row); + request.completedRegularChunks.push(headerChunk, textChunk); +} + function serializeEval(source: string): string { if (!__DEV__) { // These errors should never make it into a build so we don't need to encode them in codes.json @@ -2681,6 +2695,96 @@ function forwardDebugInfo( } } +function emitChunk( + request: Request, + task: Task, + value: ReactClientValue, +): void { + const id = task.id; + // For certain types we have special types, we typically outlined them but + // we can emit them directly for this row instead of through an indirection. + if (typeof value === 'string') { + if (enableTaint) { + const tainted = TaintRegistryValues.get(value); + if (tainted !== undefined) { + throwTaintViolation(tainted.message); + } + } + emitTextChunk(request, id, value); + return; + } + if (enableBinaryFlight) { + if (value instanceof ArrayBuffer) { + emitTypedArrayChunk(request, id, 'A', new Uint8Array(value)); + return; + } + if (value instanceof Int8Array) { + // char + emitTypedArrayChunk(request, id, 'O', value); + return; + } + if (value instanceof Uint8Array) { + // unsigned char + emitTypedArrayChunk(request, id, 'o', value); + return; + } + if (value instanceof Uint8ClampedArray) { + // unsigned clamped char + emitTypedArrayChunk(request, id, 'U', value); + return; + } + if (value instanceof Int16Array) { + // sort + emitTypedArrayChunk(request, id, 'S', value); + return; + } + if (value instanceof Uint16Array) { + // unsigned short + emitTypedArrayChunk(request, id, 's', value); + return; + } + if (value instanceof Int32Array) { + // long + emitTypedArrayChunk(request, id, 'L', value); + return; + } + if (value instanceof Uint32Array) { + // unsigned long + emitTypedArrayChunk(request, id, 'l', value); + return; + } + if (value instanceof Float32Array) { + // float + emitTypedArrayChunk(request, id, 'G', value); + return; + } + if (value instanceof Float64Array) { + // double + emitTypedArrayChunk(request, id, 'g', value); + return; + } + if (value instanceof BigInt64Array) { + // number + emitTypedArrayChunk(request, id, 'M', value); + return; + } + if (value instanceof BigUint64Array) { + // unsigned number + // We use "m" instead of "n" since JSON can start with "null" + emitTypedArrayChunk(request, id, 'm', value); + return; + } + if (value instanceof DataView) { + emitTypedArrayChunk(request, id, 'V', value); + return; + } + } + // For anything else we need to try to serialize it using JSON. + // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do + const json: string = stringify(value, task.toJSON); + emitModelChunk(request, task.id, json); +} + const emptyRoot = {}; function retryTask(request: Request, task: Task): void { @@ -2725,19 +2829,17 @@ function retryTask(request: Request, task: Task): void { task.keyPath = null; task.implicitSlot = false; - let json: string; if (typeof resolvedModel === 'object' && resolvedModel !== null) { // Object might contain unresolved values like additional elements. // This is simulating what the JSON loop would do if this was part of it. - // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do - json = stringify(resolvedModel, task.toJSON); + emitChunk(request, task, resolvedModel); } else { // If the value is a string, it means it's a terminal value and we already escaped it // We don't need to escape it again so it's not passed the toJSON replacer. // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do - json = stringify(resolvedModel); + const json: string = stringify(resolvedModel); + emitModelChunk(request, task.id, json); } - emitModelChunk(request, task.id, json); request.abortableTasks.delete(task); task.status = COMPLETED; @@ -2789,9 +2891,7 @@ function tryStreamTask(request: Request, task: Task): void { debugID = null; } try { - // $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do - const json: string = stringify(task.model, task.toJSON); - emitModelChunk(request, task.id, json); + emitChunk(request, task, task.model); } finally { if (__DEV__) { debugID = prevDebugID; From 0fddb4e8e215a0b0b9aa39d4cf741fef67771ca2 Mon Sep 17 00:00:00 2001 From: Sebastian Markbage Date: Mon, 15 Apr 2024 22:54:46 -0400 Subject: [PATCH 11/11] Fix gates --- .../src/__tests__/ReactFlight-test.js | 6 +- .../src/__tests__/ReactFlightDOMEdge-test.js | 2 + .../__tests__/ReactFlightDOMReplyEdge-test.js | 58 ++++++++++--------- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/packages/react-client/src/__tests__/ReactFlight-test.js b/packages/react-client/src/__tests__/ReactFlight-test.js index 8f416189f87c0..4b8fb5c3c43c9 100644 --- a/packages/react-client/src/__tests__/ReactFlight-test.js +++ b/packages/react-client/src/__tests__/ReactFlight-test.js @@ -2347,8 +2347,10 @@ describe('ReactFlight', () => { }, ); - // Wait for the iterator to finish - await iteratorPromise; + if (gate(flag => flag.enableFlightReadableStream)) { + // Wait for the iterator to finish + await iteratorPromise; + } await 0; // One more tick for the return value / closing. const transport = ReactNoopFlightServer.render( diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js index 4057853c28c09..6e2e02047bbe6 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMEdge-test.js @@ -454,6 +454,7 @@ describe('ReactFlightDOMEdge', () => { expect(result.get('value')).toBe('hello'); }); + // @gate enableFlightReadableStream it('can pass an async import to a ReadableStream while enqueuing in order', async () => { let resolve; const promise = new Promise(r => (resolve = r)); @@ -496,6 +497,7 @@ describe('ReactFlightDOMEdge', () => { expect(await reader.read()).toEqual({value: undefined, done: true}); }); + // @gate enableFlightReadableStream it('can pass an async import a AsyncIterable while allowing peaking at future values', async () => { let resolve; const promise = new Promise(r => (resolve = r)); diff --git a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js index d7000de7f3526..00a53a590c5e1 100644 --- a/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js +++ b/packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReplyEdge-test.js @@ -109,33 +109,35 @@ describe('ReactFlightDOMReplyEdge', () => { expect(await result.arrayBuffer()).toEqual(await blob.arrayBuffer()); }); - it('can transport FormData (blobs)', async () => { - const bytes = new Uint8Array([ - 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, - ]); - const blob = new Blob([bytes, bytes], { - type: 'application/x-test', + if (typeof FormData !== 'undefined' && typeof File !== 'undefined') { + it('can transport FormData (blobs)', async () => { + const bytes = new Uint8Array([ + 123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20, + ]); + const blob = new Blob([bytes, bytes], { + type: 'application/x-test', + }); + + const formData = new FormData(); + formData.append('hi', 'world'); + formData.append('file', blob, 'filename.test'); + + expect(formData.get('file') instanceof File).toBe(true); + expect(formData.get('file').name).toBe('filename.test'); + + const body = await ReactServerDOMClient.encodeReply(formData); + const result = await ReactServerDOMServer.decodeReply( + body, + webpackServerMap, + ); + + expect(result instanceof FormData).toBe(true); + expect(result.get('hi')).toBe('world'); + const resultBlob = result.get('file'); + expect(resultBlob instanceof Blob).toBe(true); + expect(resultBlob.name).toBe('filename.test'); // In this direction we allow file name to pass through but not other direction. + expect(resultBlob.size).toBe(bytes.length * 2); + expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer()); }); - - const formData = new FormData(); - formData.append('hi', 'world'); - formData.append('file', blob, 'filename.test'); - - expect(formData.get('file') instanceof File).toBe(true); - expect(formData.get('file').name).toBe('filename.test'); - - const body = await ReactServerDOMClient.encodeReply(formData); - const result = await ReactServerDOMServer.decodeReply( - body, - webpackServerMap, - ); - - expect(result instanceof FormData).toBe(true); - expect(result.get('hi')).toBe('world'); - const resultBlob = result.get('file'); - expect(resultBlob instanceof Blob).toBe(true); - expect(resultBlob.name).toBe('filename.test'); // In this direction we allow file name to pass through but not other direction. - expect(resultBlob.size).toBe(bytes.length * 2); - expect(await resultBlob.arrayBuffer()).toEqual(await blob.arrayBuffer()); - }); + } });