diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 081214c09d..f48e62e6a2 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,7 +1,6 @@ import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; import { pathToArray } from '../jsutils/Path.js'; -import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { @@ -85,237 +84,401 @@ export type FormattedIncrementalResult< | FormattedIncrementalDeferResult | FormattedIncrementalStreamResult; -export function yieldSubsequentPayloads( - subsequentPayloads: Set, -): AsyncGenerator { - let isDone = false; +/** + * This class is used to publish incremental results to the client, enabling semi-concurrent + * execution while preserving result order. + * + * The internal publishing state is managed as follows: + * + * '_released': the set of Incremental Data records that are ready to be sent to the client, + * i.e. their parents have completed and they have also completed. + * + * `_pending`: the set of Incremental Data records that are definitely pending, i.e. their + * parents have completed so that they can no longer be filtered. This includes all Incremental + * Data records in `released`, as well as Incremental Data records that have not yet completed. + * + * `_initialResult`: a record containing the state of the initial result, as follows: + * `isCompleted`: indicates whether the initial result has completed. + * `children`: the set of Incremental Data records that can be be published when the initial + * result is completed. + * + * Each Incremental Data record also contains similar metadata, i.e. these records also contain + * similar `isCompleted` and `children` properties. + * + * @internal + */ +export class IncrementalPublisher { + private _initialResult: { + children: Set; + isCompleted: boolean; + }; + + private _released: Set; + private _pending: Set; + + // these are assigned within the Promise executor called synchronously within the constructor + private _signalled!: Promise; + private _resolve!: () => void; + + constructor() { + this._initialResult = { + children: new Set(), + isCompleted: false, + }; + this._released = new Set(); + this._pending = new Set(); + this._reset(); + } + + hasNext(): boolean { + return this._pending.size > 0; + } - async function next(): Promise< - IteratorResult + subscribe(): AsyncGenerator< + SubsequentIncrementalExecutionResult, + void, + void > { - if (isDone) { - return { value: undefined, done: true }; - } + let isDone = false; - await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise)); + const _next = async (): Promise< + IteratorResult + > => { + // eslint-disable-next-line no-constant-condition + while (true) { + if (isDone) { + return { value: undefined, done: true }; + } - if (isDone) { - // a different call to next has exhausted all payloads - return { value: undefined, done: true }; - } + for (const item of this._released) { + this._pending.delete(item); + } + const released = this._released; + this._released = new Set(); - const incremental = getCompletedIncrementalResults(subsequentPayloads); - const hasNext = subsequentPayloads.size > 0; + const result = this._getIncrementalResult(released); - if (!incremental.length && hasNext) { - return next(); - } + if (!this.hasNext()) { + isDone = true; + } - if (!hasNext) { - isDone = true; - } + if (result !== undefined) { + return { value: result, done: false }; + } - return { - value: incremental.length ? { incremental, hasNext } : { hasNext }, - done: false, + // eslint-disable-next-line no-await-in-loop + await this._signalled; + } }; - } - function returnStreamIterators() { - const promises: Array>> = []; - subsequentPayloads.forEach((incrementalDataRecord) => { - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - promises.push(incrementalDataRecord.asyncIterator.return()); - } - }); - return Promise.all(promises); - } + const returnStreamIterators = async (): Promise => { + const promises: Array>> = []; + this._pending.forEach((incrementalDataRecord) => { + if ( + isStreamItemsRecord(incrementalDataRecord) && + incrementalDataRecord.asyncIterator?.return + ) { + promises.push(incrementalDataRecord.asyncIterator.return()); + } + }); + await Promise.all(promises); + }; - return { - [Symbol.asyncIterator]() { - return this; - }, - next, - async return(): Promise< + const _return = async (): Promise< IteratorResult - > { - await returnStreamIterators(); + > => { isDone = true; + await returnStreamIterators(); return { value: undefined, done: true }; - }, - async throw( + }; + + const _throw = async ( error?: unknown, - ): Promise> { - await returnStreamIterators(); + ): Promise> => { isDone = true; + await returnStreamIterators(); return Promise.reject(error); - }, - }; -} + }; -function getCompletedIncrementalResults( - subsequentPayloads: Set, -): Array { - const incrementalResults: Array = []; - for (const incrementalDataRecord of subsequentPayloads) { - const incrementalResult: IncrementalResult = {}; - if (!incrementalDataRecord.isCompleted) { - continue; - } - subsequentPayloads.delete(incrementalDataRecord); - if (isStreamItemsRecord(incrementalDataRecord)) { - const items = incrementalDataRecord.items; - if (incrementalDataRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; - } - (incrementalResult as IncrementalStreamResult).items = items; + return { + [Symbol.asyncIterator]() { + return this; + }, + next: _next, + return: _return, + throw: _throw, + }; + } + + prepareNewDeferredFragmentRecord(opts: { + label: string | undefined; + path: Path | undefined; + parentContext: IncrementalDataRecord | undefined; + }): DeferredFragmentRecord { + const deferredFragmentRecord = new DeferredFragmentRecord(opts); + + const parentContext = opts.parentContext; + if (parentContext) { + parentContext.children.add(deferredFragmentRecord); } else { - const data = incrementalDataRecord.data; - (incrementalResult as IncrementalDeferResult).data = data ?? null; + this._initialResult.children.add(deferredFragmentRecord); } - incrementalResult.path = incrementalDataRecord.path; - if (incrementalDataRecord.label != null) { - incrementalResult.label = incrementalDataRecord.label; - } - if (incrementalDataRecord.errors.length > 0) { - incrementalResult.errors = incrementalDataRecord.errors; + return deferredFragmentRecord; + } + + prepareNewStreamItemsRecord(opts: { + label: string | undefined; + path: Path | undefined; + asyncIterator?: AsyncIterator; + parentContext: IncrementalDataRecord | undefined; + }): StreamItemsRecord { + const streamItemsRecord = new StreamItemsRecord(opts); + + const parentContext = opts.parentContext; + if (parentContext) { + parentContext.children.add(streamItemsRecord); + } else { + this._initialResult.children.add(streamItemsRecord); } - incrementalResults.push(incrementalResult); + + return streamItemsRecord; + } + + completeDeferredFragmentRecord( + deferredFragmentRecord: DeferredFragmentRecord, + data: ObjMap | null, + ): void { + deferredFragmentRecord.data = data; + deferredFragmentRecord.isCompleted = true; + this._release(deferredFragmentRecord); + } + + completeStreamItemsRecord( + streamItemsRecord: StreamItemsRecord, + items: Array | null, + ) { + streamItemsRecord.items = items; + streamItemsRecord.isCompleted = true; + this._release(streamItemsRecord); + } + + setIsCompletedAsyncIterator(streamItemsRecord: StreamItemsRecord) { + streamItemsRecord.isCompletedAsyncIterator = true; + } + + addFieldError( + incrementalDataRecord: IncrementalDataRecord, + error: GraphQLError, + ) { + incrementalDataRecord.errors.push(error); } - return incrementalResults; -} -export function filterSubsequentPayloads( - subsequentPayloads: Set, - nullPath: Path, - currentIncrementalDataRecord: IncrementalDataRecord | undefined, -): void { - const nullPathArray = pathToArray(nullPath); - subsequentPayloads.forEach((incrementalDataRecord) => { - if (incrementalDataRecord === currentIncrementalDataRecord) { - // don't remove payload from where error originates - return; + publishInitial() { + for (const child of this._initialResult.children) { + this._publish(child); } - for (let i = 0; i < nullPathArray.length; i++) { - if (incrementalDataRecord.path[i] !== nullPathArray[i]) { - // incrementalDataRecord points to a path unaffected by this payload - return; + } + + filter( + nullPath: Path, + erroringIncrementalDataRecord: IncrementalDataRecord | undefined, + ) { + const nullPathArray = pathToArray(nullPath); + + const asyncIterators = new Set>(); + + const children = + erroringIncrementalDataRecord === undefined + ? this._initialResult.children + : erroringIncrementalDataRecord.children; + + for (const child of this._getDescendants(children)) { + if (!this._matchesPath(child.path, nullPathArray)) { + continue; + } + + this._delete(child); + const parent = + child.parentContext === undefined + ? this._initialResult + : child.parentContext; + parent.children.delete(child); + + if (isStreamItemsRecord(child)) { + if (child.asyncIterator !== undefined) { + asyncIterators.add(child.asyncIterator); + } } } - // incrementalDataRecord path points to nulled error field - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - incrementalDataRecord.asyncIterator.return().catch(() => { + + asyncIterators.forEach((asyncIterator) => { + asyncIterator.return?.().catch(() => { // ignore error }); + }); + } + + private _trigger() { + this._resolve(); + this._reset(); + } + + private _reset() { + // promiseWithResolvers uses void only as a generic type parameter + // see: https://typescript-eslint.io/rules/no-invalid-void-type/ + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + const { promise: signalled, resolve } = promiseWithResolvers(); + this._resolve = resolve; + this._signalled = signalled; + } + + private _introduce(item: IncrementalDataRecord) { + this._pending.add(item); + } + + private _release(item: IncrementalDataRecord): void { + if (this._pending.has(item)) { + this._released.add(item); + this._trigger(); + } + } + + private _push(item: IncrementalDataRecord): void { + this._released.add(item); + this._pending.add(item); + this._trigger(); + } + + private _delete(item: IncrementalDataRecord) { + this._released.delete(item); + this._pending.delete(item); + this._trigger(); + } + + private _getIncrementalResult( + completedRecords: ReadonlySet, + ): SubsequentIncrementalExecutionResult | undefined { + const incrementalResults: Array = []; + let encounteredCompletedAsyncIterator = false; + for (const incrementalDataRecord of completedRecords) { + const incrementalResult: IncrementalResult = {}; + for (const child of incrementalDataRecord.children) { + this._publish(child); + } + if (isStreamItemsRecord(incrementalDataRecord)) { + const items = incrementalDataRecord.items; + if (incrementalDataRecord.isCompletedAsyncIterator) { + // async iterable resolver just finished but there may be pending payloads + encounteredCompletedAsyncIterator = true; + continue; + } + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = incrementalDataRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; + } + + incrementalResult.path = incrementalDataRecord.path; + if (incrementalDataRecord.label != null) { + incrementalResult.label = incrementalDataRecord.label; + } + if (incrementalDataRecord.errors.length > 0) { + incrementalResult.errors = incrementalDataRecord.errors; + } + incrementalResults.push(incrementalResult); + } + + return incrementalResults.length + ? { incremental: incrementalResults, hasNext: this.hasNext() } + : encounteredCompletedAsyncIterator && !this.hasNext() + ? { hasNext: false } + : undefined; + } + + private _publish(incrementalDataRecord: IncrementalDataRecord) { + if (incrementalDataRecord.isCompleted) { + this._push(incrementalDataRecord); + } else { + this._introduce(incrementalDataRecord); + } + } + + private _getDescendants( + children: ReadonlySet, + descendants = new Set(), + ): ReadonlySet { + for (const child of children) { + descendants.add(child); + this._getDescendants(child.children, descendants); + } + return descendants; + } + + private _matchesPath( + testPath: Array, + basePath: Array, + ): boolean { + for (let i = 0; i < basePath.length; i++) { + if (basePath[i] !== testPath[i]) { + // testPath points to a path unaffected at basePath + return false; + } } - subsequentPayloads.delete(incrementalDataRecord); - }); + return true; + } } /** @internal */ export class DeferredFragmentRecord { - type: 'defer'; errors: Array; label: string | undefined; path: Array; - promise: Promise; data: ObjMap | null; parentContext: IncrementalDataRecord | undefined; + children: Set; isCompleted: boolean; - _subsequentPayloads: Set; - _resolve?: (arg: PromiseOrValue | null>) => void; constructor(opts: { label: string | undefined; path: Path | undefined; parentContext: IncrementalDataRecord | undefined; - subsequentPayloads: Set; }) { - this.type = 'defer'; this.label = opts.label; this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.errors = []; - this._subsequentPayloads = opts.subsequentPayloads; - this._subsequentPayloads.add(this); + this.children = new Set(); this.isCompleted = false; this.data = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((data) => { - this.data = data; - this.isCompleted = true; - }); - } - - addData(data: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => data)); - return; - } - this._resolve?.(data); } } /** @internal */ export class StreamItemsRecord { - type: 'stream'; errors: Array; label: string | undefined; path: Array; items: Array | null; - promise: Promise; parentContext: IncrementalDataRecord | undefined; + children: Set; asyncIterator: AsyncIterator | undefined; isCompletedAsyncIterator?: boolean; isCompleted: boolean; - _subsequentPayloads: Set; - _resolve?: (arg: PromiseOrValue | null>) => void; constructor(opts: { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; parentContext: IncrementalDataRecord | undefined; - subsequentPayloads: Set; }) { - this.type = 'stream'; this.items = null; this.label = opts.label; this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.asyncIterator = opts.asyncIterator; this.errors = []; - this._subsequentPayloads = opts.subsequentPayloads; - this._subsequentPayloads.add(this); + this.children = new Set(); this.isCompleted = false; this.items = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((items) => { - this.items = items; - this.isCompleted = true; - }); - } - - addItems(items: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => items)); - return; - } - this._resolve?.(items); - } - - setIsCompletedAsyncIterator() { - this.isCompletedAsyncIterator = true; } } @@ -324,5 +487,5 @@ export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; function isStreamItemsRecord( incrementalDataRecord: IncrementalDataRecord, ): incrementalDataRecord is StreamItemsRecord { - return incrementalDataRecord.type === 'stream'; + return incrementalDataRecord instanceof StreamItemsRecord; } diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 9f61adac1b..ce3b920895 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1165,9 +1165,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1191,25 +1188,19 @@ describe('Execute: stream directive', () => { } /* c8 ignore stop */, }, }); - expectJSON(result).toDeepEqual([ - { - errors: [ - { - message: - 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', - locations: [{ line: 4, column: 11 }], - path: ['nestedObject', 'nonNullScalarField'], - }, - ], - data: { - nestedObject: null, + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', + locations: [{ line: 4, column: 11 }], + path: ['nestedObject', 'nonNullScalarField'], }, - hasNext: true, - }, - { - hasNext: false, + ], + data: { + nestedObject: null, }, - ]); + }); }); it('Filters payloads that are nulled by a later synchronous error', async () => { const document = parse(` @@ -1293,6 +1284,9 @@ describe('Execute: stream directive', () => { path: ['nestedObject', 'nestedFriendList', 0], }, ], + hasNext: true, + }, + { hasNext: false, }, ]); @@ -1350,9 +1344,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 8c9d8f9668..1ec11f72cc 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -56,14 +56,10 @@ import type { FormattedIncrementalResult, IncrementalDataRecord, IncrementalResult, - SubsequentIncrementalExecutionResult, -} from './IncrementalPublisher.js'; -import { - DeferredFragmentRecord, - filterSubsequentPayloads, StreamItemsRecord, - yieldSubsequentPayloads, + SubsequentIncrementalExecutionResult, } from './IncrementalPublisher.js'; +import { IncrementalPublisher } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import { getArgumentValues, @@ -133,7 +129,7 @@ export interface ExecutionContext { typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; errors: Array; - subsequentPayloads: Set; + incrementalPublisher: IncrementalPublisher; } /** @@ -293,47 +289,46 @@ function executeImpl( // Errors from sub-fields of a NonNull type may propagate to the top level, // at which point we still log the error and null the parent field, which // in this case is the entire response. + const { incrementalPublisher, errors } = exeContext; try { const result = executeOperation(exeContext); if (isPromise(result)) { return result.then( (data) => { - const initialResult = buildResponse(data, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const initialResult = buildResponse(data, errors); + incrementalPublisher.publishInitial(); + if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads( - exeContext.subsequentPayloads, - ), + subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; }, (error) => { - exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + errors.push(error); + return buildResponse(null, errors); }, ); } - const initialResult = buildResponse(result, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const initialResult = buildResponse(result, errors); + incrementalPublisher.publishInitial(); + if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads( - exeContext.subsequentPayloads, - ), + subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; } catch (error) { - exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + errors.push(error); + return buildResponse(null, errors); } } @@ -449,7 +444,7 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - subsequentPayloads: new Set(), + incrementalPublisher: new IncrementalPublisher(), errors: [], }; } @@ -461,7 +456,7 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, - subsequentPayloads: new Set(), + // no need to update incrementalPublisher, incremental delivery is not supported for subscriptions errors: [], }; } @@ -714,11 +709,7 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; }); } @@ -732,11 +723,7 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -937,11 +924,7 @@ async function completePromisedValue( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -1217,8 +1200,7 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, + exeContext.incrementalPublisher.filter( itemPath, incrementalDataRecord, ); @@ -1239,11 +1221,7 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); completedResults.push(null); } @@ -1773,12 +1751,14 @@ function executeDeferredFragment( path?: Path, parentContext?: IncrementalDataRecord, ): void { - const incrementalDataRecord = new DeferredFragmentRecord({ - label, - path, - parentContext, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalPublisher = exeContext.incrementalPublisher; + const incrementalDataRecord = + incrementalPublisher.prepareNewDeferredFragmentRecord({ + label, + path, + parentContext, + }); + let promiseOrData; try { promiseOrData = executeFields( @@ -1791,16 +1771,33 @@ function executeDeferredFragment( ); if (isPromise(promiseOrData)) { - promiseOrData = promiseOrData.then(null, (e) => { - incrementalDataRecord.errors.push(e); - return null; - }); + promiseOrData = promiseOrData.then( + (resolved) => + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + resolved, + ), + (e) => { + incrementalPublisher.addFieldError(incrementalDataRecord, e); + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + null, + ); + }, + ); + } else { + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + promiseOrData, + ); } } catch (e) { - incrementalDataRecord.errors.push(e); - promiseOrData = null; + incrementalPublisher.addFieldError(incrementalDataRecord, e); + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + null, + ); } - incrementalDataRecord.addData(promiseOrData); } function executeStreamField( @@ -1814,14 +1811,16 @@ function executeStreamField( label?: string, parentContext?: IncrementalDataRecord, ): IncrementalDataRecord { - const incrementalDataRecord = new StreamItemsRecord({ - label, - path: itemPath, - parentContext, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalPublisher = exeContext.incrementalPublisher; + const incrementalDataRecord = + incrementalPublisher.prepareNewStreamItemsRecord({ + label, + path: itemPath, + parentContext, + }); + if (isPromise(item)) { - const completedItems = completePromisedValue( + completePromisedValue( exeContext, itemType, fieldGroup, @@ -1830,19 +1829,21 @@ function executeStreamField( item, incrementalDataRecord, ).then( - (value) => [value], + (value) => + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + value, + ]), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); return null; }, ); - incrementalDataRecord.addItems(completedItems); return incrementalDataRecord; } @@ -1868,25 +1869,17 @@ function executeStreamField( incrementalDataRecord, ); completedItem = null; - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); } } catch (error) { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); - incrementalDataRecord.addItems(null); + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, null); return incrementalDataRecord; } if (isPromise(completedItem)) { - const completedItems = completedItem + completedItem .then(undefined, (rawError) => { handleFieldError( rawError, @@ -1896,31 +1889,31 @@ function executeStreamField( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return null; }) .then( - (value) => [value], + (value) => + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [value], + ), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - return null; }, ); - incrementalDataRecord.addItems(completedItems); return incrementalDataRecord; } - incrementalDataRecord.addItems([completedItem]); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completedItem, + ]); return incrementalDataRecord; } @@ -1937,9 +1930,12 @@ async function executeStreamAsyncIteratorItem( let item; try { const { value, done } = await asyncIterator.next(); + if (done) { - incrementalDataRecord.setIsCompletedAsyncIterator(); - return { done, value: undefined }; + exeContext.incrementalPublisher.setIsCompletedAsyncIterator( + incrementalDataRecord, + ); + return { done: true, value: undefined }; } item = value; } catch (rawError) { @@ -1967,11 +1963,7 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return null; }); } @@ -1985,11 +1977,7 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return { done: false, value: null }; } } @@ -2005,18 +1993,19 @@ async function executeStreamAsyncIterator( label?: string, parentContext?: IncrementalDataRecord, ): Promise { + const incrementalPublisher = exeContext.incrementalPublisher; let index = initialIndex; let previousIncrementalDataRecord = parentContext ?? undefined; // eslint-disable-next-line no-constant-condition while (true) { const itemPath = addPath(path, index, undefined); - const incrementalDataRecord = new StreamItemsRecord({ - label, - path: itemPath, - parentContext: previousIncrementalDataRecord, - asyncIterator, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalDataRecord = + incrementalPublisher.prepareNewStreamItemsRecord({ + label, + path: itemPath, + parentContext: previousIncrementalDataRecord, + asyncIterator, + }); let iteration; try { @@ -2032,13 +2021,12 @@ async function executeStreamAsyncIterator( itemPath, ); } catch (error) { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - incrementalDataRecord.addItems(null); // entire stream has errored and bubbled upwards if (asyncIterator?.return) { asyncIterator.return().catch(() => { @@ -2050,26 +2038,28 @@ async function executeStreamAsyncIterator( const { done, value: completedItem } = iteration; - let completedItems: PromiseOrValue | null>; if (isPromise(completedItem)) { - completedItems = completedItem.then( - (value) => [value], + completedItem.then( + (value) => + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [value], + ), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - return null; }, ); } else { - completedItems = [completedItem]; + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completedItem, + ]); } - incrementalDataRecord.addItems(completedItems); - if (done) { break; }