Skip to content

Commit

Permalink
🐛 Source Kafka: fix missing data (#19587)
Browse files Browse the repository at this point in the history
* Kafka Source - fix missing data

* auto-bump connector version

* update doc

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
4 people authored Dec 6, 2022
1 parent 23b35ba commit 3e6d5ac
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@
- name: Kafka
sourceDefinitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
dockerRepository: airbyte/source-kafka
dockerImageTag: 0.2.2
dockerImageTag: 0.2.3
documentationUrl: https://docs.airbyte.com/integrations/sources/kafka
icon: kafka.svg
sourceType: database
Expand Down
35 changes: 8 additions & 27 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6733,7 +6733,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-kafka:0.2.2"
- dockerImage: "airbyte/source-kafka:0.2.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/kafka"
connectionSpecification:
Expand All @@ -6744,7 +6744,7 @@
- "bootstrap_servers"
- "subscription"
- "protocol"
additionalProperties: false
additionalProperties: true
properties:
MessageFormat:
title: "MessageFormat"
Expand All @@ -6755,16 +6755,11 @@
properties:
deserialization_type:
type: "string"
enum:
- "JSON"
default: "JSON"
const: "JSON"
- title: "AVRO"
properties:
deserialization_type:
type: "string"
enum:
- "AVRO"
default: "AVRO"
const: "AVRO"
deserialization_strategy:
type: "string"
enum:
Expand Down Expand Up @@ -6815,9 +6810,6 @@
\ list of topic partitions is empty, it is treated the same as unsubscribe()."
type: "string"
const: "assign"
enum:
- "assign"
default: "assign"
topic_partitions:
title: "List of topic:partition Pairs"
type: "string"
Expand All @@ -6832,9 +6824,6 @@
description: "The Topic pattern from which the records will be read."
type: "string"
const: "subscribe"
enum:
- "subscribe"
default: "subscribe"
topic_pattern:
title: "Topic Pattern"
type: "string"
Expand Down Expand Up @@ -6876,9 +6865,7 @@
properties:
security_protocol:
type: "string"
enum:
- "PLAINTEXT"
default: "PLAINTEXT"
const: "PLAINTEXT"
- title: "SASL PLAINTEXT"
required:
- "security_protocol"
Expand All @@ -6887,17 +6874,13 @@
properties:
security_protocol:
type: "string"
enum:
- "SASL_PLAINTEXT"
default: "SASL_PLAINTEXT"
const: "SASL_PLAINTEXT"
sasl_mechanism:
title: "SASL Mechanism"
description: "The SASL mechanism used for client connections. This\
\ may be any mechanism for which a security provider is available."
type: "string"
default: "PLAIN"
enum:
- "PLAIN"
const: "PLAIN"
sasl_jaas_config:
title: "SASL JAAS Config"
description: "The JAAS login context parameters for SASL connections\
Expand All @@ -6913,9 +6896,7 @@
properties:
security_protocol:
type: "string"
enum:
- "SASL_SSL"
default: "SASL_SSL"
const: "SASL_SSL"
sasl_mechanism:
title: "SASL Mechanism"
description: "The SASL mechanism used for client connections. This\
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-kafka

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/source-kafka
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public AutoCloseableIterator<AirbyteMessage> read() {
getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0));
while (true) {
final ConsumerRecords<String, GenericRecord> consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS));
consumerRecords.forEach(record -> {
record_count.getAndIncrement();
recordsList.add(record);
});
consumer.commitAsync();

if (consumerRecords.count() == 0) {
consumer.assignment().stream().map(record -> record.topic()).distinct().forEach(
topic -> {
Expand All @@ -160,12 +166,6 @@ public AutoCloseableIterator<AirbyteMessage> read() {
LOGGER.info("Max record count is reached !!");
break;
}

consumerRecords.forEach(record -> {
record_count.getAndIncrement();
recordsList.add(record);
});
consumer.commitAsync();
}
consumer.close();
final Iterator<ConsumerRecord<String, GenericRecord>> iterator = recordsList.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public AutoCloseableIterator<AirbyteMessage> read() {
getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0));
while (true) {
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS));
consumerRecords.forEach(record -> {
record_count.getAndIncrement();
recordsList.add(record);
});
consumer.commitAsync();

if (consumerRecords.count() == 0) {
consumer.assignment().stream().map(record -> record.topic()).distinct().forEach(
topic -> {
Expand All @@ -125,12 +131,6 @@ public AutoCloseableIterator<AirbyteMessage> read() {
LOGGER.info("Max record count is reached !!");
break;
}

consumerRecords.forEach(record -> {
record_count.getAndIncrement();
recordsList.add(record);
});
consumer.commitAsync();
}
consumer.close();
final Iterator<ConsumerRecord<String, JsonNode>> iterator = recordsList.iterator();
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The Kafka source connector supports the following [sync modes](https://docs.airb

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :------------------------------------------------------| :---------------------------------------- |
| 0.2.3 | 2022-12-06 | [19587](https://github.com/airbytehq/airbyte/pull/19587) | Fix missing data before consumer is closed |
| 0.2.2 | 2022-11-04 | [18648](https://github.com/airbytehq/airbyte/pull/18648) | Add missing record_count increment for JSON|
| 0.2.1 | 2022-11-04 | This version was the same as 0.2.0 and was committed so using 0.2.2 next to keep versions in order|
| 0.2.0 | 2022-08-22 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Added AVRO format support and Support for maximum records to process|
Expand Down

0 comments on commit 3e6d5ac

Please sign in to comment.