diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index dc02436cfdeb..3a75ee72eb9e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -505,7 +505,7 @@ - name: Kafka sourceDefinitionId: d917a47b-8537-4d0d-8c10-36a9928d4265 dockerRepository: airbyte/source-kafka - dockerImageTag: 0.1.7 + dockerImageTag: 0.2.0 documentationUrl: https://docs.airbyte.io/integrations/sources/kafka icon: kafka.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 8f9accc7be62..2822368e9273 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4759,7 +4759,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-kafka:0.1.7" +- dockerImage: "airbyte/source-kafka:0.2.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/kafka" connectionSpecification: @@ -4772,6 +4772,42 @@ - "protocol" additionalProperties: false properties: + MessageFormat: + title: "MessageFormat" + type: "object" + description: "The serialization used based on this " + oneOf: + - title: "JSON" + properties: + deserialization_type: + type: "string" + enum: + - "JSON" + default: "JSON" + - title: "AVRO" + properties: + deserialization_type: + type: "string" + enum: + - "AVRO" + default: "AVRO" + deserialization_strategy: + type: "string" + enum: + - "TopicNameStrategy" + - "RecordNameStrategy" + - "TopicRecordNameStrategy" + default: "TopicNameStrategy" + schema_registry_url: + type: "string" + examples: + - "http://localhost:8081" + schema_registry_username: + type: "string" + default: "" + schema_registry_password: + type: "string" + default: "" bootstrap_servers: title: "Bootstrap Servers" description: "A list of host/port pairs to use for establishing the initial\ @@ -5004,6 +5040,11 @@ \ received." type: "integer" default: 3 + max_records_process: + title: "Maximum Records" + description: "The Maximum to be processed per execution" + type: "integer" + default: 100000 supportsIncremental: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/connectors/source-kafka/Dockerfile b/airbyte-integrations/connectors/source-kafka/Dockerfile index b34d30c35565..c16cd48974bb 100644 --- a/airbyte-integrations/connectors/source-kafka/Dockerfile +++ b/airbyte-integrations/connectors/source-kafka/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-kafka COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.2.0 LABEL io.airbyte.name=airbyte/source-kafka diff --git a/airbyte-integrations/connectors/source-kafka/build.gradle b/airbyte-integrations/connectors/source-kafka/build.gradle index 028ea061692b..6ef80b0db86e 100644 --- a/airbyte-integrations/connectors/source-kafka/build.gradle +++ b/airbyte-integrations/connectors/source-kafka/build.gradle @@ -9,13 +9,24 @@ application { applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0'] } +repositories { + mavenLocal() + mavenCentral() + maven { + url "https://packages.confluent.io/maven" + } + +} + dependencies { implementation project(':airbyte-config:config-models') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-integrations:bases:base-java') + implementation libs.connectors.testcontainers.kafka - implementation 'org.apache.kafka:kafka-clients:2.8.0' - implementation 'org.apache.kafka:connect-json:2.8.0' + implementation 'org.apache.kafka:kafka-clients:3.2.1' + implementation 'org.apache.kafka:connect-json:3.2.1' + implementation 'io.confluent:kafka-avro-serializer:7.2.1' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-kafka') diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java new file mode 100644 index 000000000000..43c6c73cc631 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaFormatFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.source.kafka.format.AvroFormat; +import io.airbyte.integrations.source.kafka.format.JsonFormat; +import io.airbyte.integrations.source.kafka.format.KafkaFormat; + +public class KafkaFormatFactory { + + public static KafkaFormat getFormat(final JsonNode config) { + + MessageFormat messageFormat = + config.has("MessageFormat") ? MessageFormat.valueOf(config.get("MessageFormat").get("deserialization_type").asText().toUpperCase()) + : MessageFormat.JSON; + + switch (messageFormat) { + case JSON -> { + return new JsonFormat(config); + } + case AVRO -> { + return new AvroFormat(config); + } + } + return new JsonFormat(config); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java index 4beeba15353b..103a50700b3d 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java @@ -5,36 +5,14 @@ package io.airbyte.integrations.source.kafka; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Lists; import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.integrations.source.kafka.format.KafkaFormat; +import io.airbyte.protocol.models.*; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.SyncMode; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,33 +24,19 @@ public KafkaSource() {} @Override public AirbyteConnectionStatus check(final JsonNode config) { - try { - final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; - if (!testTopic.isBlank()) { - final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config); - final KafkaConsumer consumer = kafkaSourceConfig.getCheckConsumer(); - consumer.subscribe(Pattern.compile(testTopic)); - consumer.listTopics(); - consumer.close(); - LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); - } + KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); + if (kafkaFormat.isAccessible()) { return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } catch (final Exception e) { - LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage("Could not connect to the Kafka brokers with provided configuration. \n" + e.getMessage()); } + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage("Could not connect to the Kafka brokers with provided configuration. \n"); } @Override - public AirbyteCatalog discover(final JsonNode config) throws Exception { - - final Set topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe(); - final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers - .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) - .collect(Collectors.toList()); + public AirbyteCatalog discover(final JsonNode config) { + KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); + final List streams = kafkaFormat.getStreams(); return new AirbyteCatalog().withStreams(streams); } @@ -83,51 +47,8 @@ public AutoCloseableIterator read(final JsonNode config, final C if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) { throw new RuntimeException("Unable establish a connection: " + check.getMessage()); } - - final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config); - final KafkaConsumer consumer = kafkaSourceConfig.getConsumer(); - final List> recordsList = new ArrayList<>(); - - final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - int pollCount = 0; - final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; - while (true) { - final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); - if (consumerRecords.count() == 0) { - pollCount++; - if (pollCount > retry) { - break; - } - } - - consumerRecords.forEach(record -> { - LOGGER.info("Consumer Record: key - {}, value - {}, partition - {}, offset - {}", - record.key(), record.value(), record.partition(), record.offset()); - recordsList.add(record); - }); - consumer.commitAsync(); - } - consumer.close(); - final Iterator> iterator = recordsList.iterator(); - - return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { - - @Override - protected AirbyteMessage computeNext() { - if (iterator.hasNext()) { - final ConsumerRecord record = iterator.next(); - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withStream(record.topic()) - .withEmittedAt(Instant.now().toEpochMilli()) - .withData(record.value())); - } - - return endOfData(); - } - - }); + KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); + return kafkaFormat.read(); } public static void main(final String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java new file mode 100644 index 000000000000..8b94a1308edf --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaStrategy.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka; + +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import io.confluent.kafka.serializers.subject.TopicNameStrategy; +import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy; + +/** + * https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html + */ +public enum KafkaStrategy { + + TopicNameStrategy(TopicNameStrategy.class.getName()), + RecordNameStrategy(RecordNameStrategy.class.getName()), + TopicRecordNameStrategy(TopicRecordNameStrategy.class.getName()); + + String className; + + KafkaStrategy(String name) { + this.className = name; + } + + public static String getStrategyName(String name) { + for (KafkaStrategy value : KafkaStrategy.values()) { + if (value.name().equalsIgnoreCase(name)) { + return value.className; + } + } + throw new IllegalArgumentException("Unexpected data to strategy setting: " + name); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java new file mode 100644 index 000000000000..0e06fd784ddd --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/MessageFormat.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka; + +/** + * message format in kafka queue + * https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html + */ +public enum MessageFormat { + JSON, + AVRO +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java similarity index 54% rename from airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java rename to airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java index d92c6a31dedb..fd475d0d0f6e 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSourceConfig.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AbstractFormat.java @@ -2,49 +2,41 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.source.kafka; +package io.airbyte.integrations.source.kafka.format; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; -import java.util.Arrays; +import io.airbyte.integrations.source.kafka.KafkaProtocol; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.connect.json.JsonDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaSourceConfig { +public abstract class AbstractFormat implements KafkaFormat { - protected static final Logger LOGGER = LoggerFactory.getLogger(KafkaSourceConfig.class); - private static KafkaSourceConfig instance; - private final JsonNode config; - private KafkaConsumer consumer; - private Set topicsToSubscribe; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFormat.class); - private KafkaSourceConfig(final JsonNode config) { + protected Set topicsToSubscribe; + protected JsonNode config; + + public AbstractFormat(JsonNode config) { this.config = config; - } - public static KafkaSourceConfig getKafkaSourceConfig(final JsonNode config) { - if (instance == null) { - instance = new KafkaSourceConfig(config); - } - return instance; } - private KafkaConsumer buildKafkaConsumer(final JsonNode config) { + protected abstract KafkaConsumer getConsumer(); + + protected abstract Set getTopicsToSubscribe(); + + protected Map getKafkaConfig() { + final Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()); props.put(ConsumerConfig.GROUP_ID_CONFIG, @@ -66,14 +58,13 @@ private KafkaConsumer buildKafkaConsumer(final JsonNode config config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); final Map filteredProps = props.entrySet().stream() .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - return new KafkaConsumer<>(filteredProps); + return filteredProps; + } private Map propertiesByProtocol(final JsonNode config) { @@ -95,47 +86,4 @@ private Map propertiesByProtocol(final JsonNode config) { return builder.build(); } - public KafkaConsumer getConsumer() { - if (consumer != null) { - return consumer; - } - consumer = buildKafkaConsumer(config); - - final JsonNode subscription = config.get("subscription"); - LOGGER.info("Kafka subscribe method: {}", subscription.toString()); - switch (subscription.get("subscription_type").asText()) { - case "subscribe" -> { - final String topicPattern = subscription.get("topic_pattern").asText(); - consumer.subscribe(Pattern.compile(topicPattern)); - topicsToSubscribe = consumer.listTopics().keySet().stream() - .filter(topic -> topic.matches(topicPattern)) - .collect(Collectors.toSet()); - } - case "assign" -> { - topicsToSubscribe = new HashSet<>(); - final String topicPartitions = subscription.get("topic_partitions").asText(); - final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); - final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { - final String[] pair = topicPartition.split(":"); - topicsToSubscribe.add(pair[0]); - return new TopicPartition(pair[0], Integer.parseInt(pair[1])); - }).collect(Collectors.toList()); - LOGGER.info("Topic-partition list: {}", topicPartitionList); - consumer.assign(topicPartitionList); - } - } - return consumer; - } - - public Set getTopicsToSubscribe() { - if (topicsToSubscribe == null) { - getConsumer(); - } - return topicsToSubscribe; - } - - public KafkaConsumer getCheckConsumer() { - return buildKafkaConsumer(config); - } - } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java new file mode 100644 index 000000000000..2b593278c848 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.format; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.integrations.source.kafka.KafkaStrategy; +import io.airbyte.protocol.models.*; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AvroFormat extends AbstractFormat { + + private static final Logger LOGGER = LoggerFactory.getLogger(AvroFormat.class); + + private KafkaConsumer consumer; + + public AvroFormat(JsonNode jsonConfig) { + super(jsonConfig); + } + + @Override + protected Map getKafkaConfig() { + Map props = super.getKafkaConfig(); + final JsonNode avro_config = config.get("MessageFormat"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, + String.format("%s:%s", avro_config.get("schema_registry_username").asText(), avro_config.get("schema_registry_password").asText())); + props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); + props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, + KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); + return props; + } + + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; + } + Map filteredProps = getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } + } + return consumer; + } + + @Override + protected Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); + } + return topicsToSubscribe; + } + + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { + final KafkaConsumer consumer = getConsumer(); + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); + consumer.close(); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + } + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; + } + } + + @Override + public List getStreams() { + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) + .collect(Collectors.toList()); + return streams; + } + + @Override + public AutoCloseableIterator read() { + + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + AtomicInteger record_count = new AtomicInteger(); + final Map poll_lookup = new HashMap<>(); + getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + if (consumerRecords.count() == 0) { + consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic, poll_lookup.get(topic) + 1); + }); + boolean is_complete = poll_lookup.entrySet().stream().allMatch( + e -> e.getValue() > retry); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); + break; + } + } else if (record_count.get() > max_records) { + LOGGER.info("Max record count is reached !!"); + break; + } + + consumerRecords.forEach(record -> { + record_count.getAndIncrement(); + recordsList.add(record); + }); + consumer.commitAsync(); + } + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + GenericRecord avro_data = record.value(); + ObjectMapper mapper = new ObjectMapper(); + String namespace = avro_data.getSchema().getNamespace(); + String name = avro_data.getSchema().getName(); + JsonNode output; + try { + // Todo dynamic namespace is not supported now hence, adding avro schema name in the message + if (StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { + String newString = String.format("{\"avro_schema\": \"%s\",\"name\":\"%s\"}", namespace, name); + JsonNode newNode = mapper.readTree(newString); + output = mapper.readTree(avro_data.toString()); + ((ObjectNode) output).set("_namespace_", newNode); + } else { + output = mapper.readTree(avro_data.toString()); + } + } catch (JsonProcessingException e) { + LOGGER.error("Exception whilst reading avro data from stream", e); + throw new RuntimeException(e); + } + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(output)); + } + + return endOfData(); + } + + }); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java new file mode 100644 index 000000000000..c34fe80dd56e --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.format; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.protocol.models.*; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonFormat extends AbstractFormat { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonFormat.class); + + private KafkaConsumer consumer; + + public JsonFormat(JsonNode jsonConfig) { + super(jsonConfig); + } + + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; + } + Map filteredProps = getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } + } + return consumer; + } + + @Override + protected Map getKafkaConfig() { + Map props = super.getKafkaConfig(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); + return props; + } + + public Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); + } + return topicsToSubscribe; + } + + @Override + public List getStreams() { + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) + .collect(Collectors.toList()); + return streams; + } + + @Override + public AutoCloseableIterator read() { + + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + AtomicInteger record_count = new AtomicInteger(); + final Map poll_lookup = new HashMap<>(); + getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + if (consumerRecords.count() == 0) { + consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic, poll_lookup.get(topic) + 1); + }); + boolean is_complete = poll_lookup.entrySet().stream().allMatch( + e -> e.getValue() > retry); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); + break; + } + } else if (record_count.get() > max_records) { + LOGGER.info("Max record count is reached !!"); + break; + } + + consumerRecords.forEach(record -> { + recordsList.add(record); + }); + consumer.commitAsync(); + } + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(record.value())); + } + + return endOfData(); + } + + }); + } + + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { + final KafkaConsumer consumer = getConsumer(); + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); + consumer.close(); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); + } + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; + } + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java new file mode 100644 index 000000000000..e651d9814237 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.format; + +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import java.util.List; + +public interface KafkaFormat { + + boolean isAccessible(); + + List getStreams(); + + AutoCloseableIterator read(); + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json index 1a64203d5a34..4759f724440c 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json @@ -11,6 +11,54 @@ "required": ["bootstrap_servers", "subscription", "protocol"], "additionalProperties": false, "properties": { + "MessageFormat": { + "title": "MessageFormat", + "type": "object", + "description": "The serialization used based on this ", + "oneOf": [ + { + "title": "JSON", + "properties": { + "deserialization_type": { + "type": "string", + "enum": ["JSON"], + "default": "JSON" + } + } + }, + { + "title": "AVRO", + "properties": { + "deserialization_type": { + "type": "string", + "enum": ["AVRO"], + "default": "AVRO" + }, + "deserialization_strategy": { + "type": "string", + "enum": [ + "TopicNameStrategy", + "RecordNameStrategy", + "TopicRecordNameStrategy" + ], + "default": "TopicNameStrategy" + }, + "schema_registry_url": { + "type": "string", + "examples": ["http://localhost:8081"] + }, + "schema_registry_username": { + "type": "string", + "default": "" + }, + "schema_registry_password": { + "type": "string", + "default": "" + } + } + } + ] + }, "bootstrap_servers": { "title": "Bootstrap Servers", "description": "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).", @@ -225,6 +273,12 @@ "description": "The number of repeated calls to poll() if no messages were received.", "type": "integer", "default": 3 + }, + "max_records_process": { + "title": "Maximum Records", + "description": "The Maximum to be processed per execution", + "type": "integer", + "default": 100000 } } } diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java new file mode 100644 index 000000000000..454cf6334450 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.integrations.source.kafka.format.AvroFormat; +import io.airbyte.integrations.source.kafka.format.KafkaFormat; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +public class KafkaSourceTest { + + @Test + public void testAvroformat() throws IOException { + final JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config.json")); + final KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(configJson); + assertInstanceOf(AvroFormat.class, kafkaFormat); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json new file mode 100644 index 000000000000..d53590514996 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/test_config.json @@ -0,0 +1,32 @@ +{ + "group_id": "login", + "protocol": { + "sasl_mechanism": "PLAIN", + "sasl_jaas_config": "org.apache.kafka.common.security.plain.PlainLoginModule ;", + "security_protocol": "SASL_SSL" + }, + "client_id": "airbyte-login-consumer", + "test_topic": "", + "polling_time": 100, + "subscription": { + "topic_pattern": "dev-accounts-lms-transaction-created", + "subscription_type": "subscribe" + }, + "MessageFormat": { + "schema_registry_url": "http://localhost", + "deserialization_type": "AVRO", + "deserialization_strategy": "TopicRecordNameStrategy", + "schema_registry_password": "password", + "schema_registry_username": "username" + }, + "repeated_calls": 3, + "max_poll_records": 500, + "retry_backoff_ms": 100, + "auto_offset_reset": "earliest", + "bootstrap_servers": "localhost:9092", + "client_dns_lookup": "use_all_dns_ips", + "enable_auto_commit": true, + "request_timeout_ms": 30000, + "receive_buffer_bytes": 32768, + "auto_commit_interval_ms": 5000 +} diff --git a/docs/integrations/sources/kafka.md b/docs/integrations/sources/kafka.md index 48815e18389f..7dff6e3cced5 100644 --- a/docs/integrations/sources/kafka.md +++ b/docs/integrations/sources/kafka.md @@ -22,6 +22,7 @@ You'll need the following information to configure the Kafka source: * **Subscription Method** - You can choose to manually assign a list of partitions, or subscribe to all topics matching specified pattern to get dynamically assigned partitions. * **List of topic** * **Bootstrap Servers** - A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. +* **Schema Registry** - Host/port to connect schema registry server. Note: It supports for AVRO format only. ### For Airbyte Cloud: @@ -40,10 +41,17 @@ The Kafka source connector supports the following[sync modes](https://docs.airby | Incremental - Append Sync | Yes | | | Namespaces | No | | +## Supported Format + JSON - Json value messages. It does not support schema registry now. + + AVRO - deserialize Using confluent API. Please refer (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html) + + ## Changelog | Version | Date | Pull Request | Subject | | :------ | :-------- | :------------------------------------------------------| :---------------------------------------- | +| 0.2.0 | 2022-08-22 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Added AVRO format support and Support for maximum records to process| | 0.1.7 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors | | 0.1.6 | 2022-05-29 | [12903](https://github.com/airbytehq/airbyte/pull/12903) | Add Polling Time to Specification (default 100 ms) | | 0.1.5 | 2022-04-19 | [12134](https://github.com/airbytehq/airbyte/pull/12134) | Add PLAIN Auth |