Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure that most TableListener/MergedListener Notifications are processed while ensuring liveness; improve timeout handling in Table.awaitUpdate #6422

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -820,12 +820,12 @@ RollupTable rollup(Collection<? extends Aggregation> aggregations, boolean inclu
* <p>
* In some implementations, this call may also terminate in case of interrupt or spurious wakeup.
*
* @param timeout The maximum time to wait in milliseconds.
* @param timeoutMillis The maximum time to wait in milliseconds.
* @return false if the timeout elapses without notification, true otherwise.
* @throws InterruptedException In the event this thread is interrupted
* @see java.util.concurrent.locks.Condition#await()
*/
boolean awaitUpdate(long timeout) throws InterruptedException;
boolean awaitUpdate(long timeoutMillis) throws InterruptedException;

/**
* Subscribe for updates to this table. {@code listener} will be invoked via the {@link NotificationQueue}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,23 @@ public void awaitUpdate() throws InterruptedException {
}

@Override
public boolean awaitUpdate(long timeout) throws InterruptedException {
final MutableBoolean result = new MutableBoolean(false);
updateGraph.exclusiveLock().doLocked(
() -> result.setValue(ensureCondition().await(timeout, TimeUnit.MILLISECONDS)));
public boolean awaitUpdate(long timeoutMillis) throws InterruptedException {

return result.booleanValue();
final long startTime = System.nanoTime();
if (!updateGraph.exclusiveLock().tryLock(timeoutMillis, TimeUnit.MILLISECONDS)) {
// Usually, users will already be holding the exclusive lock when calling this method. If they are not and
// cannot acquire the lock within the timeout, we should return false now.
return false;
}
timeoutMillis -= TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check for timeoutMillis <= 0 and bail (or clamp it). I see no javadoc for the await call on Condition, but the standard wait call has the note:
In all respects, this method behaves as if wait(0L, 0) had been called. See the specification of the wait(long, int) method for details.

That has bit some of the Enterprise gRPC timeout handling in the past.

Which points out that if you have a zero argument, you are going to wait forever. I would prefer to have a check within the try to be very explicit about that edge case.

try {
// Note that we must reacquire the exclusive lock before returning from await. This deadline may be
// exceeded if the thread must wait to reacquire the lock.
return ensureCondition().await(timeoutMillis, TimeUnit.MILLISECONDS);
} finally {
updateGraph.exclusiveLock().unlock();
}
}

private Condition ensureCondition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,22 @@ public final boolean canExecute(final long step) {

void doRun(final Runnable invokeOnUpdate) {
try {
doRunInternal(invokeOnUpdate);
final long currentStep = getUpdateGraph().clock().currentStep();
try {
beforeRunNotification(currentStep);
// Retain a reference during update processing to prevent interference from concurrent destroys
if (!tryRetainReference()) {
// This listener is no longer live, there's no point to doing any work for this notification
return;
}
try {
doRunInternal(invokeOnUpdate);
} finally {
dropReference();
}
} finally {
afterRunNotification(currentStep);
}
} finally {
update.release();
}
Expand All @@ -328,9 +343,7 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
entry.onUpdateStart(update.added(), update.removed(), update.modified(), update.shifted());
}

final long currentStep = getUpdateGraph().clock().currentStep();
try {
beforeRunNotification(currentStep);
invokeOnUpdate.run();
} catch (Exception e) {
final LogEntry en = log.error().append("Uncaught exception for entry= ");
Expand Down Expand Up @@ -363,7 +376,6 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
failed = true;
onFailure(e, entry);
} finally {
afterRunNotification(currentStep);
if (entry != null) {
entry.onUpdateEnd();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,38 +332,15 @@ public MergedNotification() {

@Override
public void run() {
final long currentStep = getUpdateGraph().clock().currentStep();
try {
if (lastEnqueuedStep != currentStep) {
// noinspection ConstantConditions
throw Assert.statementNeverExecuted("Notification step mismatch: listener="
+ System.identityHashCode(MergedListener.this) + ": queuedNotificationStep="
+ lastEnqueuedStep + ", step=" + currentStep);
}

if (upstreamError != null) {
propagateError(false, upstreamError, errorSourceEntry);
return;
}

long added = 0;
long removed = 0;
long modified = 0;
long shifted = 0;

for (ListenerRecorder recorder : recorders) {
if (recorder.getNotificationStep() == currentStep) {
added += recorder.getAdded().size();
removed += recorder.getRemoved().size();
modified += recorder.getModified().size();
shifted += recorder.getShifted().getEffectiveSize();
}
}

if (entry != null) {
entry.onUpdateStart(added, removed, modified, shifted);
}
final long currentStep = getUpdateGraph().clock().currentStep();
try {
if (lastEnqueuedStep != currentStep) {
// noinspection ConstantConditions
throw Assert.statementNeverExecuted("Notification step mismatch: listener="
+ System.identityHashCode(MergedListener.this) + ": queuedNotificationStep="
+ lastEnqueuedStep + ", step=" + currentStep);
}
synchronized (MergedListener.this) {
if (notificationStep == lastEnqueuedStep) {
// noinspection ConstantConditions
Expand All @@ -373,23 +350,60 @@ public void run() {
}
notificationStep = lastEnqueuedStep;
}
process();
getUpdateGraph().logDependencies()
.append("MergedListener has completed execution ")
.append(this).endl();
} finally {
if (entry != null) {
entry.onUpdateEnd();
// Retain a reference during update processing to prevent interference from concurrent destroys
if (!tryRetainReference()) {
// This listener is no longer live, there's no point to doing any work for this notification
return;
}
try {
runInternal(currentStep);
} catch (Exception updateException) {
handleUncaughtException(updateException);
} finally {
dropReference();
}
} finally {
StepUpdater.forceUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, MergedListener.this, currentStep);
}
} catch (Exception updateException) {
handleUncaughtException(updateException);
} finally {
StepUpdater.forceUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, MergedListener.this, currentStep);
releaseFromRecorders();
}
}

private void runInternal(final long currentStep) {
if (upstreamError != null) {
propagateError(false, upstreamError, errorSourceEntry);
return;
}
long added = 0;
long removed = 0;
long modified = 0;
long shifted = 0;

for (ListenerRecorder recorder : recorders) {
if (recorder.getNotificationStep() == currentStep) {
added += recorder.getAdded().size();
removed += recorder.getRemoved().size();
modified += recorder.getModified().size();
shifted += recorder.getShifted().getEffectiveSize();
}
}

if (entry != null) {
entry.onUpdateStart(added, removed, modified, shifted);
}
try {
process();
getUpdateGraph().logDependencies()
.append("MergedListener has completed execution ")
.append(this).endl();
} finally {
if (entry != null) {
entry.onUpdateEnd();
}
}
}

@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("Merged Notification ").append(System.identityHashCode(MergedListener.this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ default void awaitUpdate() throws InterruptedException {

@SuppressWarnings("RedundantThrows")
@Override
default boolean awaitUpdate(long timeout) throws InterruptedException {
default boolean awaitUpdate(long timeoutMillis) throws InterruptedException {
return throwUnsupported();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ public void awaitUpdate() throws InterruptedException {
}

@Override
public boolean awaitUpdate(long timeout) throws InterruptedException {
return coalesce().awaitUpdate(timeout);
public boolean awaitUpdate(long timeoutMillis) throws InterruptedException {
return coalesce().awaitUpdate(timeoutMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.api.filter.Filter;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.ReferenceCountedLivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
Expand All @@ -24,10 +26,7 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

import static io.deephaven.engine.testutil.TstUtils.*;
Expand Down Expand Up @@ -153,7 +152,7 @@ public void testIncremental() throws ParseException {
final TimeSeriesFilter exclusionFilter =
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true)
.build();
final ArrayList<WeakReference<TimeSeriesFilter>> filtersToRefresh = new ArrayList<>();
final List<WeakReference<TimeSeriesFilter>> filtersToRefresh = Collections.synchronizedList(new ArrayList<>());

final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter);
Expand Down Expand Up @@ -192,7 +191,7 @@ public void testIncremental2() throws ParseException {
final TimeSeriesFilter exclusionFilter =
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true)
.build();
final ArrayList<WeakReference<TimeSeriesFilter>> filtersToRefresh = new ArrayList<>();
final List<WeakReference<TimeSeriesFilter>> filtersToRefresh = Collections.synchronizedList(new ArrayList<>());

EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter);

Expand All @@ -214,18 +213,36 @@ public void testIncremental2() throws ParseException {
}

private static EvalNugget @NotNull [] makeNuggets(QueryTable table, TimeSeriesFilter inclusionFilter,
ArrayList<WeakReference<TimeSeriesFilter>> filtersToRefresh, ControlledUpdateGraph updateGraph,
List<WeakReference<TimeSeriesFilter>> filtersToRefresh, ControlledUpdateGraph updateGraph,
TimeSeriesFilter exclusionFilter) {
final Table withInstant = table.update("Date=DateTimeUtils.epochNanosToInstant(Date.getTime() * 1000000L)");
return new EvalNugget[] {
EvalNugget.from(() -> {
final TimeSeriesFilter inclusionCopy = inclusionFilter.copy();
filtersToRefresh.add(new WeakReference<>(inclusionCopy));
final WeakReference<TimeSeriesFilter> filterRef = new WeakReference<>(inclusionCopy);
final LivenessReferent sentinel = new ReferenceCountedLivenessReferent() {
@Override
public void destroy() {
super.destroy();
filtersToRefresh.remove(filterRef);
}
};
inclusionCopy.manage(sentinel);
filtersToRefresh.add(filterRef);
return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(inclusionCopy));
}),
EvalNugget.from(() -> {
final TimeSeriesFilter exclusionCopy = exclusionFilter.copy();
filtersToRefresh.add(new WeakReference<>(exclusionCopy));
final WeakReference<TimeSeriesFilter> filterRef = new WeakReference<>(exclusionCopy);
final LivenessReferent sentinel = new ReferenceCountedLivenessReferent() {
@Override
public void destroy() {
super.destroy();
filtersToRefresh.remove(filterRef);
}
};
exclusionCopy.manage(sentinel);
filtersToRefresh.add(filterRef);
return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(exclusionCopy));
}),
};
Expand Down