Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DH-18433: SourcePartitionedTable needs to check the size of pending locations even if there are no added or removed locations #6570

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,42 @@ private void processPendingLocations(final boolean notifyListeners) {
try (final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate =
subscriptionBuffer.processPending()) {
if (locationUpdate == null) {
return;
removed = null;
} else {
removed = processRemovals(locationUpdate);
processAdditions(locationUpdate);
}
removed = processRemovals(locationUpdate);
added = processAdditions(locationUpdate);
added = checkPendingLocations();
}

resultRows.update(added, removed);
if (removed == null) {
if (added == null) {
return;
}
resultRows.insert(added);
} else if (added == null) {
resultRows.remove(removed);
} else {
resultRows.update(added, removed);
}
if (notifyListeners) {
result.notifyListeners(new TableUpdateImpl(
added,
removed,
added == null ? RowSetFactory.empty() : added,
removed == null ? RowSetFactory.empty() : removed,
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY));
} else {
added.close();
removed.close();
if (added != null) {
added.close();
}
if (removed != null) {
removed.close();
}
}
}

private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
private void processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
/*
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
Expand All @@ -280,13 +295,18 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
if (locationUpdate != null) {
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
}
}

private RowSet checkPendingLocations() {
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
Expand All @@ -296,7 +316,7 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
}

if (readyLocationStates.isEmpty()) {
return RowSetFactory.empty();
return null;
}

final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void close() {
* reset). No order is maintained internally. If a pending exception is thrown, this signals that the subscription
* is no longer valid and no subsequent location keys will be returned.
*
* @return The collection of pending location keys.
* @return A {@link LocationUpdate} collecting pending added and removed location keys, or {@code null} if there are
* none; the caller must {@link LocationUpdate#close() close} the returned object when done.
*/
public synchronized LocationUpdate processPending() {
if (!subscribed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,42 @@ public void tearDown() throws Exception {
private QueryTable p2;
private QueryTable p3;
private QueryTable p4;
private QueryTable p5;

private DependentRegistrar registrar;
private TableBackedTableLocationProvider tlp;

private SourcePartitionedTable setUpData() {
p1 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p1 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "aa", "bb", "aa", "bb"),
intCol("intCol", 10, 20, 40, 60),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p1.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p2 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p2 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "cc", "dd", "cc", "dd"),
intCol("intCol", 100, 200, 400, 600),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p2.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p3 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p3 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "ee", "ff", "ee", "ff"),
intCol("intCol", 1000, 2000, 4000, 6000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p3.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p4 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
p4 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "gg", "hh", "gg", "hh"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
p4.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

p5 = testRefreshingTable(i().toTracking(), // Initially empty
stringCol("Sym"),
intCol("intCol"),
doubleCol("doubleCol"));
p5.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);

registrar = new DependentRegistrar();
tlp = new TableBackedTableLocationProvider(
registrar,
Expand Down Expand Up @@ -238,7 +245,7 @@ public void testAddAndRemoveLocations() {
*/
final TableLocation location5;
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(), true)) {
final QueryTable p5 = testRefreshingTable(i(0, 1, 2, 3).toTracking(),
final QueryTable p5 = testRefreshingTable(ir(0, 3).toTracking(),
stringCol("Sym", "ii", "jj", "ii", "jj"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
Expand Down Expand Up @@ -407,4 +414,27 @@ public void testCantReadPrev() {
TableLocationRemovedException.class).isPresent()));
getUpdateErrors().clear();
}

@Test
public void testInitiallyEmptyLocation() {
final SourcePartitionedTable spt = setUpData();
final Table ptSummary = spt.merge().selectDistinct("Sym");
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
tlp.add(p5);
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
// We refreshed the source first, so it won't see a new size for the location backed by p5 on this cycle.
addToTable(p5, ir(0, 3),
stringCol("Sym", "ii", "jj", "kk", "ll"),
intCol("intCol", 10000, 20000, 40000, 60000),
doubleCol("doubleCol", 0.1, 0.2, 0.4, 0.6));
}, true);
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd");
updateGraph.getDelegate().runWithinUnitTestCycle(() -> {
updateGraph.refreshSources();
// Now the source has been refreshed, so it should see the new size of the location backed by p5, and
// include it in the result.
}, true);
verifyStringColumnContents(ptSummary, "Sym", "aa", "bb", "cc", "dd", "ii", "jj", "kk", "ll");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ public static WritableRowSet i(long... keys) {
return RowSetFactory.fromKeys(keys);
}

/**
* A shorthand for {@link RowSetFactory#fromRange(long, long)} for use in unit tests.
*
* @param firstRowKey the first key of the new RowSet
* @param lastRowKey the last key (inclusive) of the new RowSet
* @return a new RowSet with the given key range
*/
public static WritableRowSet ir(final long firstRowKey, final long lastRowKey) {
return RowSetFactory.fromRange(firstRowKey, lastRowKey);
}

public static void addToTable(final Table table, final RowSet rowSet, final ColumnHolder<?>... columnHolders) {
if (rowSet.isEmpty()) {
return;
Expand Down