Skip to content

Commit

Permalink
[FLINK-34466] create KafkaDatasetFacet
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 28, 2024
1 parent ca14634 commit b25b5b7
Show file tree
Hide file tree
Showing 28 changed files with 698 additions and 691 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;

import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Properties;

/** Default implementation of {@link KafkaDatasetFacet}. */
public class DefaultKafkaDatasetFacet implements KafkaDatasetFacet {

public static final String KAFKA_FACET_NAME = "kafka";

private Properties properties;

@Nullable private final TypeInformation typeInformation;
private final KafkaDatasetIdentifier topicIdentifier;

public DefaultKafkaDatasetFacet(
KafkaDatasetIdentifier topicIdentifier,
Properties properties,
@Nullable TypeInformation typeInformation) {
this(topicIdentifier, typeInformation);

this.properties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, this.properties);
}

public DefaultKafkaDatasetFacet(
KafkaDatasetIdentifier topicIdentifier, @Nullable TypeInformation typeInformation) {
this.topicIdentifier = topicIdentifier;
this.typeInformation = typeInformation;
}

public void setProperties(Properties properties) {
this.properties = new Properties();
KafkaPropertiesUtil.copyProperties(properties, this.properties);
}

public Properties getProperties() {
return properties;
}

public TypeInformation getTypeInformation() {
return typeInformation;
}

public KafkaDatasetIdentifier getTopicIdentifier() {
return topicIdentifier;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKafkaDatasetFacet that = (DefaultKafkaDatasetFacet) o;
return Objects.equals(properties, that.properties)
&& Objects.equals(typeInformation, that.typeInformation)
&& Objects.equals(topicIdentifier, that.topicIdentifier);
}

@Override
public int hashCode() {
return Objects.hash(properties, typeInformation, topicIdentifier);
}

