Skip to content

Commit

Permalink
code review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 12, 2024
1 parent 4bbff17 commit e751699
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;

import java.util.Objects;
import java.util.Properties;

/** Default implementation of {@link KafkaDatasetFacet}. */
@PublicEvolving
public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {

public static final String KAFKA_FACET_NAME = "kafka";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/** Default implementation of {@link KafkaDatasetIdentifier}. */
@PublicEvolving
public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {

@Nullable private final List<String> topics;
@Nullable private final Pattern topicPattern;

public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
private DefaultKafkaDatasetIdentifier(
@Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {
this.topics = fixedTopics;
this.topicPattern = topicPattern;
}

public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
return new DefaultKafkaDatasetIdentifier(Collections.emptyList(), pattern);
return new DefaultKafkaDatasetIdentifier(null, pattern);
}

public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.util.Objects;

/** Default implementation of {@link KafkaDatasetFacet}. */
@PublicEvolving
public class DefaultTypeDatasetFacet implements TypeDatasetFacet {

public static final String TYPE_FACET_NAME = "type";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Properties;

/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
@PublicEvolving
public interface KafkaDatasetFacet extends LineageDatasetFacet {
Properties getProperties();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Optional;

/** Contains method to extract {@link KafkaDatasetFacet}. */
@PublicEvolving
public interface KafkaDatasetFacetProvider {

/**
* Returns a Kafka dataset facet or `Optional.empty` in case an implementing class is not able
* to identify a dataset.
*
* @return
* Returns a Kafka dataset facet or empty in case an implementing class is not able to identify
* a dataset.
*/
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
@PublicEvolving
public interface KafkaDatasetIdentifier {
@Nullable
List<String> getTopics();
Expand All @@ -16,13 +20,11 @@ public interface KafkaDatasetIdentifier {
/**
* Assigns lineage dataset's name which is topic pattern if it is present or comma separated
* list of topics.
*
* @return
*/
default String toLineageName() {
if (getTopicPattern() != null) {
return getTopicPattern().toString();
}
return String.join(",", getTopics());
return String.join(",", Objects.requireNonNull(getTopics()));
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Optional;

/** Contains method which allows extracting topic identifier. */
@PublicEvolving
public interface KafkaDatasetIdentifierProvider {

/**
* Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
* dataset identifier.
*
* @return
*/
Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public String namespace() {

@Override
public Map<String, LineageDatasetFacet> facets() {
Map facetMap = new HashMap<String, LineageDatasetFacet>();
Map<String, LineageDatasetFacet> facetMap = new HashMap<>();
facetMap.put(DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
facetMap.putAll(
facets.stream()
Expand Down Expand Up @@ -102,7 +102,7 @@ public Boundedness boundedness() {

@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());
return List.copyOf(datasets);
}
};
}
Expand All @@ -111,7 +111,7 @@ public static LineageVertex lineageVertexOf(Collection<LineageDataset> datasets)
return new LineageVertex() {
@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());
return List.copyOf(datasets);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

/** Facet definition to contain type information of source and sink. */
@PublicEvolving
public interface TypeDatasetFacet extends LineageDatasetFacet {
TypeInformation getTypeInformation();
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Optional;

/** Contains method to extract {@link TypeDatasetFacet}. */
@PublicEvolving
public interface TypeDatasetFacetProvider {

/**
* Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
* resolve type.
*
* @return
*/
Optional<TypeDatasetFacet> getTypeDatasetFacet();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@

/**
* A serialization schema which defines how to convert a value of type {@code T} to {@link
* ProducerRecord}.
* ProducerRecord}. {@link org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider} can
* be implemented to provide Kafka specific lineage metadata, while {@link
* org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider} can be implemented to provide
* lineage metadata with type information.
*
* @param <T> the type of values being serialized
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public LineageVertex getLineageVertex() {
((KafkaDatasetFacetProvider) recordSerializer).getKafkaDatasetFacet();

if (!kafkaDatasetFacet.isPresent()) {
LOG.info("Provided did not return kafka dataset facet");
LOG.info("Provider did not return kafka dataset facet");
return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
}
kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
*
* <p>The KafkaSubscriber provides a unified interface for the Kafka source to support all these
* three types of subscribing mode.
*
* <p>When implementing a subscriber, {@link
* org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider} can be implemented to
* provide lineage metadata with source topics.
*/
@PublicEvolving
public interface KafkaSubscriber extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;

/** A subscriber for a partition set. */
class PartitionSetSubscriber implements KafkaDatasetIdentifierProvider, KafkaSubscriber {
class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {
private static final long serialVersionUID = 390970375272146036L;
private static final Logger LOG = LoggerFactory.getLogger(PartitionSetSubscriber.class);
private final Set<TopicPartition> subscribedPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
Expand All @@ -39,6 +40,7 @@
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -63,10 +65,11 @@ public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
private static Map<String, ?> configurableConfiguration;
private static Map<String, ?> configuration;

private interface TestingTopicSelector extends TopicSelector, KafkaDatasetIdentifierProvider {}
private interface TestingTopicSelector<T>
extends TopicSelector<T>, KafkaDatasetIdentifierProvider {}

private interface SerializationSchemaWithResultQueryable
extends SerializationSchema, ResultTypeQueryable {}
private interface SerializationSchemaWithResultQueryable<T>
extends SerializationSchema<T>, ResultTypeQueryable<T> {}

private static boolean isKeySerializer;

Expand Down Expand Up @@ -281,7 +284,9 @@ public void testGetLineageDatasetFacetsWhenTopicSelectorNotKafkaTopicsIdentifier
.setKeySerializationSchema(serializationSchema)
.build();

assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty();
assertThat(schema)
.asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
.returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);
}

@Test
Expand All @@ -290,7 +295,7 @@ public void testGetLineageDatasetFacetsWhenNoTopicsIdentifiersFound() {
KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopicSelector(
new TestingTopicSelector() {
new TestingTopicSelector<Object>() {
@Override
public Optional<DefaultKafkaDatasetIdentifier>
getDatasetIdentifier() {
Expand All @@ -305,30 +310,32 @@ public String apply(Object o) {
.setValueSerializationSchema(serializationSchema)
.setKeySerializationSchema(serializationSchema)
.build();
assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty();
assertThat(schema)
.asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
.returns(Optional.empty(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);
}

@Test
public void testGetLineageDatasetFacetsValueSerializationSchemaIsResultTypeQueryable() {
TypeInformation<String> stringTypeInformation = TypeInformation.of(String.class);
SerializationSchemaWithResultQueryable serializationSchema =
new SerializationSchemaWithResultQueryable() {
SerializationSchemaWithResultQueryable<String> serializationSchema =
new SerializationSchemaWithResultQueryable<String>() {

@Override
public TypeInformation getProducedType() {
public TypeInformation<String> getProducedType() {
return stringTypeInformation;
}

@Override
public byte[] serialize(Object o) {
public byte[] serialize(String o) {
return new byte[0];
}
};

KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopicSelector(
new TestingTopicSelector() {
new TestingTopicSelector<Object>() {
@Override
public Optional<DefaultKafkaDatasetIdentifier>
getDatasetIdentifier() {
Expand All @@ -338,7 +345,7 @@ public byte[] serialize(Object o) {
}

@Override
public Object apply(Object o) {
public String apply(Object o) {
return DEFAULT_TOPIC;
}
})
Expand All @@ -352,11 +359,10 @@ public Object apply(Object o) {
assertThat(kafkaDatasetFacet).isPresent();
assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics())
.containsExactly("topic1", "topic2");
assertThat(
((TypeDatasetFacetProvider) schema)
.getTypeDatasetFacet()
.get()
.getTypeInformation())
assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
.isPresent()
.get()
.extracting(TypeDatasetFacet::getTypeInformation)
.isEqualTo(stringTypeInformation);
}

Expand All @@ -365,7 +371,7 @@ public void testGetLineageDatasetFacets() {
KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopicSelector(
new TestingTopicSelector() {
new TestingTopicSelector<Object>() {
@Override
public Optional<DefaultKafkaDatasetIdentifier>
getDatasetIdentifier() {
Expand All @@ -375,7 +381,7 @@ public void testGetLineageDatasetFacets() {
}

@Override
public Object apply(Object o) {
public String apply(Object o) {
return DEFAULT_TOPIC;
}
})
Expand All @@ -389,11 +395,10 @@ public Object apply(Object o) {
assertThat(kafkaDatasetFacet).isPresent();
assertThat(kafkaDatasetFacet.get().getTopicIdentifier().getTopics())
.containsExactly("topic1", "topic2");
assertThat(
((TypeDatasetFacetProvider) schema)
.getTypeDatasetFacet()
.get()
.getTypeInformation())
assertThat(((TypeDatasetFacetProvider) schema).getTypeDatasetFacet())
.isPresent()
.get()
.extracting(TypeDatasetFacet::getTypeInformation)
.isEqualTo(BasicTypeInfo.STRING_TYPE_INFO);
}

Expand Down
Loading

0 comments on commit e751699

Please sign in to comment.