Skip to content

Commit

Permalink
Fix bugs in moveColumns and renameColumns
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Feb 24, 2024
1 parent e29e7c6 commit 859086d
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 104 deletions.
66 changes: 60 additions & 6 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,49 @@ public interface Table extends
@ConcurrentMethod
Table dropColumnFormats();

/**
* Produce a new table with the specified columns renamed using the syntax {@code "NewColumnName=OldColumnName"}.
* <p>
* {@link IllegalArgumentException} will be thrown:
* <ul>
* <li>if a source column is used more than once</li>
* <li>if a destination column is used more than once</li>
* </ul>
*
* @param pairs The columns to rename
* @return The new table, with the columns renamed
*/
@ConcurrentMethod
Table renameColumns(Collection<Pair> pairs);

/**
* Produce a new table with the specified columns renamed using the syntax {@code "NewColumnName=OldColumnName"}.
* <p>
* {@link IllegalArgumentException} will be thrown:
* <ul>
* <li>if a source column is used more than once</li>
* <li>if a destination column is used more than once</li>
* </ul>
*
* @param pairs The columns to rename
* @return The new table, with the columns renamed
*/
@ConcurrentMethod
Table renameColumns(String... pairs);

/**
* Produce a new table with the specified columns renamed using the provided function.
* <p>
* {@link IllegalArgumentException} will be thrown:
* <ul>
* <li>if a source column is used more than once</li>
* <li>if a destination column is used more than once</li>
* </ul>
*
* @param renameFunction The function to apply to each column name
* @return The new table, with the columns renamed
*/
@ConcurrentMethod
Table renameAllColumns(UnaryOperator<String> renameFunction);

@ConcurrentMethod
Expand All @@ -342,8 +381,14 @@ public interface Table extends
Table formatColumnWhere(String columnName, String condition, String formula);

