From f0e1fbfd2fe1384752a14dcf6f34221ebc262100 Mon Sep 17 00:00:00 2001 From: Yevhen Sukhomud Date: Fri, 5 Aug 2022 15:39:44 +0700 Subject: [PATCH] Update Pulsar destination to use outputRecordCollector to properly store state (#15349) * Update Pulsar destination to use outputRecordCollector to properly store state * Bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../connectors/destination-pulsar/Dockerfile | 2 +- .../destination-pulsar/build.gradle | 1 + .../destination/pulsar/PulsarDestination.java | 4 +- .../pulsar/PulsarRecordConsumer.java | 14 ++---- .../pulsar/PulsarRecordConsumerTest.java | 43 ++++++++++++++++--- 7 files changed, 48 insertions(+), 20 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 05be34b8d059..2cb599631b18 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -210,7 +210,7 @@ - name: Pulsar destinationDefinitionId: 2340cbba-358e-11ec-8d3d-0242ac130203 dockerRepository: airbyte/destination-pulsar - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/pulsar icon: pulsar.svg releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index d07d6934f191..349d0913c5d8 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3479,7 +3479,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-pulsar:0.1.2" +- dockerImage: "airbyte/destination-pulsar:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/pulsar" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-pulsar/Dockerfile b/airbyte-integrations/connectors/destination-pulsar/Dockerfile index c96c68c6c3f2..1861288ae8c9 100644 --- a/airbyte-integrations/connectors/destination-pulsar/Dockerfile +++ b/airbyte-integrations/connectors/destination-pulsar/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-pulsar COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-pulsar diff --git a/airbyte-integrations/connectors/destination-pulsar/build.gradle b/airbyte-integrations/connectors/destination-pulsar/build.gradle index 4cd167744463..6e12ff7c64e5 100644 --- a/airbyte-integrations/connectors/destination-pulsar/build.gradle +++ b/airbyte-integrations/connectors/destination-pulsar/build.gradle @@ -19,6 +19,7 @@ dependencies { implementation 'org.apache.pulsar:pulsar-client:2.8.1' testImplementation libs.connectors.testcontainers.pulsar + testImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-pulsar') diff --git a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarDestination.java b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarDestination.java index 960a0060deee..001423b26236 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarDestination.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarDestination.java @@ -80,8 +80,10 @@ public AirbyteConnectionStatus check(final JsonNode config) { public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { - return new PulsarRecordConsumer(PulsarDestinationConfig.getPulsarDestinationConfig(config), + final PulsarDestinationConfig pulsarConfig = PulsarDestinationConfig.getPulsarDestinationConfig(config); + return new PulsarRecordConsumer(pulsarConfig, catalog, + PulsarUtils.buildClient(pulsarConfig.getServiceUrl()), outputRecordCollector, namingResolver); } diff --git a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java index 851d4ea179ee..fde2db986903 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/main/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumer.java @@ -38,10 +38,9 @@ public class PulsarRecordConsumer extends FailureTrackingAirbyteMessageConsumer private final NamingConventionTransformer nameTransformer; private final PulsarClient client; - private AirbyteMessage lastStateMessage = null; - public PulsarRecordConsumer(final PulsarDestinationConfig pulsarDestinationConfig, final ConfiguredAirbyteCatalog catalog, + final PulsarClient pulsarClient, final Consumer outputRecordCollector, final NamingConventionTransformer nameTransformer) { this.config = pulsarDestinationConfig; @@ -49,7 +48,7 @@ public PulsarRecordConsumer(final PulsarDestinationConfig pulsarDestinationConfi this.catalog = catalog; this.outputRecordCollector = outputRecordCollector; this.nameTransformer = nameTransformer; - this.client = PulsarUtils.buildClient(this.config.getServiceUrl()); + this.client = pulsarClient; } @Override @@ -60,7 +59,7 @@ protected void startTracked() { @Override protected void acceptTracked(final AirbyteMessage airbyteMessage) { if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) { - lastStateMessage = airbyteMessage; + outputRecordCollector.accept(airbyteMessage); } else if (airbyteMessage.getType() == AirbyteMessage.Type.RECORD) { final AirbyteRecordMessage recordMessage = airbyteMessage.getRecord(); final Producer producer = producerMap.get(AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage)); @@ -100,9 +99,6 @@ private void sendRecord(final Producer producer, final GenericRec LOGGER.error("Error sending message to topic.", e); throw new RuntimeException("Cannot send message to Pulsar. Error: " + e.getMessage(), e); } - if (lastStateMessage != null) { - outputRecordCollector.accept(lastStateMessage); - } } } @@ -113,10 +109,6 @@ protected void close(final boolean hasFailed) { Exceptions.swallow(producer::close); }); Exceptions.swallow(client::close); - - if (lastStateMessage != null) { - outputRecordCollector.accept(lastStateMessage); - } } } diff --git a/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java b/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java index 28725a066759..7542922b8bb7 100644 --- a/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java +++ b/airbyte-integrations/connectors/destination-pulsar/src/test/java/io/airbyte/integrations/destination/pulsar/PulsarRecordConsumerTest.java @@ -6,7 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; @@ -16,7 +15,9 @@ import com.google.common.net.InetAddresses; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -41,21 +42,40 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.schema.GenericRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.testcontainers.containers.PulsarContainer; import org.testcontainers.utility.DockerImageName; @DisplayName("PulsarRecordConsumer") -public class PulsarRecordConsumerTest { +@ExtendWith(MockitoExtension.class) +public class PulsarRecordConsumerTest extends PerStreamStateMessageTest { + + @Mock + private Consumer outputRecordCollector; + + private PulsarRecordConsumer consumer; + + @Mock + private PulsarDestinationConfig config; + + @Mock + private ConfiguredAirbyteCatalog catalog; + + @Mock + private PulsarClient pulsarClient; private static final StandardNameTransformer NAMING_RESOLVER = new StandardNameTransformer(); @@ -75,8 +95,8 @@ public void testBuildProducerMap(final ConfiguredAirbyteCatalog catalog, .collect(Collectors.joining(",")); final PulsarDestinationConfig config = PulsarDestinationConfig .getPulsarDestinationConfig(getConfig(brokers, topicPattern)); - - final PulsarRecordConsumer recordConsumer = new PulsarRecordConsumer(config, catalog, mock(Consumer.class), NAMING_RESOLVER); + final PulsarClient pulsarClient = PulsarUtils.buildClient(config.getServiceUrl()); + final PulsarRecordConsumer recordConsumer = new PulsarRecordConsumer(config, catalog, pulsarClient, outputRecordCollector, NAMING_RESOLVER); final Map> producerMap = recordConsumer.buildProducerMap(); assertEquals(Sets.newHashSet(catalog.getStreams()).size(), producerMap.size()); @@ -98,7 +118,8 @@ void testCannotConnectToBrokers() throws Exception { namespace, Field.of("id", JsonSchemaType.NUMBER), Field.of("name", JsonSchemaType.STRING)))); - final PulsarRecordConsumer consumer = new PulsarRecordConsumer(config, catalog, mock(Consumer.class), NAMING_RESOLVER); + final PulsarClient pulsarClient = PulsarUtils.buildClient(config.getServiceUrl()); + final PulsarRecordConsumer consumer = new PulsarRecordConsumer(config, catalog, pulsarClient, outputRecordCollector, NAMING_RESOLVER); final List expectedRecords = getNRecords(10, streamName, namespace); assertThrows(RuntimeException.class, consumer::start); @@ -211,10 +232,22 @@ private List buildArgs(final ConfiguredAirbyteCatalog catalog, final } + @Override + protected Consumer getMockedConsumer() { + return outputRecordCollector; + } + + @Override + protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { + return consumer; + } + @BeforeEach void setup() { + // TODO: Unit tests should not use Testcontainers PULSAR = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.1")); PULSAR.start(); + consumer = new PulsarRecordConsumer(config, catalog, pulsarClient, outputRecordCollector, NAMING_RESOLVER); } @AfterEach