Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Python specific checks for parallel evaluation of formulas #2813

Merged
merged 2 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,6 @@ default <TYPE> ColumnSource<TYPE> cast(Class<? extends TYPE> clazz) {
return (ColumnSource<TYPE>) this;
}

/**
* Can this column source be evaluated on an arbitrary thread?
*
* Most column sources can be evaluated on an arbitrary thread, however those that do call into Python can not be
* evaluated on an arbitrary thread as the calling thread may already have the GIL, which would result in a deadlock
* when the column source takes the GIL to evaluate formulas.
*
* @return true if this column prevents parallelization
*/
default boolean preventsParallelism() {
return false;
}

/**
* Most column sources will return the same value for a given row without respect to the order that the rows are
* read. Those columns sources are considered "stateless" and should return true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,6 @@ public void fillChunk(@NotNull final FillContext context,
destination.setSize(longChunk.size());
}

@Override
public boolean preventsParallelism() {
return symbolSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return symbolSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,6 @@ public final void fillPrevChunk(@NotNull final FillContext context,
throw new UnsupportedOperationException();
}

@Override
public boolean preventsParallelism() {
return originalSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return originalSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public <ALTERNATE_DATA_TYPE> ColumnSource<ALTERNATE_DATA_TYPE> doReinterpret(
return (ColumnSource<ALTERNATE_DATA_TYPE>) alternateColumnSource;
}

@Override
public boolean preventsParallelism() {
return alternateColumnSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return alternateColumnSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public List<String> getColumns() {
@NotNull
@Override
public ColumnSource<?> getDataView() {
return new ViewColumnSource<>(Table.class, new OutputFormula(), false, true);
return new ViewColumnSource<>(Table.class, new OutputFormula(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public List<String> getColumns() {
@NotNull
@Override
public ColumnSource<?> getDataView() {
return new ViewColumnSource<>(long.class, new OutputFormula(), false, true);
return new ViewColumnSource<>(long.class, new OutputFormula(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public List<String> getColumns() {
@NotNull
@Override
public ColumnSource<?> getDataView() {
return new ViewColumnSource<>(Table.class, new OutputFormula(), false, true);
return new ViewColumnSource<>(Table.class, new OutputFormula(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ public ColumnSource<?> getLazyView() {

@NotNull
private ColumnSource<?> getViewColumnSource(boolean lazy) {
final boolean preventsParallelization = preventsParallelization();
final boolean isStateless = isStateless();

final SecurityManager sm = System.getSecurityManager();
Expand All @@ -219,18 +218,16 @@ private ColumnSource<?> getViewColumnSource(boolean lazy) {
final Formula formula = getFormula(lazy, columnSources, params);
// noinspection unchecked,rawtypes
return new ViewColumnSource((returnedType == boolean.class ? Boolean.class : returnedType), formula,
preventsParallelization, isStateless);
isStateless);
}, context);
} else {
final Formula formula = getFormula(lazy, columnSources, params);
// noinspection unchecked,rawtypes
return new ViewColumnSource((returnedType == boolean.class ? Boolean.class : returnedType), formula,
preventsParallelization, isStateless);
isStateless);
}
}

public abstract boolean preventsParallelization();

private Formula getFormula(boolean initLazyMap,
Map<String, ? extends ColumnSource<?>> columnsToData,
QueryScopeParam<?>... params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,52 +831,15 @@ private static boolean isImmutableType(QueryScopeParam<?> param) {
return TypeUtils.isBoxedType(type);
}

/**
* Is this parameter possibly a Python type?
*
* Immutable types are not Python, known Python wrappers are Python, and anything else from a PythonScope is Python.
*
* @return true if this query scope parameter may be a Python type
*/
private static boolean isPythonType(QueryScopeParam<?> param) {
if (isImmutableType(param)) {
return false;
}

// we want to catch PyObjects, and CallableWrappers even if they were hand inserted into a scope
final Object value = param.getValue();
if (value instanceof PyObject || value instanceof PythonScopeJpyImpl.CallableWrapper
|| value instanceof PyListWrapper || value instanceof PyDictWrapper) {
return true;
}

// beyond the immutable types, we must assume that anything coming from Python is python
return ExecutionContext.getContext().getQueryScope() instanceof PythonScope;
}

private boolean isUsedColumnStateless(String columnName) {
return columnSources.get(columnName).isStateless();
}

private boolean usedColumnUsesPython(String columnName) {
return columnSources.get(columnName).preventsParallelism();
}

@Override
public boolean isStateless() {
return Arrays.stream(params).allMatch(DhFormulaColumn::isImmutableType)
&& usedColumns.stream().allMatch(this::isUsedColumnStateless)
&& usedColumnArrays.stream().allMatch(this::isUsedColumnStateless);
}

/**
* Does this formula column use Python (which would cause us to hang the GIL if we evaluate it off thread?)
*
* @return true if this column has the potential to hang the gil
*/
public boolean preventsParallelization() {
return Arrays.stream(params).anyMatch(DhFormulaColumn::isPythonType)
|| usedColumns.stream().anyMatch(this::usedColumnUsesPython)
|| usedColumnArrays.stream().anyMatch(this::usedColumnUsesPython);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void fillPrevChunk(@NotNull FillContext fillContext,
final FunctionalColumnFillContext ctx = (FunctionalColumnFillContext) fillContext;
ctx.chunkFiller.fillByIndices(this, rowSequence, destination);
}
}, sourceColumnSource.preventsParallelism(), false);
}, false);
}

