Skip to content

Commit

Permalink
Try to synchronously emit a task if possible using the stream id
Browse files Browse the repository at this point in the history
This helps us avoid outlining a model if it's synchronously available.
  • Loading branch information
sebmarkbage committed Apr 16, 2024
1 parent 8e33e92 commit cecd166
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ describe('ReactFlightDOMEdge', () => {
}
}

async function readByteLength(stream) {
const reader = stream.getReader();
let length = 0;
while (true) {
const {done, value} = await reader.read();
if (done) {
return length;
}
length += value.byteLength;
}
}

it('should allow an alternative module mapping to be used for SSR', async () => {
function ClientComponent() {
return <span>Client Component</span>;
Expand Down Expand Up @@ -557,4 +569,125 @@ describe('ReactFlightDOMEdge', () => {
{withoutStack: true},
);
});

// @gate enableFlightReadableStream && enableBinaryFlight
it('should supports ReadableStreams with typed arrays', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]).buffer;
const buffers = [
buffer,
new Int8Array(buffer, 1),
new Uint8Array(buffer, 2),
new Uint8ClampedArray(buffer, 2),
new Int16Array(buffer, 2),
new Uint16Array(buffer, 2),
new Int32Array(buffer, 4),
new Uint32Array(buffer, 4),
new Float32Array(buffer, 4),
new Float64Array(buffer, 0),
new BigInt64Array(buffer, 0),
new BigUint64Array(buffer, 0),
new DataView(buffer, 3),
];

// This is not a binary stream, it's a stream that contain binary chunks.
const s = new ReadableStream({
start(c) {
for (let i = 0; i < buffers.length; i++) {
c.enqueue(buffers[i]);
}
c.close();
},
});

const stream = ReactServerDOMServer.renderToReadableStream(s, {});

const [stream1, stream2] = passThrough(stream).tee();

const result = await ReactServerDOMClient.createFromReadableStream(
stream1,
{
ssrManifest: {
moduleMap: null,
moduleLoading: null,
},
},
);

expect(await readByteLength(stream2)).toBeLessThan(400);

const streamedBuffers = [];
const reader = result.getReader();
let entry;
while (!(entry = await reader.read()).done) {
streamedBuffers.push(entry.value);
}

expect(streamedBuffers).toEqual(buffers);
});

