Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance for ConstituentDependency, tighter double-notification testing, MergeListener paranoia #3142

Merged
merged 17 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.base.reference;

import org.jetbrains.annotations.NotNull;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* {@link SimpleReference} implementation that delegates to an internal {@link SimpleReference} which can be replaced
* using the {@link #swapDelegate(SimpleReference, SimpleReference)} method.
*/
public class SwappableDelegatingReference<T> implements SimpleReference<T> {

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SwappableDelegatingReference, SimpleReference> DELEGATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
SwappableDelegatingReference.class, SimpleReference.class, "delegate");

private volatile SimpleReference<T> delegate;

public SwappableDelegatingReference(@NotNull final SimpleReference<T> delegate) {
this.delegate = Objects.requireNonNull(delegate);
}

/**
* Swap the delegate assigned to this SwappableDelegatingReference.
*
* @param oldDelegate The delegate to swap out
* @param newDelegate The delegate to swap in
* @throws IllegalArgumentException if {@code oldDelegate} is not the current delegate value
*/
public void swapDelegate(
@NotNull final SimpleReference<T> oldDelegate,
@NotNull final SimpleReference<T> newDelegate) {
if (!DELEGATE_UPDATER.compareAndSet(this, oldDelegate, newDelegate)) {
throw new IllegalArgumentException(
"Previous delegate mismatch: found " + delegate + ", expected " + oldDelegate);
}
}

@Override
public T get() {
return delegate.get();
}

@Override
public void clear() {
delegate.clear();
delegate = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ private static Table makeDownsampledQueryTable(final QueryTable wholeQueryTable,

if (swapListener != null) {
swapListener.setListenerAndResult(downsampleListener, downsampleListener.resultTable);
downsampleListener.resultTable.addParentReference(swapListener);
downsampleListener.resultTable.addParentReference(downsampleListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,11 +677,15 @@ public void addUpdateListener(final TableUpdateListener listener) {
private SimpleReferenceManager<TableUpdateListener, ? extends SimpleReference<TableUpdateListener>> ensureChildListenerReferences() {
// noinspection unchecked
return ensureField(CHILD_LISTENER_REFERENCES_UPDATER, EMPTY_CHILD_LISTENER_REFERENCES,
() -> new SimpleReferenceManager<>((
final TableUpdateListener tableUpdateListener) -> tableUpdateListener instanceof LegacyListenerAdapter
? (LegacyListenerAdapter) tableUpdateListener
: new WeakSimpleReference<>(tableUpdateListener),
true));
() -> new SimpleReferenceManager<>((final TableUpdateListener tableUpdateListener) -> {
if (tableUpdateListener instanceof LegacyListenerAdapter) {
return (LegacyListenerAdapter) tableUpdateListener;
} else if (tableUpdateListener instanceof SwapListener) {
return ((SwapListener) tableUpdateListener).getReferenceForSource();
} else {
return new WeakSimpleReference<>(tableUpdateListener);
}
}, true));
}

@Override
Expand Down Expand Up @@ -739,6 +743,11 @@ public final void notifyListeners(RowSet added, RowSet removed, RowSet modified)
* callers should pass a {@code copy} for updates they intend to further use.
*/
public final void notifyListeners(final TableUpdate update) {
Assert.eqFalse(isFailed, "isFailed");
final long currentStep = LogicalClock.DEFAULT.currentStep();
// tables may only be updated once per cycle
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");

Assert.eqTrue(update.valid(), "update.valid()");
if (update.empty()) {
update.release();
Expand All @@ -749,8 +758,6 @@ public final void notifyListeners(final TableUpdate update) {

final boolean hasNoListeners = !hasListeners();
if (hasNoListeners) {
final long currentStep = LogicalClock.DEFAULT.currentStep();
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");
lastNotificationStep = currentStep;
update.release();
return;
Expand Down Expand Up @@ -784,10 +791,6 @@ public final void notifyListeners(final TableUpdate update) {
validateUpdateOverlaps(update);
}

// tables may only be updated once per cycle
final long currentStep = LogicalClock.DEFAULT.currentStep();
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");

lastNotificationStep = currentStep;

// notify children
Expand Down Expand Up @@ -892,11 +895,14 @@ private void validateUpdateOverlaps(final TableUpdate update) {
* @param e error
* @param sourceEntry performance tracking
*/
public final void notifyListenersOnError(final Throwable e,
@Nullable final TableListener.Entry sourceEntry) {
public final void notifyListenersOnError(final Throwable e, @Nullable final TableListener.Entry sourceEntry) {
Assert.eqFalse(isFailed, "isFailed");
final long currentStep = LogicalClock.DEFAULT.currentStep();
Assert.lt(lastNotificationStep, "lastNotificationStep", currentStep, "LogicalClock.DEFAULT.currentStep()");

isFailed = true;
maybeSignal();
lastNotificationStep = LogicalClock.DEFAULT.currentStep();
lastNotificationStep = currentStep;

final NotificationQueue notificationQueue = getNotificationQueue();
childListenerReferences.forEach((listenerRef, listener) -> notificationQueue
Expand Down Expand Up @@ -1390,7 +1396,6 @@ public Table copy() {
if (swapListener != null) {
final ListenerImpl listener = new ListenerImpl("copy()", this, resultTable);
swapListener.setListenerAndResult(listener, resultTable);
resultTable.addParentReference(swapListener);
}

result.setValue(resultTable);
Expand Down Expand Up @@ -1422,16 +1427,16 @@ public Table setColumnRenderers(String directive) {
return result;
}

public static <SL extends SwapListenerBase<?>> void initializeWithSnapshot(
String logPrefix, SL swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) {
public static void initializeWithSnapshot(
String logPrefix, SwapListener swapListener, ConstructSnapshot.SnapshotFunction snapshotFunction) {
if (swapListener == null) {
snapshotFunction.call(false, LogicalClock.DEFAULT.currentValue());
return;
}
ConstructSnapshot.callDataSnapshotFunction(logPrefix, swapListener.makeSnapshotControl(), snapshotFunction);
}

public interface SwapListenerFactory<T extends SwapListenerBase<?>> {
public interface SwapListenerFactory<T extends SwapListener> {
T newListener(BaseTable sourceTable);
}

Expand All @@ -1443,7 +1448,7 @@ public interface SwapListenerFactory<T extends SwapListenerBase<?>> {
* @return a swap listener for this table (or null)
*/
@Nullable
public <T extends SwapListenerBase<?>> T createSwapListenerIfRefreshing(final SwapListenerFactory<T> factory) {
public <T extends SwapListener> T createSwapListenerIfRefreshing(final SwapListenerFactory<T> factory) {
if (!isRefreshing()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
Expand Down Expand Up @@ -58,6 +59,7 @@ public static void install(
private final ColumnSource<? extends Dependency>[] dependencyColumns;

private volatile long lastSatisfiedStep;
private long firstUnsatisfiedRowPosition = 0;

private ConstituentDependency(
@NotNull final Dependency resultUpdatedDependency,
Expand Down Expand Up @@ -90,41 +92,55 @@ public boolean satisfied(final long step) {
lastSatisfiedStep = step;
return true;
}
final int chunkSize = Math.toIntExact(Math.min(ColumnIterator.DEFAULT_CHUNK_SIZE, resultRows.size()));
final int numColumns = dependencyColumns.length;
final ChunkSource.GetContext[] contexts = new ChunkSource.GetContext[numColumns];
try (final SharedContext sharedContext = numColumns > 1 ? SharedContext.makeSharedContext() : null;
final SafeCloseable ignored = new SafeCloseableArray<>(contexts);
final RowSequence.Iterator rows = resultRows.getRowSequenceIterator()) {
for (int ci = 0; ci < numColumns; ++ci) {
contexts[ci] = dependencyColumns[ci].makeGetContext(chunkSize, sharedContext);
synchronized (this) {
if (lastSatisfiedStep == step) {
return true;
}
while (rows.hasMore()) {
final RowSequence sliceRows = rows.getNextRowSequenceWithLength(chunkSize);
final int numConstituents = sliceRows.intSize();
final int chunkSize = Math.toIntExact(Math.min(ColumnIterator.DEFAULT_CHUNK_SIZE, resultRows.size()));
final int numColumns = dependencyColumns.length;
final ChunkSource.GetContext[] contexts = new ChunkSource.GetContext[numColumns];
try (final SharedContext sharedContext = numColumns > 1 ? SharedContext.makeSharedContext() : null;
final SafeCloseable ignored = new SafeCloseableArray<>(contexts);
final RowSequence.Iterator rows = resultRows.getRowSequenceIterator()) {
if (firstUnsatisfiedRowPosition > 0) {
rows.advance(resultRows.get(firstUnsatisfiedRowPosition));
}
for (int ci = 0; ci < numColumns; ++ci) {
final ObjectChunk<? extends Dependency, ? extends Values> dependencies =
dependencyColumns[ci].getChunk(contexts[ci], sliceRows).asObjectChunk();
for (int di = 0; di < numConstituents; ++di) {
final Dependency constituent = dependencies.get(di);
if (constituent != null && !constituent.satisfied(step)) {
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("Constituent dependencies not satisfied for ")
.append(this).append(", constituent=").append(constituent)
.endl();
return false;
contexts[ci] = dependencyColumns[ci].makeGetContext(chunkSize, sharedContext);
}
while (rows.hasMore()) {
final RowSequence sliceRows = rows.getNextRowSequenceWithLength(chunkSize);
final int numConstituents = sliceRows.intSize();
for (int ci = 0; ci < numColumns; ++ci) {
final ObjectChunk<? extends Dependency, ? extends Values> dependencies =
dependencyColumns[ci].getChunk(contexts[ci], sliceRows).asObjectChunk();
for (int di = 0; di < numConstituents; ++di) {
final Dependency constituent = dependencies.get(di);
if (constituent != null && !constituent.satisfied(step)) {
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("Constituent dependencies not satisfied for ")
.append(this).append(", constituent=").append(constituent)
.endl();
firstUnsatisfiedRowPosition += di;
return false;
}
}
}
}
if (sharedContext != null) {
sharedContext.reset();
firstUnsatisfiedRowPosition += numConstituents;
if (sharedContext != null) {
sharedContext.reset();
}
}
}
Assert.eq(firstUnsatisfiedRowPosition, "firstUnsatisfiedRowPosition", resultRows.size(),
"resultRows.size()");
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("All constituent dependencies satisfied for ").append(this)
.endl();
lastSatisfiedStep = step;
firstUnsatisfiedRowPosition = 0; // Re-initialize for next cycle

return true;
}
UpdateGraphProcessor.DEFAULT.logDependencies()
.append("All constituent dependencies satisfied for ").append(this)
.endl();
lastSatisfiedStep = step;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ private <T> T throwUnsupported(String opName) {
final ListenerImpl listener =
new ListenerImpl("hierarchicalTable()", rootTable, table);
swapListener.setListenerAndResult(listener, table);
table.addParentReference(swapListener);
}

resultHolder.setValue(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public InstrumentedTableUpdateListenerAdapter(@Nullable final String description
if (this.retain = retain) {
RETENTION_CACHE.retain(this);
if (Liveness.DEBUG_MODE_ENABLED) {
Liveness.log.info().append("LivenessDebug: ShiftObliviousInstrumentedListenerAdapter ")
Liveness.log.info().append("LivenessDebug: InstrumentedTableUpdateListenerAdapter ")
.append(Utils.REFERENT_FORMATTER, this)
.append(" created with retention enabled").endl();
}
Expand Down
Loading