Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream request body instead of buffering it in memory #8084

Merged
merged 10 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/lemon-lobsters-do.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@astrojs/node': patch
'astro': patch
---

Stream request body instead of buffering it in memory.
135 changes: 84 additions & 51 deletions packages/astro/src/core/app/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,84 +9,117 @@ import { App, type MatchOptions } from './index.js';

const clientAddressSymbol = Symbol.for('astro.clientAddress');

function createRequestFromNodeRequest(req: NodeIncomingMessage, body?: Uint8Array): Request {
type CreateNodeRequestOptions = {
emptyBody?: boolean;
};

type BodyProps = Partial<RequestInit>;

function createRequestFromNodeRequest(
req: NodeIncomingMessage,
options?: CreateNodeRequestOptions
): Request {
const protocol =
req.socket instanceof TLSSocket || req.headers['x-forwarded-proto'] === 'https'
? 'https'
: 'http';
const hostname = req.headers.host || req.headers[':authority'];
const url = `${protocol}://${hostname}${req.url}`;
const rawHeaders = req.headers as Record<string, any>;
const entries = Object.entries(rawHeaders);
const headers = makeRequestHeaders(req);
const method = req.method || 'GET';
let bodyProps: BodyProps = {};
const bodyAllowed = method !== 'HEAD' && method !== 'GET' && !options?.emptyBody;
if (bodyAllowed) {
bodyProps = makeRequestBody(req);
}
const request = new Request(url, {
method,
headers: new Headers(entries),
body: ['HEAD', 'GET'].includes(method) ? null : body,
headers,
...bodyProps,
});
if (req.socket?.remoteAddress) {
Reflect.set(request, clientAddressSymbol, req.socket.remoteAddress);
}
return request;
}

class NodeIncomingMessage extends IncomingMessage {
/**
* The read-only body property of the Request interface contains a ReadableStream with the body contents that have been added to the request.
*/
body?: unknown;
function makeRequestHeaders(req: NodeIncomingMessage): Headers {
const headers = new Headers();
for (const [name, value] of Object.entries(req.headers)) {
if (value === undefined) {
continue;
}
if (Array.isArray(value)) {
for (const item of value) {
headers.append(name, item);
}
} else {
headers.append(name, value);
}
}
return headers;
}

export class NodeApp extends App {
match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) {
return super.match(req instanceof Request ? req : createRequestFromNodeRequest(req), opts);
}
render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) {
function makeRequestBody(req: NodeIncomingMessage): BodyProps {
if (req.body !== undefined) {
if (typeof req.body === 'string' && req.body.length > 0) {
return super.render(
req instanceof Request ? req : createRequestFromNodeRequest(req, Buffer.from(req.body)),
routeData,
locals
);
return { body: Buffer.from(req.body) };
}

if (typeof req.body === 'object' && req.body !== null && Object.keys(req.body).length > 0) {
return super.render(
req instanceof Request
? req
: createRequestFromNodeRequest(req, Buffer.from(JSON.stringify(req.body))),
routeData,
locals
);
return { body: Buffer.from(JSON.stringify(req.body)) };
}

if ('on' in req) {
let body = Buffer.from([]);
let reqBodyComplete = new Promise((resolve, reject) => {
req.on('data', (d) => {
body = Buffer.concat([body, d]);
});
req.on('end', () => {
resolve(body);
});
req.on('error', (err) => {
reject(err);
});
});
// This covers all async iterables including Readable and ReadableStream.
if (
typeof req.body === 'object' &&
req.body !== null &&
typeof (req.body as any)[Symbol.asyncIterator] !== 'undefined'
) {
return asyncIterableToBodyProps(req.body as AsyncIterable<any>);
}
}

// Return default body.
return asyncIterableToBodyProps(req);
}

function asyncIterableToBodyProps(iterable: AsyncIterable<any>): BodyProps {
return {
// Node uses undici for the Request implementation. Undici accepts
// a non-standard async iterable for the body.
// @ts-ignore
hbgl marked this conversation as resolved.
Show resolved Hide resolved
body: iterable,
// The duplex property is required when using a ReadableStream or async
// iterable for the body. The type definitions do not include the duplex
// property because they are not up-to-date.
Comment on lines +93 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an updated version that matches the types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is sadly no updated version with the correct types. There exists an issue DefinitelyTyped/DefinitelyTyped#60924 but no fix yet (see RequestInit type definition).

// @ts-ignore
duplex: 'half',
hbgl marked this conversation as resolved.
Show resolved Hide resolved
} satisfies BodyProps;
}

class NodeIncomingMessage extends IncomingMessage {
/**
* Allow the request body to be explicitly overridden. For example, this
* is used by the Express JSON middleware.
*/
body?: unknown;
}

return reqBodyComplete.then(() => {
return super.render(
req instanceof Request ? req : createRequestFromNodeRequest(req, body),
routeData,
locals
);
export class NodeApp extends App {
match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) {
if (!(req instanceof Request)) {
req = createRequestFromNodeRequest(req, {
emptyBody: true,
});
}
return super.render(
req instanceof Request ? req : createRequestFromNodeRequest(req),
routeData,
locals
);
return super.match(req, opts);
}
render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) {
if (!(req instanceof Request)) {
req = createRequestFromNodeRequest(req);
}
return super.render(req, routeData, locals);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/integrations/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"chai": "^4.3.7",
"cheerio": "1.0.0-rc.12",
"mocha": "^9.2.2",
"node-mocks-http": "^1.12.2",
"node-mocks-http": "^1.13.0",
"undici": "^5.22.1"
}
}
45 changes: 42 additions & 3 deletions packages/integrations/node/test/api-route.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import nodejs from '../dist/index.js';
import { loadFixture, createRequestAndResponse } from './test-utils.js';
import { expect } from 'chai';
import crypto from 'node:crypto';

describe('API routes', () => {
/** @type {import('./test-utils').Fixture} */
Expand All @@ -22,9 +23,11 @@ describe('API routes', () => {
url: '/recipes',
});

handler(req, res);
req.once('async_iterator', () => {
req.send(JSON.stringify({ id: 2 }));
});

req.send(JSON.stringify({ id: 2 }));
handler(req, res);

let [buffer] = await done;

Expand All @@ -43,11 +46,47 @@ describe('API routes', () => {
url: '/binary',
});

req.once('async_iterator', () => {
req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5])));
});

handler(req, res);
req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5])));

