Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A few cleanups and bug fixes #4636

Merged
merged 7 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@
* @param <T> A class that will extend us, to get RefCounted functionality.
*/
public abstract class RefCountedCow<T> {

private static final boolean debug =
Configuration.getInstance().getBooleanForClassWithDefault(RspArray.class, "debug", false)
|| Configuration.getInstance().getBooleanForClassWithDefault(RefCountedCow.class, "debug", false);
Configuration.getInstance().getBooleanForClassWithDefault(RefCountedCow.class, "debug", false);

/**
* Field updater for refCount, so we can avoid creating an {@link java.util.concurrent.atomic.AtomicInteger} for
* each instance.
*/
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<RefCountedCow> REFCOUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(RefCountedCow.class, "refCount");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private static RspArray wrapRspArray(final RspArray arr) {
final RspArray arr,
final int startIdx, final long startOffset, final long cardBeforeStartIdx,
final int endIdx, final long endOffset, final long cardBeforeEndIdx) {
if (RspBitmap.debug) {
if (RspArray.debug) {
if (endIdx < startIdx ||
(endIdx == startIdx && endOffset < startOffset)) {
throw new IllegalArgumentException("Empty " + RspRowSequence.class.getSimpleName() + " :" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.rowset.impl.singlerange;

import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.impl.OrderedLongSet;
import io.deephaven.engine.rowset.impl.OrderedLongSetBuilderSequential;
Expand All @@ -13,7 +14,6 @@
import io.deephaven.util.datastructures.LongAbortableConsumer;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.chunk.LongChunk;
import io.deephaven.engine.rowset.impl.rsp.RspArray;
import io.deephaven.engine.rowset.impl.rsp.RspBitmap;
import io.deephaven.engine.rowset.impl.sortedranges.SortedRanges;
import io.deephaven.util.datastructures.LongRangeAbortableConsumer;
Expand All @@ -22,6 +22,10 @@
import java.util.function.LongConsumer;

public abstract class SingleRange implements OrderedLongSet {

private static final boolean debug =
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
Configuration.getInstance().getBooleanForClassWithDefault(SingleRange.class, "debug", false);

public abstract long rangeStart();

public abstract long rangeEnd();
Expand Down Expand Up @@ -129,7 +133,7 @@ public final int ixRefCount() {

@SuppressWarnings("unused")
private void ifDebugValidate() {
if (RspArray.debug) {
if (debug) {
ixValidate();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,13 @@
import java.util.function.Function;

/**
* Base {@link SelectColumn} implementation to wrap transformer functions for
* {@link PartitionedTable#transform(Function)} and
* {@link PartitionedTable#partitionedTransform(PartitionedTable, BiFunction)}.
* Base {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#transform} and
* {@link PartitionedTable#partitionedTransform}.
*/
abstract class BaseTableTransformationColumn implements SelectColumn {

BaseTableTransformationColumn() {}

@Override
public final List<String> initInputs(@NotNull final Table table) {
return initInputs(table.getRowSet(), table.getColumnSourceMap());
}

@Override
public final Class<?> getReturnedType() {
return Table.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.select.Formula;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.engine.table.impl.sources.InMemoryColumnSource;
import io.deephaven.engine.table.impl.sources.SparseArrayColumnSource;
import io.deephaven.engine.table.impl.sources.LongSingleValueSource;
import io.deephaven.engine.table.impl.sources.ViewColumnSource;
import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -70,14 +69,9 @@ public SelectColumn copy() {
return new LongConstantColumn(outputColumnName, outputValue);
}

@Override
public final List<String> initInputs(@NotNull final Table table) {
return initInputs(table.getRowSet(), table.getColumnSourceMap());
}

@Override
public final Class<?> getReturnedType() {
return Table.class;
return long.class;
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -98,12 +92,12 @@ public final MatchPair getMatchPair() {

@Override
public final WritableColumnSource<?> newDestInstance(final long size) {
return SparseArrayColumnSource.getSparseMemoryColumnSource(size, Table.class);
return new LongSingleValueSource();
}

@Override
public final WritableColumnSource<?> newFlatDestInstance(final long size) {
return InMemoryColumnSource.getImmutableMemoryColumnSource(size, Table.class, null);
return new LongSingleValueSource();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.deephaven.engine.tablelogger.EngineTableLoggers;
import io.deephaven.engine.tablelogger.UpdatePerformanceLogLogger;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.engine.util.string.StringUtils;
import io.deephaven.internal.log.LoggerFactory;
Expand Down Expand Up @@ -87,18 +86,20 @@ private static class InternalState {
private boolean encounteredError = false;

private InternalState() {
final UpdateSourceRegistrar registrar =
final UpdateGraph publishingGraph =
PeriodicUpdateGraph.getInstance(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME);
Assert.neqNull(registrar, "The " + PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME + " UpdateGraph "
Assert.neqNull(publishingGraph, "The " + PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME + " UpdateGraph "
+ "must be created before UpdatePerformanceTracker can be initialized.");
tableLogger = EngineTableLoggers.get().updatePerformanceLogLogger();
publisher = new UpdatePerformanceStreamPublisher();
adapter = new StreamToBlinkTableAdapter(
UpdatePerformanceStreamPublisher.definition(),
publisher,
registrar,
UpdatePerformanceTracker.class.getName());
blink = adapter.table();
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(publishingGraph).open()) {
tableLogger = EngineTableLoggers.get().updatePerformanceLogLogger();
publisher = new UpdatePerformanceStreamPublisher();
adapter = new StreamToBlinkTableAdapter(
UpdatePerformanceStreamPublisher.definition(),
publisher,
publishingGraph,
UpdatePerformanceTracker.class.getName());
blink = adapter.table();
}
}

/**
Expand All @@ -124,7 +125,6 @@ private synchronized void publish(
private static final AtomicInteger entryIdCounter = new AtomicInteger(1);

private final UpdateGraph updateGraph;
private final ExecutionContext context;
private final PerformanceEntry aggregatedSmallUpdatesEntry;
private final PerformanceEntry flushEntry;
private final Queue<WeakReference<PerformanceEntry>> entries = new LinkedBlockingDeque<>();
Expand All @@ -135,9 +135,7 @@ private synchronized void publish(
private long intervalStartTimeNanos = QueryConstants.NULL_LONG;

public UpdatePerformanceTracker(final UpdateGraph updateGraph) {
this.updateGraph = updateGraph;
this.context = ExecutionContext.getContext()
.withUpdateGraph(Objects.requireNonNull(updateGraph));
this.updateGraph = Objects.requireNonNull(updateGraph);
this.aggregatedSmallUpdatesEntry = new PerformanceEntry(
QueryConstants.NULL_INT, QueryConstants.NULL_INT, QueryConstants.NULL_INT,
"Aggregated Small Updates", null, updateGraph.getName());
Expand Down Expand Up @@ -166,7 +164,9 @@ public void flush() {
}
final long intervalEndTimeMillis = System.currentTimeMillis();
final long intervalEndTimeNanos = System.nanoTime();
try (final SafeCloseable ignored1 = context.open()) {
// This happens on the primary refresh thread of this UPT's UpdateGraph. It should already have that UG
// installed in the ExecutionContext. If we need another UG, that's the responsibility of the publish callbacks.
try {
finishInterval(
getInternalState(),
intervalStartTimeMillis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ protected AbstractFormulaColumn(String columnName, String formulaString) {
this.columnName = NameValidator.validateColumnName(columnName);
}

@Override
public List<String> initInputs(Table table) {
return initInputs(table.getRowSet(), table.getColumnSourceMap());
}

@Override
public Class<?> getReturnedType() {
return returnedType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.NoSuchColumnException;
import io.deephaven.engine.table.impl.sources.InMemoryColumnSource;
import io.deephaven.engine.table.impl.sources.SparseArrayColumnSource;
import io.deephaven.engine.table.impl.sources.ViewColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.chunk.*;
Expand Down Expand Up @@ -86,11 +88,6 @@ public String toString() {
return "function(" + sourceName + ',' + destName + ')';
}

@Override
public List<String> initInputs(Table table) {
throw new UnsupportedOperationException();
}

@Override
public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
// noinspection unchecked
Expand Down Expand Up @@ -207,13 +204,13 @@ public MatchPair getMatchPair() {
}

@Override
public WritableColumnSource<?> newDestInstance(long size) {
throw new UnsupportedOperationException();
public final WritableColumnSource<?> newDestInstance(final long size) {
return SparseArrayColumnSource.getSparseMemoryColumnSource(size, destDataType);
}

@Override
public WritableColumnSource<?> newFlatDestInstance(long size) {
throw new UnsupportedOperationException();
public final WritableColumnSource<?> newFlatDestInstance(final long size) {
return InMemoryColumnSource.getImmutableMemoryColumnSource(size, destDataType, componentType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.NoSuchColumnException;
import io.deephaven.engine.table.impl.PrevColumnSource;
import io.deephaven.engine.table.impl.sources.InMemoryColumnSource;
import io.deephaven.engine.table.impl.sources.SparseArrayColumnSource;
import io.deephaven.engine.table.impl.sources.ViewColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.chunk.attributes.Values;
Expand Down Expand Up @@ -39,8 +41,6 @@ public class MultiSourceFunctionalColumn<D> implements SelectColumn {
@NotNull
private final Class<?> componentType;

boolean usesPython;

public MultiSourceFunctionalColumn(@NotNull List<String> sourceNames,
@NotNull String destName,
@NotNull Class<D> destDataType,
Expand Down Expand Up @@ -69,11 +69,6 @@ public String toString() {
return "function(" + String.join(",", sourceNames) + ',' + destName + ')';
}

@Override
public List<String> initInputs(Table table) {
throw new UnsupportedOperationException();
}

@Override
public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
final List<ColumnSource<?>> localSources = new ArrayList<>(sourceNames.size());
Expand Down Expand Up @@ -203,13 +198,13 @@ public MatchPair getMatchPair() {
}

@Override
public WritableColumnSource<?> newDestInstance(long size) {
throw new UnsupportedOperationException();
public final WritableColumnSource<?> newDestInstance(final long size) {
return SparseArrayColumnSource.getSparseMemoryColumnSource(size, destDataType);
}

@Override
public WritableColumnSource<?> newFlatDestInstance(long size) {
throw new UnsupportedOperationException();
public final WritableColumnSource<?> newFlatDestInstance(final long size) {
return InMemoryColumnSource.getImmutableMemoryColumnSource(size, destDataType, componentType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ public NullSelectColumn(final Class<T> type, final Class<?> elementType, final S
this.name = name;
}

@Override
public List<String> initInputs(final Table table) {
return Collections.emptyList();
}

@Override
public List<String> initInputs(final TrackingRowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
Expand Down Expand Up @@ -80,12 +75,12 @@ public MatchPair getMatchPair() {

@Override
public WritableColumnSource<?> newDestInstance(final long size) {
return SparseArrayColumnSource.getSparseMemoryColumnSource(size, nvcs.getType(), nvcs.getComponentType());
return nvcs;
}

@Override
public WritableColumnSource<?> newFlatDestInstance(final long size) {
return InMemoryColumnSource.getImmutableMemoryColumnSource(size, nvcs.getType(), nvcs.getComponentType());
return nvcs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ public String toString() {
return "reinterpretAs(" + sourceName + ',' + destName + ')';
}

@Override
public List<String> initInputs(Table table) {
throw new UnsupportedOperationException();
}

@Override
public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
// noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -61,15 +60,6 @@ static Collection<SelectColumn> copyFrom(Collection<SelectColumn> selectColumns)
*/
SelectColumn[] ZERO_LENGTH_SELECT_COLUMN_ARRAY = new SelectColumn[0];

/**
* Initialize the SelectColumn using the input table and return a list of underlying columns that this SelectColumn
* is dependent upon.
*
* @param table the table to initialize internals from
* @return a list containing all columns from 'table' that the result depends on
*/
List<String> initInputs(Table table);

/**
* Initialize the column from the provided set of underlying columns and row set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ private SourceColumn(String sourceName, String destName, boolean unused) {
this.destName = destName;
}

@Override
public List<String> initInputs(Table table) {
this.sourceColumn = table.getColumnSource(sourceName);
if (sourceColumn == null) {
throw new NoSuchColumnException(table.getDefinition().getColumnNames(), sourceName);
}
return Collections.singletonList(sourceName);
}

@Override
public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
this.sourceColumn = columnsOfInterest.get(sourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@ public SwitchColumn(String columnName, String expression, FormulaParserConfigura
this.parser = parserConfiguration;
}

@Override
public List<String> initInputs(Table table) {
if (realColumn == null) {
if (table.getDefinition().getColumn(expression) != null) {
realColumn = new SourceColumn(expression, columnName);
} else {
realColumn = FormulaColumn.createFormulaColumn(columnName, expression, parser);
}
}
return realColumn.initInputs(table);
}

@Override
public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
if (realColumn == null) {
Expand Down
Loading
Loading