Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ring table for kafka #2290

Merged
merged 3 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ dependencies {
'org.apache.avro:avro:1.9.2',
'org.apache.httpcomponents:httpclient:4.5.13'

Classpaths.inheritImmutables(project)

// Alternative to confluent; requires source code changes for
// different avro serializer class names.
// api('io.apicurio:apicurio-registry-utils-serde:1.3.2.Final')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public static Table consumeToTable(
KafkaTools.ALL_PARTITIONS_SEEK_TO_BEGINNING,
KafkaTools.Consume.avroSpec(keySchema),
KafkaTools.Consume.avroSpec(valueSchema),
KafkaTools.TableType.Stream);
KafkaTools.TableType.stream());
final List<String> dbTableColumnNames = dbTableColumnNames(streamingIn);
List<String> allDroppedColumns = null;
if (dropColumns != null && dropColumns.size() > 0) {
Expand Down
175 changes: 128 additions & 47 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,25 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.annotations.SimpleStyle;
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.sources.ring.RingTableTools;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.updategraph.UpdateSourceCombiner;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.kafka.KafkaTools.TableType.Append;
import io.deephaven.kafka.KafkaTools.TableType.Ring;
import io.deephaven.kafka.KafkaTools.TableType.Stream;
import io.deephaven.kafka.KafkaTools.TableType.Visitor;
import io.deephaven.time.DateTime;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.table.impl.LocalTableMap;
import io.deephaven.engine.table.impl.StreamTableTools;
import io.deephaven.engine.table.TableMap;
import io.deephaven.engine.table.TransformableTableMap;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
Expand All @@ -53,6 +57,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.*;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Parameter;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
Expand Down Expand Up @@ -1149,9 +1155,45 @@ public static KeyOrValueSpec avroSpec(
}

/**
* Type enumeration for the result {@link Table} returned by stream consumers.
* Type for the result {@link Table} returned by kafka consumers.
*/
public enum TableType {
public interface TableType {
static Stream stream() {
return ImmutableStream.of(false);
}

static Stream streamMap() {
return ImmutableStream.of(true);
}

static Append append() {
return ImmutableAppend.of(false);
}

static Append appendMap() {
return ImmutableAppend.of(true);
}

static Ring ring(int capacity) {
return ImmutableRing.of(capacity, false);
}

static Ring ringMap(int capacity) {
return ImmutableRing.of(capacity, true);
}

<T, V extends Visitor<T>> T walk(V visitor);

boolean isMap();

interface Visitor<T> {
T visit(Stream stream);

T visit(Append append);

T visit(Ring ring);
}

/**
* <p>
* Consume all partitions into a single interleaved stream table, which will present only newly-available rows
Expand All @@ -1160,46 +1202,62 @@ public enum TableType {
* See {@link Table#STREAM_TABLE_ATTRIBUTE} for a detailed explanation of stream table semantics, and
* {@link io.deephaven.engine.table.impl.StreamTableTools} for related tooling.
*/
Stream(false, false),
@Immutable
@SimpleStyle
abstract class Stream implements TableType {
@Parameter
public abstract boolean isMap();

@Override
public final <T, V extends Visitor<T>> T walk(V visitor) {
return visitor.visit(this);
}
}

/**
* Consume all partitions into a single interleaved in-memory append-only table.
*
* @see StreamTableTools#streamToAppendOnlyTable(Table)
*/
Append(true, false),
/**
* <p>
* As in {@link #Stream}, but each partition is mapped to a distinct stream table.
* <p>
* The resulting per-partition tables are aggregated into a single {@link TableMap} keyed by {@link Integer}
* partition, which is then presented as a {@link Table} proxy via
* {@link TransformableTableMap#asTable(boolean, boolean, boolean) asTable} with {@code strictKeys=true},
* {@code allowCoalesce=true}, and {@code sanityCheckJoins=true}.
* <p>
* See {@link TransformableTableMap#asTableMap()} to explicitly work with the underlying {@link TableMap} and
* {@link TransformableTableMap#asTable(boolean, boolean, boolean)} for alternative proxy options.
*/
StreamMap(false, true),
@Immutable
@SimpleStyle
abstract class Append implements TableType {
@Parameter
public abstract boolean isMap();

@Override
public final <T, V extends Visitor<T>> T walk(V visitor) {
return visitor.visit(this);
}
}

/**
* <p>
* As in {@link #Append}, but each partition is mapped to a distinct in-memory append-only table.
* <p>
* The resulting per-partition tables are aggregated into a single {@link TableMap} keyed by {@link Integer}
* partition, which is then presented as a {@link Table} proxy via
* {@link TransformableTableMap#asTable(boolean, boolean, boolean) asTable} with {@code strictKeys=true},
* {@code allowCoalesce=true}, and {@code sanityCheckJoins=true}.
* <p>
* See {@link TransformableTableMap#asTableMap()} to explicitly work with the underlying {@link TableMap} and
* {@link TransformableTableMap#asTable(boolean, boolean, boolean)} for alternative proxy options.
* Consume all partitions into a single interleaved in-memory ring table.
*
* @see RingTableTools#of(Table, int)
*/
AppendMap(true, true);
@Immutable
@SimpleStyle
abstract class Ring implements TableType {
public static Ring of(int capacity) {
return ImmutableRing.of(capacity, false);
}

private final boolean isAppend;
private final boolean isMap;
public static Ring map(int capacity) {
return ImmutableRing.of(capacity, true);
}

TableType(final boolean isAppend, final boolean isMap) {
this.isAppend = isAppend;
this.isMap = isMap;
}
@Parameter
public abstract int capacity();

@Parameter
public abstract boolean isMap();

@Override
public final <T, V extends Visitor<T>> T walk(V visitor) {
return visitor.visit(this);
}
}
}

/**
Expand All @@ -1212,11 +1270,11 @@ public enum TableType {
public static TableType friendlyNameToTableType(@NotNull final String typeName) {
// @formatter:off
switch (typeName) {
case "stream" : return TableType.Stream;
case "append" : return TableType.Append;
case "stream_map": return TableType.StreamMap;
case "append_map": return TableType.AppendMap;
default : return null;
case "stream" : return TableType.stream();
case "append" : return TableType.append();
case "stream_map": return TableType.streamMap();
case "append_map": return TableType.appendMap();
default : return null;
}
// @formatter:on
}
Expand Down Expand Up @@ -1279,7 +1337,7 @@ public static Table consumeToTable(

final TableDefinition tableDefinition = new TableDefinition(columnDefinitions);

final StreamTableMap streamTableMap = resultType.isMap ? new StreamTableMap(tableDefinition) : null;
final StreamTableMap streamTableMap = resultType.isMap() ? new StreamTableMap(tableDefinition) : null;
final UpdateSourceRegistrar updateSourceRegistrar =
streamTableMap == null ? UpdateGraphProcessor.DEFAULT : streamTableMap.refreshCombiner;

Expand Down Expand Up @@ -1312,26 +1370,26 @@ public static Table consumeToTable(
};

final MutableObject<KafkaIngester> kafkaIngesterHolder = new MutableObject<>();
final UnaryOperator<Table> tableConversion =
resultType.isAppend ? StreamTableTools::streamToAppendOnlyTable : UnaryOperator.identity();
final Table result;
final IntFunction<KafkaStreamConsumer> partitionToConsumer;
if (resultType.isMap) {
if (resultType.isMap()) {
result = streamTableMap.asTable(true, true, true);
partitionToConsumer = (final int partition) -> {
final Pair<StreamToTableAdapter, ConsumerRecordToStreamPublisherAdapter> partitionAdapterPair =
adapterFactory.get();
partitionAdapterPair.getFirst().setShutdownCallback(
() -> kafkaIngesterHolder.getValue().shutdownPartition(partition));
final Table partitionTable = tableConversion.apply(partitionAdapterPair.getFirst().table());
final Table streamTable = partitionAdapterPair.getFirst().table();
final Table partitionTable = resultType.walk(new StreamTableOperation(streamTable));
streamTableMap.enqueueUpdate(() -> Assert.eqNull(streamTableMap.put(partition, partitionTable),
"streamTableMap.put(partition, partitionTable)"));
return new SimpleKafkaStreamConsumer(partitionAdapterPair.getSecond(), partitionAdapterPair.getFirst());
};
} else {
final Pair<StreamToTableAdapter, ConsumerRecordToStreamPublisherAdapter> singleAdapterPair =
adapterFactory.get();
result = tableConversion.apply(singleAdapterPair.getFirst().table());
final Table streamTable = singleAdapterPair.getFirst().table();
result = resultType.walk(new StreamTableOperation(streamTable));
partitionToConsumer = (final int partition) -> {
singleAdapterPair.getFirst().setShutdownCallback(() -> kafkaIngesterHolder.getValue().shutdown());
return new SimpleKafkaStreamConsumer(singleAdapterPair.getSecond(), singleAdapterPair.getFirst());
Expand Down Expand Up @@ -1369,6 +1427,29 @@ private static KeyOrValueSerializer<?> getJsonSerializer(
jsonSpec.timestampFieldName, jsonSpec.nestedObjectDelimiter, jsonSpec.outputNulls);
}

private static class StreamTableOperation implements Visitor<Table> {
private final Table streamTable;

public StreamTableOperation(Table streamTable) {
this.streamTable = Objects.requireNonNull(streamTable);
}

@Override
public Table visit(Stream stream) {
return streamTable;
}

@Override
public Table visit(Append append) {
return StreamTableTools.streamToAppendOnlyTable(streamTable);
}

@Override
public Table visit(Ring ring) {
return RingTableTools.of(streamTable, ring.capacity());
}
}

private static KeyOrValueSerializer<?> getSerializer(
@NotNull final Table t,
@NotNull final Produce.KeyOrValueSpec spec,
Expand Down
4 changes: 2 additions & 2 deletions py/server/deephaven/stream/kafka/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def consume_raw(
kafka_config: dict,
cdc_spec: CDCSpec,
partitions=None,
table_type: TableType = TableType.Stream,
table_type: TableType = TableType.stream(),
) -> Table:
""" Consume the raw events from a Change Data Capture (CDC) Kafka stream to a Deephaven table.

Expand All @@ -84,7 +84,7 @@ def consume_raw(
and/or value Avro necessary schemas are stored.
cdc_spec (CDCSpec): a CDCSpec obtained from calling either the cdc_long_spec or the cdc_short_spec function
partitions (List[int]: a list of integer partition numbers, default is None indicating all partitions
table_type (TableType): a TableType enum, default is TableType.Stream
table_type (TableType): a TableType enum, default is TableType.stream()

Returns:
a Deephaven live table for the raw CDC events
Expand Down
Loading