From 55819fd76b54d5e1e0ec0ffb6a8e14b96a4f1f8b Mon Sep 17 00:00:00 2001 From: Shu Ding Date: Fri, 21 Jul 2023 13:05:40 +0200 Subject: [PATCH 1/3] refactor stream utils --- .../next/src/server/app-render/app-render.tsx | 49 ++++++------------- .../app-render/server-inserted-html.tsx | 29 +++++++++++ .../stream-utils/node-web-streams-helper.ts | 42 +++++++--------- .../app/streaming-rsc/inserted-html.js | 11 +++++ .../rsc-basic/app/streaming-rsc/page.js | 2 + 5 files changed, 77 insertions(+), 56 deletions(-) create mode 100644 packages/next/src/server/app-render/server-inserted-html.tsx create mode 100644 test/e2e/app-dir/rsc-basic/app/streaming-rsc/inserted-html.js diff --git a/packages/next/src/server/app-render/app-render.tsx b/packages/next/src/server/app-render/app-render.tsx index 1a5b951c8a52d..76bb97566e498 100644 --- a/packages/next/src/server/app-render/app-render.tsx +++ b/packages/next/src/server/app-render/app-render.tsx @@ -35,7 +35,6 @@ import { canSegmentBeOverridden, matchSegment, } from '../../client/components/match-segments' -import { ServerInsertedHTMLContext } from '../../shared/lib/server-inserted-html' import { stripInternalQueries } from '../internal-utils' import { NEXT_ROUTER_PREFETCH, @@ -80,6 +79,7 @@ import { warn } from '../../build/output/log' import { appendMutableCookies } from '../web/spec-extension/adapters/request-cookies' import { ComponentsType } from '../../build/webpack/loaders/next-app-loader' import { ModuleReference } from '../../build/webpack/loaders/metadata/types' +import { createServerInsertedHTML } from './server-inserted-html' export const isEdgeRuntime = process.env.NEXT_RUNTIME === 'edge' @@ -1435,29 +1435,11 @@ export async function renderToHTMLOrFlight( const { HeadManagerContext } = require('../../shared/lib/head-manager-context') as typeof import('../../shared/lib/head-manager-context') - const serverInsertedHTMLCallbacks: Set<() => React.ReactNode> = new Set() - function InsertedHTML({ children }: { children: JSX.Element }) { - // Reset addInsertedHtmlCallback on each render - const addInsertedHtml = React.useCallback( - (handler: () => React.ReactNode) => { - serverInsertedHTMLCallbacks.add(handler) - }, - [] - ) - return ( - - - {children} - - - ) - } + // On each render, create a new `ServerInsertedHTML` context to capture + // injected nodes from user code (`useServerInsertedHTML`). + const { ServerInsertedHTMLProvider, renderServerInsertedHTML } = + createServerInsertedHTML() getTracer().getRootSpanAttributes()?.set('next.route', pagePath) const bodyResult = getTracer().wrap( @@ -1491,9 +1473,16 @@ export async function renderToHTMLOrFlight( })) const content = ( - - - + + + + + ) let polyfillsFlushed = false @@ -1530,13 +1519,6 @@ export async function renderToHTMLOrFlight( ReactDOMServer: require('react-dom/server.edge'), element: ( <> - {Array.from(serverInsertedHTMLCallbacks).map( - (callback, index) => ( - - {callback()} - - ) - )} {polyfillsFlushed ? null : polyfills?.map((polyfill) => { @@ -1550,6 +1532,7 @@ export async function renderToHTMLOrFlight( /> ) })} + {renderServerInsertedHTML()} {errorMetaTags} ), diff --git a/packages/next/src/server/app-render/server-inserted-html.tsx b/packages/next/src/server/app-render/server-inserted-html.tsx new file mode 100644 index 0000000000000..f044c24feaba3 --- /dev/null +++ b/packages/next/src/server/app-render/server-inserted-html.tsx @@ -0,0 +1,29 @@ +// Provider for the `useServerInsertedHTML` API to register callbacks to insert +// elements into the HTML stream. + +import React from 'react' +import { ServerInsertedHTMLContext } from '../../shared/lib/server-inserted-html' + +export function createServerInsertedHTML() { + const serverInsertedHTMLCallbacks: (() => React.ReactNode)[] = [] + const addInsertedHtml = (handler: () => React.ReactNode) => { + serverInsertedHTMLCallbacks.push(handler) + } + + return { + ServerInsertedHTMLProvider({ children }: { children: JSX.Element }) { + return ( + + {children} + + ) + }, + renderServerInsertedHTML() { + return serverInsertedHTMLCallbacks.map((callback, index) => ( + + {callback()} + + )) + }, + } +} diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index 7f6312965902e..d1ee865f38ac6 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -39,17 +39,14 @@ export function readableStreamTee( const writer2 = transformStream2.writable.getWriter() const reader = readable.getReader() - function read() { - reader.read().then(({ done, value }) => { - if (done) { - writer.close() - writer2.close() - return - } - writer.write(value) - writer2.write(value) - read() - }) + async function read() { + const { done, value } = await reader.read() + if (done) { + await Promise.all([writer.close(), writer2.close()]) + return + } + await Promise.all([writer.write(value), writer2.write(value)]) + await read() } read() @@ -72,15 +69,14 @@ export function chainStreams( } export function streamFromArray(strings: string[]): ReadableStream { - // Note: we use a TransformStream here instead of instantiating a ReadableStream - // because the built-in ReadableStream polyfill runs strings through TextEncoder. - const { readable, writable } = new TransformStream() - - const writer = writable.getWriter() - strings.forEach((str) => writer.write(encodeText(str))) - writer.close() - - return readable + return new ReadableStream({ + start(controller) { + for (const str of strings) { + controller.enqueue(encodeText(str)) + } + controller.close() + }, + }) } export async function streamToString( @@ -218,7 +214,7 @@ function createHeadInsertionTransformStream( // Suffix after main body content - scripts before , // but wait for the major chunks to be enqueued. -export function createDeferredSuffixStream( +function createDeferredSuffixStream( suffix: string ): TransformStream { let suffixFlushed = false @@ -227,7 +223,7 @@ export function createDeferredSuffixStream( return new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk) - if (!suffixFlushed && suffix) { + if (!suffixFlushed && suffix.length) { suffixFlushed = true suffixFlushTask = new Promise((res) => { // NOTE: streaming flush @@ -242,7 +238,7 @@ export function createDeferredSuffixStream( }, flush(controller) { if (suffixFlushTask) return suffixFlushTask - if (!suffixFlushed && suffix) { + if (!suffixFlushed && suffix.length) { suffixFlushed = true controller.enqueue(encodeText(suffix)) } diff --git a/test/e2e/app-dir/rsc-basic/app/streaming-rsc/inserted-html.js b/test/e2e/app-dir/rsc-basic/app/streaming-rsc/inserted-html.js new file mode 100644 index 0000000000000..7b44e7d268e7a --- /dev/null +++ b/test/e2e/app-dir/rsc-basic/app/streaming-rsc/inserted-html.js @@ -0,0 +1,11 @@ +'use client' + +import { useServerInsertedHTML } from 'next/navigation' + +export function Inserted() { + useServerInsertedHTML(() => { + return

inserted_data

+ }) + + return null +} diff --git a/test/e2e/app-dir/rsc-basic/app/streaming-rsc/page.js b/test/e2e/app-dir/rsc-basic/app/streaming-rsc/page.js index 9c700ba2b0294..fa332945959a0 100644 --- a/test/e2e/app-dir/rsc-basic/app/streaming-rsc/page.js +++ b/test/e2e/app-dir/rsc-basic/app/streaming-rsc/page.js @@ -1,6 +1,7 @@ import { Suspense } from 'react' import { createDataFetcher } from '../../lib/data' import Nav from '../../components/nav' +import { Inserted } from './inserted-html' const Data = createDataFetcher('next_streaming_data', { timeout: 500, @@ -9,6 +10,7 @@ const Data = createDataFetcher('next_streaming_data', { export default function Page() { return (
+
From 4fb871ebdc1f21acd077186cd9c33950979c6579 Mon Sep 17 00:00:00 2001 From: Shu Ding Date: Fri, 21 Jul 2023 15:28:59 +0200 Subject: [PATCH 2/3] optimize streams --- .../stream-utils/node-web-streams-helper.ts | 81 ++++++++++--------- .../app/streaming-rsc/inserted-html.js | 11 --- .../rsc-basic/app/streaming-rsc/page.js | 2 - 3 files changed, 45 insertions(+), 49 deletions(-) delete mode 100644 test/e2e/app-dir/rsc-basic/app/streaming-rsc/inserted-html.js diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index d1ee865f38ac6..135829da2cb6b 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -98,19 +98,19 @@ export async function streamToString( } } -export function createBufferedTransformStream( - transform: (v: string) => string | Promise = (v) => v -): TransformStream { - let bufferedString = '' +export function createBufferedTransformStream(): TransformStream< + Uint8Array, + Uint8Array +> { + let bufferedBytes: Uint8Array = new Uint8Array() let pendingFlush: Promise | null = null const flushBuffer = (controller: TransformStreamDefaultController) => { if (!pendingFlush) { pendingFlush = new Promise((resolve) => { setTimeout(async () => { - const buffered = await transform(bufferedString) - controller.enqueue(encodeText(buffered)) - bufferedString = '' + controller.enqueue(bufferedBytes) + bufferedBytes = new Uint8Array() pendingFlush = null resolve() }, 0) @@ -119,11 +119,14 @@ export function createBufferedTransformStream( return pendingFlush } - const textDecoder = new TextDecoder() - return new TransformStream({ transform(chunk, controller) { - bufferedString += decodeText(chunk, textDecoder) + const newBufferedBytes = new Uint8Array( + bufferedBytes.length + chunk.byteLength + ) + newBufferedBytes.set(bufferedBytes) + newBufferedBytes.set(chunk, bufferedBytes.length) + bufferedBytes = newBufferedBytes flushBuffer(controller) }, @@ -141,7 +144,6 @@ export function createInsertedHTMLStream( return new TransformStream({ async transform(chunk, controller) { const insertedHTMLChunk = encodeText(await getServerInsertedHTML()) - controller.enqueue(insertedHTMLChunk) controller.enqueue(chunk) }, @@ -264,6 +266,12 @@ export function createInlineDataStream( // the safe timing to pipe the data stream, this extra tick is // necessary. dataStreamFinished = new Promise((res) => + // We use `setTimeout` here to ensure that it's inserted after flushing + // the shell. Note that this implementation might get stale if impl + // details of Fizz change in the future. + // Also we are not using `setImmediate` here because it's not available + // broadly in all runtimes, for example some edge workers might not + // have it. setTimeout(async () => { try { while (true) { @@ -289,7 +297,12 @@ export function createInlineDataStream( }) } -export function createSuffixStream( +/** + * This transform stream moves the suffix to the end of the stream, so results + * like `` will be transformed to + * ``. + */ +function createMoveSuffixStream( suffix: string ): TransformStream { let foundSuffix = false @@ -319,34 +332,34 @@ export function createSuffixStream( }) } +const BYTES_HTML_TAG = new TextEncoder().encode(' FlightRouterState ): TransformStream { let foundHtml = false let foundBody = false - const textDecoder = new TextDecoder() return new TransformStream({ async transform(chunk, controller) { - if (!foundHtml || !foundBody) { - const content = decodeText(chunk, textDecoder) - if (!foundHtml && content.includes(' v === chunk[i])) { + foundHtml = true + } + if (!foundBody && BYTES_BODY_TAG.every((v, i) => v === chunk[i])) { + foundBody = true } controller.enqueue(chunk) }, flush(controller) { - const missingTags = [ - foundHtml ? null : 'html', - foundBody ? null : 'body', - ].filter(nonNullable) + // If html or body tag is missing, we need to inject a script to notify + // the client. + if (!foundHtml || !foundBody) { + const missingTags = [ + foundHtml ? null : 'html', + foundBody ? null : 'body', + ].filter(nonNullable) - if (missingTags.length > 0) { controller.enqueue( encodeText( `