Skip to content

Commit

Permalink
Address review feedback through slight refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmanus committed Jun 14, 2023
1 parent ac2f5fa commit 847e95a
Showing 1 changed file with 139 additions and 92 deletions.
231 changes: 139 additions & 92 deletions query-planner-js/src/buildPlan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -793,15 +793,6 @@ class QueryPlanningTraversal<RV extends Vertex> {
}
}

type UnhandledGroups = [FetchGroup, UnhandledParentRelations][];
type UnhandledParentRelations = ParentRelation[];

function printUnhandled(u: UnhandledGroups): string {
return '[' + u.map(([g, relations]) =>
`${g.index} (missing: [${relations.map((r) => r.group.index).join(', ')}])`
).join(', ') + ']';
}

/**
* Used in `FetchDependencyGraph` to store, for a given group, information about one of its parent.
* Namely, this structure stores:
Expand Down Expand Up @@ -1906,6 +1897,112 @@ class DeferTracking {
}
}

/**
* UnhandledGroups is used while processing fetch groups in dependency order to track group for which
* one of the parent has been processed/handled but which has other parents. So it is a set of
* groups and for each group which parent(s) remains to be processed before the group itself can be
* processed.
*/
type UnhandledGroups = [FetchGroup, UnhandledParentRelations][];
type UnhandledParentRelations = ParentRelation[];

function printUnhandled(u: UnhandledGroups): string {
return '[' + u.map(([g, relations]) =>
`${g.index} (missing: [${relations.map((r) => r.group.index).join(', ')}])`
).join(', ') + ']';
}

/*
* Used during the processing of fetch groups in dependency order.
*/
class ProcessingState {
private constructor(
// Groups that can be handled (because all their parents/dependencies have been processed before).
readonly next: readonly FetchGroup[],
// Groups that needs some parents/dependencies to be processed first because they can be themselves.
// Note that we make sure that this never hold group with no "edges".
readonly unhandled: UnhandledGroups,
) {
}

static empty(): ProcessingState {
return new ProcessingState([], []);
}

static forChildrenOfProcessedGroup(processed: FetchGroup, children: FetchGroup[]): ProcessingState {
const ready: FetchGroup[] = [];
const unhandled: UnhandledGroups = [];
for (const c of children) {
const parents = c.parents();
if (parents.length === 1) {
// The parent we have processed is the only one parent of that children; we can handle the children
ready.push(c);
} else {
unhandled.push([c, parents.filter((p) => p.group !== processed)]);
}
}
return new ProcessingState(ready, unhandled);
}

static ofReadyGroups(groups: readonly FetchGroup[]): ProcessingState {
return new ProcessingState(groups, []);
}

withOnlyUnhandled(): ProcessingState {
return new ProcessingState([], this.unhandled);
}

mergeWith(that: ProcessingState): ProcessingState {
const next: FetchGroup[] = this.next.concat(that.next.filter((g) => !this.next.includes(g)));
const unhandled: UnhandledGroups = [];

const thatUnhandled = that.unhandled.concat();
for (const [g, edges] of this.unhandled) {
const newEdges = this.mergeRemaingsAndRemoveIfFound(g, edges, thatUnhandled);
if (newEdges.length == 0) {
if (!next.includes(g)) {
next.push(g);
}
} else {
unhandled.push([g, newEdges])
}
}
// Anything remaining in `thatUnhandled` are groups that were not in `this` at all.
unhandled.push(...thatUnhandled);
return new ProcessingState(next, unhandled);
}

private mergeRemaingsAndRemoveIfFound(group: FetchGroup, inEdges: UnhandledParentRelations, otherGroups: UnhandledGroups): UnhandledParentRelations {
const idx = otherGroups.findIndex(g => g[0] === group);
if (idx < 0) {
return inEdges;
} else {
const otherEdges = otherGroups[idx][1];
otherGroups.splice(idx, 1);
// The uhandled are the one that are unhandled on both side.
return inEdges.filter(e => otherEdges.includes(e))
}
}

updateForProcessedGroups(processed: readonly FetchGroup[]): ProcessingState {
const next: FetchGroup[] = this.next.concat();
const unhandled: UnhandledGroups = [];
for (const [g, edges] of this.unhandled) {
// Remove any of the processed groups from the unhandled edges of that group.
// And if there is no remaining edge, that group can be handled.
const newEdges = edges.filter((edge) => !processed.includes(edge.group));
if (newEdges.length === 0) {
if (!next.includes(g)) {
next.push(g);
}
} else {
unhandled.push([g, newEdges]);
}
}
return new ProcessingState(next, unhandled);
}
}

