From 78d20cc8cdeb8328d2922d840dc76aa51eb1ddd6 Mon Sep 17 00:00:00 2001 From: Andrey Lunyov Date: Fri, 10 Jan 2020 07:56:58 -0800 Subject: [PATCH] An optimized batch responses processing in QueryExecutor Reviewed By: josephsavona Differential Revision: D19177805 fbshipit-source-id: e97ec057b6b5b436f35632809799c588dfafc47a --- .../store/RelayModernQueryExecutor.js | 522 +++++++++++------- .../store/RelayResponseNormalizer.js | 1 + .../relay-runtime/store/RelayStoreTypes.js | 1 + ...odernEnvironment-ExecuteWithStream-test.js | 48 +- ...ment-ExecuteWithStreamedConnection-test.js | 59 +- 5 files changed, 342 insertions(+), 289 deletions(-) diff --git a/packages/relay-runtime/store/RelayModernQueryExecutor.js b/packages/relay-runtime/store/RelayModernQueryExecutor.js index cd8f81ee047c3..c7dd4e1559c31 100644 --- a/packages/relay-runtime/store/RelayModernQueryExecutor.js +++ b/packages/relay-runtime/store/RelayModernQueryExecutor.js @@ -30,6 +30,7 @@ const {ROOT_TYPE, TYPENAME_KEY, getStorageKey} = require('./RelayStoreUtils'); import type { GraphQLResponse, + GraphQLResponseWithoutData, GraphQLResponseWithData, } from '../network/RelayNetworkTypes'; import type {Sink, Subscription} from '../network/RelayObservable'; @@ -263,39 +264,56 @@ class Executor { }); } - _handleNext(response: GraphQLResponse): void { - if (this._state === 'completed') { - return; - } - if (Array.isArray(response)) { - response.forEach(item => { - this._handleNext(item); - }); - return; - } + _handleErrorResponse( + responses: $ReadOnlyArray< + GraphQLResponseWithData | GraphQLResponseWithoutData, + >, + ): $ReadOnlyArray { + // Once thing to notice here: if one of the responses in array has errors + // All batch will be ignored. + return responses.map(response => { + if (response.data == null) { + const messages = response.errors + ? response.errors.map(({message}) => message).join('\n') + : '(No errors)'; + const error = RelayError.create( + 'RelayNetwork', + 'No data returned for operation `' + + this._operation.request.node.params.name + + '`, got error(s):\n' + + messages + + '\n\nSee the error `source` property for more information.', + ); + (error: $FlowFixMe).source = { + errors: response.errors, + operation: this._operation.request.node, + variables: this._operation.request.variables, + }; + throw error; + } + const responseWithData: GraphQLResponseWithData = (response: $FlowFixMe); + return responseWithData; + }); + } - if (response.data == null) { - const {errors} = response; - const messages = errors - ? errors.map(({message}) => message).join('\n') - : '(No errors)'; - const error = RelayError.create( - 'RelayNetwork', - 'No data returned for operation `' + - this._operation.request.node.params.name + - '`, got error(s):\n' + - messages + - '\n\nSee the error `source` property for more information.', - ); - (error: $FlowFixMe).source = { - errors, - operation: this._operation.request.node, - variables: this._operation.request.variables, - }; - throw error; + /** + * This method return boolean to indicate if the optimistic + * response has been handled + */ + _handleOptimisticResponses( + responses: $ReadOnlyArray, + ): boolean { + if (responses.length > 1) { + if ( + responses.some( + responsePart => responsePart.extensions?.isOptimistic === true, + ) + ) { + invariant(false, 'Optimistic responses cannot be batched.'); + } + return false; } - // Above check ensures that response.data != null - const responseWithData: GraphQLResponseWithData = (response: $FlowFixMe); + const response = responses[0]; const isOptimistic = response.extensions?.isOptimistic === true; if (isOptimistic && this._state !== 'started') { invariant( @@ -303,36 +321,63 @@ class Executor { 'RelayModernQueryExecutor: optimistic payload received after server payload.', ); } - const isFinal = response.extensions?.is_final === true; - this._state = isFinal ? 'loading_final' : 'loading_incremental'; - if (isFinal) { - this._incrementalPayloadsPending = false; + if (isOptimistic) { + this._processOptimisticResponse(response, null); + this._sink.next(response); + return true; + } + return false; + } + + _handleNext(response: GraphQLResponse): void { + if (this._state === 'completed') { + return; } + const responsesWithData = this._handleErrorResponse( + Array.isArray(response) ? response : [response], + ); + + // Next, handle optimistic responses + const isOptimistic = this._handleOptimisticResponses(responsesWithData); if (isOptimistic) { - this._processOptimisticResponse(responseWithData, null); - } else { - const {path, label} = response; - if (path != null || label != null) { - if (typeof label === 'string' && Array.isArray(path)) { - this._processIncrementalResponse({ - path, - label, - response: responseWithData, - }); - } else { - invariant( - false, - 'RelayModernQueryExecutor: invalid incremental payload, expected ' + - '`path` and `label` to either both be null/undefined, or ' + - '`path` to be an `Array` and `label` to be a ' + - '`string`.', - ); - } - } else { - this._processResponse(responseWithData); - } + return; } + + const [ + nonIncrementalResponses, + incrementalResponses, + ] = partitionGraphQLResponses(responsesWithData); + + // In theory this doesn't preserve the ordering of the batch. + // The idea is that a batch is always: + // * at-most one non-incremental payload + // * followed zero or more incremental payloads + // The non-incremental payload can appear if the server sends a batch + // w the initial payload followed by some early-to-resolve incremental + // payloads (although, can that even happen?) + if (nonIncrementalResponses.length > 0) { + const payloadFollowups = this._processResponses(nonIncrementalResponses); + // Please note, that we're passing `this._operation` to the publish + // queue here, which will later passed to the store (via notify) + // to indicate that this is an operation that cause the store to update + const updatedOwners = this._publishQueue.run(this._operation); + this._updateOperationTracker(updatedOwners); + this._processPayloadFollowups(payloadFollowups); + } + + if (incrementalResponses.length > 0) { + const payloadFollowups = this._processIncrementalResponses( + incrementalResponses, + ); + // For the incremental case, we're only handling follow-up responses + // for already initiated operation (and we're not passing it to + // the run(...) call) + const updatedOwners = this._publishQueue.run(); + this._updateOperationTracker(updatedOwners); + this._processPayloadFollowups(payloadFollowups); + } + this._sink.next(response); } @@ -377,6 +422,7 @@ class Executor { incrementalPlaceholders: null, moduleImportPayloads: null, source: RelayRecordSource.create(), + isFinal: false, }, updater: updater, }); @@ -489,80 +535,107 @@ class Executor { }); } - _processResponse(response: GraphQLResponseWithData): void { + _processResponses(responses: $ReadOnlyArray) { if (this._optimisticUpdates !== null) { this._optimisticUpdates.forEach(update => this._publishQueue.revertUpdate(update), ); this._optimisticUpdates = null; } - const payload = normalizeResponse( - response, - this._operation.root, - ROOT_TYPE, - {getDataID: this._getDataID, path: [], request: this._operation.request}, - ); this._incrementalPayloadsPending = false; this._incrementalResults.clear(); this._source.clear(); - this._publishQueue.commitPayload(this._operation, payload, this._updater); - const updatedOwners = this._publishQueue.run(this._operation); - this._updateOperationTracker(updatedOwners); - this._processPayloadFollowups(payload); + return responses.map(payloadPart => { + const relayPayload = normalizeResponse( + payloadPart, + this._operation.root, + ROOT_TYPE, + { + getDataID: this._getDataID, + path: [], + request: this._operation.request, + }, + ); + this._publishQueue.commitPayload( + this._operation, + relayPayload, + this._updater, + ); + return relayPayload; + }); } /** * Handles any follow-up actions for a Relay payload for @match, @defer, * and @stream directives. */ - _processPayloadFollowups(payload: RelayResponsePayload): void { + _processPayloadFollowups( + payloads: $ReadOnlyArray, + ): void { if (this._state === 'completed') { return; } - const {incrementalPlaceholders, moduleImportPayloads} = payload; - if (moduleImportPayloads && moduleImportPayloads.length !== 0) { - const operationLoader = this._operationLoader; - invariant( - operationLoader, - 'RelayModernEnvironment: Expected an operationLoader to be ' + - 'configured when using `@match`.', - ); - moduleImportPayloads.forEach(moduleImportPayload => { - this._processModuleImportPayload(moduleImportPayload, operationLoader); - }); - } - if (incrementalPlaceholders && incrementalPlaceholders.length !== 0) { - this._incrementalPayloadsPending = this._state !== 'loading_final'; - incrementalPlaceholders.forEach(incrementalPlaceholder => { - this._processIncrementalPlaceholder(payload, incrementalPlaceholder); - }); - - if (this._state === 'loading_final') { - // The query has defer/stream selections that are enabled, but the - // server indicated that this is a "final" payload: no incremental - // payloads will be delivered. If it's not a client payload, warn that - // the query was (likely) executed on the server in non-streaming mode, - // with incremental delivery disabled. - warning( - this._isClientPayload, - 'RelayModernEnvironment: Operation `%s` contains @defer/@stream ' + - 'directives but was executed in non-streaming mode. See ' + - 'https://fburl.com/relay-incremental-delivery-non-streaming-warning.', - this._operation.request.node.params.name, + payloads.forEach(payload => { + const {incrementalPlaceholders, moduleImportPayloads, isFinal} = payload; + this._state = isFinal ? 'loading_final' : 'loading_incremental'; + if (isFinal) { + this._incrementalPayloadsPending = false; + } + if (moduleImportPayloads && moduleImportPayloads.length !== 0) { + const operationLoader = this._operationLoader; + invariant( + operationLoader, + 'RelayModernEnvironment: Expected an operationLoader to be ' + + 'configured when using `@match`.', ); - // But eagerly process any deferred payloads - incrementalPlaceholders.forEach(placeholder => { - if (placeholder.kind === 'defer') { - this._processDeferResponse( - placeholder.label, - placeholder.path, - placeholder, - {data: placeholder.data}, - ); - } + moduleImportPayloads.forEach(moduleImportPayload => { + this._processModuleImportPayload( + moduleImportPayload, + operationLoader, + ); }); } - } + if (incrementalPlaceholders && incrementalPlaceholders.length !== 0) { + this._incrementalPayloadsPending = this._state !== 'loading_final'; + incrementalPlaceholders.forEach(incrementalPlaceholder => { + this._processIncrementalPlaceholder(payload, incrementalPlaceholder); + }); + + if (this._state === 'loading_final') { + // The query has defer/stream selections that are enabled, but the + // server indicated that this is a "final" payload: no incremental + // payloads will be delivered. If it's not a client payload, warn that + // the query was (likely) executed on the server in non-streaming mode, + // with incremental delivery disabled. + warning( + this._isClientPayload, + 'RelayModernEnvironment: Operation `%s` contains @defer/@stream ' + + 'directives but was executed in non-streaming mode. See ' + + 'https://fburl.com/relay-incremental-delivery-non-streaming-warning.', + this._operation.request.node.params.name, + ); + // But eagerly process any deferred payloads + const relayPayloads = []; + incrementalPlaceholders.forEach(placeholder => { + if (placeholder.kind === 'defer') { + relayPayloads.push( + this._processDeferResponse( + placeholder.label, + placeholder.path, + placeholder, + {data: placeholder.data}, + ), + ); + } + }); + if (relayPayloads.length > 0) { + const updatedOwners = this._publishQueue.run(); + this._updateOperationTracker(updatedOwners); + this._processPayloadFollowups(relayPayloads); + } + } + } + }); } _maybeCompleteSubscriptionOperationTracking() { @@ -653,7 +726,7 @@ class Executor { this._publishQueue.commitPayload(this._operation, relayPayload); const updatedOwners = this._publishQueue.run(); this._updateOperationTracker(updatedOwners); - this._processPayloadFollowups(relayPayload); + this._processPayloadFollowups([relayPayload]); } /** @@ -760,10 +833,13 @@ class Executor { // If there were any queued responses, process them now that placeholders // are in place if (pendingResponses != null) { - pendingResponses.forEach(incrementalResponse => { - this._schedule(() => { - this._processIncrementalResponse(incrementalResponse); - }); + this._schedule(() => { + const payloadFollowups = this._processIncrementalResponses( + pendingResponses, + ); + const updatedOwners = this._publishQueue.run(); + this._updateOperationTracker(updatedOwners); + this._processPayloadFollowups(payloadFollowups); }); } } @@ -773,78 +849,96 @@ class Executor { * response, normalize/publish it, and process any nested defer/match/stream * metadata. */ - _processIncrementalResponse( - incrementalResponse: IncrementalGraphQLResponse, - ): void { - const {label, path, response} = incrementalResponse; - let resultForLabel = this._incrementalResults.get(label); - if (resultForLabel == null) { - resultForLabel = new Map(); - this._incrementalResults.set(label, resultForLabel); - } - if (label.indexOf('$defer$') !== -1) { - const pathKey = path.map(String).join('.'); - let resultForPath = resultForLabel.get(pathKey); - if (resultForPath == null) { - resultForPath = {kind: 'response', responses: [incrementalResponse]}; - resultForLabel.set(pathKey, resultForPath); - return; - } else if (resultForPath.kind === 'response') { - resultForPath.responses.push(incrementalResponse); - return; - } - const placeholder = resultForPath.placeholder; - if (placeholder.kind === 'connection_page_info') { - this._processConnectionPageInfoResponse( - label, - path, - placeholder, - response, - ); - } else { - invariant( - placeholder.kind === 'defer', - 'RelayModernEnvironment: Expected data for path `%s` for label `%s` ' + - 'to be data for @defer, was `@%s`.', - pathKey, - label, - placeholder.kind, - ); - this._processDeferResponse(label, path, placeholder, response); - } - } else { - // @stream payload path values end in the field name and item index, - // but Relay records paths relative to the parent of the stream node: - // therefore we strip the last two elements just to lookup the path - // (the item index is used later to insert the element in the list) - const pathKey = path - .slice(0, -2) - .map(String) - .join('.'); - let resultForPath = resultForLabel.get(pathKey); - if (resultForPath == null) { - resultForPath = {kind: 'response', responses: [incrementalResponse]}; - resultForLabel.set(pathKey, resultForPath); - return; - } else if (resultForPath.kind === 'response') { - resultForPath.responses.push(incrementalResponse); - return; + _processIncrementalResponses( + incrementalResponses: $ReadOnlyArray, + ): $ReadOnlyArray { + const relayPayloads = []; + incrementalResponses.forEach(incrementalResponse => { + const {label, path, response} = incrementalResponse; + let resultForLabel = this._incrementalResults.get(label); + if (resultForLabel == null) { + resultForLabel = new Map(); + this._incrementalResults.set(label, resultForLabel); } - const placeholder = resultForPath.placeholder; - if (placeholder.kind === 'connection_edge') { - this._processConnectionEdgeResponse(label, path, placeholder, response); + + if (label.indexOf('$defer$') !== -1) { + const pathKey = path.map(String).join('.'); + let resultForPath = resultForLabel.get(pathKey); + if (resultForPath == null) { + resultForPath = {kind: 'response', responses: [incrementalResponse]}; + resultForLabel.set(pathKey, resultForPath); + return; + } else if (resultForPath.kind === 'response') { + resultForPath.responses.push(incrementalResponse); + return; + } + const placeholder = resultForPath.placeholder; + if (placeholder.kind === 'connection_page_info') { + relayPayloads.push( + this._processConnectionPageInfoResponse( + label, + path, + placeholder, + response, + ), + ); + } else { + invariant( + placeholder.kind === 'defer', + 'RelayModernEnvironment: Expected data for path `%s` for label `%s` ' + + 'to be data for @defer, was `@%s`.', + pathKey, + label, + placeholder.kind, + ); + relayPayloads.push( + this._processDeferResponse(label, path, placeholder, response), + ); + } } else { - invariant( - placeholder.kind === 'stream', - 'RelayModernEnvironment: Expected data for path `%s` for label `%s` ' + - 'to be data for @stream, was `@%s`.', - pathKey, - label, - placeholder.kind, - ); - this._processStreamResponse(label, path, placeholder, response); + // @stream payload path values end in the field name and item index, + // but Relay records paths relative to the parent of the stream node: + // therefore we strip the last two elements just to lookup the path + // (the item index is used later to insert the element in the list) + const pathKey = path + .slice(0, -2) + .map(String) + .join('.'); + let resultForPath = resultForLabel.get(pathKey); + if (resultForPath == null) { + resultForPath = {kind: 'response', responses: [incrementalResponse]}; + resultForLabel.set(pathKey, resultForPath); + return; + } else if (resultForPath.kind === 'response') { + resultForPath.responses.push(incrementalResponse); + return; + } + const placeholder = resultForPath.placeholder; + if (placeholder.kind === 'connection_edge') { + relayPayloads.push( + this._processConnectionEdgeResponse( + label, + path, + placeholder, + response, + ), + ); + } else { + invariant( + placeholder.kind === 'stream', + 'RelayModernEnvironment: Expected data for path `%s` for label `%s` ' + + 'to be data for @stream, was `@%s`.', + pathKey, + label, + placeholder.kind, + ); + relayPayloads.push( + this._processStreamResponse(label, path, placeholder, response), + ); + } } - } + }); + return relayPayloads; } _processConnectionPageInfoResponse( @@ -852,7 +946,7 @@ class Executor { path: $ReadOnlyArray, placeholder: ConnectionPageInfoPlaceholder, response: GraphQLResponseWithData, - ): void { + ): RelayResponsePayload { let relayPayload: RelayResponsePayload = normalizeResponse( response, placeholder.selector, @@ -904,10 +998,7 @@ class Executor { }), }; this._publishQueue.commitPayload(this._operation, relayPayload); - - const updatedOwners = this._publishQueue.run(); - this._updateOperationTracker(updatedOwners); - this._processPayloadFollowups(relayPayload); + return relayPayload; } _processDeferResponse( @@ -915,7 +1006,7 @@ class Executor { path: $ReadOnlyArray, placeholder: DeferPlaceholder, response: GraphQLResponseWithData, - ): void { + ): RelayResponsePayload { const {dataID: parentID} = placeholder.selector; const relayPayload = normalizeResponse( response, @@ -947,15 +1038,14 @@ class Executor { incrementalPlaceholders: null, moduleImportPayloads: null, source: RelayRecordSource.create(), + isFinal: response.extensions?.is_final === true, }; this._publishQueue.commitPayload( this._operation, handleFieldsRelayPayload, ); } - const updatedOwners = this._publishQueue.run(); - this._updateOperationTracker(updatedOwners); - this._processPayloadFollowups(relayPayload); + return relayPayload; } _processConnectionEdgeResponse( @@ -963,7 +1053,7 @@ class Executor { path: $ReadOnlyArray, placeholder: ConnectionEdgePlaceholder, response: GraphQLResponseWithData, - ): void { + ): RelayResponsePayload { const {parentID, node, variables} = placeholder; let {relayPayload, itemID, itemIndex} = this._normalizeStreamItem( response, @@ -986,9 +1076,7 @@ class Executor { }; this._publishQueue.commitPayload(this._operation, relayPayload); - const updatedOwners = this._publishQueue.run(); - this._updateOperationTracker(updatedOwners); - this._processPayloadFollowups(relayPayload); + return relayPayload; } /** @@ -999,7 +1087,7 @@ class Executor { path: $ReadOnlyArray, placeholder: StreamPlaceholder, response: GraphQLResponseWithData, - ): void { + ): RelayResponsePayload { const {parentID, node, variables} = placeholder; // Find the LinkedField where @stream was applied const field = node.selections[0]; @@ -1064,15 +1152,14 @@ class Executor { incrementalPlaceholders: null, moduleImportPayloads: null, source: RelayRecordSource.create(), + isFinal: false, }; this._publishQueue.commitPayload( this._operation, handleFieldsRelayPayload, ); } - const updatedOwners = this._publishQueue.run(); - this._updateOperationTracker(updatedOwners); - this._processPayloadFollowups(relayPayload); + return relayPayload; } _normalizeStreamItem( @@ -1209,6 +1296,38 @@ class Executor { } } +function partitionGraphQLResponses( + responses: $ReadOnlyArray, +): [ + $ReadOnlyArray, + $ReadOnlyArray, +] { + const nonIncrementalResponses: Array = []; + const incrementalResponses: Array = []; + responses.forEach(response => { + if (response.path != null || response.label != null) { + const {label, path} = response; + if (label == null || path == null) { + invariant( + false, + 'RelayModernQueryExecutor: invalid incremental payload, expected ' + + '`path` and `label` to either both be null/undefined, or ' + + '`path` to be an `Array` and `label` to be a ' + + '`string`.', + ); + } + incrementalResponses.push({ + label, + path, + response, + }); + } else { + nonIncrementalResponses.push(response); + } + }); + return [nonIncrementalResponses, incrementalResponses]; +} + function normalizeResponse( response: GraphQLResponseWithData, selector: NormalizationSelector, @@ -1228,6 +1347,7 @@ function normalizeResponse( return { ...relayPayload, errors, + isFinal: response.extensions?.is_final === true, }; } diff --git a/packages/relay-runtime/store/RelayResponseNormalizer.js b/packages/relay-runtime/store/RelayResponseNormalizer.js index 3e013c787b743..8444672792626 100644 --- a/packages/relay-runtime/store/RelayResponseNormalizer.js +++ b/packages/relay-runtime/store/RelayResponseNormalizer.js @@ -152,6 +152,7 @@ class RelayResponseNormalizer { incrementalPlaceholders: this._incrementalPlaceholders, moduleImportPayloads: this._moduleImportPayloads, source: this._recordSource, + isFinal: false, }; } diff --git a/packages/relay-runtime/store/RelayStoreTypes.js b/packages/relay-runtime/store/RelayStoreTypes.js index 2b28c8452a3e8..fb0b4edc66397 100644 --- a/packages/relay-runtime/store/RelayStoreTypes.js +++ b/packages/relay-runtime/store/RelayStoreTypes.js @@ -854,6 +854,7 @@ export type RelayResponsePayload = {| +incrementalPlaceholders: ?Array, +moduleImportPayloads: ?Array, +source: MutableRecordSource, + +isFinal: boolean, |}; /** diff --git a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStream-test.js b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStream-test.js index 5448e0f23ebca..9790b3e754a4b 100644 --- a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStream-test.js +++ b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStream-test.js @@ -248,15 +248,9 @@ describe('execute() a query with @stream', () => { path: ['node', 'actors', 1], }, ]); - expect(next).toBeCalledTimes(2); - expect(callback).toBeCalledTimes(2); - let snapshot = callback.mock.calls[0][0]; - expect(snapshot.isMissingData).toBe(false); - expect(snapshot.data).toEqual({ - id: '1', - actors: [{name: 'ALICE'}], - }); - snapshot = callback.mock.calls[1][0]; + expect(next).toBeCalledTimes(1); + expect(callback).toBeCalledTimes(1); + const snapshot = callback.mock.calls[0][0]; expect(snapshot.isMissingData).toBe(false); expect(snapshot.data).toEqual({ id: '1', @@ -306,14 +300,9 @@ describe('execute() a query with @stream', () => { path: ['node', 'actors', 1], }, ]); - expect(next).toBeCalledTimes(1); - expect(callback).toBeCalledTimes(1); - const snapshot = callback.mock.calls[0][0]; - expect(snapshot.isMissingData).toBe(false); - expect(snapshot.data).toEqual({ - id: '1', - actors: [{name: 'ALICE'}], - }); + // All batch will be discareded if there an error in the batch + expect(next).toBeCalledTimes(0); + expect(callback).toBeCalledTimes(0); expect(complete).toBeCalledTimes(0); expect(error).toBeCalledTimes(1); @@ -344,7 +333,7 @@ describe('execute() a query with @stream', () => { path: ['node', 'actors', 0], }, ]); - expect(next).toBeCalledTimes(2); + expect(next).toBeCalledTimes(1); // Here is the nuance: For the mix of initial and incremental payloads // the subscribe callback will be called twice // (one for the initial payload) and for an incremental @@ -387,17 +376,11 @@ describe('execute() a query with @stream', () => { path: ['node', 'actors', 2], }, ]); - expect(next).toBeCalledTimes(2); - expect(callback).toBeCalledTimes(2); + expect(next).toBeCalledTimes(1); + expect(callback).toBeCalledTimes(1); const snapshot3 = callback.mock.calls[0][0]; expect(snapshot3.isMissingData).toBe(false); expect(snapshot3.data).toEqual({ - id: '1', - actors: [{name: 'ALICE'}, {name: 'BOB'}], - }); - const snapshot4 = callback.mock.calls[1][0]; - expect(snapshot4.isMissingData).toBe(false); - expect(snapshot4.data).toEqual({ id: '1', actors: [{name: 'ALICE'}, {name: 'BOB'}, {name: 'CLAIR'}], }); @@ -440,7 +423,7 @@ describe('execute() a query with @stream', () => { 'https://fburl.com/relay-incremental-delivery-non-streaming-warning.', 'FeedbackQuery', ); - expect(next).toBeCalledTimes(2); + expect(next).toBeCalledTimes(1); // Here is the nuance: For the mix of initial and incremental payloads // the subscribe callback will be called twice // (one for the initial payload) and for an incremental @@ -1257,21 +1240,14 @@ describe('execute() a query with @stream', () => { expect(error.mock.calls[0][0].message).toContain( 'No data returned for operation `FeedbackQuery`', ); - expect(next).toBeCalledTimes(2); - expect(callback).toBeCalledTimes(2); + expect(next).toBeCalledTimes(1); + expect(callback).toBeCalledTimes(1); const snapshot = callback.mock.calls[0][0]; expect(snapshot.isMissingData).toBe(false); expect(snapshot.data).toEqual({ id: '1', actors: [], }); - - const snapshot2 = callback.mock.calls[1][0]; - expect(snapshot2.isMissingData).toBe(false); - expect(snapshot2.data).toEqual({ - id: '1', - actors: [undefined, {name: 'BOB'}], - }); }); it('uses user-defined getDataID to generate ID from streamed payload.', () => { diff --git a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStreamedConnection-test.js b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStreamedConnection-test.js index 0b0ef32e4e6d6..19e4c686e0f4a 100644 --- a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStreamedConnection-test.js +++ b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-ExecuteWithStreamedConnection-test.js @@ -529,45 +529,11 @@ describe('execute() fetches a @stream-ed @connection', () => { }, ]); expect(error.mock.calls.map(call => call[0].stack)).toEqual([]); - expect(next).toBeCalledTimes(2); - expect(callback).toBeCalledTimes(2); + expect(next).toBeCalledTimes(1); + expect(callback).toBeCalledTimes(1); const snapshot = callback.mock.calls[0][0]; expect(snapshot.isMissingData).toBe(false); expect(snapshot.data).toEqual({ - newsFeed: { - edges: [ - { - cursor: 'cursor-1', - node: { - __typename: 'Story', - id: '1', - feedback: { - id: 'feedback-1', - actors: [{id: 'actor-1', name: 'ALICE'}], - }, - }, - }, - { - cursor: 'cursor-2', - node: { - __typename: 'Story', - id: '2', - feedback: { - id: 'feedback-2', - actors: [{id: 'actor-2', name: 'BOB'}], - }, - }, - }, - ], - pageInfo: { - endCursor: null, - hasNextPage: false, - }, - }, - }); - const snapshot2 = callback.mock.calls[1][0]; - expect(snapshot2.isMissingData).toBe(false); - expect(snapshot2.data).toEqual({ newsFeed: { edges: [ { @@ -699,10 +665,10 @@ describe('execute() fetches a @stream-ed @connection', () => { }, ]); expect(error.mock.calls.map(call => call[0].stack)).toEqual([]); - expect(next).toBeCalledTimes(3); - expect(callback).toBeCalledTimes(3); - const snapshot1 = callback.mock.calls[1][0]; - expect(snapshot1.isMissingData).toBe(true); + expect(next).toBeCalledTimes(2); + expect(callback).toBeCalledTimes(2); + const snapshot1 = callback.mock.calls[0][0]; + expect(snapshot1.isMissingData).toBe(false); expect(snapshot1.data).toEqual({ newsFeed: { edges: [ @@ -718,17 +684,6 @@ describe('execute() fetches a @stream-ed @connection', () => { }, }, }, - { - cursor: 'cursor-3', - node: { - __typename: 'Story', - id: '3', - feedback: { - id: 'feedback-3', - actors: [{id: 'actor-3', name: 'CLAIR'}], - }, - }, - }, ], pageInfo: { endCursor: null, @@ -736,7 +691,7 @@ describe('execute() fetches a @stream-ed @connection', () => { }, }, }); - const snapshot2 = callback.mock.calls[2][0]; + const snapshot2 = callback.mock.calls[1][0]; expect(snapshot2.isMissingData).toBe(false); expect(snapshot2.data).toEqual({ newsFeed: {