Skip to content

Commit

Permalink
Fix SourceTable coalesce bug when indexed columns are dropped (#5578)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy committed Jun 5, 2024
1 parent 6881afb commit 081793d
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.api.ColumnName;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.table.ColumnSource;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class TableBackedDataIndex extends AbstractDataIndex {
public TableBackedDataIndex(
@NotNull final QueryTable sourceTable,
@NotNull final String... keyColumnNames) {
this.keyColumnNames = List.of(keyColumnNames);
this.keyColumnNames = List.of(Require.elementsNeqNull(keyColumnNames, "keyColumnNames"));

// Create an in-order reverse lookup map for the key column names.
keyColumnNamesByIndexedColumn = Collections.unmodifiableMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ class MergedDataIndex extends AbstractDataIndex {
@NotNull final String[] keyColumnNames,
@NotNull final ColumnSource<?>[] keySources,
@NotNull final RegionedColumnSourceManager columnSourceManager) {

Require.eq(keyColumnNames.length, "keyColumnNames.length", keySources.length, "keySources.length");
Require.elementsNeqNull(keyColumnNames, "keyColumnNames");
Require.elementsNeqNull(keySources, "keySources");

this.keyColumnNames = List.of(keyColumnNames);
this.columnSourceManager = columnSourceManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import gnu.trove.map.hash.TObjectIntHashMap;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderRandom;
Expand Down Expand Up @@ -73,7 +74,9 @@ class PartitioningColumnDataIndex<KEY_TYPE> extends AbstractDataIndex {
@NotNull final String keyColumnName,
@NotNull final ColumnSource<KEY_TYPE> keySource,
@NotNull final RegionedColumnSourceManager columnSourceManager) {
this.keyColumnName = keyColumnName;
this.keyColumnName = Require.neqNull(keyColumnName, "keyColumnName");
Require.neqNull(keySource, "keySource");
Require.neqNull(columnSourceManager, "columnSourceManager");

keyColumnNamesByIndexedColumn = Map.of(keySource, keyColumnName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ public synchronized TrackingWritableRowSet initialize() {
final ColumnSource<?>[] keySources = Arrays.stream(keyColumnNames)
.map(columnSources::get)
.toArray(ColumnSource[]::new);
if (Arrays.stream(keySources).anyMatch(Objects::isNull)) {
// If any of the key sources are missing (e.g. because the user supplied a TableDefinition that
// excludes them, or has used a RedefinableTable operation to drop them), we can't create the
// DataIndex.
continue;
}
final DataIndex mergedIndex = new MergedDataIndex(keyColumnNames, keySources, this);
retainedDataIndexes.add(mergedIndex);
// Not refreshing, so no need to manage mergedIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ private void doTest(final boolean missingIndexes) {

final TableDefinition partitionedMissingDataDefinition = TableDefinition.of(
ColumnDefinition.ofString("Part").withPartitioning(),
ColumnDefinition.ofChar("Sym"),
ColumnDefinition.ofLong("Other"));

final String tableName = "TestTable";
Expand Down Expand Up @@ -173,6 +172,21 @@ private void doTest(final boolean missingIndexes) {
TstUtils.assertTableEquals(expected.groupBy("Sym").ungroup(), actual.groupBy("Sym").ungroup());
}

@Test
public void testDroppedIndexColumn() {
final Table raw = TableTools.emptyTable(26 * 10 * 1000).update("Part=String.format(`%04d`, (long)(ii/1000))",
"Sym=(char)('A' + ii % 26)", "Other=ii");
DataIndexer.getOrCreateDataIndex(raw, "Sym");

final String path =
dataDirectory.getAbsolutePath() + File.separator + "TestTable2" + File.separator + PARQUET_FILE_NAME;

ParquetTools.writeTable(raw, path);

TestCase.assertFalse(DataIndexer.hasDataIndex(
ParquetTools.readTable(path).dropColumns("Sym").coalesce(), "Sym"));
}

@Test
public void testParallelCollection() {
final List<Integer> observedOrder = Collections.synchronizedList(new ArrayList<>());
Expand Down

0 comments on commit 081793d

Please sign in to comment.