Skip to content

Commit

Permalink
Update Cassandra destination to use outputRecordCollector to properly…
Browse files Browse the repository at this point in the history
… store state (#15294)

* Update Cassandra destination to use outputRecordCollector to properly store state

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
suhomud and octavia-squidington-iii authored Aug 4, 2022
1 parent 117c346 commit 39db316
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
- name: Cassandra
destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5
dockerRepository: airbyte/destination-cassandra
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/cassandra
icon: cassandra.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-cassandra:0.1.2"
- dockerImage: "airbyte/destination-cassandra:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/cassandra"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-cassandra

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-cassandra
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
// https://mvnrepository.com/artifact/org.assertj/assertj-core
testImplementation "org.assertj:assertj-core:${assertVersion}"
testImplementation libs.connectors.testcontainers.cassandra
testImplementation project(':airbyte-integrations:bases:standard-destination-test')


integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception {
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
public AirbyteConnectionStatus check(final JsonNode config) {
var cassandraConfig = new CassandraConfig(config);
// add random uuid to avoid conflicts with existing tables.
String tableName = "table_" + UUID.randomUUID().toString().replace("-", "");
Expand Down Expand Up @@ -55,10 +55,12 @@ public AirbyteConnectionStatus check(JsonNode config) {
}

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
return new CassandraMessageConsumer(new CassandraConfig(config), configuredCatalog, outputRecordCollector);
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final CassandraConfig cassandraConfig = new CassandraConfig(config);
final CassandraCqlProvider cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
return new CassandraMessageConsumer(cassandraConfig, configuredCatalog, cassandraCqlProvider, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ class CassandraMessageConsumer extends FailureTrackingAirbyteMessageConsumer {

private final CassandraCqlProvider cassandraCqlProvider;

private AirbyteMessage lastMessage = null;

public CassandraMessageConsumer(CassandraConfig cassandraConfig,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
public CassandraMessageConsumer(final CassandraConfig cassandraConfig,
final ConfiguredAirbyteCatalog configuredCatalog,
final CassandraCqlProvider provider,
final Consumer<AirbyteMessage> outputRecordCollector) {
this.cassandraConfig = cassandraConfig;
this.outputRecordCollector = outputRecordCollector;
this.cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
this.cassandraCqlProvider = provider;
var nameTransformer = new CassandraNameTransformer(cassandraConfig);
this.cassandraStreams = configuredCatalog.getStreams().stream()
.collect(Collectors.toUnmodifiableMap(
Expand All @@ -55,7 +54,7 @@ protected void startTracked() {
}

@Override
protected void acceptTracked(AirbyteMessage message) {
protected void acceptTracked(final AirbyteMessage message) {
if (message.getType() == AirbyteMessage.Type.RECORD) {
var messageRecord = message.getRecord();
var streamConfig =
Expand All @@ -66,14 +65,14 @@ protected void acceptTracked(AirbyteMessage message) {
var data = Jsons.serialize(messageRecord.getData());
cassandraCqlProvider.insert(streamConfig.getKeyspace(), streamConfig.getTempTableName(), data);
} else if (message.getType() == AirbyteMessage.Type.STATE) {
this.lastMessage = message;
outputRecordCollector.accept(message);
} else {
LOGGER.warn("Unsupported airbyte message type: {}", message.getType());
}
}

@Override
protected void close(boolean hasFailed) {
protected void close(final boolean hasFailed) {
if (!hasFailed) {
cassandraStreams.forEach((k, v) -> {
try {
Expand All @@ -88,17 +87,16 @@ protected void close(boolean hasFailed) {
}
default -> throw new UnsupportedOperationException();
}
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Error while copying data to table {}: : ", v.getTableName(), e);
}
});
outputRecordCollector.accept(lastMessage);
}

cassandraStreams.forEach((k, v) -> {
try {
cassandraCqlProvider.dropTableIfExists(v.getKeyspace(), v.getTempTableName());
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Error while deleting temp table {} with reason: ", v.getTempTableName(), e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ void setup() {

var catalog = TestDataFactory.createConfiguredAirbyteCatalog(cStream1, cStream2);

cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, message -> {});
cassandraCqlProvider = new CassandraCqlProvider(cassandraConfig);
cassandraMessageConsumer = new CassandraMessageConsumer(cassandraConfig, catalog, cassandraCqlProvider, message -> {});
nameTransformer = new CassandraNameTransformer(cassandraConfig);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.cassandra;

import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class CassandraRecordConsumerTest extends PerStreamStateMessageTest {

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

@InjectMocks
private CassandraMessageConsumer consumer;
@Mock
private CassandraConfig config;
@Mock
private ConfiguredAirbyteCatalog catalog;
@Mock
private CassandraCqlProvider provider;

@BeforeEach
public void init() {
consumer = new CassandraMessageConsumer(config, catalog, provider, outputRecordCollector);
}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

}

0 comments on commit 39db316

Please sign in to comment.