/**
* A Directed Acyclic Graph (DAG) of `FetchGroup` and their dependencies.
*
Expand Down Expand Up @@ -2459,170 +2556,119 @@ class FetchDependencyGraph {
handledConditions: Conditions,
): {
main: TProcessed,
unhandled: UnhandledGroups,
state: ProcessingState,
deferredGroups: SetMultiMap<string, FetchGroup>,
} {
const conditions = updatedConditions(group.conditions(), handledConditions);
const newHandledConditions = mergeConditions(conditions, handledConditions);
const { children, deferredGroups } = this.extractChildrenAndDeferredDependencies(group);
const processed = processor.onFetchGroup(group, newHandledConditions);
if (children.length == 0) {
return { main: processor.onConditions(conditions, processed), unhandled: [], deferredGroups };
return { main: processor.onConditions(conditions, processed), state: ProcessingState.empty(), deferredGroups };
}

const groupIsOnlyParentOfAllChildren = children.every(g => g.parents().length === 1);
if (groupIsOnlyParentOfAllChildren) {
// We process the children as if they were parallel roots (they are from `processed`
const state = ProcessingState.forChildrenOfProcessedGroup(group, children);
if (state.next.length > 0) {
// We process the ready children as if they were parallel roots (they are from `processed`
// in a way), and then just add process at the beginning of the sequence.
const {
mainSequence,
unhandled,
newState,
deferredGroups: allDeferredGroups,
} = this.processRootMainGroups({
processor,
rootGroups: children,
state,
rootsAreParallel: true,
initialDeferredGroups: deferredGroups,
handledConditions: newHandledConditions,
});
return {
main: processor.onConditions(conditions, processor.reduceSequence([processed].concat(mainSequence))),
unhandled,
state: newState,
deferredGroups: allDeferredGroups,
};
} else {
// We return just the group, with all other groups to be handled after, but remembering that
// this group edge has been handled.
return {
main: processor.onConditions(conditions, processed),
unhandled: children.map(g => [g, g.parents().filter((p) => p.group !== group)]),
state,
deferredGroups,
};
}
}

private processGroups<TProcessed, TDeferred>(
processor: FetchGroupProcessor<TProcessed, TDeferred>,
groups: readonly FetchGroup[],
state: ProcessingState,
processInParallel: boolean,
remaining: UnhandledGroups,
handledConditions: Conditions,
): {
processed: TProcessed,
next: FetchGroup[],
unhandled: UnhandledGroups,
newState: ProcessingState,
deferredGroups: SetMultiMap<string, FetchGroup>,
} {
const processedNodes: TProcessed[] = [];
const allDeferredGroups = new SetMultiMap<string, FetchGroup>();
let remainingNext = remaining;
let toHandleNext: FetchGroup[] = [];
for (const group of groups) {
const { main, deferredGroups, unhandled } = this.processGroup(processor, group, handledConditions);
let newState = state.withOnlyUnhandled();
for (const group of state.next) {
const { main, deferredGroups, state: stateAfterGroup } = this.processGroup(processor, group, handledConditions);
processedNodes.push(main);
allDeferredGroups.addAll(deferredGroups);
remainingNext = this.mergeRemainings(remainingNext, unhandled, toHandleNext);
newState = newState.mergeWith(stateAfterGroup);
}

// Note that `newState` is the merged result of everything after each individual group (anything that was _only_ depending
// on it), but the fact that groups themselves (`state.next`) have been handled has not necessarily be taking into
// account yet, so we do it below. Also note that this must be done outside of the `for` loop above, because any
// group that dependend on multiple of the input groups of this function must not be handled _within_ this function
// but rather after it, and this is what ensures it.
return {
processed: processInParallel ? processor.reduceParallel(processedNodes) : processor.reduceSequence(processedNodes),
next: toHandleNext,
unhandled: remainingNext,
newState: newState.updateForProcessedGroups(state.next),
deferredGroups: allDeferredGroups,
};
}

private mergeRemainings(r1: UnhandledGroups, r2: UnhandledGroups, readyToHandle: FetchGroup[]): UnhandledGroups {
const unhandled: UnhandledGroups = [];
for (const [g, edges] of r1) {
const newEdges = this.mergeRemaingsAndRemoveIfFound(g, edges, r2);
if (newEdges.length == 0) {
readyToHandle.push(g);
} else {
unhandled.push([g, newEdges])
}
}
unhandled.push(...r2);
return unhandled;
}

private mergeRemaingsAndRemoveIfFound(group: FetchGroup, inEdges: UnhandledParentRelations, otherGroups: UnhandledGroups): UnhandledParentRelations {
const idx = otherGroups.findIndex(g => g[0] === group);
if (idx < 0) {
return inEdges;
} else {
const otherEdges = otherGroups[idx][1];
otherGroups.splice(idx, 1);
// The uhandled are the one that are unhandled on both side.
return inEdges.filter(e => otherEdges.includes(e))
}
}

/**
* Process the "main" (non-deferred) groups starting at the provided roots. The deferred groups are collected
* by this method but not otherwise processed.
*/
private processRootMainGroups<TProcessed, TDeferred>({
processor,
rootGroups,
state,
rootsAreParallel,
initialDeferredGroups,
handledConditions,
}: {
processor: FetchGroupProcessor<TProcessed, TDeferred>,
rootGroups: readonly FetchGroup[]
state: ProcessingState,
rootsAreParallel: boolean,
initialDeferredGroups?: SetMultiMap<string, FetchGroup>,
handledConditions: Conditions,
}): {
mainSequence: TProcessed[],
unhandled: UnhandledGroups,
newState: ProcessingState,
deferredGroups: SetMultiMap<string, FetchGroup>,
} {
let nextGroups = rootGroups;
let remainingNext: UnhandledGroups = [];
const mainSequence: TProcessed[] = [];
const allDeferredGroups = initialDeferredGroups
? new SetMultiMap<string, FetchGroup>(initialDeferredGroups)
: new SetMultiMap<string, FetchGroup>();
let processInParallel = rootsAreParallel;
while (nextGroups.length > 0) {
const { processed, next, unhandled, deferredGroups } = this.processGroups(processor, nextGroups, processInParallel, remainingNext, handledConditions);
while (state.next.length > 0) {
const { processed, newState, deferredGroups } = this.processGroups(processor, state, processInParallel, handledConditions);
// After the root groups, handled on the first iteration, we can process everything in parallel.
processInParallel = true;
mainSequence.push(processed);
remainingNext = this.mergeRemainings(remainingNext, unhandled, next);
remainingNext = this.updateUnhandledOnProcessedGroups(nextGroups, remainingNext, next);
nextGroups = next;
state = newState;
allDeferredGroups.addAll(deferredGroups);
}
return {
mainSequence,
unhandled: remainingNext,
newState: state,
deferredGroups: allDeferredGroups,
};
}

private updateUnhandledOnProcessedGroups(
processed: readonly FetchGroup[],
currentUnhandled: UnhandledGroups,
readyToHandle: FetchGroup[],
): UnhandledGroups {
const unhandled: UnhandledGroups = [];
for (const current of currentUnhandled) {
// Remove any of the processed groups from the unhandled edges of that group.
// And if there is no remaining edge, that group can be handled.
const newEdges = current[1].filter((edge) => !processed.includes(edge.group));
if (newEdges.length === 0) {
readyToHandle.push(current[0]);
} else {
unhandled.push([current[0], newEdges]);
}
}
return unhandled;
}


private processRootGroups<TProcessed, TDeferred>({
processor,
rootGroups,
Expand All @@ -2644,12 +2690,13 @@ class FetchDependencyGraph {
} {
const {
mainSequence,
unhandled,
newState,
deferredGroups,
} = this.processRootMainGroups({ processor, rootsAreParallel, rootGroups, handledConditions });
} = this.processRootMainGroups({ processor, rootsAreParallel, state: ProcessingState.ofReadyGroups(rootGroups), handledConditions });
assert(newState.next.length === 0, () => `Should not have left some ready groups, but got ${newState.next}`);
assert(
unhandled.length == 0,
() => `Root groups:\n${rootGroups.map((g) => ` - ${g}`).join('\n')}\nshould have no remaining groups unhandled, but got: ${printUnhandled(unhandled)}`
newState.unhandled.length == 0,
() => `Root groups:\n${rootGroups.map((g) => ` - ${g}`).join('\n')}\nshould have no remaining groups unhandled, but got: ${printUnhandled(newState.unhandled)}`
);
const allDeferredGroups = new SetMultiMap<string, FetchGroup>();
if (otherDeferGroups) {
Expand Down

0 comments on commit 847e95a

Please sign in to comment.