diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java
index c784a10fedb..1fc9c6aa620 100644
--- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java
+++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java
@@ -820,12 +820,12 @@ RollupTable rollup(Collection extends Aggregation> aggregations, boolean inclu
*
* In some implementations, this call may also terminate in case of interrupt or spurious wakeup.
*
- * @param timeout The maximum time to wait in milliseconds.
+ * @param timeoutMillis The maximum time to wait in milliseconds.
* @return false if the timeout elapses without notification, true otherwise.
* @throws InterruptedException In the event this thread is interrupted
* @see java.util.concurrent.locks.Condition#await()
*/
- boolean awaitUpdate(long timeout) throws InterruptedException;
+ boolean awaitUpdate(long timeoutMillis) throws InterruptedException;
/**
* Subscribe for updates to this table. {@code listener} will be invoked via the {@link NotificationQueue}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java
index e0a1e4d1102..23d28d7e46b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java
@@ -40,7 +40,6 @@
import io.deephaven.util.annotations.ReferentialIntegrity;
import io.deephaven.util.datastructures.SimpleReferenceManager;
import io.deephaven.util.datastructures.hash.IdentityKeyedObjectKey;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -73,13 +72,16 @@ public abstract class BaseTable> extends
private static final Logger log = LoggerFactory.getLogger(BaseTable.class);
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater CONDITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(BaseTable.class, Condition.class, "updateGraphCondition");
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater PARENTS_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(BaseTable.class, Collection.class, "parents");
private static final Collection EMPTY_PARENTS = Collections.emptyList();
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater CHILD_LISTENER_REFERENCES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
BaseTable.class, SimpleReferenceManager.class, "childListenerReferences");
@@ -521,16 +523,49 @@ public boolean satisfied(final long step) {
@Override
public void awaitUpdate() throws InterruptedException {
- updateGraph.exclusiveLock().doLocked(ensureCondition()::await);
+ final long startLastNotificationStep = lastNotificationStep;
+ if (isFailed) {
+ return;
+ }
+ updateGraph.exclusiveLock().doLocked(() -> {
+ while (!isFailed && startLastNotificationStep == lastNotificationStep) {
+ ensureCondition().await();
+ }
+ });
}
@Override
- public boolean awaitUpdate(long timeout) throws InterruptedException {
- final MutableBoolean result = new MutableBoolean(false);
- updateGraph.exclusiveLock().doLocked(
- () -> result.setValue(ensureCondition().await(timeout, TimeUnit.MILLISECONDS)));
+ public boolean awaitUpdate(final long timeoutMillis) throws InterruptedException {
+ // TODO: Think about this. Does it make sense to check notification steps from inside this method? Doesn't the
+ // caller need to check themselves? But then we can't know when to terminate early unless they provide the
+ // step, which is harder to use and represents an interface change. They can use a condition themselves,
+ // though, if they have exotic requirements. I think we just need a note outside that the caller should be
+ // holding the exclusive lock if they want this to be reliable.
+ final long startLastNotificationStep = lastNotificationStep;
+ long lastStartTime = System.nanoTime();
+ if (isFailed) {
+ return startLastNotificationStep != lastNotificationStep;
+ }
- return result.booleanValue();
+ long remainingNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
+ // No need to validate the input timeout: if remainingNanos <= 0, tryLock is guaranteed to not wait at all.
+ if (!updateGraph.exclusiveLock().tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {
+ // Usually, users will already be holding the exclusive lock when calling this method. If they are not and
+ // cannot acquire the lock within the timeout, we should return false if no update has been observed.
+ return startLastNotificationStep != lastNotificationStep;
+ }
+ while (startLastNotificationStep == ())
+ remainingNanos -= System.nanoTime() - lastStartTime;
+ try {
+ if (remainingNanos <= 0) {
+ return false;
+ }
+ // Note that we must reacquire the exclusive lock before returning from await. This deadline may be
+ // exceeded if the thread must wait to reacquire the lock.
+ return ensureCondition().awaitNanos(remainingNanos);
+ } finally {
+ updateGraph.exclusiveLock().unlock();
+ }
}
private Condition ensureCondition() {
@@ -670,74 +705,74 @@ public final void notifyListeners(RowSet added, RowSet removed, RowSet modified)
* callers should pass a {@code copy} for updates they intend to further use.
*/
public final void notifyListeners(final TableUpdate update) {
- Assert.eqFalse(isFailed, "isFailed");
- final long currentStep = updateGraph.clock().currentStep();
- // tables may only be updated once per cycle
- Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "updateGraph.clock().currentStep()");
-
- Assert.eqTrue(update.valid(), "update.valid()");
- if (update.empty()) {
- update.release();
- return;
- }
-
- maybeSignal();
-
- final boolean hasNoListeners = !hasListeners();
- if (hasNoListeners) {
- lastNotificationStep = currentStep;
- update.release();
- return;
- }
+ try {
+ Assert.eqFalse(isFailed, "isFailed");
+ final long currentStep = updateGraph.clock().currentStep();
+ // tables may only be updated once per cycle
+ Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "updateGraph.clock().currentStep()");
+
+ Assert.eqTrue(update.valid(), "update.valid()");
+ if (update.empty()) {
+ return;
+ }
- Assert.neqNull(update.added(), "added");
- Assert.neqNull(update.removed(), "removed");
- Assert.neqNull(update.modified(), "modified");
- Assert.neqNull(update.shifted(), "shifted");
+ final boolean hasNoListeners = !hasListeners();
+ if (hasNoListeners) {
+ lastNotificationStep = currentStep;
+ maybeSignal();
+ return;
+ }
- if (isFlat()) {
- Assert.assertion(getRowSet().isFlat(), "getRowSet().isFlat()", getRowSet(), "getRowSet()");
- }
- if (isAppendOnly() || isAddOnly()) {
- Assert.assertion(update.removed().isEmpty(), "update.removed.empty()");
- Assert.assertion(update.modified().isEmpty(), "update.modified.empty()");
- Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
- }
- if (isAppendOnly()) {
- Assert.assertion(getRowSet().sizePrev() == 0 || getRowSet().lastRowKeyPrev() < update.added().firstRowKey(),
- "getRowSet().lastRowKeyPrev() < update.added().firstRowKey()");
- }
- if (isBlink()) {
- Assert.eq(update.added().size(), "added size", getRowSet().size(), "current table size");
- Assert.eq(update.removed().size(), "removed size", getRowSet().sizePrev(), "previous table size");
- Assert.assertion(update.modified().isEmpty(), "update.modified.isEmpty()");
- Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
- }
+ Assert.neqNull(update.added(), "added");
+ Assert.neqNull(update.removed(), "removed");
+ Assert.neqNull(update.modified(), "modified");
+ Assert.neqNull(update.shifted(), "shifted");
- // First validate that each rowSet is in a sane state.
- if (VALIDATE_UPDATE_INDICES) {
- update.added().validate();
- update.removed().validate();
- update.modified().validate();
- update.shifted().validate();
- Assert.eq(update.modified().isEmpty(), "update.modified.empty()", update.modifiedColumnSet().empty(),
- "update.modifiedColumnSet.empty()");
- }
+ if (isFlat()) {
+ Assert.assertion(getRowSet().isFlat(), "getRowSet().isFlat()", getRowSet(), "getRowSet()");
+ }
+ if (isAppendOnly() || isAddOnly()) {
+ Assert.assertion(update.removed().isEmpty(), "update.removed.empty()");
+ Assert.assertion(update.modified().isEmpty(), "update.modified.empty()");
+ Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
+ }
+ if (isAppendOnly()) {
+ Assert.assertion(getRowSet().sizePrev() == 0
+ || getRowSet().lastRowKeyPrev() < update.added().firstRowKey(),
+ "getRowSet().sizePrev() == 0 || getRowSet().lastRowKeyPrev() < update.added().firstRowKey()");
+ }
+ if (isBlink()) {
+ Assert.eq(update.added().size(), "added size", getRowSet().size(), "current table size");
+ Assert.eq(update.removed().size(), "removed size", getRowSet().sizePrev(), "previous table size");
+ Assert.assertion(update.modified().isEmpty(), "update.modified.isEmpty()");
+ Assert.assertion(update.shifted().empty(), "update.shifted.empty()");
+ }
- if (VALIDATE_UPDATE_OVERLAPS) {
- validateUpdateOverlaps(update);
- }
+ // First validate that each rowSet is in a sane state.
+ if (VALIDATE_UPDATE_INDICES) {
+ update.added().validate();
+ update.removed().validate();
+ update.modified().validate();
+ update.shifted().validate();
+ Assert.eq(update.modified().isEmpty(), "update.modified.empty()", update.modifiedColumnSet().empty(),
+ "update.modifiedColumnSet.empty()");
+ }
- // notify children
- synchronized (this) {
- lastNotificationStep = currentStep;
+ if (VALIDATE_UPDATE_OVERLAPS) {
+ validateUpdateOverlaps(update);
+ }
- final NotificationQueue notificationQueue = getNotificationQueue();
- childListenerReferences.forEach(
- (listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
+ // notify children
+ synchronized (this) {
+ lastNotificationStep = currentStep;
+ maybeSignal();
+ final NotificationQueue notificationQueue = getNotificationQueue();
+ childListenerReferences.forEach(
+ (listenerRef, listener) -> notificationQueue.addNotification(listener.getNotification(update)));
+ }
+ } finally {
+ update.release();
}
-
- update.release();
}
private void validateUpdateOverlaps(final TableUpdate update) {
@@ -841,11 +876,10 @@ public final void notifyListenersOnError(final Throwable e, @Nullable final Tabl
"updateGraph.clock().currentStep()");
isFailed = true;
- maybeSignal();
synchronized (this) {
lastNotificationStep = currentStep;
-
+ maybeSignal();
final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
.addNotification(listener.getErrorNotification(e, sourceEntry)));
@@ -889,7 +923,7 @@ public Table markSystemic() {
* Simplest appropriate legacy ShiftObliviousInstrumentedListener implementation for BaseTable and descendants. It's
* expected that most use-cases will require overriding onUpdate() - the default implementation simply passes rowSet
* updates through to the dependent's listeners.
- *
+ *
* It is preferred to use {@link ListenerImpl} over {@link ShiftObliviousListenerImpl}
*/
public static class ShiftObliviousListenerImpl extends ShiftObliviousInstrumentedListener {
@@ -1201,7 +1235,7 @@ void maybeCopyColumnDescriptions(final BaseTable> destination, final SelectCol
}
final Map sourceDescriptions = new HashMap<>(oldDescriptions);
- if (selectColumns != null && selectColumns.length != 0) {
+ if (selectColumns != null) {
for (final SelectColumn sc : selectColumns) {
sourceDescriptions.remove(sc.getName());
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java
index c14d5eaf20f..3bb78223d7b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java
@@ -313,7 +313,22 @@ public final boolean canExecute(final long step) {
void doRun(final Runnable invokeOnUpdate) {
try {
- doRunInternal(invokeOnUpdate);
+ final long currentStep = getUpdateGraph().clock().currentStep();
+ try {
+ beforeRunNotification(currentStep);
+ // Retain a reference during update processing to prevent interference from concurrent destroys
+ if (!tryRetainReference()) {
+ // This listener is no longer live, there's no point to doing any work for this notification
+ return;
+ }
+ try {
+ doRunInternal(invokeOnUpdate);
+ } finally {
+ dropReference();
+ }
+ } finally {
+ afterRunNotification(currentStep);
+ }
} finally {
update.release();
}
@@ -328,9 +343,7 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
entry.onUpdateStart(update.added(), update.removed(), update.modified(), update.shifted());
}
- final long currentStep = getUpdateGraph().clock().currentStep();
try {
- beforeRunNotification(currentStep);
invokeOnUpdate.run();
} catch (Exception e) {
final LogEntry en = log.error().append("Uncaught exception for entry= ");
@@ -363,7 +376,6 @@ private void doRunInternal(final Runnable invokeOnUpdate) {
failed = true;
onFailure(e, entry);
} finally {
- afterRunNotification(currentStep);
if (entry != null) {
entry.onUpdateEnd();
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java
index b4a2462f219..cb0c4d24113 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java
@@ -332,38 +332,15 @@ public MergedNotification() {
@Override
public void run() {
- final long currentStep = getUpdateGraph().clock().currentStep();
try {
- if (lastEnqueuedStep != currentStep) {
- // noinspection ConstantConditions
- throw Assert.statementNeverExecuted("Notification step mismatch: listener="
- + System.identityHashCode(MergedListener.this) + ": queuedNotificationStep="
- + lastEnqueuedStep + ", step=" + currentStep);
- }
-
- if (upstreamError != null) {
- propagateError(false, upstreamError, errorSourceEntry);
- return;
- }
-
- long added = 0;
- long removed = 0;
- long modified = 0;
- long shifted = 0;
-
- for (ListenerRecorder recorder : recorders) {
- if (recorder.getNotificationStep() == currentStep) {
- added += recorder.getAdded().size();
- removed += recorder.getRemoved().size();
- modified += recorder.getModified().size();
- shifted += recorder.getShifted().getEffectiveSize();
- }
- }
-
- if (entry != null) {
- entry.onUpdateStart(added, removed, modified, shifted);
- }
+ final long currentStep = getUpdateGraph().clock().currentStep();
try {
+ if (lastEnqueuedStep != currentStep) {
+ // noinspection ConstantConditions
+ throw Assert.statementNeverExecuted("Notification step mismatch: listener="
+ + System.identityHashCode(MergedListener.this) + ": queuedNotificationStep="
+ + lastEnqueuedStep + ", step=" + currentStep);
+ }
synchronized (MergedListener.this) {
if (notificationStep == lastEnqueuedStep) {
// noinspection ConstantConditions
@@ -373,23 +350,60 @@ public void run() {
}
notificationStep = lastEnqueuedStep;
}
- process();
- getUpdateGraph().logDependencies()
- .append("MergedListener has completed execution ")
- .append(this).endl();
- } finally {
- if (entry != null) {
- entry.onUpdateEnd();
+ // Retain a reference during update processing to prevent interference from concurrent destroys
+ if (!tryRetainReference()) {
+ // This listener is no longer live, there's no point to doing any work for this notification
+ return;
+ }
+ try {
+ runInternal(currentStep);
+ } catch (Exception updateException) {
+ handleUncaughtException(updateException);
+ } finally {
+ dropReference();
}
+ } finally {
+ StepUpdater.forceUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, MergedListener.this, currentStep);
}
- } catch (Exception updateException) {
- handleUncaughtException(updateException);
} finally {
- StepUpdater.forceUpdateRecordedStep(LAST_COMPLETED_STEP_UPDATER, MergedListener.this, currentStep);
releaseFromRecorders();
}
}
+ private void runInternal(final long currentStep) {
+ if (upstreamError != null) {
+ propagateError(false, upstreamError, errorSourceEntry);
+ return;
+ }
+ long added = 0;
+ long removed = 0;
+ long modified = 0;
+ long shifted = 0;
+
+ for (ListenerRecorder recorder : recorders) {
+ if (recorder.getNotificationStep() == currentStep) {
+ added += recorder.getAdded().size();
+ removed += recorder.getRemoved().size();
+ modified += recorder.getModified().size();
+ shifted += recorder.getShifted().getEffectiveSize();
+ }
+ }
+
+ if (entry != null) {
+ entry.onUpdateStart(added, removed, modified, shifted);
+ }
+ try {
+ process();
+ getUpdateGraph().logDependencies()
+ .append("MergedListener has completed execution ")
+ .append(this).endl();
+ } finally {
+ if (entry != null) {
+ entry.onUpdateEnd();
+ }
+ }
+ }
+
@Override
public LogOutput append(LogOutput logOutput) {
return logOutput.append("Merged Notification ").append(System.identityHashCode(MergedListener.this))
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java
index 171defa9953..54356a377fc 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java
@@ -357,7 +357,7 @@ default void awaitUpdate() throws InterruptedException {
@SuppressWarnings("RedundantThrows")
@Override
- default boolean awaitUpdate(long timeout) throws InterruptedException {
+ default boolean awaitUpdate(long timeoutMillis) throws InterruptedException {
return throwUnsupported();
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java
index bbcd8ad0dd4..00bfcfb808f 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java
@@ -459,8 +459,8 @@ public void awaitUpdate() throws InterruptedException {
}
@Override
- public boolean awaitUpdate(long timeout) throws InterruptedException {
- return coalesce().awaitUpdate(timeout);
+ public boolean awaitUpdate(long timeoutMillis) throws InterruptedException {
+ return coalesce().awaitUpdate(timeoutMillis);
}
@Override
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java
index d5cf3646f2c..232576e28c3 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestTimeSeriesFilter.java
@@ -5,6 +5,8 @@
import io.deephaven.api.filter.Filter;
import io.deephaven.engine.context.ExecutionContext;
+import io.deephaven.engine.liveness.LivenessReferent;
+import io.deephaven.engine.liveness.ReferenceCountedLivenessReferent;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
@@ -24,10 +26,7 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import static io.deephaven.engine.testutil.TstUtils.*;
@@ -153,7 +152,7 @@ public void testIncremental() throws ParseException {
final TimeSeriesFilter exclusionFilter =
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true)
.build();
- final ArrayList> filtersToRefresh = new ArrayList<>();
+ final List> filtersToRefresh = Collections.synchronizedList(new ArrayList<>());
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter);
@@ -192,7 +191,7 @@ public void testIncremental2() throws ParseException {
final TimeSeriesFilter exclusionFilter =
TimeSeriesFilter.newBuilder().columnName("Date").period("PT01:00:00").clock(testClock).invert(true)
.build();
- final ArrayList> filtersToRefresh = new ArrayList<>();
+ final List> filtersToRefresh = Collections.synchronizedList(new ArrayList<>());
EvalNugget[] en = makeNuggets(table, inclusionFilter, filtersToRefresh, updateGraph, exclusionFilter);
@@ -214,18 +213,36 @@ public void testIncremental2() throws ParseException {
}
private static EvalNugget @NotNull [] makeNuggets(QueryTable table, TimeSeriesFilter inclusionFilter,
- ArrayList> filtersToRefresh, ControlledUpdateGraph updateGraph,
+ List> filtersToRefresh, ControlledUpdateGraph updateGraph,
TimeSeriesFilter exclusionFilter) {
final Table withInstant = table.update("Date=DateTimeUtils.epochNanosToInstant(Date.getTime() * 1000000L)");
return new EvalNugget[] {
EvalNugget.from(() -> {
final TimeSeriesFilter inclusionCopy = inclusionFilter.copy();
- filtersToRefresh.add(new WeakReference<>(inclusionCopy));
+ final WeakReference filterRef = new WeakReference<>(inclusionCopy);
+ final LivenessReferent sentinel = new ReferenceCountedLivenessReferent() {
+ @Override
+ public void destroy() {
+ super.destroy();
+ filtersToRefresh.remove(filterRef);
+ }
+ };
+ inclusionCopy.manage(sentinel);
+ filtersToRefresh.add(filterRef);
return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(inclusionCopy));
}),
EvalNugget.from(() -> {
final TimeSeriesFilter exclusionCopy = exclusionFilter.copy();
- filtersToRefresh.add(new WeakReference<>(exclusionCopy));
+ final WeakReference filterRef = new WeakReference<>(exclusionCopy);
+ final LivenessReferent sentinel = new ReferenceCountedLivenessReferent() {
+ @Override
+ public void destroy() {
+ super.destroy();
+ filtersToRefresh.remove(filterRef);
+ }
+ };
+ exclusionCopy.manage(sentinel);
+ filtersToRefresh.add(filterRef);
return updateGraph.exclusiveLock().computeLocked(() -> withInstant.where(exclusionCopy));
}),
};