diff --git a/query-planner-js/src/buildPlan.ts b/query-planner-js/src/buildPlan.ts index 5a0a6452c..1b12aedd8 100644 --- a/query-planner-js/src/buildPlan.ts +++ b/query-planner-js/src/buildPlan.ts @@ -793,15 +793,6 @@ class QueryPlanningTraversal { } } -type UnhandledGroups = [FetchGroup, UnhandledParentRelations][]; -type UnhandledParentRelations = ParentRelation[]; - -function printUnhandled(u: UnhandledGroups): string { - return '[' + u.map(([g, relations]) => - `${g.index} (missing: [${relations.map((r) => r.group.index).join(', ')}])` - ).join(', ') + ']'; -} - /** * Used in `FetchDependencyGraph` to store, for a given group, information about one of its parent. * Namely, this structure stores: @@ -1906,6 +1897,112 @@ class DeferTracking { } } +/** + * UnhandledGroups is used while processing fetch groups in dependency order to track group for which + * one of the parent has been processed/handled but which has other parents. So it is a set of + * groups and for each group which parent(s) remains to be processed before the group itself can be + * processed. + */ +type UnhandledGroups = [FetchGroup, UnhandledParentRelations][]; +type UnhandledParentRelations = ParentRelation[]; + +function printUnhandled(u: UnhandledGroups): string { + return '[' + u.map(([g, relations]) => + `${g.index} (missing: [${relations.map((r) => r.group.index).join(', ')}])` + ).join(', ') + ']'; +} + +/* + * Used during the processing of fetch groups in dependency order. + */ +class ProcessingState { + private constructor( + // Groups that can be handled (because all their parents/dependencies have been processed before). + readonly next: readonly FetchGroup[], + // Groups that needs some parents/dependencies to be processed first because they can be themselves. + // Note that we make sure that this never hold group with no "edges". + readonly unhandled: UnhandledGroups, + ) { + } + + static empty(): ProcessingState { + return new ProcessingState([], []); + } + + static forChildrenOfProcessedGroup(processed: FetchGroup, children: FetchGroup[]): ProcessingState { + const ready: FetchGroup[] = []; + const unhandled: UnhandledGroups = []; + for (const c of children) { + const parents = c.parents(); + if (parents.length === 1) { + // The parent we have processed is the only one parent of that children; we can handle the children + ready.push(c); + } else { + unhandled.push([c, parents.filter((p) => p.group !== processed)]); + } + } + return new ProcessingState(ready, unhandled); + } + + static ofReadyGroups(groups: readonly FetchGroup[]): ProcessingState { + return new ProcessingState(groups, []); + } + + withOnlyUnhandled(): ProcessingState { + return new ProcessingState([], this.unhandled); + } + + mergeWith(that: ProcessingState): ProcessingState { + const next: FetchGroup[] = this.next.concat(that.next.filter((g) => !this.next.includes(g))); + const unhandled: UnhandledGroups = []; + + const thatUnhandled = that.unhandled.concat(); + for (const [g, edges] of this.unhandled) { + const newEdges = this.mergeRemaingsAndRemoveIfFound(g, edges, thatUnhandled); + if (newEdges.length == 0) { + if (!next.includes(g)) { + next.push(g); + } + } else { + unhandled.push([g, newEdges]) + } + } + // Anything remaining in `thatUnhandled` are groups that were not in `this` at all. + unhandled.push(...thatUnhandled); + return new ProcessingState(next, unhandled); + } + + private mergeRemaingsAndRemoveIfFound(group: FetchGroup, inEdges: UnhandledParentRelations, otherGroups: UnhandledGroups): UnhandledParentRelations { + const idx = otherGroups.findIndex(g => g[0] === group); + if (idx < 0) { + return inEdges; + } else { + const otherEdges = otherGroups[idx][1]; + otherGroups.splice(idx, 1); + // The uhandled are the one that are unhandled on both side. + return inEdges.filter(e => otherEdges.includes(e)) + } + } + + updateForProcessedGroups(processed: readonly FetchGroup[]): ProcessingState { + const next: FetchGroup[] = this.next.concat(); + const unhandled: UnhandledGroups = []; + for (const [g, edges] of this.unhandled) { + // Remove any of the processed groups from the unhandled edges of that group. + // And if there is no remaining edge, that group can be handled. + const newEdges = edges.filter((edge) => !processed.includes(edge.group)); + if (newEdges.length === 0) { + if (!next.includes(g)) { + next.push(g); + } + } else { + unhandled.push([g, newEdges]); + } + } + return new ProcessingState(next, unhandled); + } +} + /** * A Directed Acyclic Graph (DAG) of `FetchGroup` and their dependencies. * @@ -2459,7 +2556,7 @@ class FetchDependencyGraph { handledConditions: Conditions, ): { main: TProcessed, - unhandled: UnhandledGroups, + state: ProcessingState, deferredGroups: SetMultiMap, } { const conditions = updatedConditions(group.conditions(), handledConditions); @@ -2467,35 +2564,33 @@ class FetchDependencyGraph { const { children, deferredGroups } = this.extractChildrenAndDeferredDependencies(group); const processed = processor.onFetchGroup(group, newHandledConditions); if (children.length == 0) { - return { main: processor.onConditions(conditions, processed), unhandled: [], deferredGroups }; + return { main: processor.onConditions(conditions, processed), state: ProcessingState.empty(), deferredGroups }; } - const groupIsOnlyParentOfAllChildren = children.every(g => g.parents().length === 1); - if (groupIsOnlyParentOfAllChildren) { - // We process the children as if they were parallel roots (they are from `processed` + const state = ProcessingState.forChildrenOfProcessedGroup(group, children); + if (state.next.length > 0) { + // We process the ready children as if they were parallel roots (they are from `processed` // in a way), and then just add process at the beginning of the sequence. const { mainSequence, - unhandled, + newState, deferredGroups: allDeferredGroups, } = this.processRootMainGroups({ processor, - rootGroups: children, + state, rootsAreParallel: true, initialDeferredGroups: deferredGroups, handledConditions: newHandledConditions, }); return { main: processor.onConditions(conditions, processor.reduceSequence([processed].concat(mainSequence))), - unhandled, + state: newState, deferredGroups: allDeferredGroups, }; } else { - // We return just the group, with all other groups to be handled after, but remembering that - // this group edge has been handled. return { main: processor.onConditions(conditions, processed), - unhandled: children.map(g => [g, g.parents().filter((p) => p.group !== group)]), + state, deferredGroups, }; } @@ -2503,126 +2598,77 @@ class FetchDependencyGraph { private processGroups( processor: FetchGroupProcessor, - groups: readonly FetchGroup[], + state: ProcessingState, processInParallel: boolean, - remaining: UnhandledGroups, handledConditions: Conditions, ): { processed: TProcessed, - next: FetchGroup[], - unhandled: UnhandledGroups, + newState: ProcessingState, deferredGroups: SetMultiMap, } { const processedNodes: TProcessed[] = []; const allDeferredGroups = new SetMultiMap(); - let remainingNext = remaining; - let toHandleNext: FetchGroup[] = []; - for (const group of groups) { - const { main, deferredGroups, unhandled } = this.processGroup(processor, group, handledConditions); + let newState = state.withOnlyUnhandled(); + for (const group of state.next) { + const { main, deferredGroups, state: stateAfterGroup } = this.processGroup(processor, group, handledConditions); processedNodes.push(main); allDeferredGroups.addAll(deferredGroups); - remainingNext = this.mergeRemainings(remainingNext, unhandled, toHandleNext); + newState = newState.mergeWith(stateAfterGroup); } + // Note that `newState` is the merged result of everything after each individual group (anything that was _only_ depending + // on it), but the fact that groups themselves (`state.next`) have been handled has not necessarily be taking into + // account yet, so we do it below. Also note that this must be done outside of the `for` loop above, because any + // group that dependend on multiple of the input groups of this function must not be handled _within_ this function + // but rather after it, and this is what ensures it. return { processed: processInParallel ? processor.reduceParallel(processedNodes) : processor.reduceSequence(processedNodes), - next: toHandleNext, - unhandled: remainingNext, + newState: newState.updateForProcessedGroups(state.next), deferredGroups: allDeferredGroups, }; } - private mergeRemainings(r1: UnhandledGroups, r2: UnhandledGroups, readyToHandle: FetchGroup[]): UnhandledGroups { - const unhandled: UnhandledGroups = []; - for (const [g, edges] of r1) { - const newEdges = this.mergeRemaingsAndRemoveIfFound(g, edges, r2); - if (newEdges.length == 0) { - readyToHandle.push(g); - } else { - unhandled.push([g, newEdges]) - } - } - unhandled.push(...r2); - return unhandled; - } - - private mergeRemaingsAndRemoveIfFound(group: FetchGroup, inEdges: UnhandledParentRelations, otherGroups: UnhandledGroups): UnhandledParentRelations { - const idx = otherGroups.findIndex(g => g[0] === group); - if (idx < 0) { - return inEdges; - } else { - const otherEdges = otherGroups[idx][1]; - otherGroups.splice(idx, 1); - // The uhandled are the one that are unhandled on both side. - return inEdges.filter(e => otherEdges.includes(e)) - } - } - /** * Process the "main" (non-deferred) groups starting at the provided roots. The deferred groups are collected * by this method but not otherwise processed. */ private processRootMainGroups({ processor, - rootGroups, + state, rootsAreParallel, initialDeferredGroups, handledConditions, }: { processor: FetchGroupProcessor, - rootGroups: readonly FetchGroup[] + state: ProcessingState, rootsAreParallel: boolean, initialDeferredGroups?: SetMultiMap, handledConditions: Conditions, }): { mainSequence: TProcessed[], - unhandled: UnhandledGroups, + newState: ProcessingState, deferredGroups: SetMultiMap, } { - let nextGroups = rootGroups; - let remainingNext: UnhandledGroups = []; const mainSequence: TProcessed[] = []; const allDeferredGroups = initialDeferredGroups ? new SetMultiMap(initialDeferredGroups) : new SetMultiMap(); let processInParallel = rootsAreParallel; - while (nextGroups.length > 0) { - const { processed, next, unhandled, deferredGroups } = this.processGroups(processor, nextGroups, processInParallel, remainingNext, handledConditions); + while (state.next.length > 0) { + const { processed, newState, deferredGroups } = this.processGroups(processor, state, processInParallel, handledConditions); // After the root groups, handled on the first iteration, we can process everything in parallel. processInParallel = true; mainSequence.push(processed); - remainingNext = this.mergeRemainings(remainingNext, unhandled, next); - remainingNext = this.updateUnhandledOnProcessedGroups(nextGroups, remainingNext, next); - nextGroups = next; + state = newState; allDeferredGroups.addAll(deferredGroups); } return { mainSequence, - unhandled: remainingNext, + newState: state, deferredGroups: allDeferredGroups, }; } - private updateUnhandledOnProcessedGroups( - processed: readonly FetchGroup[], - currentUnhandled: UnhandledGroups, - readyToHandle: FetchGroup[], - ): UnhandledGroups { - const unhandled: UnhandledGroups = []; - for (const current of currentUnhandled) { - // Remove any of the processed groups from the unhandled edges of that group. - // And if there is no remaining edge, that group can be handled. - const newEdges = current[1].filter((edge) => !processed.includes(edge.group)); - if (newEdges.length === 0) { - readyToHandle.push(current[0]); - } else { - unhandled.push([current[0], newEdges]); - } - } - return unhandled; - } - - private processRootGroups({ processor, rootGroups, @@ -2644,12 +2690,13 @@ class FetchDependencyGraph { } { const { mainSequence, - unhandled, + newState, deferredGroups, - } = this.processRootMainGroups({ processor, rootsAreParallel, rootGroups, handledConditions }); + } = this.processRootMainGroups({ processor, rootsAreParallel, state: ProcessingState.ofReadyGroups(rootGroups), handledConditions }); + assert(newState.next.length === 0, () => `Should not have left some ready groups, but got ${newState.next}`); assert( - unhandled.length == 0, - () => `Root groups:\n${rootGroups.map((g) => ` - ${g}`).join('\n')}\nshould have no remaining groups unhandled, but got: ${printUnhandled(unhandled)}` + newState.unhandled.length == 0, + () => `Root groups:\n${rootGroups.map((g) => ` - ${g}`).join('\n')}\nshould have no remaining groups unhandled, but got: ${printUnhandled(newState.unhandled)}` ); const allDeferredGroups = new SetMultiMap(); if (otherDeferGroups) {