From f1333d72f60164a1bca602a1896c745cc276e96a Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Fri, 15 Apr 2022 14:35:09 -0700 Subject: [PATCH 1/2] ring table for kafka --- extensions/kafka/build.gradle | 2 + .../java/io/deephaven/kafka/CdcTools.java | 2 +- .../java/io/deephaven/kafka/KafkaTools.java | 175 +++++++++++++----- py/server/deephaven/stream/kafka/cdc.py | 4 +- py/server/deephaven/stream/kafka/consumer.py | 80 ++++++-- 5 files changed, 194 insertions(+), 69 deletions(-) diff --git a/extensions/kafka/build.gradle b/extensions/kafka/build.gradle index 935cc5e7f69..4830852a3b2 100644 --- a/extensions/kafka/build.gradle +++ b/extensions/kafka/build.gradle @@ -11,6 +11,8 @@ dependencies { 'org.apache.avro:avro:1.9.2', 'org.apache.httpcomponents:httpclient:4.5.13' + Classpaths.inheritImmutables(project) + // Alternative to confluent; requires source code changes for // different avro serializer class names. // api('io.apicurio:apicurio-registry-utils-serde:1.3.2.Final') diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/CdcTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/CdcTools.java index 744a0647e87..94f9001178c 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/CdcTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/CdcTools.java @@ -303,7 +303,7 @@ public static Table consumeToTable( KafkaTools.ALL_PARTITIONS_SEEK_TO_BEGINNING, KafkaTools.Consume.avroSpec(keySchema), KafkaTools.Consume.avroSpec(valueSchema), - KafkaTools.TableType.Stream); + KafkaTools.TableType.stream()); final List dbTableColumnNames = dbTableColumnNames(streamingIn); List allDroppedColumns = null; if (dropColumns != null && dropColumns.size() > 0) { diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 239772f2541..7eed198680f 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -12,21 +12,25 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.annotations.SimpleStyle; import io.deephaven.base.Pair; import io.deephaven.base.verify.Assert; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.sources.ring.RingTableTools; import io.deephaven.engine.updategraph.UpdateGraphProcessor; import io.deephaven.engine.updategraph.UpdateSourceCombiner; import io.deephaven.engine.updategraph.UpdateSourceRegistrar; +import io.deephaven.kafka.KafkaTools.TableType.Append; +import io.deephaven.kafka.KafkaTools.TableType.Ring; +import io.deephaven.kafka.KafkaTools.TableType.Stream; +import io.deephaven.kafka.KafkaTools.TableType.Visitor; import io.deephaven.time.DateTime; import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.impl.LocalTableMap; import io.deephaven.engine.table.impl.StreamTableTools; -import io.deephaven.engine.table.TableMap; -import io.deephaven.engine.table.TransformableTableMap; import io.deephaven.engine.util.BigDecimalUtils; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; @@ -53,6 +57,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.*; +import org.immutables.value.Value.Immutable; +import org.immutables.value.Value.Parameter; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -1149,9 +1155,45 @@ public static KeyOrValueSpec avroSpec( } /** - * Type enumeration for the result {@link Table} returned by stream consumers. + * Type for the result {@link Table} returned by kafka consumers. */ - public enum TableType { + public interface TableType { + static Stream stream() { + return ImmutableStream.of(false); + } + + static Stream streamMap() { + return ImmutableStream.of(true); + } + + static Append append() { + return ImmutableAppend.of(false); + } + + static Append appendMap() { + return ImmutableAppend.of(true); + } + + static Ring ring(int capacity) { + return ImmutableRing.of(capacity, false); + } + + static Ring ringMap(int capacity) { + return ImmutableRing.of(capacity, true); + } + + > T walk(V visitor); + + boolean isMap(); + + interface Visitor { + T visit(Stream stream); + + T visit(Append append); + + T visit(Ring ring); + } + /** *

* Consume all partitions into a single interleaved stream table, which will present only newly-available rows @@ -1160,46 +1202,62 @@ public enum TableType { * See {@link Table#STREAM_TABLE_ATTRIBUTE} for a detailed explanation of stream table semantics, and * {@link io.deephaven.engine.table.impl.StreamTableTools} for related tooling. */ - Stream(false, false), + @Immutable + @SimpleStyle + abstract class Stream implements TableType { + @Parameter + public abstract boolean isMap(); + + @Override + public final > T walk(V visitor) { + return visitor.visit(this); + } + } + /** * Consume all partitions into a single interleaved in-memory append-only table. + * + * @see StreamTableTools#streamToAppendOnlyTable(Table) */ - Append(true, false), - /** - *

- * As in {@link #Stream}, but each partition is mapped to a distinct stream table. - *

- * The resulting per-partition tables are aggregated into a single {@link TableMap} keyed by {@link Integer} - * partition, which is then presented as a {@link Table} proxy via - * {@link TransformableTableMap#asTable(boolean, boolean, boolean) asTable} with {@code strictKeys=true}, - * {@code allowCoalesce=true}, and {@code sanityCheckJoins=true}. - *

- * See {@link TransformableTableMap#asTableMap()} to explicitly work with the underlying {@link TableMap} and - * {@link TransformableTableMap#asTable(boolean, boolean, boolean)} for alternative proxy options. - */ - StreamMap(false, true), + @Immutable + @SimpleStyle + abstract class Append implements TableType { + @Parameter + public abstract boolean isMap(); + + @Override + public final > T walk(V visitor) { + return visitor.visit(this); + } + } + /** - *

- * As in {@link #Append}, but each partition is mapped to a distinct in-memory append-only table. - *

- * The resulting per-partition tables are aggregated into a single {@link TableMap} keyed by {@link Integer} - * partition, which is then presented as a {@link Table} proxy via - * {@link TransformableTableMap#asTable(boolean, boolean, boolean) asTable} with {@code strictKeys=true}, - * {@code allowCoalesce=true}, and {@code sanityCheckJoins=true}. - *

- * See {@link TransformableTableMap#asTableMap()} to explicitly work with the underlying {@link TableMap} and - * {@link TransformableTableMap#asTable(boolean, boolean, boolean)} for alternative proxy options. + * Consume all partitions into a single interleaved in-memory ring table. + * + * @see RingTableTools#of(Table, int) */ - AppendMap(true, true); + @Immutable + @SimpleStyle + abstract class Ring implements TableType { + public static Ring of(int capacity) { + return ImmutableRing.of(capacity, false); + } - private final boolean isAppend; - private final boolean isMap; + public static Ring map(int capacity) { + return ImmutableRing.of(capacity, true); + } - TableType(final boolean isAppend, final boolean isMap) { - this.isAppend = isAppend; - this.isMap = isMap; - } + @Parameter + public abstract int capacity(); + @Parameter + public abstract boolean isMap(); + + @Override + public final > T walk(V visitor) { + return visitor.visit(this); + } + } } /** @@ -1212,11 +1270,11 @@ public enum TableType { public static TableType friendlyNameToTableType(@NotNull final String typeName) { // @formatter:off switch (typeName) { - case "stream" : return TableType.Stream; - case "append" : return TableType.Append; - case "stream_map": return TableType.StreamMap; - case "append_map": return TableType.AppendMap; - default : return null; + case "stream" : return TableType.stream(); + case "append" : return TableType.append(); + case "stream_map": return TableType.streamMap(); + case "append_map": return TableType.appendMap(); + default : return null; } // @formatter:on } @@ -1279,7 +1337,7 @@ public static Table consumeToTable( final TableDefinition tableDefinition = new TableDefinition(columnDefinitions); - final StreamTableMap streamTableMap = resultType.isMap ? new StreamTableMap(tableDefinition) : null; + final StreamTableMap streamTableMap = resultType.isMap() ? new StreamTableMap(tableDefinition) : null; final UpdateSourceRegistrar updateSourceRegistrar = streamTableMap == null ? UpdateGraphProcessor.DEFAULT : streamTableMap.refreshCombiner; @@ -1312,18 +1370,17 @@ public static Table consumeToTable( }; final MutableObject kafkaIngesterHolder = new MutableObject<>(); - final UnaryOperator tableConversion = - resultType.isAppend ? StreamTableTools::streamToAppendOnlyTable : UnaryOperator.identity(); final Table result; final IntFunction partitionToConsumer; - if (resultType.isMap) { + if (resultType.isMap()) { result = streamTableMap.asTable(true, true, true); partitionToConsumer = (final int partition) -> { final Pair partitionAdapterPair = adapterFactory.get(); partitionAdapterPair.getFirst().setShutdownCallback( () -> kafkaIngesterHolder.getValue().shutdownPartition(partition)); - final Table partitionTable = tableConversion.apply(partitionAdapterPair.getFirst().table()); + final Table streamTable = partitionAdapterPair.getFirst().table(); + final Table partitionTable = resultType.walk(new StreamTableOperation(streamTable)); streamTableMap.enqueueUpdate(() -> Assert.eqNull(streamTableMap.put(partition, partitionTable), "streamTableMap.put(partition, partitionTable)")); return new SimpleKafkaStreamConsumer(partitionAdapterPair.getSecond(), partitionAdapterPair.getFirst()); @@ -1331,7 +1388,8 @@ public static Table consumeToTable( } else { final Pair singleAdapterPair = adapterFactory.get(); - result = tableConversion.apply(singleAdapterPair.getFirst().table()); + final Table streamTable = singleAdapterPair.getFirst().table(); + result = resultType.walk(new StreamTableOperation(streamTable)); partitionToConsumer = (final int partition) -> { singleAdapterPair.getFirst().setShutdownCallback(() -> kafkaIngesterHolder.getValue().shutdown()); return new SimpleKafkaStreamConsumer(singleAdapterPair.getSecond(), singleAdapterPair.getFirst()); @@ -1369,6 +1427,29 @@ private static KeyOrValueSerializer getJsonSerializer( jsonSpec.timestampFieldName, jsonSpec.nestedObjectDelimiter, jsonSpec.outputNulls); } + private static class StreamTableOperation implements Visitor
{ + private final Table streamTable; + + public StreamTableOperation(Table streamTable) { + this.streamTable = Objects.requireNonNull(streamTable); + } + + @Override + public Table visit(Stream stream) { + return streamTable; + } + + @Override + public Table visit(Append append) { + return StreamTableTools.streamToAppendOnlyTable(streamTable); + } + + @Override + public Table visit(Ring ring) { + return RingTableTools.of(streamTable, ring.capacity()); + } + } + private static KeyOrValueSerializer getSerializer( @NotNull final Table t, @NotNull final Produce.KeyOrValueSpec spec, diff --git a/py/server/deephaven/stream/kafka/cdc.py b/py/server/deephaven/stream/kafka/cdc.py index b62fa371c8d..b06be5f0db2 100644 --- a/py/server/deephaven/stream/kafka/cdc.py +++ b/py/server/deephaven/stream/kafka/cdc.py @@ -73,7 +73,7 @@ def consume_raw( kafka_config: dict, cdc_spec: CDCSpec, partitions=None, - table_type: TableType = TableType.Stream, + table_type: TableType = TableType.stream(), ) -> Table: """ Consume the raw events from a Change Data Capture (CDC) Kafka stream to a Deephaven table. @@ -84,7 +84,7 @@ def consume_raw( and/or value Avro necessary schemas are stored. cdc_spec (CDCSpec): a CDCSpec obtained from calling either the cdc_long_spec or the cdc_short_spec function partitions (List[int]: a list of integer partition numbers, default is None indicating all partitions - table_type (TableType): a TableType enum, default is TableType.Stream + table_type (TableType): a TableType enum, default is TableType.stream() Returns: a Deephaven live table for the raw CDC events diff --git a/py/server/deephaven/stream/kafka/consumer.py b/py/server/deephaven/stream/kafka/consumer.py index 48e2cf5f897..577ad5f2420 100644 --- a/py/server/deephaven/stream/kafka/consumer.py +++ b/py/server/deephaven/stream/kafka/consumer.py @@ -18,24 +18,8 @@ _JKafkaTools = jpy.get_type("io.deephaven.kafka.KafkaTools") _JKafkaTools_Consume = jpy.get_type("io.deephaven.kafka.KafkaTools$Consume") _JPythonTools = jpy.get_type("io.deephaven.integrations.python.PythonTools") -_JTableType = jpy.get_type("io.deephaven.kafka.KafkaTools$TableType") ALL_PARTITIONS = _JKafkaTools.ALL_PARTITIONS - -class TableType(Enum): - """An Enum that defines the supported Table Type for consuming Kafka.""" - - Stream = _JTableType.Stream - """ Consume all partitions into a single interleaved stream table, which will present only newly-available rows - to downstream operations and visualizations.""" - Append = _JTableType.Append - """ Consume all partitions into a single interleaved in-memory append-only table.""" - StreamMap = _JTableType.StreamMap - """ Similar to Stream, but each partition is mapped to a distinct stream table.""" - AppendMap = _JTableType.AppendMap - """ Similar to Append, but each partition is mapped to a distinct in-memory append-only table. """ - - SEEK_TO_BEGINNING = _JKafkaTools.SEEK_TO_BEGINNING """ Start consuming at the beginning of a partition. """ DONT_SEEK = _JKafkaTools.DONT_SEEK @@ -74,6 +58,64 @@ def j_object(self) -> jpy.JType: in the properties as "key.column.name" or "value.column.name" in the config, and otherwise default to "key" or "value". """ +class TableType(JObjectWrapper): + """An Enum that defines the supported Table Type for consuming Kafka.""" + + j_object_type = jpy.get_type("io.deephaven.kafka.KafkaTools$TableType") + + @staticmethod + def stream(): + """ Consume all partitions into a single interleaved stream table, which will present only newly-available rows + to downstream operations and visualizations.""" + return TableType(TableType.j_object_type.stream()) + + @staticmethod + def stream_map(): + """ Similar to stream(), but each partition is mapped to a distinct stream table.""" + return TableType(TableType.j_object_type.streamMap()) + + @staticmethod + def append(): + """ Consume all partitions into a single interleaved in-memory append-only table.""" + return TableType(TableType.j_object_type.append()) + + @staticmethod + def append_map(): + """ Similar to append(), but each partition is mapped to a distinct in-memory append-only table. """ + return TableType(TableType.j_object_type.appendMap()) + + @staticmethod + def ring(capacity : int): + """ Consume all partitions into a single in-memory ring table.""" + return TableType(TableType.j_object_type.ring(capacity)) + + @staticmethod + def ring_map(capacity : int): + """ Similar to ring(capacity), but each partition is mapped to a distinct in-memory ring table. """ + return TableType(TableType.j_object_type.ringMap(capacity)) + + def __init__(self, j_spec: jpy.JType): + self._j_spec = j_spec + + @property + def j_object(self) -> jpy.JType: + return self._j_spec + + +TableType.Stream = TableType.stream() +""" Deprecated, prefer TableType.stream(). Consume all partitions into a single interleaved stream table, which will +present only newly-available rows to downstream operations and visualizations.""" + +TableType.Append = TableType.append() +""" Deprecated, prefer TableType.append(). Consume all partitions into a single interleaved in-memory append-only table.""" + +TableType.StreamMap = TableType.stream_map() +""" Deprecated, prefer TableType.stream_map(). Similar to Stream, but each partition is mapped to a distinct stream table.""" + +TableType.AppendMap = TableType.append_map() +""" Deprecated, prefer TableType.append_map(). Similar to Append, but each partition is mapped to a distinct in-memory +append-only table. """ + def j_partitions(partitions): if partitions is None: @@ -106,7 +148,7 @@ def consume( offsets: Dict[int, int] = None, key_spec: KeyValueSpec = None, value_spec: KeyValueSpec = None, - table_type: TableType = TableType.Stream, + table_type: TableType = TableType.stream(), ) -> Table: """Consume from Kafka to a Deephaven table. @@ -132,7 +174,7 @@ def consume( works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values for dictionary keys 'deephaven.key.column.name' and 'deephaven.key.column.type', for the single resulting column name and type - table_type (TableType): a TableType enum, default is TableType.Stream + table_type (TableType): a TableType enum, default is TableType.stream() Returns: a Deephaven live table that will update based on Kafka messages consumed for the given topic @@ -172,7 +214,7 @@ def consume( offsets, key_spec.j_object, value_spec.j_object, - table_type.value, + table_type.j_object, ) ) except Exception as e: From ab4acad8cab98b6cd40427a5fe1a94de0f5c3f30 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 20 Apr 2022 13:29:05 -0700 Subject: [PATCH 2/2] review response --- py/server/deephaven/stream/kafka/consumer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/py/server/deephaven/stream/kafka/consumer.py b/py/server/deephaven/stream/kafka/consumer.py index 577ad5f2420..4f663b7753a 100644 --- a/py/server/deephaven/stream/kafka/consumer.py +++ b/py/server/deephaven/stream/kafka/consumer.py @@ -94,12 +94,12 @@ def ring_map(capacity : int): """ Similar to ring(capacity), but each partition is mapped to a distinct in-memory ring table. """ return TableType(TableType.j_object_type.ringMap(capacity)) - def __init__(self, j_spec: jpy.JType): - self._j_spec = j_spec + def __init__(self, j_table_type: jpy.JType): + self._j_table_type = j_table_type @property def j_object(self) -> jpy.JType: - return self._j_spec + return self._j_table_type TableType.Stream = TableType.stream()