Skip to content

Commit

Permalink
Destination:PubSub Add Batching and Ordering configuration (#15705)
Browse files Browse the repository at this point in the history
* Add Batching and Ordering configuration

* Add changelog

* fix config for tests

* format files

* auto-bump connector version

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 2, 2022
1 parent bd9eedf commit b7d2681
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@
- name: Google PubSub
destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692
dockerRepository: airbyte/destination-pubsub
dockerImageTag: 0.1.6
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/pubsub
icon: googlepubsub.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2734,7 +2734,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-pubsub:0.1.6"
- dockerImage: "airbyte/destination-pubsub:0.2.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/pubsub"
connectionSpecification:
Expand All @@ -2745,6 +2745,8 @@
- "project_id"
- "topic_id"
- "credentials_json"
- "ordering_enabled"
- "batching_enabled"
additionalProperties: true
properties:
project_id:
Expand All @@ -2762,6 +2764,37 @@
>docs</a> if you need help generating this key."
title: "Credentials JSON"
airbyte_secret: true
ordering_enabled:
title: "Message Ordering Enabled"
description: "If TRUE PubSub publisher will have <a href=\"https://cloud.google.com/pubsub/docs/ordering\"\
>message ordering</a> enabled. Every message will have an ordering key\
\ of stream"
type: "boolean"
default: false
batching_enabled:
type: "boolean"
title: "Message Batching Enabled"
description: "If TRUE messages will be buffered instead of sending them\
\ one by one"
default: false
batching_delay_threshold:
type: "integer"
title: "Message Batching: Delay Threshold"
description: "Number of ms before the buffer is flushed"
default: 1
minimum: 1
batching_element_count_threshold:
type: "integer"
title: "Message Batching: Element Count Threshold"
description: "Number of messages before the buffer is flushed"
default: 1
minimum: 1
batching_request_bytes_threshold:
type: "integer"
title: "Message Batching: Request Bytes Threshold"
description: "Number of bytes before the buffer is flushed"
default: 1
minimum: 1
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-pubsub

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -24,7 +21,6 @@
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.ByteArrayInputStream;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
Expand All @@ -34,13 +30,13 @@
public class PubsubConsumer extends FailureTrackingAirbyteMessageConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(PubsubConsumer.class);
private final JsonNode config;
private final PubsubDestinationConfig config;
private final ConfiguredAirbyteCatalog catalog;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, Map<String, String>> attributes;
private Publisher publisher;

public PubsubConsumer(final JsonNode config,
public PubsubConsumer(final PubsubDestinationConfig config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
this.outputRecordCollector = outputRecordCollector;
Expand All @@ -54,18 +50,11 @@ public PubsubConsumer(final JsonNode config,
@Override
protected void startTracked() throws Exception {
// get publisher
final String projectId = config.get(PubsubDestination.CONFIG_PROJECT_ID).asText();
final String topicName = config.get(PubsubDestination.CONFIG_TOPIC_ID).asText();
final TopicName topic = TopicName.of(projectId, topicName);
final String credentialsString =
config.get(PubsubDestination.CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(
PubsubDestination.CONFIG_CREDS))
: config.get(PubsubDestination.CONFIG_CREDS).asText();
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));
publisher = Publisher.newBuilder(topic)
.setEnableMessageOrdering(true)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
publisher = Publisher.newBuilder(config.getTopic())
.setBatchingSettings(config.getBatchingSettings())
.setEnableMessageOrdering(config.isOrderingEnabled())
.setCredentialsProvider(FixedCredentialsProvider.create(config.getCredentials()))
.build();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final Map<String, String> attrs = Maps.newHashMap();
final var key = AirbyteStreamNameNamespacePair.fromAirbyteStream(configStream.getStream());
Expand Down Expand Up @@ -100,10 +89,13 @@ protected void acceptTracked(final AirbyteMessage msg) throws Exception {
JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()));

publisher.publish(
PubsubMessage.newBuilder().putAllAttributes(attributes.get(streamKey))
.setOrderingKey(streamKey.toString())
.setData(ByteString.copyFromUtf8(Jsons.serialize(data))).build());
var messageBuilder = PubsubMessage.newBuilder()
.putAllAttributes(attributes.get(streamKey))
.setData(ByteString.copyFromUtf8(Jsons.serialize(data)));
if (config.isOrderingEnabled()) {
messageBuilder.setOrderingKey(streamKey.toString());
}
publisher.publish(messageBuilder.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.pubsub.v1.TopicName;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
Expand All @@ -23,17 +19,14 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubDestination extends BaseConnector implements Destination {

static final String CONFIG_TOPIC_ID = "topic_id";
static final String CONFIG_PROJECT_ID = "project_id";
static final String CONFIG_CREDS = "credentials_json";
static final String STREAM = "_stream";
static final String NAMESPACE = "_namespace";
private static final Logger LOGGER = LoggerFactory.getLogger(PubsubDestination.class);
Expand All @@ -45,23 +38,15 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try {
final String projectId = config.get(CONFIG_PROJECT_ID).asText();
final String topicId = config.get(CONFIG_TOPIC_ID).asText();
final String credentialsString =
config.get(CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(CONFIG_CREDS))
: config.get(CONFIG_CREDS).asText();
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));

var pubsubDestinationConfig = PubsubDestinationConfig.fromJsonNode(config);
final TopicAdminClient adminClient = TopicAdminClient
.create(TopicAdminSettings.newBuilder().setCredentialsProvider(
FixedCredentialsProvider.create(credentials)).build());
FixedCredentialsProvider.create(pubsubDestinationConfig.getCredentials())).build());

// check if topic is present and the service account has necessary permissions on it
final TopicName topicName = TopicName.of(projectId, topicId);
final List<String> requiredPermissions = List.of("pubsub.topics.publish");
final TestIamPermissionsResponse response = adminClient.testIamPermissions(
TestIamPermissionsRequest.newBuilder().setResource(topicName.toString())
TestIamPermissionsRequest.newBuilder().setResource(pubsubDestinationConfig.getTopic().toString())
.addAllPermissions(requiredPermissions).build());
Preconditions.checkArgument(response.getPermissionsList().containsAll(requiredPermissions),
"missing required permissions " + requiredPermissions);
Expand All @@ -77,8 +62,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return new PubsubConsumer(config, configuredCatalog, outputRecordCollector);
final Consumer<AirbyteMessage> outputRecordCollector)
throws IOException {
return new PubsubConsumer(PubsubDestinationConfig.fromJsonNode(config), configuredCatalog, outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.pubsub;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.batching.BatchingSettings;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.pubsub.v1.TopicName;
import io.airbyte.commons.json.Jsons;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.threeten.bp.Duration;

public class PubsubDestinationConfig {

static final String CONFIG_TOPIC_ID = "topic_id";
static final String CONFIG_PROJECT_ID = "project_id";
static final String CONFIG_CREDS = "credentials_json";
static final String CONFIG_ORDERING_ENABLED = "ordering_enabled";
static final String CONFIG_BATCHING_ENABLED = "batching_enabled";
static final String CONFIG_BATCHING_DELAY_THRESHOLD = "batching_delay_threshold";
static final String CONFIG_BATCHING_ELEMENT_COUNT_THRESHOLD = "batching_element_count_threshold";
static final String CONFIG_BATCHING_REQUEST_BYTES_THRESHOLD = "batching_request_bytes_threshold";
private final TopicName topic;
private final ServiceAccountCredentials credentials;
private final boolean orderingEnabled;
private final BatchingSettings batchingSettings;

private PubsubDestinationConfig(TopicName topic,
ServiceAccountCredentials credentials,
boolean orderingEnabled,
BatchingSettings batchingSettings) {
this.topic = topic;
this.credentials = credentials;
this.orderingEnabled = orderingEnabled;
this.batchingSettings = batchingSettings;
}

public static PubsubDestinationConfig fromJsonNode(final JsonNode config) throws IOException {
final String projectId = config.get(CONFIG_PROJECT_ID).asText();
final String topicName = config.get(CONFIG_TOPIC_ID).asText();
final TopicName topic = TopicName.of(projectId, topicName);
final String credentialsString = config.get(CONFIG_CREDS).isObject()
? Jsons.serialize(config.get(CONFIG_CREDS))
: config.get(CONFIG_CREDS).asText();
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));

final boolean orderingEnabled = config.get(CONFIG_ORDERING_ENABLED).asBoolean();

final var batchingSetting = BatchingSettings.newBuilder()
.setIsEnabled(config.get(CONFIG_BATCHING_ENABLED).asBoolean())
.setDelayThreshold(Duration.ofMillis(getOrDefault(config, CONFIG_BATCHING_DELAY_THRESHOLD, JsonNode::asLong, 1L)))
.setRequestByteThreshold(getOrDefault(config, CONFIG_BATCHING_REQUEST_BYTES_THRESHOLD, JsonNode::asLong, 1L))
.setElementCountThreshold(getOrDefault(config, CONFIG_BATCHING_ELEMENT_COUNT_THRESHOLD, JsonNode::asLong, 1L))
.build();

return new PubsubDestinationConfig(topic, credentials, orderingEnabled, batchingSetting);
}

private static <T> T getOrDefault(JsonNode node, String key, Function<JsonNode, T> consumer, T defaultValue) {
var value = node.get(key);
if (value != null) {
return consumer.apply(value);
} else {
return defaultValue;
}
}

public BatchingSettings getBatchingSettings() {
return batchingSettings;
}

public boolean isOrderingEnabled() {
return orderingEnabled;
}

public ServiceAccountCredentials getCredentials() {
return credentials;
}

public TopicName getTopic() {
return topic;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Google PubSub Destination Spec",
"type": "object",
"required": ["project_id", "topic_id", "credentials_json"],
"required": [
"project_id",
"topic_id",
"credentials_json",
"ordering_enabled",
"batching_enabled"
],
"additionalProperties": true,
"properties": {
"project_id": {
Expand All @@ -26,6 +32,39 @@
"description": "The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.com/integrations/destinations/pubsub\">docs</a> if you need help generating this key.",
"title": "Credentials JSON",
"airbyte_secret": true
},
"ordering_enabled": {
"title": "Message Ordering Enabled",
"description": "If TRUE PubSub publisher will have <a href=\"https://cloud.google.com/pubsub/docs/ordering\">message ordering</a> enabled. Every message will have an ordering key of stream",
"type": "boolean",
"default": false
},
"batching_enabled": {
"type": "boolean",
"title": "Message Batching Enabled",
"description": "If TRUE messages will be buffered instead of sending them one by one",
"default": false
},
"batching_delay_threshold": {
"type": "integer",
"title": "Message Batching: Delay Threshold",
"description": "Number of ms before the buffer is flushed",
"default": 1,
"minimum": 1
},
"batching_element_count_threshold": {
"type": "integer",
"title": "Message Batching: Element Count Threshold",
"description": "Number of messages before the buffer is flushed",
"default": 1,
"minimum": 1
},
"batching_request_bytes_threshold": {
"type": "integer",
"title": "Message Batching: Request Bytes Threshold",
"description": "Number of bytes before the buffer is flushed",
"default": 1,
"minimum": 1
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package io.airbyte.integrations.destination.pubsub;

import static com.google.common.base.Strings.nullToEmpty;
import static io.airbyte.integrations.destination.pubsub.PubsubDestination.CONFIG_CREDS;
import static io.airbyte.integrations.destination.pubsub.PubsubDestination.CONFIG_PROJECT_ID;
import static io.airbyte.integrations.destination.pubsub.PubsubDestination.CONFIG_TOPIC_ID;
import static io.airbyte.integrations.destination.pubsub.PubsubDestination.NAMESPACE;
import static io.airbyte.integrations.destination.pubsub.PubsubDestination.STREAM;
import static io.airbyte.integrations.destination.pubsub.PubsubDestinationConfig.CONFIG_BATCHING_ENABLED;
import static io.airbyte.integrations.destination.pubsub.PubsubDestinationConfig.CONFIG_CREDS;
import static io.airbyte.integrations.destination.pubsub.PubsubDestinationConfig.CONFIG_ORDERING_ENABLED;
import static io.airbyte.integrations.destination.pubsub.PubsubDestinationConfig.CONFIG_PROJECT_ID;
import static io.airbyte.integrations.destination.pubsub.PubsubDestinationConfig.CONFIG_TOPIC_ID;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -202,6 +204,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJsonString)
.put(CONFIG_TOPIC_ID, topicId)
.put(CONFIG_BATCHING_ENABLED, true)
.put(CONFIG_ORDERING_ENABLED, true)
.build());

credentials =
Expand Down
Loading

0 comments on commit b7d2681

Please sign in to comment.