Skip to content

Commit

Permalink
🎉 New source: Kafka (#5906)
Browse files Browse the repository at this point in the history
* new source: kafka

* refactored util and config classes

* fixed discover method for configs based on topic pattern
  • Loading branch information
heade authored Sep 22, 2021
1 parent 0b82f7f commit c86403f
Show file tree
Hide file tree
Showing 10 changed files with 812 additions and 0 deletions.
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-kafka

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.name=airbyte/source-kafka
LABEL io.airbyte.version=0.1.0
45 changes: 45 additions & 0 deletions airbyte-integrations/connectors/source-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Kafka Source

This is the repository for the Kafka source connector.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/kafka).

## Local development

### Prerequisites
* If you are using Python for connector development, minimal required version `= 3.7.0`
* Valid credentials (see the "Create credentials section for instructions)
TODO: _which languages and tools does a user need to develop on this connector? add them to the bullet list above_

### Iteration
TODO: _which commands should a developer use to run this connector locally?_

### Testing
#### Unit Tests
TODO: _how can a user run unit tests?_

#### Integration Tests
TODO: _how can a user run integration tests?_
_this section is currently under construction -- please reach out to us on Slack for help with setting up Airbyte's standard test suite_


### Locally running the connector docker image

First, make sure you build the latest Docker image:
```
docker build . -t airbyte/kafka:dev
```

Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-kafka:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-kafka:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-kafka:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-kafka:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json
```

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/kafka)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `spec.json` file. `secrets` is gitignored by default.

**If you are an Airbyte core member**, copy the credentials from Lastpass under the secret name `source kafka test creds`
and place them into `secrets/config.json`.
24 changes: 24 additions & 0 deletions airbyte-integrations/connectors/source-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.source.kafka.KafkaSource'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')

implementation 'org.apache.kafka:kafka-clients:2.8.0'
implementation 'org.apache.kafka:connect-json:2.8.0'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-kafka')
integrationTestJavaImplementation "org.testcontainers:kafka:1.15.3"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.kafka;

public enum KafkaProtocol {

PLAINTEXT,
SASL_PLAINTEXT,
SASL_SSL

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.*;
import io.airbyte.protocol.models.*;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSource extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);

public KafkaSource() {}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : "";
if (!testTopic.isBlank()) {
final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getCheckConsumer();
consumer.subscribe(Pattern.compile(testTopic));
consumer.listTopics();
consumer.close();
LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText());
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect to the Kafka brokers with provided configuration. \n" + e.getMessage());
}
}

@Override
public AirbyteCatalog discover(JsonNode config) throws Exception {

Set<String> topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe();
List<AirbyteStream> streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers
.createAirbyteStream(topic, Field.of("value", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.collect(Collectors.toList());
return new AirbyteCatalog().withStreams(streams);
}

@Override
public AutoCloseableIterator<AirbyteMessage> read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception {
final AirbyteConnectionStatus check = check(config);
if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) {
throw new RuntimeException("Unable establish a connection: " + check.getMessage());
}

final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getConsumer();
List<ConsumerRecord<String, JsonNode>> recordsList = new ArrayList<>();

int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
int pollCount = 0;
while (true) {
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
if (consumerRecords.count() == 0) {
pollCount++;
if (pollCount > retry) {
break;
}
}

consumerRecords.forEach(record -> {
LOGGER.info("Consumer Record: key - {}, value - {}, partition - {}, offset - {}",
record.key(), record.value(), record.partition(), record.offset());
recordsList.add(record);
});
consumer.commitAsync();
}
consumer.close();
Iterator<ConsumerRecord<String, JsonNode>> iterator = recordsList.iterator();

return AutoCloseableIterators.fromIterator(new AbstractIterator<>() {

@Override
protected AirbyteMessage computeNext() {
if (iterator.hasNext()) {
ConsumerRecord<String, JsonNode> record = iterator.next();
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(record.topic())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(record.value()));
}

return endOfData();
}

});
}

public static void main(String[] args) throws Exception {
final Source source = new KafkaSource();
LOGGER.info("Starting source: {}", KafkaSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("Completed source: {}", KafkaSource.class);
}

}
Loading

0 comments on commit c86403f

Please sign in to comment.