Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message translation to GetSpec #18130

Merged
merged 12 commits into from
Oct 21, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public <PreviousVersion, CurrentVersion> CurrentVersion upgrade(final PreviousVe
return (CurrentVersion) result;
}

public Version getMostRecentVersion() {
return new Version(mostRecentVersion, "0", "0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extract "0" to a named constant to make it easy to understand what it represents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed mostRecentVersion to mostRecentMajorVersion which is actually what this attribute is. This should make it clearer already.
If you think it still helps to have the other constants, happy to change it as well.

}

private Collection<AirbyteMessageMigration<?, ?>> selectMigrations(final Version version) {
final Collection<AirbyteMessageMigration<?, ?>> results = migrations.tailMap(version.getMajorVersion()).values();
if (results.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ public <T> AirbyteMessageVersionedMigrator<T> getVersionedMigrator(final Version
return new AirbyteMessageVersionedMigrator<>(this.migrator, version);
}

public Version getMostRecentVersion() {
return migrator.getMostRecentVersion();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.internal;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
Expand All @@ -13,7 +14,12 @@
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,10 +32,17 @@
public class VersionedAirbyteStreamFactory<T> extends DefaultAirbyteStreamFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class);
private static final Version fallbackVersion = new Version("0.2.0");
private static final int BUFFER_READ_AHEAD_LIMIT = 4096;
private static final int MESSAGES_LOOK_AHEAD_FOR_DETECTION = 10;

private final AirbyteMessageDeserializer<T> deserializer;
private final AirbyteMessageVersionedMigrator<T> migrator;
private final Version protocolVersion;
private final AirbyteMessageSerDeProvider serDeProvider;
private final AirbyteMessageVersionedMigratorFactory migratorFactory;
private AirbyteMessageDeserializer<T> deserializer;
private AirbyteMessageVersionedMigrator<T> migrator;
private Version protocolVersion;

private boolean shouldDetectVersion = false;

public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider,
final AirbyteMessageVersionedMigratorFactory migratorFactory,
Expand All @@ -43,6 +56,79 @@ public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProv
final MdcScope.Builder containerLogMdcBuilder) {
// TODO AirbyteProtocolPredicate needs to be updated to be protocol version aware
super(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder);
Preconditions.checkNotNull(protocolVersion);
this.serDeProvider = serDeProvider;
this.migratorFactory = migratorFactory;
this.initializeForProtocolVersion(protocolVersion);
}

/**
* Create the AirbyteMessage stream.
*
* If detectVersion is set to true, it will decide which protocol version to use from the content of
* the stream rather than the one passed from the constructor.
*/
@SneakyThrows
@Override
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
if (shouldDetectVersion) {
final Optional<Version> versionMaybe = detectVersion(bufferedReader);
if (versionMaybe.isPresent()) {
logger.info("Detected Protocol Version {}", versionMaybe.get().serialize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why use serialized here instead of toString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes from our implementation, serialize returns a traditional "<major>.<minor>.patch>" while toString is a pretty print of the class.

initializeForProtocolVersion(versionMaybe.get());
} else {
// No version found, use the default as a fallback
logger.info("Unable to detect Protocol Version, assuming protocol version {}", fallbackVersion.serialize());
initializeForProtocolVersion(fallbackVersion);
}
}
return super.create(bufferedReader);
}

/**
* Attempt to detect the version by scanning the stream
*
* Using the BufferedReader reset/mark feature to get a look-ahead. We will attempt to find the
* first SPEC message and decide on a protocol version from this message.
*
* @param bufferedReader the stream to read
* @return The Version if found
* @throws IOException
*/
private Optional<Version> detectVersion(final BufferedReader bufferedReader) throws IOException {
bufferedReader.mark(BUFFER_READ_AHEAD_LIMIT);
// Cap detection to the first 10 messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we cap at 10.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably add more details in the comments, ideally, the version detection should only read the first message.
In practice, the connectors are sending log messages before the actual SPEC message. The main idea behind capping is that we should not be reading ahead too far in order to detect the protocol version.
Another constraint is how BufferedReader::mark/reset work, if we read ahead too far, it will throw and exception and we won't be able to reset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments.

for (int i = 0; i < MESSAGES_LOOK_AHEAD_FOR_DETECTION; ++i) {
final String line = bufferedReader.readLine();
final Optional<JsonNode> jsonOpt = Jsons.tryDeserialize(line);
if (jsonOpt.isPresent()) {
final JsonNode json = jsonOpt.get();
if (isSpecMessage(json)) {
final JsonNode protocolVersionNode = json.at("/spec/protocol_version");
bufferedReader.reset();
return Optional.ofNullable(protocolVersionNode).filter(Predicate.not(JsonNode::isMissingNode)).map(node -> new Version(node.asText()));
benmoriceau marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
bufferedReader.reset();
return Optional.empty();
}

private boolean isSpecMessage(final JsonNode json) {
final String typeFieldName = "type";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting to constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return json.has(typeFieldName) && "spec".equalsIgnoreCase(json.get(typeFieldName).asText());
}

public boolean setDetectVersion(final boolean detectVersion) {
return this.shouldDetectVersion = detectVersion;
}

public VersionedAirbyteStreamFactory<T> withDetectVersion(final boolean detectVersion) {
setDetectVersion(detectVersion);
return this;
}

final protected void initializeForProtocolVersion(final Version protocolVersion) {
this.deserializer = (AirbyteMessageDeserializer<T>) serDeProvider.getDeserializer(protocolVersion).orElseThrow();
this.migrator = migratorFactory.getVersionedMigrator(protocolVersion);
this.protocolVersion = protocolVersion;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.internal;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import io.airbyte.commons.protocol.AirbyteMessageMigrator;
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigrationV0;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.nio.charset.Charset;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.ClassLoaderUtils;

class VersionedAirbyteStreamFactoryTest {

AirbyteMessageSerDeProvider serDeProvider;
AirbyteMessageVersionedMigratorFactory migratorFactory;

final static Version defaultVersion = new Version("0.2.0");

@BeforeEach
void beforeEach() {
serDeProvider = spy(new AirbyteMessageSerDeProvider(
List.of(new AirbyteMessageV0Deserializer()),
List.of(new AirbyteMessageV0Serializer())));
serDeProvider.initialize();
final AirbyteMessageMigrator migrator = new AirbyteMessageMigrator(
List.of(new AirbyteMessageMigrationV0()));
migrator.initialize();
migratorFactory = spy(new AirbyteMessageVersionedMigratorFactory(migrator));
}

@Test
void testCreate() {
final Version initialVersion = new Version("0.1.2");
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion);

final BufferedReader bufferedReader = new BufferedReader(new StringReader(""));
streamFactory.create(bufferedReader);

verify(serDeProvider).getDeserializer(initialVersion);
verify(migratorFactory).getVersionedMigrator(initialVersion);
}

@Test
void testCreateWithVersionDetection() {
final Version initialVersion = new Version("0.0.0");
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion)
.withDetectVersion(true);

final BufferedReader bufferedReader =
getBuffereredReader("version-detection/logs-with-version.jsonl");
final Stream<AirbyteMessage> stream = streamFactory.create(bufferedReader);

long messageCount = stream.toList().size();
verify(serDeProvider).getDeserializer(initialVersion);
verify(serDeProvider).getDeserializer(new Version("0.5.9"));
assertEquals(1, messageCount);
}

@Test
void testCreateWithVersionDetectionFallback() {
final Version initialVersion = new Version("0.0.6");
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion)
.withDetectVersion(true);

final BufferedReader bufferedReader =
getBuffereredReader("version-detection/logs-without-version.jsonl");
final Stream<AirbyteMessage> stream = streamFactory.create(bufferedReader);

final long messageCount = stream.toList().size();
verify(serDeProvider).getDeserializer(initialVersion);
verify(serDeProvider).getDeserializer(defaultVersion);
assertEquals(1, messageCount);
}

@Test
void testCreateWithVersionDetectionWithoutSpecMessage() {
final Version initialVersion = new Version("0.0.1");
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion)
.withDetectVersion(true);

final BufferedReader bufferedReader =
getBuffereredReader("version-detection/logs-without-spec-message.jsonl");
final Stream<AirbyteMessage> stream = streamFactory.create(bufferedReader);

final long messageCount = stream.toList().size();
verify(serDeProvider).getDeserializer(initialVersion);
verify(serDeProvider).getDeserializer(defaultVersion);
assertEquals(2, messageCount);
}

BufferedReader getBuffereredReader(final String resourceFile) {
return new BufferedReader(
new InputStreamReader(
ClassLoaderUtils.getDefaultClassLoader().getResourceAsStream(resourceFile),
Charset.defaultCharset()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}}
{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}}
{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}}
{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}}
{"type":"SPEC","spec":{"protocol_version":"0.5.9","documentationUrl":"https://docs.airbyte.io/integrations/destinations/bigquery","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"BigQuery Destination Spec","type":"object","required":["project_id","dataset_location","dataset_id"],"additionalProperties":true,"properties":{"project_id":{"type":"string","description":"The GCP project ID for the project containing the target BigQuery dataset. Read more <a href=\"https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects\">here</a>.","title":"Project ID","order":0},"dataset_location":{"type":"string","description":"The location of the dataset. Warning: Changes made after creation will not be applied. Read more <a href=\"https://cloud.google.com/bigquery/docs/locations\">here</a>.","title":"Dataset Location","order":1,"enum":["US","EU","asia-east1","asia-east2","asia-northeast1","asia-northeast2","asia-northeast3","asia-south1","asia-south2","asia-southeast1","asia-southeast2","australia-southeast1","australia-southeast2","europe-central2","europe-north1","europe-west1","europe-west2","europe-west3","europe-west4","europe-west6","northamerica-northeast1","northamerica-northeast2","southamerica-east1","southamerica-west1","us-central1","us-east1","us-east4","us-west1","us-west2","us-west3","us-west4"]},"dataset_id":{"type":"string","description":"The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more <a href=\"https://cloud.google.com/bigquery/docs/datasets#create-dataset\">here</a>.","title":"Default Dataset ID","order":2},"loading_method":{"type":"object","title":"Loading Method","description":"Loading method used to send select the way data will be uploaded to BigQuery. <br/><b>Standard Inserts</b> - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging. <br/><b>GCS Staging</b> - Writes large batches of records to a file, uploads the file to GCS, then uses <b>COPY INTO table</b> to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#gcs-staging\">here</a>.","order":3,"oneOf":[{"title":"Standard Inserts","required":["method"],"properties":{"method":{"type":"string","const":"Standard"}}},{"title":"GCS Staging","required":["method","gcs_bucket_name","gcs_bucket_path","credential"],"properties":{"method":{"type":"string","const":"GCS Staging","order":0},"credential":{"title":"Credential","description":"An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more <a href=\"https://cloud.google.com/storage/docs/authentication/hmackeys\">here</a>.","type":"object","order":1,"oneOf":[{"title":"HMAC key","required":["credential_type","hmac_key_access_id","hmac_key_secret"],"properties":{"credential_type":{"type":"string","const":"HMAC_KEY","order":0},"hmac_key_access_id":"**********","type":"string","description":"HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.","title":"HMAC Key Access ID","airbyte_secret":true,"examples":["1234567890abcdefghij1234"],"order":1},"hmac_key_secret":"**********","type":"string","description":"The corresponding secret for the access ID. It is a 40-character base-64 encoded string.","title":"HMAC Key Secret","airbyte_secret":true,"examples":["1234567890abcdefghij1234567890ABCDEFGHIJ"],"order":2}]},"gcs_bucket_name":{"title":"GCS Bucket Name","type":"string","description":"The name of the GCS bucket. Read more <a href=\"https://cloud.google.com/storage/docs/naming-buckets\">here</a>.","examples":["airbyte_sync"],"order":2},"gcs_bucket_path":{"title":"GCS Bucket Path","description":"Directory under the GCS bucket where data will be written.","type":"string","examples":["data_sync/test"],"order":3},"keep_files_in_gcs-bucket":{"type":"string","description":"This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.","title":"GCS Tmp Files Afterward Processing (Optional)","default":"Delete all tmp files from GCS","enum":["Delete all tmp files from GCS","Keep all tmp files in GCS"],"order":4}}}]},"credentials_json":"**********","type":"string","description":"The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\">docs</a> if you need help generating this key. Default credentials will be used if this field is left empty.","title":"Service Account Key JSON (Required for cloud, optional for open-source)","airbyte_secret":true,"order":4},"transformation_priority":{"type":"string","description":"Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type <a href=\"https://cloud.google.com/bigquery/docs/running-queries#queries\">here</a>. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries <a href=\"https://cloud.google.com/bigquery/docs/running-queries#batch\">here</a>. The default \"interactive\" value is used if not set explicitly.","title":"Transformation Query Run Type (Optional)","default":"interactive","enum":["interactive","batch"],"order":5},"big_query_client_buffer_size_mb":{"title":"Google BigQuery Client Chunk Size (Optional)","description":"Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more <a href=\"https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html\">here</a>.","type":"integer","minimum":1,"maximum":15,"default":15,"examples":["15"],"order":6}}},"supportsIncremental":true,"supportsNormalization":true,"supportsDBT":true,"supported_destination_sync_modes":["overwrite","append","append_dedup"]}
Loading