Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement stream cancellation #52281

Merged
merged 19 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,27 @@ async function render(request: NextRequest, event: NextFetchEvent) {
response.headers.append('Vary', RSC_VARY_HEADER)

const writer = tranform.writable.getWriter()
result.pipe({

let innerClose: undefined | (() => void)
const target = {
write: (chunk: Uint8Array) => writer.write(chunk),
end: () => writer.close(),
destroy: (reason?: Error) => writer.abort(reason),
})

on(_event: 'close', cb: () => void) {
innerClose = cb
},
off(_event: 'close', _cb: () => void) {
innerClose = undefined
},
}
const onClose = () => {
innerClose?.()
}
// No, this cannot be replaced with `finally`, because early cancelling
// the stream will create a rejected promise, and finally will create an
// unhandled rejection.
writer.closed.then(onClose, onClose)
result.pipe(target)

return response
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import {
NodeNextResponse,
} from 'next/dist/server/base-http/node'
import { sendResponse } from 'next/dist/server/send-response'
import { NextRequestAdapter } from 'next/dist/server/web/spec-extension/adapters/next-request'
import {
NextRequestAdapter,
signalFromNodeResponse,
} from 'next/dist/server/web/spec-extension/adapters/next-request'
import { RouteHandlerManagerContext } from 'next/dist/server/future/route-handler-managers/route-handler-manager'

import { attachRequestMeta } from './next-request-helpers'
Expand Down Expand Up @@ -43,7 +46,10 @@ export default (routeModule: RouteModule) => {
}

const routeResponse = await routeModule.handle(
NextRequestAdapter.fromNodeNextRequest(req),
NextRequestAdapter.fromNodeNextRequest(
req,
signalFromNodeResponse(response)
),
context
)

Expand Down
8 changes: 6 additions & 2 deletions packages/next/src/export/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ import { NodeNextRequest } from '../server/base-http/node'
import { isAppRouteRoute } from '../lib/is-app-route-route'
import { toNodeOutgoingHttpHeaders } from '../server/web/utils'
import { RouteModuleLoader } from '../server/future/helpers/module-loader/route-module-loader'
import { NextRequestAdapter } from '../server/web/spec-extension/adapters/next-request'
import {
NextRequestAdapter,
signalFromNodeResponse,
} from '../server/web/spec-extension/adapters/next-request'
import * as ciEnvironment from '../telemetry/ci-info'

const envConfig = require('../shared/lib/runtime-config')
Expand Down Expand Up @@ -388,7 +391,8 @@ export default async function exportPage({
// Ensure that the url for the page is absolute.
req.url = `http://localhost:3000${req.url}`
const request = NextRequestAdapter.fromNodeNextRequest(
new NodeNextRequest(req)
new NodeNextRequest(req),
signalFromNodeResponse(res)
)

// Create the context for the handler. This contains the params from
Expand Down
8 changes: 7 additions & 1 deletion packages/next/src/server/base-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ import {
type RouteMatch,
} from './future/route-matches/route-match'
import { normalizeLocalePath } from '../shared/lib/i18n/normalize-locale-path'
import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request'

export type FindComponentsResult = {
components: LoadComponentsReturnType
Expand Down Expand Up @@ -1837,7 +1838,12 @@ export default abstract class Server<ServerOptions extends Options = Options> {

try {
// Handle the match and collect the response if it's a static response.
const response = await this.handlers.handle(match, req, context)
const response = await this.handlers.handle(
match,
req,
context,
signalFromNodeResponse((res as NodeNextResponse).originalResponse)
)

;(req as any).fetchMetrics = (
context.staticGenerationContext as any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export class RouteHandlerManager {
public async handle(
match: AppRouteRouteMatch,
req: BaseNextRequest,
context: RouteHandlerManagerContext
context: RouteHandlerManagerContext,
signal: AbortSignal
): Promise<Response> {
// The module supports minimal mode, load the minimal module.
const module = await RouteModuleLoader.load<RouteModule>(
Expand All @@ -33,7 +34,7 @@ export class RouteHandlerManager {
)

// Convert the BaseNextRequest to a NextRequest.
const request = NextRequestAdapter.fromBaseNextRequest(req)
const request = NextRequestAdapter.fromBaseNextRequest(req, signal)

// Get the response from the handler and send it back.
return await module.handle(request, context)
Expand Down
21 changes: 13 additions & 8 deletions packages/next/src/server/lib/route-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import { proxyRequest } from './router-utils/proxy-request'
import { getResolveRoutes } from './router-utils/resolve-routes'
import { PERMANENT_REDIRECT_STATUS } from '../../shared/lib/constants'
import { splitCookiesString, toNodeOutgoingHttpHeaders } from '../web/utils'
import { signalFromNodeRequest } from '../web/spec-extension/adapters/next-request'
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
import { getMiddlewareRouteMatcher } from '../../shared/lib/router/utils/middleware-route-matcher'
import { pipeReadable } from './server-ipc/invoke-request'
import { pipeReadable } from '../pipe-readable'

type RouteResult =
| {
Expand Down Expand Up @@ -132,7 +132,7 @@ export async function makeResolver(
serverAddr.port || 3000
}${req.url}`,
body: cloneableBody,
signal: signalFromNodeRequest(req),
signal: signalFromNodeResponse(res),
},
useCache: true,
onWarning: console.warn,
Expand Down Expand Up @@ -160,11 +160,11 @@ export async function makeResolver(
}
res.statusCode = result.response.status

for await (const chunk of result.response.body || ([] as any)) {
if (res.closed) break
res.write(chunk)
if (result.response.body) {
await pipeReadable(result.response.body, res)
} else {
res.end()
}
res.end()
} catch (err) {
console.error(err)
res.statusCode = 500
Expand Down Expand Up @@ -218,7 +218,12 @@ export async function makeResolver(
req: IncomingMessage,
res: ServerResponse
): Promise<RouteResult | void> {
const routeResult = await resolveRoutes(req, new Set(), false)
const routeResult = await resolveRoutes(
req,
new Set(),
false,
signalFromNodeResponse(res)
)
const {
matchedOutput,
bodyStream,
Expand Down
42 changes: 31 additions & 11 deletions packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import { filterReqHeaders } from './server-ipc/utils'
import { findPagesDir } from '../../lib/find-pages-dir'
import { setupFsCheck } from './router-utils/filesystem'
import { proxyRequest } from './router-utils/proxy-request'
import { invokeRequest, pipeReadable } from './server-ipc/invoke-request'
import { invokeRequest } from './server-ipc/invoke-request'
import { isAbortError, pipeReadable } from '../pipe-readable'
import { createRequestResponseMocks } from './mock-request'
import { createIpcServer, createWorker } from './server-ipc'
import { UnwrapPromise } from '../../lib/coalesced-function'
Expand All @@ -29,6 +30,7 @@ import {
PHASE_DEVELOPMENT_SERVER,
PERMANENT_REDIRECT_STATUS,
} from '../../shared/lib/constants'
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'

let initializeResult:
| undefined
Expand Down Expand Up @@ -331,14 +333,26 @@ export async function initialize(opts: {

debug('invokeRender', renderUrl, invokeHeaders)

const invokeRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
let invokeRes
try {
invokeRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
signal: signalFromNodeResponse(res),
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object (when
// the headers are flushed), then we can early exit without further
// processing.
if (isAbortError(e)) {
return
}
throw e
}

debug('invokeRender res', invokeRes.status, invokeRes.headers)

Expand Down Expand Up @@ -419,7 +433,12 @@ export async function initialize(opts: {
resHeaders,
bodyStream,
matchedOutput,
} = await resolveRoutes(req, matchedDynamicRoutes, false)
} = await resolveRoutes(
req,
matchedDynamicRoutes,
false,
signalFromNodeResponse(res)
)

if (devInstance && matchedOutput?.type === 'devVirtualFsItem') {
const origUrl = req.url || '/'
Expand Down Expand Up @@ -687,7 +706,8 @@ export async function initialize(opts: {
const { matchedOutput, parsedUrl } = await resolveRoutes(
req,
new Set(),
true
true,
signalFromNodeResponse(socket)
)

// TODO: allow upgrade requests to pages/app paths?
Expand Down
20 changes: 19 additions & 1 deletion packages/next/src/server/lib/router-utils/proxy-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ export async function proxyRequest(
await new Promise((proxyResolve, proxyReject) => {
let finished = false

// http-proxy does not properly detect a client disconnect in newer
// versions of Node.js. This is caused because it only listens for the
// `aborted` event on the our request object, but it also fully reads
// and closes the request object. Node **will not** fire `aborted` when
// the request is already closed. Listening for `close` on our response
// object will detect the disconnect, and we can abort the proxy's
// connection.
proxy.on('proxyReq', (proxyReq) => {
res.on('close', () => proxyReq.destroy())
})
proxy.on('proxyRes', (proxyRes) => {
if (res.destroyed) {
proxyRes.destroy()
} else {
res.on('close', () => proxyRes.destroy())
}
})

proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
const cleanup = (err: any) => {
// cleanup event listeners to allow clean garbage collection
Expand All @@ -59,7 +77,7 @@ export async function proxyRequest(
finished = true
proxyReject(err)

if (!res.closed) {
if (!res.destroyed) {
res.statusCode = 500
res.end('Internal Server Error')
}
Expand Down
37 changes: 28 additions & 9 deletions packages/next/src/server/lib/router-utils/resolve-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Header } from '../../../lib/load-custom-routes'
import { stringifyQuery } from '../../server-route-utils'
import { toNodeOutgoingHttpHeaders } from '../../web/utils'
import { invokeRequest } from '../server-ipc/invoke-request'
import { isAbortError } from '../../pipe-readable'
import { getCookieParser, setLazyProp } from '../../api-utils'
import { getHostname } from '../../../shared/lib/get-hostname'
import { UnwrapPromise } from '../../../lib/coalesced-function'
Expand Down Expand Up @@ -93,7 +94,8 @@ export function getResolveRoutes(
async function resolveRoutes(
req: IncomingMessage,
matchedDynamicRoutes: Set<string>,
isUpgradeReq?: boolean
isUpgradeReq: boolean,
signal: AbortSignal
): Promise<{
finished: boolean
statusCode?: number
Expand Down Expand Up @@ -453,14 +455,31 @@ export function getResolveRoutes(

debug('invoking middleware', renderUrl, invokeHeaders)

const middlewareRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
let middlewareRes
try {
middlewareRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
signal,
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object
// (when the headers are flushed), then we can early exit without
// further processing.
if (isAbortError(e)) {
return {
parsedUrl,
resHeaders,
finished: true,
}
}
throw e
}

const middlewareHeaders = toNodeOutgoingHttpHeaders(
middlewareRes.headers
) as Record<string, string | string[] | undefined>
Expand Down
33 changes: 4 additions & 29 deletions packages/next/src/server/lib/server-ipc/invoke-request.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import '../../node-polyfill-fetch'

import type { IncomingMessage } from 'http'
import type { Writable, Readable } from 'stream'
import type { Readable } from 'stream'
import { filterReqHeaders } from './utils'

export const invokeRequest = async (
targetUrl: string,
requestInit: {
headers: IncomingMessage['headers']
method: IncomingMessage['method']
signal?: AbortSignal
},
readableBody?: Readable | ReadableStream
) => {
Expand All @@ -22,10 +23,11 @@ export const invokeRequest = async (
...requestInit.headers,
}) as IncomingMessage['headers']

const invokeRes = await fetch(parsedTargetUrl.toString(), {
return await fetch(parsedTargetUrl.toString(), {
headers: invokeHeaders as any as Headers,
method: requestInit.method,
redirect: 'manual',
signal: requestInit.signal,

...(requestInit.method !== 'GET' &&
requestInit.method !== 'HEAD' &&
Expand All @@ -41,31 +43,4 @@ export const invokeRequest = async (
internal: true,
},
})

return invokeRes
}

export async function pipeReadable(
readable: ReadableStream,
writable: Writable
) {
const reader = readable.getReader()

async function doRead() {
const item = await reader.read()

if (item?.value) {
writable.write(Buffer.from(item?.value))

if ('flush' in writable) {
;(writable as any).flush()
}
}

if (!item?.done) {
return doRead()
}
}
await doRead()
writable.end()
}
Loading