@Override
public String name() {
return KAFKA_FACET_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.apache.flink.connector.kafka.lineage;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;

/** Default implementation of {@link KafkaDatasetIdentifier}. */
public class DefaultKafkaDatasetIdentifier implements KafkaDatasetIdentifier {

@Nullable private final List<String> topics;
@Nullable private final Pattern topicPattern;

public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
this.topics = fixedTopics;
this.topicPattern = topicPattern;
}

public static DefaultKafkaDatasetIdentifier ofPattern(Pattern pattern) {
return new DefaultKafkaDatasetIdentifier(Collections.emptyList(), pattern);
}

public static DefaultKafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {
return new DefaultKafkaDatasetIdentifier(fixedTopics, null);
}

@Nullable
public List<String> getTopics() {
return topics;
}

@Nullable
public Pattern getTopicPattern() {
return topicPattern;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultKafkaDatasetIdentifier that = (DefaultKafkaDatasetIdentifier) o;
return Objects.equals(topics, that.topics)
&& Objects.equals(topicPattern, that.topicPattern);
}

@Override
public int hashCode() {
return Objects.hash(topics, topicPattern);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;

import java.util.Properties;

/** Facet definition to contain all Kafka specific information on Kafka sources and sinks. */
public interface KafkaDatasetFacet extends LineageDatasetFacet {
Properties getProperties();

TypeInformation getTypeInformation();

KafkaDatasetIdentifier getTopicIdentifier();

void setProperties(Properties properties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.connector.kafka.lineage;

import java.util.Optional;

/** Contains method to extract {@link KafkaDatasetFacet}. */
public interface KafkaDatasetFacetProvider {

/**
* Returns a Kafka dataset facet or `Optional.empty` in case an implementing class is not able
* to identify a dataset.
*
* @return
*/
Optional<KafkaDatasetFacet> getKafkaDatasetFacet();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.flink.connector.kafka.lineage;

import javax.annotation.Nullable;

import java.util.List;
import java.util.regex.Pattern;

/** Kafka dataset identifier which can contain either a list of topics or a topic pattern. */
public interface KafkaDatasetIdentifier {
@Nullable
List<String> getTopics();

@Nullable
Pattern getTopicPattern();

/**
* Assigns lineage dataset's name which is topic pattern if it is present or comma separated
* list of topics.
*
* @return
*/
default String toLineageName() {
if (getTopicPattern() != null) {
return getTopicPattern().toString();
}
return String.join(",", getTopics());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.connector.kafka.lineage;

import java.util.Optional;

/** Contains method which allows extracting topic identifier. */
public interface KafkaDatasetIdentifierProvider {

/**
* Gets Kafka dataset identifier or empty in case a class implementing is not able to extract
* dataset identifier.
*
* @return
*/
Optional<DefaultKafkaDatasetIdentifier> getDatasetIdentifier();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,89 +20,30 @@
package org.apache.flink.connector.kafka.lineage;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicListFacet;
import org.apache.flink.connector.kafka.lineage.facets.KafkaTopicPatternFacet;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

/** Utility class with useful methods for managing dataset facets. */
/** Utility class with useful methods for managing lineage objects. */
public class LineageUtil {

private static final String KAFKA_DATASET_PREFIX = "kafka://";
private static final String COMMA = ",";
private static final String SEMICOLON = ";";

/**
* Loads facet from any object implementing @link{DatasetFacetProvider} interface.
*
* @param object
* @return
*/
public static Collection<LineageDatasetFacet> facetsFrom(Object object) {
return Optional.of(object)
.filter(LineageFacetProvider.class::isInstance)
.map(LineageFacetProvider.class::cast)
.map(LineageFacetProvider::getDatasetFacets)
.orElse(Collections.emptyList());
}

/**
* Creates dataset from a list of facets. Uses {@link KafkaTopicListFacet} to extract dataset
* name from. Dataset per each element of topic list is created
*
* @param facets
* @return
*/
public static Collection<LineageDataset> datasetsFrom(
String namespace, Collection<LineageDatasetFacet> facets) {
// Check if topic list facet is available -> if so explode the list of facets
Optional<KafkaTopicListFacet> topicList =
facets.stream()
.filter(KafkaTopicListFacet.class::isInstance)
.map(KafkaTopicListFacet.class::cast)
.findAny();

List<LineageDataset> datasets = new ArrayList<>();

// Explode list of other facets
if (topicList.isPresent()) {
List<LineageDatasetFacet> facetsWithoutTopicList =
facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList());

datasets.addAll(
topicList.get().topics.stream()
.map(t -> datasetOf(namespace, t, facetsWithoutTopicList))
.collect(Collectors.toList()));
}

// Check if topic pattern is present
// If so topic pattern will be used as a dataset name
datasets.addAll(
facets.stream()
.filter(KafkaTopicPatternFacet.class::isInstance)
.map(KafkaTopicPatternFacet.class::cast)
.map(f -> datasetOf(namespace, f.pattern.toString(), facets))
.collect(Collectors.toList()));
return datasets;
}

private static LineageDataset datasetOf(
String namespace, String name, Collection<LineageDatasetFacet> facets) {
public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
return new LineageDataset() {
@Override
public String name() {
return name;
return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
}

@Override
Expand All @@ -112,16 +53,19 @@ public String namespace() {

@Override
public Map<String, LineageDatasetFacet> facets() {
return facets.stream()
.distinct()
.collect(Collectors.toMap(LineageDatasetFacet::name, item -> item));
return Collections.singletonMap(
DefaultKafkaDatasetFacet.KAFKA_FACET_NAME, kafkaDatasetFacet);
}
};
}

public static String datasetNamespaceOf(Properties properties) {
public static String namespaceOf(Properties properties) {
String bootstrapServers = properties.getProperty("bootstrap.servers");

if (bootstrapServers == null) {
return KAFKA_DATASET_PREFIX;
}

if (bootstrapServers.contains(COMMA)) {
bootstrapServers = bootstrapServers.split(COMMA)[0];
} else if (bootstrapServers.contains(SEMICOLON)) {
Expand Down
Loading

0 comments on commit b25b5b7

Please sign in to comment.