From 378f44b9abdc3c40e0f685c5e76aa0e33f0a9642 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Thu, 15 Oct 2020 11:31:48 -0400 Subject: [PATCH 1/3] support for experimental defer-stream --- integrationTests/ts/package.json | 2 +- package-lock.json | 6 +- package.json | 2 +- src/__tests__/http-test.ts | 308 ++++++++++++++++++++++++++++++- src/index.ts | 124 +++++++++++-- src/isAsyncIterable.ts | 9 + 6 files changed, 426 insertions(+), 25 deletions(-) create mode 100644 src/isAsyncIterable.ts diff --git a/integrationTests/ts/package.json b/integrationTests/ts/package.json index ef8f9cec..d1aef6f8 100644 --- a/integrationTests/ts/package.json +++ b/integrationTests/ts/package.json @@ -6,7 +6,7 @@ "dependencies": { "@types/node": "14.0.13", "express-graphql": "file:../express-graphql.tgz", - "graphql": "14.7.0", + "graphql": "15.4.0-experimental-stream-defer.1", "typescript-3.4": "npm:typescript@3.4.x", "typescript-3.5": "npm:typescript@3.5.x", "typescript-3.6": "npm:typescript@3.6.x", diff --git a/package-lock.json b/package-lock.json index 17b27e05..75ecb61d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3007,9 +3007,9 @@ } }, "graphql": { - "version": "15.4.0", - "resolved": "https://registry.npmjs.org/graphql/-/graphql-15.4.0.tgz", - "integrity": "sha512-EB3zgGchcabbsU9cFe1j+yxdzKQKAbGUWRb13DsrsMN1yyfmmIq+2+L5MqVWcDCE4V89R5AyUOi7sMOGxdsYtA==", + "version": "15.4.0-experimental-stream-defer.1", + "resolved": "https://registry.npmjs.org/graphql/-/graphql-15.4.0-experimental-stream-defer.1.tgz", + "integrity": "sha512-zlGgY7aLlIofjO0CfTpCYK/tMccnj+5jvjnkTnW5qOxYhgEltuCvpMNYOJ67gz6L1flTIigt5BVEM8JExgtW3w==", "dev": true }, "graphql-language-service-interface": { diff --git a/package.json b/package.json index 1e910800..7cffd704 100644 --- a/package.json +++ b/package.json @@ -83,7 +83,7 @@ "eslint-plugin-node": "11.1.0", "express": "4.17.1", "graphiql": "1.0.6", - "graphql": "15.4.0", + "graphql": "15.4.0-experimental-stream-defer.1", "mocha": "8.2.1", "multer": "1.4.2", "nyc": "15.1.0", diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index dfab1ebc..b6087192 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -25,6 +25,7 @@ import { } from 'graphql'; import { graphqlHTTP } from '../index'; +import { isAsyncIterable } from '../isAsyncIterable'; type Middleware = (req: any, res: any, next: () => void) => unknown; type Server = () => { @@ -1027,6 +1028,60 @@ function runTests(server: Server) { errors: [{ message: 'Must provide query string.' }], }); }); + + it('allows for streaming results with @defer', async () => { + const app = server(); + const fakeFlush = sinon.fake(); + + app.use((_, res, next) => { + res.flush = fakeFlush; + next(); + }); + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ ...frag @defer(label: "deferLabel") } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(fakeFlush.callCount).to.equal(2); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 26', + '', + '{"data":{},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 78', + '', + '{"data":{"test":"Hello World"},"path":[],"label":"deferLabel","hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); describe('Pretty printing', () => { @@ -1109,6 +1164,62 @@ function runTests(server: Server) { expect(unprettyResponse.text).to.equal('{"data":{"test":"Hello World"}}'); }); + it('supports pretty printing async iterable requests', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + pretty: true, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 35', + '', + ['{', ' "data": {},', ' "hasNext": true', '}'].join('\n'), + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 79', + '', + [ + '{', + ' "data": {', + ' "test": "Hello World"', + ' },', + ' "path": [],', + ' "hasNext": false', + '}', + ].join('\n'), + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); it('will send request and response when using thunk', async () => { @@ -1260,6 +1371,108 @@ function runTests(server: Server) { }); }); + it('allows for custom error formatting in initial payload of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + customFormatErrorFn(error) { + return { message: 'Custom error format: ' + error.message }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ thrower, ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 94', + '', + '{"errors":[{"message":"Custom error format: Throws!"}],"data":{"thrower":null},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 57', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + + it('allows for custom error formatting in subsequent payloads of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + customFormatErrorFn(error) { + return { message: 'Custom error format: ' + error.message }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ test(who: "World"), ...frag @defer } fragment frag on QueryRoot { thrower }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 46', + '', + '{"data":{"test":"Hello World"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 105', + '', + '{"data":{"thrower":null},"path":[],"errors":[{"message":"Custom error format: Throws!"}],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + it('allows for custom error formatting to elaborate', async () => { const app = server(); @@ -2100,6 +2313,10 @@ function runTests(server: Server) { async customExecuteFn(args) { seenExecuteArgs = args; const result = await Promise.resolve(execute(args)); + // istanbul ignore if this test query will never return an async iterable + if (isAsyncIterable(result)) { + return result; + } return { ...result, data: { @@ -2253,6 +2470,57 @@ function runTests(server: Server) { }); }); + it('allows for custom extensions in initial and subsequent payloads of async iterator', async () => { + const app = server(); + + app.post( + urlString(), + graphqlHTTP({ + schema: TestSchema, + extensions({ result }) { + return { preservedResult: { ...result } }; + }, + }), + ); + + const req = app + .request() + .post(urlString()) + .send({ + query: + '{ hello: test(who: "Rob"), ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + }) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + const response = await req; + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 124', + '', + '{"data":{"hello":"Hello Rob"},"hasNext":true,"extensions":{"preservedResult":{"data":{"hello":"Hello Rob"},"hasNext":true}}}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 148', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false,"extensions":{"preservedResult":{"data":{"test":"Hello World"},"path":[],"hasNext":false}}}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); + it('extension function may be async', async () => { const app = server(); @@ -2293,12 +2561,44 @@ function runTests(server: Server) { const response = await app .request() - .get(urlString({ query: '{test}', raw: '' })) - .set('Accept', 'text/html'); + .get( + urlString({ + query: + '{ hello: test(who: "Rob"), ...frag @defer } fragment frag on QueryRoot { test(who: "World") }', + raw: '', + }), + ) + .set('Accept', 'text/html') + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); expect(response.status).to.equal(200); - expect(response.type).to.equal('application/json'); - expect(response.text).to.equal('{"data":{"test":"Hello World"}}'); + expect(response.type).to.equal('multipart/mixed'); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 45', + '', + '{"data":{"hello":"Hello Rob"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 57', + '', + '{"data":{"test":"Hello World"},"path":[],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); }); }); } diff --git a/src/index.ts b/src/index.ts index 10a74752..d9af3ae8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ -import type { IncomingMessage, ServerResponse } from 'http'; +import type { IncomingMessage } from 'http'; +import { ServerResponse } from 'http'; import type { ASTVisitor, @@ -8,6 +9,9 @@ import type { ExecutionArgs, ExecutionResult, FormattedExecutionResult, + ExecutionPatchResult, + FormattedExecutionPatchResult, + AsyncExecutionResult, GraphQLSchema, GraphQLFieldResolver, GraphQLTypeResolver, @@ -30,12 +34,16 @@ import { import type { GraphiQLOptions, GraphiQLData } from './renderGraphiQL'; import { parseBody } from './parseBody'; +import { isAsyncIterable } from './isAsyncIterable'; import { renderGraphiQL } from './renderGraphiQL'; // `url` is always defined for IncomingMessage coming from http.Server type Request = IncomingMessage & { url: string }; -type Response = ServerResponse & { json?: (data: unknown) => void }; +type Response = ServerResponse & { + json?: (data: unknown) => void; + flush?: () => void; +}; type MaybePromise = Promise | T; /** @@ -94,7 +102,9 @@ export interface OptionsData { * An optional function which will be used to execute instead of default `execute` * from `graphql-js`. */ - customExecuteFn?: (args: ExecutionArgs) => MaybePromise; + customExecuteFn?: ( + args: ExecutionArgs, + ) => MaybePromise>; /** * An optional function which will be used to format any errors produced by @@ -172,7 +182,7 @@ export interface RequestInfo { /** * The result of executing the operation. */ - result: FormattedExecutionResult; + result: AsyncExecutionResult; /** * A value to pass as the context to the graphql() function. @@ -198,7 +208,10 @@ export function graphqlHTTP(options: Options): Middleware { let showGraphiQL = false; let graphiqlOptions; let formatErrorFn = formatError; + let extensionsFn; let pretty = false; + let documentAST: DocumentNode; + let executeResult; let result: ExecutionResult; try { @@ -227,7 +240,6 @@ export function graphqlHTTP(options: Options): Middleware { const fieldResolver = optionsData.fieldResolver; const typeResolver = optionsData.typeResolver; const graphiql = optionsData.graphiql ?? false; - const extensionsFn = optionsData.extensions; const context = optionsData.context ?? request; const parseFn = optionsData.customParseFn ?? parse; const executeFn = optionsData.customExecuteFn ?? execute; @@ -259,6 +271,23 @@ export function graphqlHTTP(options: Options): Middleware { graphiqlOptions = graphiql; } + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (optionsData.extensions) { + extensionsFn = (payload: AsyncExecutionResult) => { + /* istanbul ignore next condition not reachable, required for flow */ + if (optionsData.extensions) { + return optionsData.extensions({ + document: documentAST, + variables, + operationName, + result: payload, + context, + }); + } + }; + } + // If there is no query, but GraphiQL will be displayed, do not produce // a result, otherwise return a 400: Bad Request. if (query == null) { @@ -278,7 +307,6 @@ export function graphqlHTTP(options: Options): Middleware { } // Parse source to AST, reporting any syntax error. - let documentAST; try { documentAST = parseFn(new Source(query, 'GraphQL request')); } catch (syntaxError: unknown) { @@ -324,7 +352,7 @@ export function graphqlHTTP(options: Options): Middleware { // Perform the execution, reporting any errors creating the context. try { - result = await executeFn({ + executeResult = await executeFn({ schema, document: documentAST, rootValue, @@ -341,16 +369,18 @@ export function graphqlHTTP(options: Options): Middleware { }); } - // Collect and apply any metadata extensions if a function was provided. - // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (isAsyncIterable(executeResult)) { + // Get first payload from AsyncIterator. http status will reflect status + // of this payload. + const asyncIterator = getAsyncIterator(executeResult); + const { value } = await asyncIterator.next(); + result = value; + } else { + result = executeResult; + } + if (extensionsFn) { - const extensions = await extensionsFn({ - document: documentAST, - variables, - operationName, - result, - context, - }); + const extensions = await extensionsFn(result); if (extensions != null) { result = { ...result, extensions }; @@ -403,6 +433,31 @@ export function graphqlHTTP(options: Options): Middleware { errors: result.errors?.map(formatErrorFn), }; + if (isAsyncIterable(executeResult)) { + response.setHeader('Content-Type', 'multipart/mixed; boundary="-"'); + sendPartialResponse(pretty, response, formattedResult); + for await (let payload of executeResult) { + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (extensionsFn) { + const extensions = await extensionsFn(payload); + + if (extensions != null) { + payload = { ...payload, extensions }; + } + } + const formattedPayload: FormattedExecutionPatchResult = { + // first payload is already consumed, all subsequent payloads typed as ExecutionPatchResult + ...(payload as ExecutionPatchResult), + errors: payload.errors?.map(formatErrorFn), + }; + sendPartialResponse(pretty, response, formattedPayload); + } + response.write('\r\n-----\r\n'); + response.end(); + return; + } + // If allowed to show GraphiQL, present it instead of JSON. if (showGraphiQL) { return respondWithGraphiQL( @@ -523,6 +578,36 @@ function canDisplayGraphiQL(request: Request, params: GraphQLParams): boolean { return !params.raw && accepts(request).types(['json', 'html']) === 'html'; } +/** + * Helper function for sending part of a multi-part response using only the core Node server APIs. + */ +function sendPartialResponse( + pretty: boolean, + response: Response, + result: FormattedExecutionResult | FormattedExecutionPatchResult, +): void { + const json = JSON.stringify(result, null, pretty ? 2 : 0); + const chunk = Buffer.from(json, 'utf8'); + const data = [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: ' + String(chunk.length), + '', + chunk, + '', + ].join('\r\n'); + response.write(data); + // flush response if compression middleware is used + if ( + typeof response.flush === 'function' && + // @ts-expect-error deprecated flush method is implemented on ServerResponse but not typed + response.flush !== ServerResponse.prototype.flush + ) { + response.flush(); + } +} + /** * Helper function for sending a response using only the core Node server APIs. */ @@ -539,3 +624,10 @@ function devAssert(condition: unknown, message: string): asserts condition { throw new Error(message); } } + +function getAsyncIterator( + asyncIterable: AsyncIterable, +): AsyncIterator { + const method = asyncIterable[Symbol.asyncIterator]; + return method.call(asyncIterable); +} diff --git a/src/isAsyncIterable.ts b/src/isAsyncIterable.ts new file mode 100644 index 00000000..7542281e --- /dev/null +++ b/src/isAsyncIterable.ts @@ -0,0 +1,9 @@ +export function isAsyncIterable( + maybeAsyncIterable: any, + // eslint-disable-next-line no-undef +): maybeAsyncIterable is AsyncIterable { + if (maybeAsyncIterable == null || typeof maybeAsyncIterable !== 'object') { + return false; + } + return typeof maybeAsyncIterable[Symbol.asyncIterator] === 'function'; +} From 928b129403db849e57bc10eb8a5760dbf1829776 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 11 Nov 2020 17:10:29 -0500 Subject: [PATCH 2/3] catch errors from AsyncIterable --- src/__tests__/http-test.ts | 80 +++++++++++++++++++++++++++ src/__tests__/isAsyncIterable-test.ts | 20 +++++++ src/index.ts | 80 ++++++++++++++++++--------- src/isAsyncIterable.ts | 1 - 4 files changed, 153 insertions(+), 28 deletions(-) create mode 100644 src/__tests__/isAsyncIterable-test.ts diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index b6087192..eb97113b 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -2356,6 +2356,86 @@ function runTests(server: Server) { '{"errors":[{"message":"I did something wrong"}]}', ); }); + + it('catches first error thrown from custom execute function that returns an AsyncIterable', async () => { + const app = server(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + customExecuteFn() { + return { + [Symbol.asyncIterator]: () => ({ + next: () => Promise.reject(new Error('I did something wrong')), + }), + }; + }, + })), + ); + + const response = await app.request().get(urlString({ query: '{test}' })); + expect(response.status).to.equal(400); + expect(response.text).to.equal( + '{"errors":[{"message":"I did something wrong"}]}', + ); + }); + + it('catches subsequent errors thrown from custom execute function that returns an AsyncIterable', async () => { + const app = server(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + async *customExecuteFn() { + await new Promise((r) => { + setTimeout(r, 1); + }); + yield { + data: { + test2: 'Modification', + }, + hasNext: true, + }; + throw new Error('I did something wrong'); + }, + })), + ); + + const response = await app + .request() + .get(urlString({ query: '{test}' })) + .parse((res, cb) => { + res.on('data', (data) => { + res.text = `${res.text || ''}${data.toString('utf8') as string}`; + }); + res.on('end', (err) => { + cb(err, null); + }); + }); + + expect(response.status).to.equal(200); + expect(response.text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 48', + '', + '{"data":{"test2":"Modification"},"hasNext":true}', + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 64', + '', + '{"errors":[{"message":"I did something wrong"}],"hasNext":false}', + '', + '-----', + '', + ].join('\r\n'), + ); + }); }); describe('Custom parse function', () => { diff --git a/src/__tests__/isAsyncIterable-test.ts b/src/__tests__/isAsyncIterable-test.ts new file mode 100644 index 00000000..6c5c2696 --- /dev/null +++ b/src/__tests__/isAsyncIterable-test.ts @@ -0,0 +1,20 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { isAsyncIterable } from '../isAsyncIterable'; + +describe('isAsyncIterable', () => { + it('returns false for null', () => { + expect(isAsyncIterable(null)).to.equal(false); + }); + it('returns false for non-object', () => { + expect(isAsyncIterable(1)).to.equal(false); + }); + it('returns true for async generator function', () => { + // istanbul ignore next: test function + // eslint-disable-next-line @typescript-eslint/no-empty-function + const myGen = async function* () {}; + const result = myGen(); + expect(isAsyncIterable(result)).to.equal(true); + }); +}); diff --git a/src/index.ts b/src/index.ts index d9af3ae8..7624e168 100644 --- a/src/index.ts +++ b/src/index.ts @@ -275,7 +275,7 @@ export function graphqlHTTP(options: Options): Middleware { // https://graphql.github.io/graphql-spec/#sec-Response-Format if (optionsData.extensions) { extensionsFn = (payload: AsyncExecutionResult) => { - /* istanbul ignore next condition not reachable, required for flow */ + /* istanbul ignore else: condition not reachable, required for typescript */ if (optionsData.extensions) { return optionsData.extensions({ document: documentAST, @@ -285,6 +285,8 @@ export function graphqlHTTP(options: Options): Middleware { context, }); } + /* istanbul ignore next: condition not reachable, required for typescript */ + return undefined; }; } @@ -362,6 +364,18 @@ export function graphqlHTTP(options: Options): Middleware { fieldResolver, typeResolver, }); + + if (isAsyncIterable(executeResult)) { + // Get first payload from AsyncIterator. http status will reflect status + // of this payload. + const asyncIterator = getAsyncIterator( + executeResult, + ); + const { value } = await asyncIterator.next(); + result = value; + } else { + result = executeResult; + } } catch (contextError: unknown) { // Return 400: Bad Request if any execution context errors exist. throw httpError(400, 'GraphQL execution context error.', { @@ -369,16 +383,6 @@ export function graphqlHTTP(options: Options): Middleware { }); } - if (isAsyncIterable(executeResult)) { - // Get first payload from AsyncIterator. http status will reflect status - // of this payload. - const asyncIterator = getAsyncIterator(executeResult); - const { value } = await asyncIterator.next(); - result = value; - } else { - result = executeResult; - } - if (extensionsFn) { const extensions = await extensionsFn(result); @@ -412,9 +416,12 @@ export function graphqlHTTP(options: Options): Middleware { undefined, error, ); - result = { data: undefined, errors: [graphqlError] }; + executeResult = result = { data: undefined, errors: [graphqlError] }; } else { - result = { data: undefined, errors: error.graphqlErrors }; + executeResult = result = { + data: undefined, + errors: error.graphqlErrors, + }; } } @@ -436,22 +443,41 @@ export function graphqlHTTP(options: Options): Middleware { if (isAsyncIterable(executeResult)) { response.setHeader('Content-Type', 'multipart/mixed; boundary="-"'); sendPartialResponse(pretty, response, formattedResult); - for await (let payload of executeResult) { - // Collect and apply any metadata extensions if a function was provided. - // https://graphql.github.io/graphql-spec/#sec-Response-Format - if (extensionsFn) { - const extensions = await extensionsFn(payload); - - if (extensions != null) { - payload = { ...payload, extensions }; + try { + for await (let payload of executeResult) { + // Collect and apply any metadata extensions if a function was provided. + // https://graphql.github.io/graphql-spec/#sec-Response-Format + if (extensionsFn) { + const extensions = await extensionsFn(payload); + + if (extensions != null) { + payload = { ...payload, extensions }; + } } + const formattedPayload: FormattedExecutionPatchResult = { + // first payload is already consumed, all subsequent payloads typed as ExecutionPatchResult + ...(payload as ExecutionPatchResult), + errors: payload.errors?.map(formatErrorFn), + }; + sendPartialResponse(pretty, response, formattedPayload); } - const formattedPayload: FormattedExecutionPatchResult = { - // first payload is already consumed, all subsequent payloads typed as ExecutionPatchResult - ...(payload as ExecutionPatchResult), - errors: payload.errors?.map(formatErrorFn), - }; - sendPartialResponse(pretty, response, formattedPayload); + } catch (rawError: unknown) { + /* istanbul ignore next: Thrown by underlying library. */ + const error = + rawError instanceof Error ? rawError : new Error(String(rawError)); + const graphqlError = new GraphQLError( + error.message, + undefined, + undefined, + undefined, + undefined, + error, + ); + sendPartialResponse(pretty, response, { + data: undefined, + errors: [formatErrorFn(graphqlError)], + hasNext: false, + }); } response.write('\r\n-----\r\n'); response.end(); diff --git a/src/isAsyncIterable.ts b/src/isAsyncIterable.ts index 7542281e..b5c8ce65 100644 --- a/src/isAsyncIterable.ts +++ b/src/isAsyncIterable.ts @@ -1,6 +1,5 @@ export function isAsyncIterable( maybeAsyncIterable: any, - // eslint-disable-next-line no-undef ): maybeAsyncIterable is AsyncIterable { if (maybeAsyncIterable == null || typeof maybeAsyncIterable !== 'object') { return false; From ce8429e5c15172b394e65d5a27491611b5fb354e Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Tue, 17 Nov 2020 15:55:21 -0500 Subject: [PATCH 3/3] call return on underlying async iterator when connection closes --- src/__tests__/http-test.ts | 142 ++++++++++++++++++++++++++++++++++++- src/index.ts | 47 +++++++++--- 2 files changed, 175 insertions(+), 14 deletions(-) diff --git a/src/__tests__/http-test.ts b/src/__tests__/http-test.ts index eb97113b..1fec0933 100644 --- a/src/__tests__/http-test.ts +++ b/src/__tests__/http-test.ts @@ -1,4 +1,5 @@ import zlib from 'zlib'; +import type http from 'http'; import type { Server as Restify } from 'restify'; import connect from 'connect'; @@ -81,6 +82,12 @@ function urlString(urlParams?: { [param: string]: string }): string { return string; } +function sleep(ms = 1) { + return new Promise((r) => { + setTimeout(r, ms); + }); +} + describe('GraphQL-HTTP tests for connect', () => { runTests(() => { const app = connect(); @@ -2389,9 +2396,7 @@ function runTests(server: Server) { graphqlHTTP(() => ({ schema: TestSchema, async *customExecuteFn() { - await new Promise((r) => { - setTimeout(r, 1); - }); + await sleep(); yield { data: { test2: 'Modification', @@ -2436,6 +2441,137 @@ function runTests(server: Server) { ].join('\r\n'), ); }); + + it('calls return on underlying async iterable when connection is closed', async () => { + const app = server(); + const fakeReturn = sinon.fake(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + // custom iterable keeps yielding until return is called + customExecuteFn() { + let returned = false; + return { + [Symbol.asyncIterator]: () => ({ + next: async () => { + await sleep(); + if (returned) { + return { value: undefined, done: true }; + } + return { + value: { data: { test: 'Hello, World' }, hasNext: true }, + done: false, + }; + }, + return: () => { + returned = true; + fakeReturn(); + return Promise.resolve({ value: undefined, done: true }); + }, + }), + }; + }, + })), + ); + + let text = ''; + const request = app + .request() + .get(urlString({ query: '{test}' })) + .parse((res, cb) => { + res.on('data', (data) => { + text = `${text}${data.toString('utf8') as string}`; + ((res as unknown) as http.IncomingMessage).destroy(); + cb(new Error('Aborted connection'), null); + }); + }); + + try { + await request; + } catch (e: unknown) { + // ignore aborted error + } + // sleep to allow time for return function to be called + await sleep(2); + expect(text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 47', + '', + '{"data":{"test":"Hello, World"},"hasNext":true}', + '', + ].join('\r\n'), + ); + expect(fakeReturn.callCount).to.equal(1); + }); + + it('handles return function on async iterable that throws', async () => { + const app = server(); + + app.get( + urlString(), + graphqlHTTP(() => ({ + schema: TestSchema, + // custom iterable keeps yielding until return is called + customExecuteFn() { + let returned = false; + return { + [Symbol.asyncIterator]: () => ({ + next: async () => { + await sleep(); + if (returned) { + return { value: undefined, done: true }; + } + return { + value: { data: { test: 'Hello, World' }, hasNext: true }, + done: false, + }; + }, + return: () => { + returned = true; + return Promise.reject(new Error('Throws!')); + }, + }), + }; + }, + })), + ); + + let text = ''; + const request = app + .request() + .get(urlString({ query: '{test}' })) + .parse((res, cb) => { + res.on('data', (data) => { + text = `${text}${data.toString('utf8') as string}`; + ((res as unknown) as http.IncomingMessage).destroy(); + cb(new Error('Aborted connection'), null); + }); + }); + + try { + await request; + } catch (e: unknown) { + // ignore aborted error + } + // sleep to allow return function to be called + await sleep(2); + expect(text).to.equal( + [ + '', + '---', + 'Content-Type: application/json; charset=utf-8', + 'Content-Length: 47', + '', + '{"data":{"test":"Hello, World"},"hasNext":true}', + '', + ].join('\r\n'), + ); + }); }); describe('Custom parse function', () => { diff --git a/src/index.ts b/src/index.ts index 7624e168..05161631 100644 --- a/src/index.ts +++ b/src/index.ts @@ -213,6 +213,7 @@ export function graphqlHTTP(options: Options): Middleware { let documentAST: DocumentNode; let executeResult; let result: ExecutionResult; + let finishedIterable = false; try { // Parse the Request to get GraphQL request parameters. @@ -371,6 +372,23 @@ export function graphqlHTTP(options: Options): Middleware { const asyncIterator = getAsyncIterator( executeResult, ); + + response.on('close', () => { + if ( + !finishedIterable && + typeof asyncIterator.return === 'function' + ) { + asyncIterator.return().then(null, (rawError: unknown) => { + const graphqlError = getGraphQlError(rawError); + sendPartialResponse(pretty, response, { + data: undefined, + errors: [formatErrorFn(graphqlError)], + hasNext: false, + }); + }); + } + }); + const { value } = await asyncIterator.next(); result = value; } else { @@ -398,6 +416,7 @@ export function graphqlHTTP(options: Options): Middleware { rawError instanceof Error ? rawError : String(rawError), ); + // eslint-disable-next-line require-atomic-updates response.statusCode = error.status; const { headers } = error; @@ -431,6 +450,7 @@ export function graphqlHTTP(options: Options): Middleware { // the resulting JSON payload. // https://graphql.github.io/graphql-spec/#sec-Data if (response.statusCode === 200 && result.data == null) { + // eslint-disable-next-line require-atomic-updates response.statusCode = 500; } @@ -462,17 +482,7 @@ export function graphqlHTTP(options: Options): Middleware { sendPartialResponse(pretty, response, formattedPayload); } } catch (rawError: unknown) { - /* istanbul ignore next: Thrown by underlying library. */ - const error = - rawError instanceof Error ? rawError : new Error(String(rawError)); - const graphqlError = new GraphQLError( - error.message, - undefined, - undefined, - undefined, - undefined, - error, - ); + const graphqlError = getGraphQlError(rawError); sendPartialResponse(pretty, response, { data: undefined, errors: [formatErrorFn(graphqlError)], @@ -481,6 +491,7 @@ export function graphqlHTTP(options: Options): Middleware { } response.write('\r\n-----\r\n'); response.end(); + finishedIterable = true; return; } @@ -657,3 +668,17 @@ function getAsyncIterator( const method = asyncIterable[Symbol.asyncIterator]; return method.call(asyncIterable); } + +function getGraphQlError(rawError: unknown) { + /* istanbul ignore next: Thrown by underlying library. */ + const error = + rawError instanceof Error ? rawError : new Error(String(rawError)); + return new GraphQLError( + error.message, + undefined, + undefined, + undefined, + undefined, + error, + ); +}