private static class FunctionalColumnFillContext implements Formula.FillContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends Colu
sourceColumns = localSources.toArray(ColumnSource.ZERO_LENGTH_COLUMN_SOURCE_ARRAY);
prevSources = localPrev.toArray(ColumnSource.ZERO_LENGTH_COLUMN_SOURCE_ARRAY);

usesPython = Arrays.stream(sourceColumns).anyMatch(ColumnSource::preventsParallelism);

return getColumns();
}

Expand Down Expand Up @@ -175,7 +173,7 @@ public void fillPrevChunk(@NotNull FillContext fillContext,
final FunctionalColumnFillContext ctx = (FunctionalColumnFillContext) fillContext;
ctx.chunkFiller.fillByIndices(this, rowSequence, destination);
}
}, usesPython, false);
}, false);
}

private static class FunctionalColumnFillContext implements Formula.FillContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer {
private final boolean flattenedResult;
private final boolean alreadyFlattenedSources;
private final BitSet dependencyBitSet;
private final boolean canUseThreads;
private final boolean canParallelizeThisColumn;
private final boolean isSystemic;
private final boolean resultTypeIsLivenessReferent;
Expand Down Expand Up @@ -86,13 +85,9 @@ final public class SelectColumnLayer extends SelectOrViewColumnLayer {
this.flattenedResult = flattenedResult;
this.alreadyFlattenedSources = alreadyFlattenedSources;

// We can't use threads at all if we have column that uses a Python query scope, because we are likely operating
// under the GIL which will cause a deadlock
canUseThreads = !sc.getDataView().preventsParallelism();

// We can only parallelize this column if we are not redirected, our destination provides ensure previous, and
// the select column is stateless
canParallelizeThisColumn = canUseThreads && !isRedirected
canParallelizeThisColumn = !isRedirected
&& WritableSourceWithPrepareForParallelPopulation.supportsParallelPopulation(ws) && sc.isStateless();

// If we were created on a systemic thread, we want to be sure to make sure that any updates are also
Expand Down Expand Up @@ -573,6 +568,6 @@ public LogOutput append(LogOutput logOutput) {

@Override
public boolean allowCrossColumnParallelization() {
return canUseThreads && inner.allowCrossColumnParallelization();
return inner.allowCrossColumnParallelization();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ public boolean isStateless() {
return false;
}

@Override
public boolean preventsParallelization() {
return true;
}

@Override
protected final FormulaSourceDescriptor getSourceDescriptor() {
return new FormulaSourceDescriptor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,6 @@ private void doFillChunk(@NotNull final ColumnSource.FillContext context,
destination.setSize((int) sz);
}

@Override
public boolean preventsParallelism() {
return innerSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return innerSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,6 @@ private void doFillChunk(@NotNull final ColumnSource.FillContext context,
effectiveContext.shareable.runLengths);
}

@Override
public boolean preventsParallelism() {
return innerSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return innerSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ private static void convertToByte(@NotNull final WritableChunk<? super Values> d
byteDestination.setSize(booleanObjectChunk.size());
}

@Override
public boolean preventsParallelism() {
return alternateColumnSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return alternateColumnSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,6 @@ final void transformChunk(@NotNull final Chunk<? extends Values> source,
}
}

@Override
public boolean preventsParallelism() {
return originalSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return originalSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@ public void fillPrevChunkUnordered(@NotNull FillContext context, @NotNull Writab
convertToBoolean(dest, toBooleanFillContext.byteChunk);
}

@Override
public boolean preventsParallelism() {
return alternateColumnSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return alternateColumnSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,6 @@ private void doOrderedFillAndPermute(@NotNull final ColumnSource<?> innerSource,
}
}

@Override
public boolean preventsParallelism() {
return innerSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return innerSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ private static void convertToLong(@NotNull final WritableChunk<? super Values> d
longDestination.setSize(dateTimeChunk.size());
}

@Override
public boolean preventsParallelism() {
return alternateColumnSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return alternateColumnSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,6 @@ public short getPrevShort(long rowKey) {
return delegate.getPrevShort(rowKey);
}

@Override
public boolean preventsParallelism() {
return delegate.preventsParallelism();
}

@Override
public boolean isStateless() {
return delegate.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ private static void convertToDateTime(@NotNull final WritableChunk<? super Value
dateTimeObjectDestination.setSize(longChunk.size());
}

@Override
public boolean preventsParallelism() {
return alternateColumnSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return alternateColumnSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,6 @@ private void doOrderedFillAscending(@NotNull final ColumnSource<?> innerSource,
}
}

@Override
public boolean preventsParallelism() {
return innerSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return innerSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ public void fillPrevChunk(@NotNull ColumnSource.FillContext _context,
context.reverseKernel.reverse(destination);
}

@Override
public boolean preventsParallelism() {
return innerSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return innerSource.isStateless();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,6 @@ private boolean prevInvalid() {
return prevValidityStep == -1 || prevValidityStep != LogicalClock.DEFAULT.currentStep();
}

@Override
public boolean preventsParallelism() {
return currentSource.preventsParallelism() || (!prevInvalid() && prevSource.preventsParallelism());
}

@Override
public boolean isStateless() {
return currentSource.isStateless() && (prevInvalid() || prevSource.isStateless());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public boolean isImmutable() {
return innerSource.isImmutable();
}

@Override
public boolean preventsParallelism() {
return innerSource.preventsParallelism();
}

@Override
public boolean isStateless() {
return innerSource.isStateless();
Expand Down
Loading