Skip to content

Commit

Permalink
Update Keen destination to use outputRecordCollector to properly stor…
Browse files Browse the repository at this point in the history
…e state (#15291)

* Update Keen destination to use outputRecordCollector to properly store state

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
suhomud and octavia-squidington-iii authored Aug 4, 2022
1 parent 3861f0f commit e28038e
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
- name: Chargify (Keen)
destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae
dockerRepository: airbyte/destination-keen
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/keen
icon: chargify.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-keen:0.2.3"
- dockerImage: "airbyte/destination-keen:0.2.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/keen"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-keen

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.name=airbyte/destination-keen
2 changes: 2 additions & 0 deletions airbyte-integrations/connectors/destination-keen/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ dependencies {
implementation 'org.apache.kafka:kafka-clients:2.8.0'
implementation 'com.joestelmach:natty:0.11'

testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-keen')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class KeenRecordsConsumer extends FailureTrackingAirbyteMessageConsumer {
private String projectId;
private String apiKey;
private KafkaProducer<String, String> kafkaProducer;
private AirbyteMessage lastStateMessage;
private Set<String> streamNames;

public KeenRecordsConsumer(final JsonNode config,
Expand All @@ -52,7 +51,6 @@ public KeenRecordsConsumer(final JsonNode config,
this.outputRecordCollector = outputRecordCollector;
this.kafkaProducer = null;
this.streamNames = Set.of();
this.lastStateMessage = null;
LOGGER.info("initializing consumer.");
}

Expand All @@ -72,8 +70,7 @@ protected void startTracked() throws IOException, InterruptedException {
@Override
protected void acceptTracked(final AirbyteMessage msg) {
if (msg.getType() == Type.STATE) {
lastStateMessage = msg;
outputRecordCollector.accept(lastStateMessage);
outputRecordCollector.accept(msg);
return;
} else if (msg.getType() != Type.RECORD) {
return;
Expand Down Expand Up @@ -128,9 +125,6 @@ private String getStreamName(final AirbyteRecordMessage recordMessage) {
protected void close(final boolean hasFailed) {
kafkaProducer.flush();
kafkaProducer.close();
if (!hasFailed) {
outputRecordCollector.accept(lastStateMessage);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.keen;

import static io.airbyte.integrations.destination.keen.KeenDestination.CONFIG_API_KEY;
import static io.airbyte.integrations.destination.keen.KeenDestination.CONFIG_PROJECT_ID;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@DisplayName("KafkaRecordConsumer")
@ExtendWith(MockitoExtension.class)
public class KeenRecordConsumerTest extends PerStreamStateMessageTest {

private static final String SCHEMA_NAME = "public";
private static final String STREAM_NAME = "id_and_name";

private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(List.of(
CatalogHelpers.createConfiguredAirbyteStream(
STREAM_NAME,
SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))));
@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private KeenRecordsConsumer consumer;

@BeforeEach
public void init() {
final JsonNode config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, "test_project")
.put(CONFIG_API_KEY, "test_apikey")
.build());
consumer = new KeenRecordsConsumer(config, CATALOG, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

}

0 comments on commit e28038e

Please sign in to comment.