From e75169916f653cddbfa042e35bd760a482c3c76f Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Tue, 12 Nov 2024 08:22:19 +0100 Subject: [PATCH] code review changes Signed-off-by: Pawel Leszczynski --- .../lineage/DefaultKafkaDatasetFacet.java | 2 + .../DefaultKafkaDatasetIdentifier.java | 9 +- .../lineage/DefaultTypeDatasetFacet.java | 2 + .../kafka/lineage/KafkaDatasetFacet.java | 2 + .../lineage/KafkaDatasetFacetProvider.java | 9 +- .../kafka/lineage/KafkaDatasetIdentifier.java | 8 +- .../KafkaDatasetIdentifierProvider.java | 5 +- .../connector/kafka/lineage/LineageUtil.java | 6 +- .../kafka/lineage/TypeDatasetFacet.java | 2 + .../lineage/TypeDatasetFacetProvider.java | 5 +- .../sink/KafkaRecordSerializationSchema.java | 5 +- .../flink/connector/kafka/sink/KafkaSink.java | 2 +- .../subscriber/KafkaSubscriber.java | 4 + .../subscriber/PartitionSetSubscriber.java | 2 +- ...aRecordSerializationSchemaBuilderTest.java | 53 +++-- .../kafka/source/KafkaSourceTest.java | 206 ++++++++++-------- 16 files changed, 184 insertions(+), 138 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java index cb1a4671c..e1c682345 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetFacet.java @@ -1,11 +1,13 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil; import java.util.Objects; import java.util.Properties; /** Default implementation of {@link KafkaDatasetFacet}. */ +@PublicEvolving public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet { public static final String KAFKA_FACET_NAME = "kafka"; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java index bd05cfd52..cd97b7ff4 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultKafkaDatasetIdentifier.java @@ -1,25 +1,28 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; + import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.regex.Pattern; /** Default implementation of {@link KafkaDatasetIdentifier}. */ +@PublicEvolving public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier { @Nullable private final List topics; @Nullable private final Pattern topicPattern; - public DefaultKafkaDatasetIdentifier(List fixedTopics, Pattern topicPattern) { + private DefaultKafkaDatasetIdentifier( + @Nullable List fixedTopics, @Nullable Pattern topicPattern) { this.topics = fixedTopics; this.topicPattern = topicPattern; } public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) { - return new DefaultKafkaDatasetIdentifier(Collections.emptyList(), pattern); + return new DefaultKafkaDatasetIdentifier(null, pattern); } public static DefaultKafkaDatasetIdentifier ofTopics(List fixedTopics) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java index 69183e3a1..d9475d77a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/DefaultTypeDatasetFacet.java @@ -1,10 +1,12 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.util.Objects; /** Default implementation of {@link KafkaDatasetFacet}. */ +@PublicEvolving public class DefaultTypeDatasetFacet implements TypeDatasetFacet { public static final String TYPE_FACET_NAME = "type"; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java index 22d14dd2c..c0d3d0b73 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacet.java @@ -1,10 +1,12 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import java.util.Properties; /** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */ +@PublicEvolving public interface KafkaDatasetFacet extends LineageDatasetFacet { Properties getProperties(); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java index 0eed6f715..76fe41b82 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetFacetProvider.java @@ -1,15 +1,16 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; + import java.util.Optional; /** Contains method to extract {@link KafkaDatasetFacet}. */ +@PublicEvolving public interface KafkaDatasetFacetProvider { /** - * Returns a Kafka dataset facet or `Optional.empty` in case an implementing class is not able - * to identify a dataset. - * - * @return + * Returns a Kafka dataset facet or empty in case an implementing class is not able to identify + * a dataset. */ Optional getKafkaDatasetFacet(); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java index 0c43f8be9..19f7082e2 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifier.java @@ -1,11 +1,15 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; + import javax.annotation.Nullable; import java.util.List; +import java.util.Objects; import java.util.regex.Pattern; /** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */ +@PublicEvolving public interface KafkaDatasetIdentifier { @Nullable List getTopics(); @@ -16,13 +20,11 @@ public interface KafkaDatasetIdentifier { /** * Assigns lineage dataset's name which is topic pattern if it is present or comma separated * list of topics. - * - * @return */ default String toLineageName() { if (getTopicPattern() != null) { return getTopicPattern().toString(); } - return String.join(",", getTopics()); + return String.join(",", Objects.requireNonNull(getTopics())); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java index 36f8c4f2e..1389fea58 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/KafkaDatasetIdentifierProvider.java @@ -1,15 +1,16 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; + import java.util.Optional; /** Contains method which allows extracting topic identifier. */ +@PublicEvolving public interface KafkaDatasetIdentifierProvider { /** * Gets Kafka dataset identifier or empty in case a class implementing is not able to extract * dataset identifier. - * - * @return */ Optional getDatasetIdentifier(); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java index 779c167c6..748e6be05 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/LineageUtil.java @@ -66,7 +66,7 @@ public String namespace() { @Override public Map facets() { - Map facetMap = new HashMap(); + Map facetMap = new HashMap<>(); facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet); facetMap.putAll( facets.stream() @@ -102,7 +102,7 @@ public Boundedness boundedness() { @Override public List datasets() { - return datasets.stream().collect(Collectors.toList()); + return List.copyOf(datasets); } }; } @@ -111,7 +111,7 @@ public static LineageVertex lineageVertexOf(Collection datasets) return new LineageVertex() { @Override public List datasets() { - return datasets.stream().collect(Collectors.toList()); + return List.copyOf(datasets); } }; } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java index 4b4261c65..1e64f5819 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacet.java @@ -1,9 +1,11 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; /** Facet definition to contain type information of source and sink. */ +@PublicEvolving public interface TypeDatasetFacet extends LineageDatasetFacet { TypeInformation getTypeInformation(); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java index b2f0ea831..016a1bb84 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/lineage/TypeDatasetFacetProvider.java @@ -1,15 +1,16 @@ package org.apache.flink.connector.kafka.lineage; +import org.apache.flink.annotation.PublicEvolving; + import java.util.Optional; /** Contains method to extract {@link TypeDatasetFacet}. */ +@PublicEvolving public interface TypeDatasetFacetProvider { /** * Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to * resolve type. - * - * @return */ Optional getTypeDatasetFacet(); } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java index 9d081c755..f56a7da54 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java @@ -29,7 +29,10 @@ /** * A serialization schema which defines how to convert a value of type {@code T} to {@link - * ProducerRecord}. + * ProducerRecord}. {@link org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider} can + * be implemented to provide Kafka specific lineage metadata, while {@link + * org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider} can be implemented to provide + * lineage metadata with type information. * * @param the type of values being serialized */ diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index 0f89e5bb2..d3d3c89df 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -154,7 +154,7 @@ public LineageVertex getLineageVertex() { ((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet(); if (!kafkaDatasetFacet.isPresent()) { - LOG.info("Provided did not return kafka dataset facet"); + LOG.info("Provider did not return kafka dataset facet"); return LineageUtil.sourceLineageVertexOf(Collections.emptyList()); } kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java index 1b819fb23..37de884af 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java @@ -39,6 +39,10 @@ * *

The KafkaSubscriber provides a unified interface for the Kafka source to support all these * three types of subscribing mode. + * + *

When implementing a subscriber, {@link + * org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider} can be implemented to + * provide lineage metadata with source topics. */ @PublicEvolving public interface KafkaSubscriber extends Serializable { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java index 3ea6f9a5a..9cd50fb20 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java @@ -36,7 +36,7 @@ import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata; /** A subscriber for a partition set. */ -class PartitionSetSubscriber implements KafkaDatasetIdentifierProvider, KafkaSubscriber { +class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider { private static final long serialVersionUID = 390970375272146036L; private static final Logger LOG = LoggerFactory.getLogger(PartitionSetSubscriber.class); private final Set subscribedPartitions; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java index 50b1abfbe..4d1437288 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; @@ -39,6 +40,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.Before; import org.junit.Test; @@ -63,10 +65,11 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger { private static Map configurableConfiguration; private static Map configuration; - private interface TestingTopicSelector extends TopicSelector, KafkaDatasetIdentifierProvider {} + private interface TestingTopicSelector + extends TopicSelector, KafkaDatasetIdentifierProvider {} - private interface SerializationSchemaWithResultQueryable - extends SerializationSchema, ResultTypeQueryable {} + private interface SerializationSchemaWithResultQueryable + extends SerializationSchema, ResultTypeQueryable {} private static boolean isKeySerializer; @@ -281,7 +284,9 @@ public void testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifier .setKeySerializationSchema(serializationSchema) .build(); - assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty(); + assertThat(schema) + .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class)) + .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet); } @Test @@ -290,7 +295,7 @@ public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() { KafkaRecordSerializationSchema schema = KafkaRecordSerializationSchema.builder() .setTopicSelector( - new TestingTopicSelector() { + new TestingTopicSelector() { @Override public Optional getDatasetIdentifier() { @@ -305,22 +310,24 @@ public String apply(Object o) { .setValueSerializationSchema(serializationSchema) .setKeySerializationSchema(serializationSchema) .build(); - assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty(); + assertThat(schema) + .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class)) + .returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet); } @Test public void testGetLineageDatasetFacetsValueSerializationSchemaIsResultTypeQueryable() { TypeInformation stringTypeInformation = TypeInformation.of(String.class); - SerializationSchemaWithResultQueryable serializationSchema = - new SerializationSchemaWithResultQueryable() { + SerializationSchemaWithResultQueryable serializationSchema = + new SerializationSchemaWithResultQueryable() { @Override - public TypeInformation getProducedType() { + public TypeInformation getProducedType() { return stringTypeInformation; } @Override - public byte[] serialize(Object o) { + public byte[] serialize(String o) { return new byte[0]; } }; @@ -328,7 +335,7 @@ public byte[] serialize(Object o) { KafkaRecordSerializationSchema schema = KafkaRecordSerializationSchema.builder() .setTopicSelector( - new TestingTopicSelector() { + new TestingTopicSelector() { @Override public Optional getDatasetIdentifier() { @@ -338,7 +345,7 @@ public byte[] serialize(Object o) { } @Override - public Object apply(Object o) { + public String apply(Object o) { return DEFAULT_TOPIC; } }) @@ -352,11 +359,10 @@ public Object apply(Object o) { assertThat(kafkaDatasetFacet).isPresent(); assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics()) .containsExactly("topic1", "topic2"); - assertThat( - ((TypeDatasetFacetProvider) schema) - .getTypeDatasetFacet() - .get() - .getTypeInformation()) + assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet()) + .isPresent() + .get() + .extracting(TypeDatasetFacet::getTypeInformation) .isEqualTo(stringTypeInformation); } @@ -365,7 +371,7 @@ public void testGetLineageDatasetFacets() { KafkaRecordSerializationSchema schema = KafkaRecordSerializationSchema.builder() .setTopicSelector( - new TestingTopicSelector() { + new TestingTopicSelector() { @Override public Optional getDatasetIdentifier() { @@ -375,7 +381,7 @@ public void testGetLineageDatasetFacets() { } @Override - public Object apply(Object o) { + public String apply(Object o) { return DEFAULT_TOPIC; } }) @@ -389,11 +395,10 @@ public Object apply(Object o) { assertThat(kafkaDatasetFacet).isPresent(); assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics()) .containsExactly("topic1", "topic2"); - assertThat( - ((TypeDatasetFacetProvider) schema) - .getTypeDatasetFacet() - .get() - .getTypeInformation()) + assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet()) + .isPresent() + .get() + .extracting(TypeDatasetFacet::getTypeInformation) .isEqualTo(BasicTypeInfo.STRING_TYPE_INFO); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java index f755edc9e..259668c5d 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTest.java @@ -18,13 +18,14 @@ package org.apache.flink.connector.kafka.source; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.util.Collector; @@ -46,9 +47,6 @@ public class KafkaSourceTest { Properties kafkaProperties; - private interface TestingKafkaSubscriber - extends KafkaSubscriber, KafkaDatasetIdentifierProvider {} - @BeforeEach void setup() { kafkaProperties = new Properties(); @@ -57,109 +55,129 @@ void setup() { @Test public void testGetLineageVertexWhenSubscriberNotAnKafkaDatasetFacetProvider() { - KafkaSource source = - new KafkaSource( - new KafkaSubscriber() { - @Override - public Set getSubscribedTopicPartitions( - AdminClient adminClient) { - return null; - } - }, - null, - null, - Boundedness.CONTINUOUS_UNBOUNDED, - null, - kafkaProperties, - null); - assertThat(source.getLineageVertex().datasets()).isEmpty(); + KafkaSource source = + new KafkaSourceBuilder() + .setKafkaSubscriber( + new KafkaSubscriber() { + @Override + public Set getSubscribedTopicPartitions( + AdminClient adminClient) { + return null; + } + }) + .setProperties(kafkaProperties) + .setGroupId("") + .setDeserializer( + new KafkaRecordDeserializationSchema() { + @Override + public TypeInformation getProducedType() { + return null; + } + + @Override + public void deserialize( + ConsumerRecord record, + Collector out) + throws IOException {} + }) + .setUnbounded(OffsetsInitializer.committedOffsets()) + .build(); + + assertThat(source.getLineageVertex()) + .extracting(LineageVertex::datasets) + .asList() + .isEmpty(); } @Test public void testGetLineageVertexWhenNoKafkaTopicsIdentifier() { - KafkaSource source = - new KafkaSource( - new TestingKafkaSubscriber() { - @Override - public Optional getDatasetIdentifier() { - return Optional.empty(); - } - - @Override - public Set getSubscribedTopicPartitions( - AdminClient adminClient) { - return null; - } - }, - null, - null, - Boundedness.CONTINUOUS_UNBOUNDED, - null, - kafkaProperties, - null); - assertThat(source.getLineageVertex().datasets()).isEmpty(); - assertThat(source.getLineageVertex().datasets()).isEmpty(); + KafkaSource source = + new KafkaSourceBuilder() + .setKafkaSubscriber( + new TestingKafkaSubscriber() { + @Override + public Optional + getDatasetIdentifier() { + return Optional.empty(); + } + }) + .setProperties(kafkaProperties) + .setGroupId("") + .setDeserializer( + new KafkaRecordDeserializationSchema() { + @Override + public void deserialize( + ConsumerRecord record, + Collector out) + throws IOException {} + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(String.class); + } + }) + .setUnbounded(OffsetsInitializer.committedOffsets()) + .build(); + assertThat(source.getLineageVertex()) + .extracting(LineageVertex::datasets) + .asList() + .isEmpty(); } @Test public void testGetLineageVertex() { TypeInformation typeInformation = TypeInformation.of(String.class); - KafkaSource source = - new KafkaSource( - new TestingKafkaSubscriber() { - @Override - public Optional getDatasetIdentifier() { - return Optional.of( - DefaultKafkaDatasetIdentifier.ofTopics( - Collections.singletonList("topic1"))); - } - - @Override - public Set getSubscribedTopicPartitions( - AdminClient adminClient) { - return null; - } - }, - null, - null, - Boundedness.CONTINUOUS_UNBOUNDED, - new KafkaRecordDeserializationSchema() { - @Override - public void deserialize(ConsumerRecord record, Collector out) - throws IOException {} - - @Override - public TypeInformation getProducedType() { - return typeInformation; - } - }, - kafkaProperties, - null); + KafkaSource source = + new KafkaSourceBuilder() + .setKafkaSubscriber(new TestingKafkaSubscriber()) + .setProperties(kafkaProperties) + .setGroupId("") + .setDeserializer( + new KafkaRecordDeserializationSchema() { + @Override + public void deserialize( + ConsumerRecord record, + Collector out) + throws IOException {} + + @Override + public TypeInformation getProducedType() { + return typeInformation; + } + }) + .setUnbounded(OffsetsInitializer.committedOffsets()) + .build(); LineageVertex lineageVertex = source.getLineageVertex(); assertThat(lineageVertex.datasets()).hasSize(1); + LineageDataset dataset = lineageVertex.datasets().get(0); + + assertThat(dataset.namespace()).isEqualTo("kafka://host1"); + assertThat(dataset.name()).isEqualTo("topic1"); + + assertThat(dataset.facets()).containsKey(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME); + DefaultKafkaDatasetFacet kafkaFacet = + (DefaultKafkaDatasetFacet) + dataset.facets().get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME); - assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("kafka://host1"); - assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("topic1"); - - assertThat( - lineageVertex - .datasets() - .get(0) - .facets() - .get(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME)) - .hasFieldOrPropertyWithValue("properties", kafkaProperties) - .hasFieldOrPropertyWithValue( - "topicIdentifier", - DefaultKafkaDatasetIdentifier.ofTopics( - Collections.singletonList("topic1"))); - - assertThat( - lineageVertex - .datasets() - .get(0) - .facets() - .get(DefaultTypeDatasetFacet.TYPE_FACET_NAME)) + assertThat(kafkaFacet.getProperties()).containsEntry("bootstrap.servers", "host1;host2"); + + assertThat(dataset.facets()).containsKey(DefaultTypeDatasetFacet.TYPE_FACET_NAME); + assertThat(dataset.facets().get(DefaultTypeDatasetFacet.TYPE_FACET_NAME)) .hasFieldOrPropertyWithValue("typeInformation", TypeInformation.of(String.class)); } + + private static class TestingKafkaSubscriber + implements KafkaSubscriber, KafkaDatasetIdentifierProvider { + @Override + public Optional getDatasetIdentifier() { + return Optional.of( + DefaultKafkaDatasetIdentifier.ofTopics(Collections.singletonList("topic1"))); + } + + @Override + public Set getSubscribedTopicPartitions(AdminClient adminClient) { + return null; + } + } }