/**
* Produce a new table with the specified columns moved to the leftmost position. Columns can be renamed with the
* Produce a new table with the specified columns moved to the rightmost position. Columns can be renamed with the
* usual syntax, i.e. {@code "NewColumnName=OldColumnName")}.
* <p>
* {@link IllegalArgumentException} will be thrown:
* <ul>
* <li>if a source column is used more than once</li>
* <li>if a destination column is used more than once</li>
* </ul>
*
* @param columnsToMove The columns to move to the left (and, optionally, to rename)
* @return The new table, with the columns rearranged as explained above {@link #moveColumns(int, String...)}
Expand All @@ -354,6 +399,12 @@ public interface Table extends
/**
* Produce a new table with the specified columns moved to the rightmost position. Columns can be renamed with the
* usual syntax, i.e. {@code "NewColumnName=OldColumnName")}.
* <p>
* {@link IllegalArgumentException} will be thrown:
* <ul>
* <li>if a source column is used more than once</li>
* <li>if a destination column is used more than once</li>
* </ul>
*
* @param columnsToMove The columns to move to the right (and, optionally, to rename)
* @return The new table, with the columns rearranged as explained above {@link #moveColumns(int, String...)}
Expand All @@ -362,8 +413,14 @@ public interface Table extends
Table moveColumnsDown(String... columnsToMove);

/**
* Produce a new table with the specified columns moved to the specified {@code index}. Column indices begin at 0.
* Columns can be renamed with the usual syntax, i.e. {@code "NewColumnName=OldColumnName")}.
* Produce a new table with the specified columns moved to the rightmost position. Columns can be renamed with the
* usual syntax, i.e. {@code "NewColumnName=OldColumnName")}.
* <p>
* {@link IllegalArgumentException} will be thrown:
* <ul>
* <li>if a source column is used more than once</li>
* <li>if a destination column is used more than once</li>
* </ul>
*
* @param index The index to which the specified columns should be moved
* @param columnsToMove The columns to move to the specified index (and, optionally, to rename)
Expand All @@ -372,9 +429,6 @@ public interface Table extends
@ConcurrentMethod
Table moveColumns(int index, String... columnsToMove);

@ConcurrentMethod
Table moveColumns(int index, boolean moveToEnd, String... columnsToMove);

// -----------------------------------------------------------------------------------------------------------------
// Slice Operations
// -----------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/
package io.deephaven.engine.table.impl;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.AsOfJoinMatch;
import io.deephaven.api.AsOfJoinRule;
import io.deephaven.api.ColumnName;
Expand Down Expand Up @@ -48,7 +47,6 @@
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.rangejoin.RangeJoinOperation;
import io.deephaven.engine.table.impl.select.MatchPairFactory;
import io.deephaven.engine.table.impl.select.SelectColumnFactory;
import io.deephaven.engine.table.impl.updateby.UpdateBy;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
Expand Down Expand Up @@ -79,6 +77,7 @@
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -924,54 +923,11 @@ private String getCastFormulaInternal(Class<?> dataType) {
}

@Override
public Table moveColumns(int index, boolean moveToEnd, String... columnsToMove) {
public Table moveColumns(final int index, String... columnsToMove) {
final UpdateGraph updateGraph = getUpdateGraph();
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
// Get the current columns
final List<String> currentColumns = getDefinition().getColumnNames();

// Create a Set from columnsToMove. This way, we can rename and rearrange columns at once.
final Set<String> leftColsToMove = new HashSet<>();
final Set<String> rightColsToMove = new HashSet<>();
int extraCols = 0;

for (final String columnToMove : columnsToMove) {
final String left = MatchPairFactory.getExpression(columnToMove).leftColumn;
final String right = MatchPairFactory.getExpression(columnToMove).rightColumn;

if (!leftColsToMove.add(left) || !currentColumns.contains(left) || (rightColsToMove.contains(left)
&& !left.equals(right) && leftColsToMove.stream().anyMatch(col -> col.equals(right)))) {
extraCols++;
}
if (currentColumns.stream().anyMatch(currentColumn -> currentColumn.equals(right))
&& !left.equals(right)
&& rightColsToMove.add(right) && !rightColsToMove.contains(left)) {
extraCols--;
}
}
index += moveToEnd ? extraCols : 0;

// vci for write, cci for currentColumns, ctmi for columnsToMove
final SelectColumn[] viewColumns = new SelectColumn[currentColumns.size() + extraCols];
for (int vci = 0, cci = 0, ctmi = 0; vci < viewColumns.length;) {
if (vci >= index && ctmi < columnsToMove.length) {
viewColumns[vci++] = SelectColumnFactory.getExpression(columnsToMove[ctmi++]);
} else {
// Don't add the column if it's one of the columns we're moving or if it has been renamed.
final String currentColumn = currentColumns.get(cci++);
if (!leftColsToMove.contains(currentColumn)
&& Arrays.stream(viewColumns).noneMatch(
viewCol -> viewCol != null
&& viewCol.getMatchPair().leftColumn.equals(currentColumn))
&& Arrays.stream(columnsToMove)
.noneMatch(colToMove -> MatchPairFactory.getExpression(colToMove).rightColumn
.equals(currentColumn))) {

viewColumns[vci++] = SelectColumnFactory.getExpression(currentColumn);
}
}
}
return viewOrUpdateView(Flavor.View, viewColumns);
final MatchPair[] pairsToMove = MatchPairFactory.getExpressions(columnsToMove);
return renameColumnsImpl("moveColumns(" + index + ", ", Math.max(0, index), pairsToMove);
}
}

Expand Down Expand Up @@ -1820,73 +1776,121 @@ public void onUpdate(final TableUpdate upstream) {
public Table renameColumns(Collection<io.deephaven.api.Pair> pairs) {
final UpdateGraph updateGraph = getUpdateGraph();
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return renameColumnsImpl(MatchPair.fromPairs(pairs));
return renameColumnsImpl("renameColumns(", -1, MatchPair.fromPairs(pairs));
}
}

private Table renameColumnsImpl(MatchPair... pairs) {
return QueryPerformanceRecorder.withNugget("renameColumns(" + matchString(pairs) + ")",
private Table renameColumnsImpl(
@NotNull final String methodNuggetPrefix,
final int movePosition,
@NotNull final MatchPair... pairs) {
return QueryPerformanceRecorder.withNugget(methodNuggetPrefix + matchString(pairs) + ")",
sizeForInstrumentation(), () -> {
if (pairs == null || pairs.length == 0) {
return prepareReturnThis();
}

checkInitiateOperation();

final Map<String, String> pairLookup = new HashMap<>();
final Set<String> newNames = new HashSet<>();
final Map<String, String> pairLookup = new LinkedHashMap<>();
for (final MatchPair pair : pairs) {
if (pair.leftColumn == null || pair.leftColumn.equals("")) {
if (pair.leftColumn == null || pair.leftColumn.isEmpty()) {
throw new IllegalArgumentException(
"Bad left column in rename pair \"" + pair + "\"");
}
if (null == columns.get(pair.rightColumn)) {
throw new IllegalArgumentException("Column \"" + pair.rightColumn + "\" not found");
}
pairLookup.put(pair.rightColumn, pair.leftColumn);
if (pairLookup.put(pair.rightColumn, pair.leftColumn) != null) {
throw new IllegalArgumentException(
"Duplicate source column \"" + pair.rightColumn + "\"");
}
if (!newNames.add(pair.leftColumn)) {
throw new IllegalArgumentException(
"Duplicate destination column \"" + pair.leftColumn + "\"");
}
}

int mcsPairIdx = 0;
final MutableInt mcsPairIdx = new MutableInt();
final MatchPair[] modifiedColumnSetPairs = new MatchPair[columns.size()];

final Map<String, ColumnSource<?>> newColumns = new LinkedHashMap<>();

final Runnable moveColumns = () -> {
for (final Map.Entry<String, String> rename : pairLookup.entrySet()) {
final String oldName = rename.getKey();
final String newName = rename.getValue();
final ColumnSource<?> columnSource = columns.get(oldName);
newColumns.put(newName, columnSource);
modifiedColumnSetPairs[mcsPairIdx.getAndIncrement()] = new MatchPair(newName, oldName);
}
};

for (final Map.Entry<String, ? extends ColumnSource<?>> entry : columns.entrySet()) {
final String oldName = entry.getKey();
final ColumnSource<?> columnSource = entry.getValue();
String newName = pairLookup.get(oldName);
if (newName == null) {
if (newNames.contains(oldName)) {
// this column is being replaced by a rename
continue;
}
newName = oldName;
} else if (movePosition >= 0) {
// we move this column when we get to the right position
continue;
}

if (mcsPairIdx.intValue() == movePosition) {
moveColumns.run();
}
modifiedColumnSetPairs[mcsPairIdx++] = new MatchPair(newName, oldName);

modifiedColumnSetPairs[mcsPairIdx.getAndIncrement()] = new MatchPair(newName, oldName);
newColumns.put(newName, columnSource);
}

final QueryTable queryTable = new QueryTable(rowSet, newColumns);
if (isRefreshing()) {
final ModifiedColumnSet.Transformer mcsTransformer =
newModifiedColumnSetTransformer(queryTable, modifiedColumnSetPairs);
addUpdateListener(new ListenerImpl("renameColumns(" + Arrays.deepToString(pairs) + ')',
this, queryTable) {
@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = queryTable.getModifiedColumnSetForUpdates();
if (upstream.modified().isNonempty()) {
mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(),
downstream.modifiedColumnSet);
} else {
downstream.modifiedColumnSet.clear();
}
queryTable.notifyListeners(downstream);
}
});
if (mcsPairIdx.intValue() <= movePosition) {
moveColumns.run();
}
propagateFlatness(queryTable);

copyAttributes(queryTable, CopyAttributeOperation.RenameColumns);
copySortableColumns(queryTable, pairs);
maybeCopyColumnDescriptions(queryTable, pairs);
final Mutable<QueryTable> result = new MutableObject<>();

final OperationSnapshotControl snapshotControl =
createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
initializeWithSnapshot("renameColumns", snapshotControl, (usePrev, beforeClockValue) -> {
final QueryTable resultTable = new QueryTable(rowSet, newColumns);
propagateFlatness(resultTable);

copyAttributes(resultTable, CopyAttributeOperation.RenameColumns);
copySortableColumns(resultTable, pairs);
maybeCopyColumnDescriptions(resultTable, pairs);

if (snapshotControl != null) {
final ModifiedColumnSet.Transformer mcsTransformer =
newModifiedColumnSetTransformer(resultTable, modifiedColumnSetPairs);
final ListenerImpl listener = new ListenerImpl(
methodNuggetPrefix + Arrays.deepToString(pairs) + ')', this, resultTable) {
@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = resultTable.getModifiedColumnSetForUpdates();
if (upstream.modified().isNonempty()) {
mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(),
downstream.modifiedColumnSet);
} else {
downstream.modifiedColumnSet.clear();
}
resultTable.notifyListeners(downstream);
}
};
snapshotControl.setListenerAndResult(listener, resultTable);
}

result.setValue(resultTable);

return true;
});

return result.getValue();

return queryTable;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ default Table renameColumns(Collection<Pair> pairs) {
}

@Override
default Table moveColumns(int index, boolean moveToEnd, String... columnsToMove) {
default Table moveColumns(int index, String... columnsToMove) {
return throwUnsupported();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,7 @@ default Table moveColumnsUp(String... columnsToMove) {
@ConcurrentMethod
@FinalDefault
default Table moveColumnsDown(String... columnsToMove) {
return moveColumns(numColumns() - columnsToMove.length, true, columnsToMove);
}

@Override
@ConcurrentMethod
@FinalDefault
default Table moveColumns(int index, String... columnsToMove) {
return moveColumns(index, false, columnsToMove);
return moveColumns(numColumns() - columnsToMove.length, columnsToMove);
}

// -----------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ public Table renameColumns(Collection<Pair> pairs) {

@Override
@ConcurrentMethod
public Table moveColumns(int index, boolean moveToEnd, String... columnsToMove) {
return coalesce().moveColumns(index, moveToEnd, columnsToMove);
public Table moveColumns(int index, String... columnsToMove) {
return coalesce().moveColumns(index, columnsToMove);
}

@Override
Expand Down
Loading

0 comments on commit 859086d

Please sign in to comment.