From d766c8eb34fb03c5ee63c51380f2b012924e970c Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Sat, 17 Jun 2023 13:51:55 -0400 Subject: [PATCH] introduce new IncrementalPublisher class (#3894) extracted from #3886 depends on #3903 more refactors from the without-branching branch, a bit more fundamental than #3891 [set as patch release because it does have an observable effect on the number of payloads, see below] = iterate only through completed items = remove extra ticks by making the publisher manage changes to its state synchronously. = use children array instead of promises to manage hierarchy = have IncrementalPublisher instantiate new IncrementalDataRecords = The new publisher sometimes cause an empty `{ hasNext: false }` to be emitted. In particular, because the publisher is faster than it was, it may emit a stream result before the stream's asynchronous iterator has completed. = The new publisher may sometimes reduce the number of `{ hasNext: false }` records that are emitted. For example, when errors on the initial result filter all subsequent results, this now happens synchronously, and so the publisher knows immediately that there are no subsequent results, such that there is no need for an empty final payload. --- src/execution/IncrementalPublisher.ts | 473 +++++++++++++++++-------- src/execution/__tests__/stream-test.ts | 37 +- src/execution/execute.ts | 254 +++++++------ 3 files changed, 454 insertions(+), 310 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 081214c09d..f48e62e6a2 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,7 +1,6 @@ import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; import { pathToArray } from '../jsutils/Path.js'; -import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { @@ -85,237 +84,401 @@ export type FormattedIncrementalResult< | FormattedIncrementalDeferResult | FormattedIncrementalStreamResult; -export function yieldSubsequentPayloads( - subsequentPayloads: Set, -): AsyncGenerator { - let isDone = false; +/** + * This class is used to publish incremental results to the client, enabling semi-concurrent + * execution while preserving result order. + * + * The internal publishing state is managed as follows: + * + * '_released': the set of Incremental Data records that are ready to be sent to the client, + * i.e. their parents have completed and they have also completed. + * + * `_pending`: the set of Incremental Data records that are definitely pending, i.e. their + * parents have completed so that they can no longer be filtered. This includes all Incremental + * Data records in `released`, as well as Incremental Data records that have not yet completed. + * + * `_initialResult`: a record containing the state of the initial result, as follows: + * `isCompleted`: indicates whether the initial result has completed. + * `children`: the set of Incremental Data records that can be be published when the initial + * result is completed. + * + * Each Incremental Data record also contains similar metadata, i.e. these records also contain + * similar `isCompleted` and `children` properties. + * + * @internal + */ +export class IncrementalPublisher { + private _initialResult: { + children: Set; + isCompleted: boolean; + }; + + private _released: Set; + private _pending: Set; + + // these are assigned within the Promise executor called synchronously within the constructor + private _signalled!: Promise; + private _resolve!: () => void; + + constructor() { + this._initialResult = { + children: new Set(), + isCompleted: false, + }; + this._released = new Set(); + this._pending = new Set(); + this._reset(); + } + + hasNext(): boolean { + return this._pending.size > 0; + } - async function next(): Promise< - IteratorResult + subscribe(): AsyncGenerator< + SubsequentIncrementalExecutionResult, + void, + void > { - if (isDone) { - return { value: undefined, done: true }; - } + let isDone = false; - await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise)); + const _next = async (): Promise< + IteratorResult + > => { + // eslint-disable-next-line no-constant-condition + while (true) { + if (isDone) { + return { value: undefined, done: true }; + } - if (isDone) { - // a different call to next has exhausted all payloads - return { value: undefined, done: true }; - } + for (const item of this._released) { + this._pending.delete(item); + } + const released = this._released; + this._released = new Set(); - const incremental = getCompletedIncrementalResults(subsequentPayloads); - const hasNext = subsequentPayloads.size > 0; + const result = this._getIncrementalResult(released); - if (!incremental.length && hasNext) { - return next(); - } + if (!this.hasNext()) { + isDone = true; + } - if (!hasNext) { - isDone = true; - } + if (result !== undefined) { + return { value: result, done: false }; + } - return { - value: incremental.length ? { incremental, hasNext } : { hasNext }, - done: false, + // eslint-disable-next-line no-await-in-loop + await this._signalled; + } }; - } - function returnStreamIterators() { - const promises: Array>> = []; - subsequentPayloads.forEach((incrementalDataRecord) => { - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - promises.push(incrementalDataRecord.asyncIterator.return()); - } - }); - return Promise.all(promises); - } + const returnStreamIterators = async (): Promise => { + const promises: Array>> = []; + this._pending.forEach((incrementalDataRecord) => { + if ( + isStreamItemsRecord(incrementalDataRecord) && + incrementalDataRecord.asyncIterator?.return + ) { + promises.push(incrementalDataRecord.asyncIterator.return()); + } + }); + await Promise.all(promises); + }; - return { - [Symbol.asyncIterator]() { - return this; - }, - next, - async return(): Promise< + const _return = async (): Promise< IteratorResult - > { - await returnStreamIterators(); + > => { isDone = true; + await returnStreamIterators(); return { value: undefined, done: true }; - }, - async throw( + }; + + const _throw = async ( error?: unknown, - ): Promise> { - await returnStreamIterators(); + ): Promise> => { isDone = true; + await returnStreamIterators(); return Promise.reject(error); - }, - }; -} + }; -function getCompletedIncrementalResults( - subsequentPayloads: Set, -): Array { - const incrementalResults: Array = []; - for (const incrementalDataRecord of subsequentPayloads) { - const incrementalResult: IncrementalResult = {}; - if (!incrementalDataRecord.isCompleted) { - continue; - } - subsequentPayloads.delete(incrementalDataRecord); - if (isStreamItemsRecord(incrementalDataRecord)) { - const items = incrementalDataRecord.items; - if (incrementalDataRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; - } - (incrementalResult as IncrementalStreamResult).items = items; + return { + [Symbol.asyncIterator]() { + return this; + }, + next: _next, + return: _return, + throw: _throw, + }; + } + + prepareNewDeferredFragmentRecord(opts: { + label: string | undefined; + path: Path | undefined; + parentContext: IncrementalDataRecord | undefined; + }): DeferredFragmentRecord { + const deferredFragmentRecord = new DeferredFragmentRecord(opts); + + const parentContext = opts.parentContext; + if (parentContext) { + parentContext.children.add(deferredFragmentRecord); } else { - const data = incrementalDataRecord.data; - (incrementalResult as IncrementalDeferResult).data = data ?? null; + this._initialResult.children.add(deferredFragmentRecord); } - incrementalResult.path = incrementalDataRecord.path; - if (incrementalDataRecord.label != null) { - incrementalResult.label = incrementalDataRecord.label; - } - if (incrementalDataRecord.errors.length > 0) { - incrementalResult.errors = incrementalDataRecord.errors; + return deferredFragmentRecord; + } + + prepareNewStreamItemsRecord(opts: { + label: string | undefined; + path: Path | undefined; + asyncIterator?: AsyncIterator; + parentContext: IncrementalDataRecord | undefined; + }): StreamItemsRecord { + const streamItemsRecord = new StreamItemsRecord(opts); + + const parentContext = opts.parentContext; + if (parentContext) { + parentContext.children.add(streamItemsRecord); + } else { + this._initialResult.children.add(streamItemsRecord); } - incrementalResults.push(incrementalResult); + + return streamItemsRecord; + } + + completeDeferredFragmentRecord( + deferredFragmentRecord: DeferredFragmentRecord, + data: ObjMap | null, + ): void { + deferredFragmentRecord.data = data; + deferredFragmentRecord.isCompleted = true; + this._release(deferredFragmentRecord); + } + + completeStreamItemsRecord( + streamItemsRecord: StreamItemsRecord, + items: Array | null, + ) { + streamItemsRecord.items = items; + streamItemsRecord.isCompleted = true; + this._release(streamItemsRecord); + } + + setIsCompletedAsyncIterator(streamItemsRecord: StreamItemsRecord) { + streamItemsRecord.isCompletedAsyncIterator = true; + } + + addFieldError( + incrementalDataRecord: IncrementalDataRecord, + error: GraphQLError, + ) { + incrementalDataRecord.errors.push(error); } - return incrementalResults; -} -export function filterSubsequentPayloads( - subsequentPayloads: Set, - nullPath: Path, - currentIncrementalDataRecord: IncrementalDataRecord | undefined, -): void { - const nullPathArray = pathToArray(nullPath); - subsequentPayloads.forEach((incrementalDataRecord) => { - if (incrementalDataRecord === currentIncrementalDataRecord) { - // don't remove payload from where error originates - return; + publishInitial() { + for (const child of this._initialResult.children) { + this._publish(child); } - for (let i = 0; i < nullPathArray.length; i++) { - if (incrementalDataRecord.path[i] !== nullPathArray[i]) { - // incrementalDataRecord points to a path unaffected by this payload - return; + } + + filter( + nullPath: Path, + erroringIncrementalDataRecord: IncrementalDataRecord | undefined, + ) { + const nullPathArray = pathToArray(nullPath); + + const asyncIterators = new Set>(); + + const children = + erroringIncrementalDataRecord === undefined + ? this._initialResult.children + : erroringIncrementalDataRecord.children; + + for (const child of this._getDescendants(children)) { + if (!this._matchesPath(child.path, nullPathArray)) { + continue; + } + + this._delete(child); + const parent = + child.parentContext === undefined + ? this._initialResult + : child.parentContext; + parent.children.delete(child); + + if (isStreamItemsRecord(child)) { + if (child.asyncIterator !== undefined) { + asyncIterators.add(child.asyncIterator); + } } } - // incrementalDataRecord path points to nulled error field - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - incrementalDataRecord.asyncIterator.return().catch(() => { + + asyncIterators.forEach((asyncIterator) => { + asyncIterator.return?.().catch(() => { // ignore error }); + }); + } + + private _trigger() { + this._resolve(); + this._reset(); + } + + private _reset() { + // promiseWithResolvers uses void only as a generic type parameter + // see: https://typescript-eslint.io/rules/no-invalid-void-type/ + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + const { promise: signalled, resolve } = promiseWithResolvers(); + this._resolve = resolve; + this._signalled = signalled; + } + + private _introduce(item: IncrementalDataRecord) { + this._pending.add(item); + } + + private _release(item: IncrementalDataRecord): void { + if (this._pending.has(item)) { + this._released.add(item); + this._trigger(); + } + } + + private _push(item: IncrementalDataRecord): void { + this._released.add(item); + this._pending.add(item); + this._trigger(); + } + + private _delete(item: IncrementalDataRecord) { + this._released.delete(item); + this._pending.delete(item); + this._trigger(); + } + + private _getIncrementalResult( + completedRecords: ReadonlySet, + ): SubsequentIncrementalExecutionResult | undefined { + const incrementalResults: Array = []; + let encounteredCompletedAsyncIterator = false; + for (const incrementalDataRecord of completedRecords) { + const incrementalResult: IncrementalResult = {}; + for (const child of incrementalDataRecord.children) { + this._publish(child); + } + if (isStreamItemsRecord(incrementalDataRecord)) { + const items = incrementalDataRecord.items; + if (incrementalDataRecord.isCompletedAsyncIterator) { + // async iterable resolver just finished but there may be pending payloads + encounteredCompletedAsyncIterator = true; + continue; + } + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = incrementalDataRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; + } + + incrementalResult.path = incrementalDataRecord.path; + if (incrementalDataRecord.label != null) { + incrementalResult.label = incrementalDataRecord.label; + } + if (incrementalDataRecord.errors.length > 0) { + incrementalResult.errors = incrementalDataRecord.errors; + } + incrementalResults.push(incrementalResult); + } + + return incrementalResults.length + ? { incremental: incrementalResults, hasNext: this.hasNext() } + : encounteredCompletedAsyncIterator && !this.hasNext() + ? { hasNext: false } + : undefined; + } + + private _publish(incrementalDataRecord: IncrementalDataRecord) { + if (incrementalDataRecord.isCompleted) { + this._push(incrementalDataRecord); + } else { + this._introduce(incrementalDataRecord); + } + } + + private _getDescendants( + children: ReadonlySet, + descendants = new Set(), + ): ReadonlySet { + for (const child of children) { + descendants.add(child); + this._getDescendants(child.children, descendants); + } + return descendants; + } + + private _matchesPath( + testPath: Array, + basePath: Array, + ): boolean { + for (let i = 0; i < basePath.length; i++) { + if (basePath[i] !== testPath[i]) { + // testPath points to a path unaffected at basePath + return false; + } } - subsequentPayloads.delete(incrementalDataRecord); - }); + return true; + } } /** @internal */ export class DeferredFragmentRecord { - type: 'defer'; errors: Array; label: string | undefined; path: Array; - promise: Promise; data: ObjMap | null; parentContext: IncrementalDataRecord | undefined; + children: Set; isCompleted: boolean; - _subsequentPayloads: Set; - _resolve?: (arg: PromiseOrValue | null>) => void; constructor(opts: { label: string | undefined; path: Path | undefined; parentContext: IncrementalDataRecord | undefined; - subsequentPayloads: Set; }) { - this.type = 'defer'; this.label = opts.label; this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.errors = []; - this._subsequentPayloads = opts.subsequentPayloads; - this._subsequentPayloads.add(this); + this.children = new Set(); this.isCompleted = false; this.data = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((data) => { - this.data = data; - this.isCompleted = true; - }); - } - - addData(data: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => data)); - return; - } - this._resolve?.(data); } } /** @internal */ export class StreamItemsRecord { - type: 'stream'; errors: Array; label: string | undefined; path: Array; items: Array | null; - promise: Promise; parentContext: IncrementalDataRecord | undefined; + children: Set; asyncIterator: AsyncIterator | undefined; isCompletedAsyncIterator?: boolean; isCompleted: boolean; - _subsequentPayloads: Set; - _resolve?: (arg: PromiseOrValue | null>) => void; constructor(opts: { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; parentContext: IncrementalDataRecord | undefined; - subsequentPayloads: Set; }) { - this.type = 'stream'; this.items = null; this.label = opts.label; this.path = pathToArray(opts.path); this.parentContext = opts.parentContext; this.asyncIterator = opts.asyncIterator; this.errors = []; - this._subsequentPayloads = opts.subsequentPayloads; - this._subsequentPayloads.add(this); + this.children = new Set(); this.isCompleted = false; this.items = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((items) => { - this.items = items; - this.isCompleted = true; - }); - } - - addItems(items: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => items)); - return; - } - this._resolve?.(items); - } - - setIsCompletedAsyncIterator() { - this.isCompletedAsyncIterator = true; } } @@ -324,5 +487,5 @@ export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; function isStreamItemsRecord( incrementalDataRecord: IncrementalDataRecord, ): incrementalDataRecord is StreamItemsRecord { - return incrementalDataRecord.type === 'stream'; + return incrementalDataRecord instanceof StreamItemsRecord; } diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 9f61adac1b..ce3b920895 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -1165,9 +1165,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); @@ -1191,25 +1188,19 @@ describe('Execute: stream directive', () => { } /* c8 ignore stop */, }, }); - expectJSON(result).toDeepEqual([ - { - errors: [ - { - message: - 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', - locations: [{ line: 4, column: 11 }], - path: ['nestedObject', 'nonNullScalarField'], - }, - ], - data: { - nestedObject: null, + expectJSON(result).toDeepEqual({ + errors: [ + { + message: + 'Cannot return null for non-nullable field NestedObject.nonNullScalarField.', + locations: [{ line: 4, column: 11 }], + path: ['nestedObject', 'nonNullScalarField'], }, - hasNext: true, - }, - { - hasNext: false, + ], + data: { + nestedObject: null, }, - ]); + }); }); it('Filters payloads that are nulled by a later synchronous error', async () => { const document = parse(` @@ -1293,6 +1284,9 @@ describe('Execute: stream directive', () => { path: ['nestedObject', 'nestedFriendList', 0], }, ], + hasNext: true, + }, + { hasNext: false, }, ]); @@ -1350,9 +1344,6 @@ describe('Execute: stream directive', () => { ], }, ], - hasNext: true, - }, - { hasNext: false, }, ]); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 8c9d8f9668..1ec11f72cc 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -56,14 +56,10 @@ import type { FormattedIncrementalResult, IncrementalDataRecord, IncrementalResult, - SubsequentIncrementalExecutionResult, -} from './IncrementalPublisher.js'; -import { - DeferredFragmentRecord, - filterSubsequentPayloads, StreamItemsRecord, - yieldSubsequentPayloads, + SubsequentIncrementalExecutionResult, } from './IncrementalPublisher.js'; +import { IncrementalPublisher } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import { getArgumentValues, @@ -133,7 +129,7 @@ export interface ExecutionContext { typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; errors: Array; - subsequentPayloads: Set; + incrementalPublisher: IncrementalPublisher; } /** @@ -293,47 +289,46 @@ function executeImpl( // Errors from sub-fields of a NonNull type may propagate to the top level, // at which point we still log the error and null the parent field, which // in this case is the entire response. + const { incrementalPublisher, errors } = exeContext; try { const result = executeOperation(exeContext); if (isPromise(result)) { return result.then( (data) => { - const initialResult = buildResponse(data, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const initialResult = buildResponse(data, errors); + incrementalPublisher.publishInitial(); + if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads( - exeContext.subsequentPayloads, - ), + subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; }, (error) => { - exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + errors.push(error); + return buildResponse(null, errors); }, ); } - const initialResult = buildResponse(result, exeContext.errors); - if (exeContext.subsequentPayloads.size > 0) { + const initialResult = buildResponse(result, errors); + incrementalPublisher.publishInitial(); + if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads( - exeContext.subsequentPayloads, - ), + subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; } catch (error) { - exeContext.errors.push(error); - return buildResponse(null, exeContext.errors); + errors.push(error); + return buildResponse(null, errors); } } @@ -449,7 +444,7 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - subsequentPayloads: new Set(), + incrementalPublisher: new IncrementalPublisher(), errors: [], }; } @@ -461,7 +456,7 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, - subsequentPayloads: new Set(), + // no need to update incrementalPublisher, incremental delivery is not supported for subscriptions errors: [], }; } @@ -714,11 +709,7 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; }); } @@ -732,11 +723,7 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -937,11 +924,7 @@ async function completePromisedValue( path, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(path, incrementalDataRecord); return null; } } @@ -1217,8 +1200,7 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, + exeContext.incrementalPublisher.filter( itemPath, incrementalDataRecord, ); @@ -1239,11 +1221,7 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); completedResults.push(null); } @@ -1773,12 +1751,14 @@ function executeDeferredFragment( path?: Path, parentContext?: IncrementalDataRecord, ): void { - const incrementalDataRecord = new DeferredFragmentRecord({ - label, - path, - parentContext, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalPublisher = exeContext.incrementalPublisher; + const incrementalDataRecord = + incrementalPublisher.prepareNewDeferredFragmentRecord({ + label, + path, + parentContext, + }); + let promiseOrData; try { promiseOrData = executeFields( @@ -1791,16 +1771,33 @@ function executeDeferredFragment( ); if (isPromise(promiseOrData)) { - promiseOrData = promiseOrData.then(null, (e) => { - incrementalDataRecord.errors.push(e); - return null; - }); + promiseOrData = promiseOrData.then( + (resolved) => + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + resolved, + ), + (e) => { + incrementalPublisher.addFieldError(incrementalDataRecord, e); + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + null, + ); + }, + ); + } else { + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + promiseOrData, + ); } } catch (e) { - incrementalDataRecord.errors.push(e); - promiseOrData = null; + incrementalPublisher.addFieldError(incrementalDataRecord, e); + incrementalPublisher.completeDeferredFragmentRecord( + incrementalDataRecord, + null, + ); } - incrementalDataRecord.addData(promiseOrData); } function executeStreamField( @@ -1814,14 +1811,16 @@ function executeStreamField( label?: string, parentContext?: IncrementalDataRecord, ): IncrementalDataRecord { - const incrementalDataRecord = new StreamItemsRecord({ - label, - path: itemPath, - parentContext, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalPublisher = exeContext.incrementalPublisher; + const incrementalDataRecord = + incrementalPublisher.prepareNewStreamItemsRecord({ + label, + path: itemPath, + parentContext, + }); + if (isPromise(item)) { - const completedItems = completePromisedValue( + completePromisedValue( exeContext, itemType, fieldGroup, @@ -1830,19 +1829,21 @@ function executeStreamField( item, incrementalDataRecord, ).then( - (value) => [value], + (value) => + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + value, + ]), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); return null; }, ); - incrementalDataRecord.addItems(completedItems); return incrementalDataRecord; } @@ -1868,25 +1869,17 @@ function executeStreamField( incrementalDataRecord, ); completedItem = null; - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); } } catch (error) { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, - incrementalDataRecord, - ); - incrementalDataRecord.addItems(null); + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, null); return incrementalDataRecord; } if (isPromise(completedItem)) { - const completedItems = completedItem + completedItem .then(undefined, (rawError) => { handleFieldError( rawError, @@ -1896,31 +1889,31 @@ function executeStreamField( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return null; }) .then( - (value) => [value], + (value) => + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [value], + ), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - return null; }, ); - incrementalDataRecord.addItems(completedItems); return incrementalDataRecord; } - incrementalDataRecord.addItems([completedItem]); + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completedItem, + ]); return incrementalDataRecord; } @@ -1937,9 +1930,12 @@ async function executeStreamAsyncIteratorItem( let item; try { const { value, done } = await asyncIterator.next(); + if (done) { - incrementalDataRecord.setIsCompletedAsyncIterator(); - return { done, value: undefined }; + exeContext.incrementalPublisher.setIsCompletedAsyncIterator( + incrementalDataRecord, + ); + return { done: true, value: undefined }; } item = value; } catch (rawError) { @@ -1967,11 +1963,7 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return null; }); } @@ -1985,11 +1977,7 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - itemPath, - incrementalDataRecord, - ); + exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); return { done: false, value: null }; } } @@ -2005,18 +1993,19 @@ async function executeStreamAsyncIterator( label?: string, parentContext?: IncrementalDataRecord, ): Promise { + const incrementalPublisher = exeContext.incrementalPublisher; let index = initialIndex; let previousIncrementalDataRecord = parentContext ?? undefined; // eslint-disable-next-line no-constant-condition while (true) { const itemPath = addPath(path, index, undefined); - const incrementalDataRecord = new StreamItemsRecord({ - label, - path: itemPath, - parentContext: previousIncrementalDataRecord, - asyncIterator, - subsequentPayloads: exeContext.subsequentPayloads, - }); + const incrementalDataRecord = + incrementalPublisher.prepareNewStreamItemsRecord({ + label, + path: itemPath, + parentContext: previousIncrementalDataRecord, + asyncIterator, + }); let iteration; try { @@ -2032,13 +2021,12 @@ async function executeStreamAsyncIterator( itemPath, ); } catch (error) { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - incrementalDataRecord.addItems(null); // entire stream has errored and bubbled upwards if (asyncIterator?.return) { asyncIterator.return().catch(() => { @@ -2050,26 +2038,28 @@ async function executeStreamAsyncIterator( const { done, value: completedItem } = iteration; - let completedItems: PromiseOrValue | null>; if (isPromise(completedItem)) { - completedItems = completedItem.then( - (value) => [value], + completedItem.then( + (value) => + incrementalPublisher.completeStreamItemsRecord( + incrementalDataRecord, + [value], + ), (error) => { - incrementalDataRecord.errors.push(error); - filterSubsequentPayloads( - exeContext.subsequentPayloads, - path, + incrementalPublisher.addFieldError(incrementalDataRecord, error); + incrementalPublisher.filter(path, incrementalDataRecord); + incrementalPublisher.completeStreamItemsRecord( incrementalDataRecord, + null, ); - return null; }, ); } else { - completedItems = [completedItem]; + incrementalPublisher.completeStreamItemsRecord(incrementalDataRecord, [ + completedItem, + ]); } - incrementalDataRecord.addItems(completedItems); - if (done) { break; }