let [out] = await done;
let arr = Array.from(new Uint8Array(out.buffer));
expect(arr).to.deep.equal([5, 4, 3, 2, 1]);
});

it('Can post large binary daya', async () => {
matthewp marked this conversation as resolved.
Show resolved Hide resolved
const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');

let { req, res, done } = createRequestAndResponse({
method: 'POST',
url: '/hash',
});

handler(req, res);

let expectedDigest = null;
req.once('async_iterator', () => {
// Send 256MB of garbage data in 256KB chunks. This should be fast (< 1sec).
let remainingBytes = 128 * 1024 * 1024;
const chunkSize = 256 * 1024;

const hash = crypto.createHash('sha256');
while (remainingBytes > 0) {
const size = Math.min(remainingBytes, chunkSize);
const chunk = Buffer.alloc(size, Math.floor(Math.random() * 256));
hash.update(chunk);
req.emit('data', chunk);
remainingBytes -= size;
}

req.emit('end');
expectedDigest = hash.digest();
});

let [out] = await done;
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import crypto from 'node:crypto';

export async function post({ request }: { request: Request }) {
const hash = crypto.createHash('sha256');

const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
for await (const chunk of iterable) {
hash.update(chunk);
}

return new Response(hash.digest(), {
headers: {
'Content-Type': 'application/octet-stream'
}
});
}
20 changes: 18 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.