// @gate enableFlightReadableStream && enableBinaryFlight
it('should support BYOB binary ReadableStreams', async () => {
const buffer = new Uint8Array([
123, 4, 10, 5, 100, 255, 244, 45, 56, 67, 43, 124, 67, 89, 100, 20,
]).buffer;
const buffers = [
new Int8Array(buffer, 1),
new Uint8Array(buffer, 2),
new Uint8ClampedArray(buffer, 2),
new Int16Array(buffer, 2),
new Uint16Array(buffer, 2),
new Int32Array(buffer, 4),
new Uint32Array(buffer, 4),
new Float32Array(buffer, 4),
new Float64Array(buffer, 0),
new BigInt64Array(buffer, 0),
new BigUint64Array(buffer, 0),
new DataView(buffer, 3),
];

// This a binary stream where each chunk ends up as Uint8Array.
const s = new ReadableStream({
type: 'bytes',
start(c) {
for (let i = 0; i < buffers.length; i++) {
c.enqueue(buffers[i]);
}
c.close();
},
});

const stream = ReactServerDOMServer.renderToReadableStream(s, {});

const [stream1, stream2] = passThrough(stream).tee();

const result = await ReactServerDOMClient.createFromReadableStream(
stream1,
{
ssrManifest: {
moduleMap: null,
moduleLoading: null,
},
},
);

expect(await readByteLength(stream2)).toBeLessThan(400);

const streamedBuffers = [];
const reader = result.getReader({mode: 'byob'});
let entry;
while (!(entry = await reader.read(new Uint8Array(10))).done) {
expect(entry.value instanceof Uint8Array).toBe(true);
streamedBuffers.push(entry.value);
}

// The streamed buffers might be in different chunks and in Uint8Array form but
// the concatenated bytes should be the same.
expect(streamedBuffers.flatMap(t => Array.from(t))).toEqual(
buffers.flatMap(c =>
Array.from(new Uint8Array(c.buffer, c.byteOffset, c.byteLength)),
),
);
});
});
98 changes: 64 additions & 34 deletions packages/react-server/src/ReactFlightServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ function serializeThenable(

function serializeReadableStream(
request: Request,
task: Task,
stream: ReadableStream,
): string {
// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
Expand All @@ -546,10 +547,20 @@ function serializeReadableStream(

const reader = stream.getReader();

request.pendingChunks += 2; // Start and Stop rows.
const streamId = request.nextChunkId++;
// This task won't actually be retried. We just use it to attempt synchronous renders.
const streamTask = createTask(
request,
task.model,
task.keyPath,
task.implicitSlot,
request.abortableTasks,
);
request.abortableTasks.delete(streamTask);

request.pendingChunks++; // The task represents the Start row. This adds a Stop row.

const startStreamRow =
streamId.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n';
streamTask.id.toString(16) + ':' + (supportsBYOB ? 'r' : 'R') + '\n';
request.completedRegularChunks.push(stringToChunk(startStreamRow));

// There's a race condition between when the stream is aborted and when the promise
Expand All @@ -562,21 +573,15 @@ function serializeReadableStream(

if (entry.done) {
request.abortListeners.delete(error);
const endStreamRow = streamId.toString(16) + ':C\n';
request.pendingChunks++;
const endStreamRow = streamTask.id.toString(16) + ':C\n';
request.completedRegularChunks.push(stringToChunk(endStreamRow));
enqueueFlush(request);
aborted = true;
} else {
try {
const chunkId = outlineModel(request, entry.value);
const processedChunk = encodeReferenceChunk(
request,
streamId,
serializeByValueID(chunkId),
);
streamTask.model = entry.value;
request.pendingChunks++;
request.completedRegularChunks.push(processedChunk);
tryStreamTask(request, streamTask);
enqueueFlush(request);
reader.read().then(progress, error);
} catch (x) {
Expand All @@ -598,22 +603,23 @@ function serializeReadableStream(
) {
const postponeInstance: Postpone = (reason: any);
logPostpone(request, postponeInstance.message);
emitPostponeChunk(request, streamId, postponeInstance);
emitPostponeChunk(request, streamTask.id, postponeInstance);
} else {
const digest = logRecoverableError(request, reason);
emitErrorChunk(request, streamId, digest, reason);
emitErrorChunk(request, streamTask.id, digest, reason);
}
enqueueFlush(request);
// $FlowFixMe should be able to pass mixed
reader.cancel(reason).then(error, error);
}
request.abortListeners.add(error);
reader.read().then(progress, error);
return serializeByValueID(streamId);
return serializeByValueID(streamTask.id);
}

function serializeAsyncIterable(
request: Request,
task: Task,
iterable: $AsyncIterable<ReactClientValue, ReactClientValue, void>,
iterator: $AsyncIterator<ReactClientValue, ReactClientValue, void>,
): string {
Expand All @@ -623,16 +629,26 @@ function serializeAsyncIterable(
// iterated more than once on the client.
const isIterator = iterable === iterator;

request.pendingChunks += 2; // Start and Stop rows.
const streamId = request.nextChunkId++;
// This task won't actually be retried. We just use it to attempt synchronous renders.
const streamTask = createTask(
request,
task.model,
task.keyPath,
task.implicitSlot,
request.abortableTasks,
);
request.abortableTasks.delete(streamTask);

request.pendingChunks++; // The task represents the Start row. This adds a Stop row.

const startStreamRow =
streamId.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n';
streamTask.id.toString(16) + ':' + (isIterator ? 'x' : 'X') + '\n';
request.completedRegularChunks.push(stringToChunk(startStreamRow));

if (__DEV__) {
const debugInfo: ?ReactDebugInfo = (iterable: any)._debugInfo;
if (debugInfo) {
forwardDebugInfo(request, streamId, debugInfo);
forwardDebugInfo(request, streamTask.id, debugInfo);
}
}

Expand All @@ -652,14 +668,14 @@ function serializeAsyncIterable(
request.abortListeners.delete(error);
let endStreamRow;
if (entry.value === undefined) {
endStreamRow = streamId.toString(16) + ':C\n';
endStreamRow = streamTask.id.toString(16) + ':C\n';
} else {
// Unlike streams, the last value may not be undefined. If it's not
// we outline it and encode a reference to it in the closing instruction.
try {
const chunkId = outlineModel(request, entry.value);
endStreamRow =
streamId.toString(16) +
streamTask.id.toString(16) +
':C' +
stringify(serializeByValueID(chunkId)) +
'\n';
Expand All @@ -668,20 +684,14 @@ function serializeAsyncIterable(
return;
}
}
request.pendingChunks++;
request.completedRegularChunks.push(stringToChunk(endStreamRow));
enqueueFlush(request);
aborted = true;
} else {
try {
const chunkId = outlineModel(request, entry.value);
const processedChunk = encodeReferenceChunk(
request,
streamId,
serializeByValueID(chunkId),
);
streamTask.model = entry.value;
request.pendingChunks++;
request.completedRegularChunks.push(processedChunk);
tryStreamTask(request, streamTask);
enqueueFlush(request);
iterator.next().then(progress, error);
} catch (x) {
Expand All @@ -704,10 +714,10 @@ function serializeAsyncIterable(
) {
const postponeInstance: Postpone = (reason: any);
logPostpone(request, postponeInstance.message);
emitPostponeChunk(request, streamId, postponeInstance);
emitPostponeChunk(request, streamTask.id, postponeInstance);
} else {
const digest = logRecoverableError(request, reason);
emitErrorChunk(request, streamId, digest, reason);
emitErrorChunk(request, streamTask.id, digest, reason);
}
enqueueFlush(request);
if (typeof (iterator: any).throw === 'function') {
Expand All @@ -718,7 +728,7 @@ function serializeAsyncIterable(
}
request.abortListeners.add(error);
iterator.next().then(progress, error);
return serializeByValueID(streamId);
return serializeByValueID(streamTask.id);
}

export function emitHint<Code: HintCode>(
Expand Down Expand Up @@ -986,7 +996,7 @@ function renderAsyncFragment(
// be recursive serialization, we need to reset the keyPath and implicitSlot,
// before recursing here.
const asyncIterator = getAsyncIterator.call(children);
return serializeAsyncIterable(request, children, asyncIterator);
return serializeAsyncIterable(request, task, children, asyncIterator);
}

function renderClientElement(
Expand Down Expand Up @@ -1932,7 +1942,7 @@ function renderModelDestructive(
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(request, value);
return serializeReadableStream(request, task, value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
Expand Down Expand Up @@ -2769,6 +2779,26 @@ function retryTask(request: Request, task: Task): void {
}
}

function tryStreamTask(request: Request, task: Task): void {
// This is used to try to emit something synchronously but if it suspends,
// we emit a reference to a new outlined task immediately instead.
const prevDebugID = debugID;
if (__DEV__) {
// We don't use the id of the stream task for debugID. Instead we leave it null
// so that we instead outline the row to get a new debugID if needed.
debugID = null;
}
try {
// $FlowFixMe[incompatible-type] stringify can return null for undefined but we never do
const json: string = stringify(task.model, task.toJSON);
emitModelChunk(request, task.id, json);
} finally {
if (__DEV__) {
debugID = prevDebugID;
}
}
}

function performWork(request: Request): void {
const prevDispatcher = ReactSharedInternals.H;
ReactSharedInternals.H = HooksDispatcher;
Expand Down

0 comments on commit cecd166

Please sign in to comment.