Skip to content

Commit

Permalink
Address dependency and liveness issues in KafkaTools.StreamPartitione…
Browse files Browse the repository at this point in the history
…dTable (deephaven#4968)

* Address dependency satisfaction issue for KafkaTools.StreamPartitionedTable when used with append-only or ring constituents

* Refactor KafkaTools.StreamPartitionedTable to be StreamPartitionedQueryTable: This allows us to address some referential integrity issues w.r.t. the UpdateSourceCombiner, and also makes the relationships more standard.

* Address liveness issue for the UpdateSourceCombiner used by KafkaTools.StreamPartitionedQueryTable: StreamToBlinkTableAdapter result tables must manage the registrar in case the StreamPartitionedQueryTable is leaked or destroyed before the constituents are.
  • Loading branch information
rcaudy committed Dec 20, 2023
1 parent 6d3702b commit 6d203ab
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected void instrumentedRefresh() {
}

if (refreshCombiner != null) {
result.getUpdateGraph().addSource(refreshCombiner);
refreshCombiner.install();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.ModifiedColumnSet;
Expand Down Expand Up @@ -167,6 +168,10 @@ public StreamToBlinkTableAdapter(
setAttribute(e.getKey(), e.getValue());
}
addParentReference(StreamToBlinkTableAdapter.this);
// Ensure that the UpdateSourceRegistrar remains alive while the blink table does.
if (updateSourceRegistrar instanceof LivenessReferent) {
manage((LivenessReferent) updateSourceRegistrar);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ public UpdateSourceCombiner(final UpdateGraph updateGraph) {
this.updateGraph = updateGraph;
}

/**
* Add this UpdateSourceCombiner to the {@link UpdateGraph update graph} passed at construction. This should only be
* done once.
*/
public void install() {
updateGraph.addSource(this);
}

@Override
public void run() {
combinedTables.forEachValidReference(Runnable::run);
Expand Down
118 changes: 75 additions & 43 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.deephaven.annotations.SimpleStyle;
import io.deephaven.annotations.SingletonStyle;
import io.deephaven.chunk.ChunkType;
import io.deephaven.processor.ObjectProcessor;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessManager;
import io.deephaven.engine.liveness.LivenessScope;
Expand All @@ -32,15 +31,16 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.BlinkTableTools;
import io.deephaven.engine.table.impl.ConstituentDependency;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.table.impl.partitioned.PartitionedTableImpl;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.kafka.AvroImpl.AvroConsume;
Expand Down Expand Up @@ -70,6 +70,7 @@
import io.deephaven.kafka.publish.KafkaPublisherException;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.kafka.publish.PublishToKafka;
import io.deephaven.processor.ObjectProcessor;
import io.deephaven.protobuf.ProtobufDescriptorParserOptions;
import io.deephaven.qst.column.header.ColumnHeader;
import io.deephaven.stream.StreamChunkUtils;
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public static PartitionedTable consumeToPartitionedTable(
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec,
@NotNull final TableType tableType) {
final AtomicReference<StreamPartitionedTable> resultHolder = new AtomicReference<>();
final AtomicReference<StreamPartitionedQueryTable> resultHolder = new AtomicReference<>();
final ExecutionContext enclosingExecutionContext = ExecutionContext.getContext();
final LivenessManager enclosingLivenessManager = LivenessScopeStack.peek();

Expand All @@ -1009,12 +1010,12 @@ public static PartitionedTable consumeToPartitionedTable(
final StreamPublisher streamPublisher) -> {
try (final SafeCloseable ignored1 = enclosingExecutionContext.open();
final SafeCloseable ignored2 = LivenessScopeStack.open()) {
StreamPartitionedTable result = resultHolder.get();
StreamPartitionedQueryTable result = resultHolder.get();
if (result == null) {
synchronized (resultHolder) {
result = resultHolder.get();
if (result == null) {
result = new StreamPartitionedTable(tableDefinition);
result = new StreamPartitionedQueryTable(tableDefinition);
enclosingLivenessManager.manage(result);
resultHolder.set(result);
}
Expand All @@ -1025,7 +1026,7 @@ public static PartitionedTable consumeToPartitionedTable(
final StreamToBlinkTableAdapter streamToBlinkTableAdapter = new StreamToBlinkTableAdapter(
tableDefinition,
streamPublisher,
result.refreshCombiner,
result.getRegistrar(),
"Kafka-" + topic + '-' + topicPartition.partition());
final Table blinkTable = streamToBlinkTableAdapter.table();
final Table derivedTable = tableType.walk(new BlinkTableOperation(blinkTable));
Expand All @@ -1036,7 +1037,7 @@ public static PartitionedTable consumeToPartitionedTable(
consume(kafkaProperties, topic, partitionFilter,
InitialOffsetLookup.adapt(partitionToInitialOffset), keySpec, valueSpec,
StreamConsumerRegistrarProvider.perPartition(registrar), null);
return resultHolder.get();
return resultHolder.get().toPartitionedTable();
}

@FunctionalInterface
Expand Down Expand Up @@ -1438,7 +1439,9 @@ public static Runnable produceFromTable(KafkaPublishOptions options) {
final KeyOrValueSerializer<?> keySerializer = keySpec.getKeyOrValueSerializer(effectiveTable, keyColumns);
final KeyOrValueSerializer<?> valueSerializer =
valueSpec.getKeyOrValueSerializer(effectiveTable, valueColumns);
final PublishToKafka producer = new PublishToKafka(
// PublishToKafka is a LivenessArtifact; it will be kept reachable and alive by the publisherScope, since
// it is constructed with enforceStrongReachability=true.
new PublishToKafka(
options.config(),
effectiveTable,
options.topic(),
Expand All @@ -1458,58 +1461,85 @@ public static Runnable produceFromTable(KafkaPublishOptions options) {
}

/**
* @implNote The constructor publishes {@code this} to the {@link UpdateGraph} and cannot be subclassed.
* @implNote The constructor publishes {@code this} (indirectly) to the {@link UpdateGraph} and cannot be
* subclassed.
*/
private static final class StreamPartitionedTable extends PartitionedTableImpl implements Runnable {
private static final class StreamPartitionedQueryTable extends QueryTable implements Runnable {

private static final String PARTITION_COLUMN_NAME = "Partition";
private static final String CONSTITUENT_COLUMN_NAME = "Table";

@ReferentialIntegrity // We also access the combiner externally, but it must be referenced regardless
private final UpdateSourceCombiner refreshCombiner;
private final TableDefinition constituentDefinition;

private final WritableColumnSource<Integer> partitionColumn;
private final WritableColumnSource<Table> constituentColumn;

@ReferentialIntegrity
private final UpdateSourceCombiner refreshCombiner;

private volatile long lastAddedPartitionRowKey = -1L; // NULL_ROW_KEY

private StreamPartitionedTable(@NotNull final TableDefinition constituentDefinition) {
super(makeResultTable(), Set.of(PARTITION_COLUMN_NAME), true, CONSTITUENT_COLUMN_NAME,
constituentDefinition, true, false);
partitionColumn = (WritableColumnSource<Integer>) table().getColumnSource(PARTITION_COLUMN_NAME, int.class);
constituentColumn =
(WritableColumnSource<Table>) table().getColumnSource(CONSTITUENT_COLUMN_NAME, Table.class);
UpdateGraph updateGraph = table().getUpdateGraph();
refreshCombiner = new UpdateSourceCombiner(updateGraph);
manage(refreshCombiner);
refreshCombiner.addSource(this);
updateGraph.addSource(refreshCombiner);
private StreamPartitionedQueryTable(@NotNull final TableDefinition constituentDefinition) {
super(RowSetFactory.empty().toTracking(), makeSources());

setFlat();
setRefreshing(true);

this.constituentDefinition = constituentDefinition;

partitionColumn = (WritableColumnSource<Integer>) getColumnSource(PARTITION_COLUMN_NAME, int.class);
constituentColumn = (WritableColumnSource<Table>) getColumnSource(CONSTITUENT_COLUMN_NAME, Table.class);

/*
* Note: We do not need to use a ConstituentDependency here, because using the combiner effectively prevents
* delivery of the partitioned table's notification until its constituents have also had their chance to
* complete their update for the cycle. We could remove the combiner and use the "raw" UpdateGraph plus a
* We use an UpdateSourceCombiner to drive both the StreamPartitionedQueryTable and its constituents, with
* the StreamPartitionedQueryTable added first. This to ensure that newly-added constituents cannot process
* their initial update until after they have been added to the StreamPartitionedQueryTable. The
* StreamPartitionedQueryTable creates the combiner, and constituent-building code is responsible for using
* it, accessed via getRegistrar(). We could remove the combiner and use the "raw" UpdateGraph plus a
* ConstituentDependency if we demanded a guarantee that new constituents would only be constructed in such
* a way that they were unable to fire before being added to the partitioned table. Without that guarantee,
* we risk data loss for blink or ring tables.
* we would risk data loss for blink or ring tables.
*/
refreshCombiner = new UpdateSourceCombiner(getUpdateGraph());
manage(refreshCombiner);
refreshCombiner.addSource(this);

/*
* We use a ConstituentDependency to ensure that our notification is not delivered before all of our
* constituents have become satisfied on this cycle. For "raw" (blink) constituents, the
* UpdateSourceCombiner trivially guarantees this. However, since constituents may be transformed to ring or
* append-only tables before they are added to the StreamPartitionedQueryTable, we must test constituent
* satisfaction before allowing the table to become satisfied on a given cycle.
*/
ConstituentDependency.install(this, refreshCombiner);

// Begin update processing
refreshCombiner.install();
}

/**
* @return The {@link UpdateSourceRegistrar} to be used for all constituent roots.
*/
public UpdateSourceRegistrar getRegistrar() {
return refreshCombiner;
}

@Override
public void run() {
final WritableRowSet rowSet = table().getRowSet().writableCast();
final WritableRowSet rowSet = getRowSet().writableCast();

final long newLastRowKey = lastAddedPartitionRowKey;
final long oldLastRowKey = rowSet.lastRowKey();

if (newLastRowKey != oldLastRowKey) {
final RowSet added = RowSetFactory.fromRange(oldLastRowKey + 1, newLastRowKey);
rowSet.insert(added);
((BaseTable<?>) table()).notifyListeners(new TableUpdateImpl(added,
notifyListeners(new TableUpdateImpl(added,
RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY));
}
}

public synchronized void enqueueAdd(final int partition, @NotNull final Table partitionTable) {
private synchronized void enqueueAdd(final int partition, @NotNull final Table partitionTable) {
manage(partitionTable);

final long partitionRowKey = lastAddedPartitionRowKey + 1;
Expand All @@ -1523,19 +1553,21 @@ public synchronized void enqueueAdd(final int partition, @NotNull final Table pa
lastAddedPartitionRowKey = partitionRowKey;
}

private static Table makeResultTable() {
final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(2);
resultSources.put(PARTITION_COLUMN_NAME,
ArrayBackedColumnSource.getMemoryColumnSource(int.class, null));
resultSources.put(CONSTITUENT_COLUMN_NAME,
ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null));
// noinspection resource
return new QueryTable(RowSetFactory.empty().toTracking(), resultSources) {
{
setFlat();
setRefreshing(true);
}
};
private PartitionedTable toPartitionedTable() {
return new PartitionedTableImpl(this,
Set.of(PARTITION_COLUMN_NAME),
true, // keys are unique
CONSTITUENT_COLUMN_NAME,
constituentDefinition,
true, // constituent changes are permitted
false); // validation not needed
}

private static Map<String, ColumnSource<?>> makeSources() {
final Map<String, ColumnSource<?>> sources = new LinkedHashMap<>(2);
sources.put(PARTITION_COLUMN_NAME, ArrayBackedColumnSource.getMemoryColumnSource(int.class, null));
sources.put(CONSTITUENT_COLUMN_NAME, ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null));
return sources;
}
}

Expand Down

0 comments on commit 6d203ab

Please sign in to comment.