Skip to content

Commit

Permalink
Update Pulsar destination to use outputRecordCollector to properly st…
Browse files Browse the repository at this point in the history
…ore state (#15349)

* Update Pulsar destination to use outputRecordCollector to properly store state

* Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
suhomud and octavia-squidington-iii authored Aug 5, 2022
1 parent 964b226 commit f0e1fbf
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@
- name: Pulsar
destinationDefinitionId: 2340cbba-358e-11ec-8d3d-0242ac130203
dockerRepository: airbyte/destination-pulsar
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/pulsar
icon: pulsar.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3479,7 +3479,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-pulsar:0.1.2"
- dockerImage: "airbyte/destination-pulsar:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/pulsar"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-pulsar

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation 'org.apache.pulsar:pulsar-client:2.8.1'

testImplementation libs.connectors.testcontainers.pulsar
testImplementation project(':airbyte-integrations:bases:standard-destination-test')

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-pulsar')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ public AirbyteConnectionStatus check(final JsonNode config) {
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return new PulsarRecordConsumer(PulsarDestinationConfig.getPulsarDestinationConfig(config),
final PulsarDestinationConfig pulsarConfig = PulsarDestinationConfig.getPulsarDestinationConfig(config);
return new PulsarRecordConsumer(pulsarConfig,
catalog,
PulsarUtils.buildClient(pulsarConfig.getServiceUrl()),
outputRecordCollector,
namingResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@ public class PulsarRecordConsumer extends FailureTrackingAirbyteMessageConsumer
private final NamingConventionTransformer nameTransformer;
private final PulsarClient client;

private AirbyteMessage lastStateMessage = null;

public PulsarRecordConsumer(final PulsarDestinationConfig pulsarDestinationConfig,
final ConfiguredAirbyteCatalog catalog,
final PulsarClient pulsarClient,
final Consumer<AirbyteMessage> outputRecordCollector,
final NamingConventionTransformer nameTransformer) {
this.config = pulsarDestinationConfig;
this.producerMap = new HashMap<>();
this.catalog = catalog;
this.outputRecordCollector = outputRecordCollector;
this.nameTransformer = nameTransformer;
this.client = PulsarUtils.buildClient(this.config.getServiceUrl());
this.client = pulsarClient;
}

@Override
Expand All @@ -60,7 +59,7 @@ protected void startTracked() {
@Override
protected void acceptTracked(final AirbyteMessage airbyteMessage) {
if (airbyteMessage.getType() == AirbyteMessage.Type.STATE) {
lastStateMessage = airbyteMessage;
outputRecordCollector.accept(airbyteMessage);
} else if (airbyteMessage.getType() == AirbyteMessage.Type.RECORD) {
final AirbyteRecordMessage recordMessage = airbyteMessage.getRecord();
final Producer<GenericRecord> producer = producerMap.get(AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage));
Expand Down Expand Up @@ -100,9 +99,6 @@ private void sendRecord(final Producer<GenericRecord> producer, final GenericRec
LOGGER.error("Error sending message to topic.", e);
throw new RuntimeException("Cannot send message to Pulsar. Error: " + e.getMessage(), e);
}
if (lastStateMessage != null) {
outputRecordCollector.accept(lastStateMessage);
}
}
}

Expand All @@ -113,10 +109,6 @@ protected void close(final boolean hasFailed) {
Exceptions.swallow(producer::close);
});
Exceptions.swallow(client::close);

if (lastStateMessage != null) {
outputRecordCollector.accept(lastStateMessage);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
Expand All @@ -16,7 +15,9 @@
import com.google.common.net.InetAddresses;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.PerStreamStateMessageTest;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand All @@ -41,21 +42,40 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

@DisplayName("PulsarRecordConsumer")
public class PulsarRecordConsumerTest {
@ExtendWith(MockitoExtension.class)
public class PulsarRecordConsumerTest extends PerStreamStateMessageTest {

@Mock
private Consumer<AirbyteMessage> outputRecordCollector;

private PulsarRecordConsumer consumer;

@Mock
private PulsarDestinationConfig config;

@Mock
private ConfiguredAirbyteCatalog catalog;

@Mock
private PulsarClient pulsarClient;

private static final StandardNameTransformer NAMING_RESOLVER = new StandardNameTransformer();

Expand All @@ -75,8 +95,8 @@ public void testBuildProducerMap(final ConfiguredAirbyteCatalog catalog,
.collect(Collectors.joining(","));
final PulsarDestinationConfig config = PulsarDestinationConfig
.getPulsarDestinationConfig(getConfig(brokers, topicPattern));

final PulsarRecordConsumer recordConsumer = new PulsarRecordConsumer(config, catalog, mock(Consumer.class), NAMING_RESOLVER);
final PulsarClient pulsarClient = PulsarUtils.buildClient(config.getServiceUrl());
final PulsarRecordConsumer recordConsumer = new PulsarRecordConsumer(config, catalog, pulsarClient, outputRecordCollector, NAMING_RESOLVER);
final Map<AirbyteStreamNameNamespacePair, Producer<GenericRecord>> producerMap = recordConsumer.buildProducerMap();
assertEquals(Sets.newHashSet(catalog.getStreams()).size(), producerMap.size());

Expand All @@ -98,7 +118,8 @@ void testCannotConnectToBrokers() throws Exception {
namespace,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))));
final PulsarRecordConsumer consumer = new PulsarRecordConsumer(config, catalog, mock(Consumer.class), NAMING_RESOLVER);
final PulsarClient pulsarClient = PulsarUtils.buildClient(config.getServiceUrl());
final PulsarRecordConsumer consumer = new PulsarRecordConsumer(config, catalog, pulsarClient, outputRecordCollector, NAMING_RESOLVER);
final List<AirbyteMessage> expectedRecords = getNRecords(10, streamName, namespace);

assertThrows(RuntimeException.class, consumer::start);
Expand Down Expand Up @@ -211,10 +232,22 @@ private List<Arguments> buildArgs(final ConfiguredAirbyteCatalog catalog, final

}

@Override
protected Consumer<AirbyteMessage> getMockedConsumer() {
return outputRecordCollector;
}

@Override
protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() {
return consumer;
}

@BeforeEach
void setup() {
// TODO: Unit tests should not use Testcontainers
PULSAR = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.8.1"));
PULSAR.start();
consumer = new PulsarRecordConsumer(config, catalog, pulsarClient, outputRecordCollector, NAMING_RESOLVER);
}

@AfterEach
Expand Down

0 comments on commit f0e1fbf

Please sign in to comment.