From 1b4e323ab3322545262d26fc848b67f776ad7175 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Fri, 14 Oct 2022 10:32:05 -0700 Subject: [PATCH 1/7] Update SpecActivityImpl to build a VersionedStreamFactory --- .../workers/temporal/spec/SpecActivityImpl.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index 5cdb899fdfba..cd7d0fbca579 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -6,7 +6,10 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; +import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; import io.airbyte.commons.temporal.CancellationHandler; +import io.airbyte.commons.version.Version; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobGetSpecConfig; @@ -17,6 +20,8 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.config.WorkerMode; import io.airbyte.workers.general.DefaultGetSpecWorker; +import io.airbyte.workers.internal.AirbyteStreamFactory; +import io.airbyte.workers.internal.VersionedAirbyteStreamFactory; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; @@ -41,6 +46,8 @@ public class SpecActivityImpl implements SpecActivity { private final LogConfigs logConfigs; private final AirbyteApiClient airbyteApiClient; private final String airbyteVersion; + private final AirbyteMessageSerDeProvider serDeProvider; + private final AirbyteMessageVersionedMigratorFactory migratorFactory; public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerConfigs, @Named("specProcessFactory") final ProcessFactory processFactory, @@ -48,7 +55,9 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, final AirbyteApiClient airbyteApiClient, - @Value("${airbyte.version}") final String airbyteVersion) { + @Value("${airbyte.version}") final String airbyteVersion, + final AirbyteMessageSerDeProvider serDeProvider, + final AirbyteMessageVersionedMigratorFactory migratorFactory) { this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.workspaceRoot = workspaceRoot; @@ -56,6 +65,8 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo this.logConfigs = logConfigs; this.airbyteApiClient = airbyteApiClient; this.airbyteVersion = airbyteVersion; + this.serDeProvider = serDeProvider; + this.migratorFactory = migratorFactory; } @Override @@ -82,6 +93,8 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final Integration private CheckedSupplier, Exception> getWorkerFactory( final IntegrationLauncherConfig launcherConfig) { return () -> { + final Version protocolVersion = launcherConfig.getProtocolVersion(); + final AirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion); final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), @@ -89,7 +102,7 @@ private CheckedSupplier, Exception> processFactory, workerConfigs.getResourceRequirements()); - return new DefaultGetSpecWorker(integrationLauncher); + return new DefaultGetSpecWorker(integrationLauncher, streamFactory); }; } From 0002a1a03e791e6f7a36b2b3e1ee26d95f9592b5 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 18 Oct 2022 12:39:05 -0700 Subject: [PATCH 2/7] Enable Protocol Version detection from Stream for SPEC --- .../protocol/AirbyteMessageMigrator.java | 4 + ...irbyteMessageVersionedMigratorFactory.java | 4 + .../VersionedAirbyteStreamFactory.java | 92 +++++++++++++- .../VersionedAirbyteStreamFactoryTest.java | 115 ++++++++++++++++++ .../version-detection/logs-with-version.jsonl | 5 + .../logs-without-spec-message.jsonl | 18 +++ .../logs-without-version.jsonl | 5 + .../temporal/spec/SpecActivityImpl.java | 10 +- 8 files changed, 248 insertions(+), 5 deletions(-) create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java create mode 100644 airbyte-commons-worker/src/test/resources/version-detection/logs-with-version.jsonl create mode 100644 airbyte-commons-worker/src/test/resources/version-detection/logs-without-spec-message.jsonl create mode 100644 airbyte-commons-worker/src/test/resources/version-detection/logs-without-version.jsonl diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java index 316bb7df71db..9683efc0082f 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java @@ -75,6 +75,10 @@ public CurrentVersion upgrade(final PreviousVe return (CurrentVersion) result; } + public Version getMostRecentVersion() { + return new Version(mostRecentVersion, "0", "0"); + } + private Collection> selectMigrations(final Version version) { final Collection> results = migrations.tailMap(version.getMajorVersion()).values(); if (results.isEmpty()) { diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java index afce6264089f..dec45297880e 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageVersionedMigratorFactory.java @@ -23,4 +23,8 @@ public AirbyteMessageVersionedMigrator getVersionedMigrator(final Version return new AirbyteMessageVersionedMigrator<>(this.migrator, version); } + public Version getMostRecentVersion() { + return migrator.getMostRecentVersion(); + } + } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 88410da0c55d..212960f4d3ae 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -5,6 +5,7 @@ package io.airbyte.workers.internal; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.MdcScope; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; @@ -13,7 +14,12 @@ import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer; import io.airbyte.commons.version.Version; import io.airbyte.protocol.models.AirbyteMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Predicate; import java.util.stream.Stream; +import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,10 +32,17 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactory { private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class); + private static final Version fallbackVersion = new Version("0.2.0"); + private static final int BUFFER_READ_AHEAD_LIMIT = 4096; + private static final int MESSAGES_LOOK_AHEAD_FOR_DETECTION = 10; - private final AirbyteMessageDeserializer deserializer; - private final AirbyteMessageVersionedMigrator migrator; - private final Version protocolVersion; + private final AirbyteMessageSerDeProvider serDeProvider; + private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private AirbyteMessageDeserializer deserializer; + private AirbyteMessageVersionedMigrator migrator; + private Version protocolVersion; + + private boolean shouldDetectVersion = false; public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider, final AirbyteMessageVersionedMigratorFactory migratorFactory, @@ -43,6 +56,79 @@ public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProv final MdcScope.Builder containerLogMdcBuilder) { // TODO AirbyteProtocolPredicate needs to be updated to be protocol version aware super(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder); + Preconditions.checkNotNull(protocolVersion); + this.serDeProvider = serDeProvider; + this.migratorFactory = migratorFactory; + this.initializeForProtocolVersion(protocolVersion); + } + + /** + * Create the AirbyteMessage stream. + * + * If detectVersion is set to true, it will decide which protocol version to use from the content of + * the stream rather than the one passed from the constructor. + */ + @SneakyThrows + @Override + public Stream create(final BufferedReader bufferedReader) { + if (shouldDetectVersion) { + final Optional versionMaybe = detectVersion(bufferedReader); + if (versionMaybe.isPresent()) { + logger.info("Detected Protocol Version {}", versionMaybe.get().serialize()); + initializeForProtocolVersion(versionMaybe.get()); + } else { + // No version found, use the default as a fallback + logger.info("Unable to detect Protocol Version, assuming protocol version {}", fallbackVersion.serialize()); + initializeForProtocolVersion(fallbackVersion); + } + } + return super.create(bufferedReader); + } + + /** + * Attempt to detect the version by scanning the stream + * + * Using the BufferedReader reset/mark feature to get a look-ahead. We will attempt to find the + * first SPEC message and decide on a protocol version from this message. + * + * @param bufferedReader the stream to read + * @return The Version if found + * @throws IOException + */ + private Optional detectVersion(final BufferedReader bufferedReader) throws IOException { + bufferedReader.mark(BUFFER_READ_AHEAD_LIMIT); + // Cap detection to the first 10 messages + for (int i = 0; i < MESSAGES_LOOK_AHEAD_FOR_DETECTION; ++i) { + final String line = bufferedReader.readLine(); + final Optional jsonOpt = Jsons.tryDeserialize(line); + if (jsonOpt.isPresent()) { + final JsonNode json = jsonOpt.get(); + if (isSpecMessage(json)) { + final JsonNode protocolVersionNode = json.at("/spec/protocol_version"); + bufferedReader.reset(); + return Optional.ofNullable(protocolVersionNode).filter(Predicate.not(JsonNode::isMissingNode)).map(node -> new Version(node.asText())); + } + } + } + bufferedReader.reset(); + return Optional.empty(); + } + + private boolean isSpecMessage(final JsonNode json) { + final String typeFieldName = "type"; + return json.has(typeFieldName) && "spec".equalsIgnoreCase(json.get(typeFieldName).asText()); + } + + public boolean setDetectVersion(final boolean detectVersion) { + return this.shouldDetectVersion = detectVersion; + } + + public VersionedAirbyteStreamFactory withDetectVersion(final boolean detectVersion) { + setDetectVersion(detectVersion); + return this; + } + + final protected void initializeForProtocolVersion(final Version protocolVersion) { this.deserializer = (AirbyteMessageDeserializer) serDeProvider.getDeserializer(protocolVersion).orElseThrow(); this.migrator = migratorFactory.getVersionedMigrator(protocolVersion); this.protocolVersion = protocolVersion; diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java new file mode 100644 index 000000000000..f6ac35de3a54 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactoryTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.internal; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import io.airbyte.commons.protocol.AirbyteMessageMigrator; +import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; +import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; +import io.airbyte.commons.protocol.migrations.AirbyteMessageMigrationV0; +import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer; +import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer; +import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.AirbyteMessage; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.util.List; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.platform.commons.util.ClassLoaderUtils; + +class VersionedAirbyteStreamFactoryTest { + + AirbyteMessageSerDeProvider serDeProvider; + AirbyteMessageVersionedMigratorFactory migratorFactory; + + final static Version defaultVersion = new Version("0.2.0"); + + @BeforeEach + void beforeEach() { + serDeProvider = spy(new AirbyteMessageSerDeProvider( + List.of(new AirbyteMessageV0Deserializer()), + List.of(new AirbyteMessageV0Serializer()))); + serDeProvider.initialize(); + final AirbyteMessageMigrator migrator = new AirbyteMessageMigrator( + List.of(new AirbyteMessageMigrationV0())); + migrator.initialize(); + migratorFactory = spy(new AirbyteMessageVersionedMigratorFactory(migrator)); + } + + @Test + void testCreate() { + final Version initialVersion = new Version("0.1.2"); + final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion); + + final BufferedReader bufferedReader = new BufferedReader(new StringReader("")); + streamFactory.create(bufferedReader); + + verify(serDeProvider).getDeserializer(initialVersion); + verify(migratorFactory).getVersionedMigrator(initialVersion); + } + + @Test + void testCreateWithVersionDetection() { + final Version initialVersion = new Version("0.0.0"); + final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) + .withDetectVersion(true); + + final BufferedReader bufferedReader = + getBuffereredReader("version-detection/logs-with-version.jsonl"); + final Stream stream = streamFactory.create(bufferedReader); + + long messageCount = stream.toList().size(); + verify(serDeProvider).getDeserializer(initialVersion); + verify(serDeProvider).getDeserializer(new Version("0.5.9")); + assertEquals(1, messageCount); + } + + @Test + void testCreateWithVersionDetectionFallback() { + final Version initialVersion = new Version("0.0.6"); + final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) + .withDetectVersion(true); + + final BufferedReader bufferedReader = + getBuffereredReader("version-detection/logs-without-version.jsonl"); + final Stream stream = streamFactory.create(bufferedReader); + + final long messageCount = stream.toList().size(); + verify(serDeProvider).getDeserializer(initialVersion); + verify(serDeProvider).getDeserializer(defaultVersion); + assertEquals(1, messageCount); + } + + @Test + void testCreateWithVersionDetectionWithoutSpecMessage() { + final Version initialVersion = new Version("0.0.1"); + final VersionedAirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) + .withDetectVersion(true); + + final BufferedReader bufferedReader = + getBuffereredReader("version-detection/logs-without-spec-message.jsonl"); + final Stream stream = streamFactory.create(bufferedReader); + + final long messageCount = stream.toList().size(); + verify(serDeProvider).getDeserializer(initialVersion); + verify(serDeProvider).getDeserializer(defaultVersion); + assertEquals(2, messageCount); + } + + BufferedReader getBuffereredReader(final String resourceFile) { + return new BufferedReader( + new InputStreamReader( + ClassLoaderUtils.getDefaultClassLoader().getResourceAsStream(resourceFile), + Charset.defaultCharset())); + } + +} diff --git a/airbyte-commons-worker/src/test/resources/version-detection/logs-with-version.jsonl b/airbyte-commons-worker/src/test/resources/version-detection/logs-with-version.jsonl new file mode 100644 index 000000000000..9fab56f890af --- /dev/null +++ b/airbyte-commons-worker/src/test/resources/version-detection/logs-with-version.jsonl @@ -0,0 +1,5 @@ +{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} +{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} +{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} +{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} +{"type":"SPEC","spec":{"protocol_version":"0.5.9","documentationUrl":"https://docs.airbyte.io/integrations/destinations/bigquery","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"BigQuery Destination Spec","type":"object","required":["project_id","dataset_location","dataset_id"],"additionalProperties":true,"properties":{"project_id":{"type":"string","description":"The GCP project ID for the project containing the target BigQuery dataset. Read more here.","title":"Project ID","order":0},"dataset_location":{"type":"string","description":"The location of the dataset. Warning: Changes made after creation will not be applied. Read more here.","title":"Dataset Location","order":1,"enum":["US","EU","asia-east1","asia-east2","asia-northeast1","asia-northeast2","asia-northeast3","asia-south1","asia-south2","asia-southeast1","asia-southeast2","australia-southeast1","australia-southeast2","europe-central2","europe-north1","europe-west1","europe-west2","europe-west3","europe-west4","europe-west6","northamerica-northeast1","northamerica-northeast2","southamerica-east1","southamerica-west1","us-central1","us-east1","us-east4","us-west1","us-west2","us-west3","us-west4"]},"dataset_id":{"type":"string","description":"The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more here.","title":"Default Dataset ID","order":2},"loading_method":{"type":"object","title":"Loading Method","description":"Loading method used to send select the way data will be uploaded to BigQuery.
Standard Inserts - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging.
GCS Staging - Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO table to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging here.","order":3,"oneOf":[{"title":"Standard Inserts","required":["method"],"properties":{"method":{"type":"string","const":"Standard"}}},{"title":"GCS Staging","required":["method","gcs_bucket_name","gcs_bucket_path","credential"],"properties":{"method":{"type":"string","const":"GCS Staging","order":0},"credential":{"title":"Credential","description":"An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more here.","type":"object","order":1,"oneOf":[{"title":"HMAC key","required":["credential_type","hmac_key_access_id","hmac_key_secret"],"properties":{"credential_type":{"type":"string","const":"HMAC_KEY","order":0},"hmac_key_access_id":"**********","type":"string","description":"HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.","title":"HMAC Key Access ID","airbyte_secret":true,"examples":["1234567890abcdefghij1234"],"order":1},"hmac_key_secret":"**********","type":"string","description":"The corresponding secret for the access ID. It is a 40-character base-64 encoded string.","title":"HMAC Key Secret","airbyte_secret":true,"examples":["1234567890abcdefghij1234567890ABCDEFGHIJ"],"order":2}]},"gcs_bucket_name":{"title":"GCS Bucket Name","type":"string","description":"The name of the GCS bucket. Read more here.","examples":["airbyte_sync"],"order":2},"gcs_bucket_path":{"title":"GCS Bucket Path","description":"Directory under the GCS bucket where data will be written.","type":"string","examples":["data_sync/test"],"order":3},"keep_files_in_gcs-bucket":{"type":"string","description":"This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.","title":"GCS Tmp Files Afterward Processing (Optional)","default":"Delete all tmp files from GCS","enum":["Delete all tmp files from GCS","Keep all tmp files in GCS"],"order":4}}}]},"credentials_json":"**********","type":"string","description":"The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.","title":"Service Account Key JSON (Required for cloud, optional for open-source)","airbyte_secret":true,"order":4},"transformation_priority":{"type":"string","description":"Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type here. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries here. The default \"interactive\" value is used if not set explicitly.","title":"Transformation Query Run Type (Optional)","default":"interactive","enum":["interactive","batch"],"order":5},"big_query_client_buffer_size_mb":{"title":"Google BigQuery Client Chunk Size (Optional)","description":"Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more here.","type":"integer","minimum":1,"maximum":15,"default":15,"examples":["15"],"order":6}}},"supportsIncremental":true,"supportsNormalization":true,"supportsDBT":true,"supported_destination_sync_modes":["overwrite","append","append_dedup"]} diff --git a/airbyte-commons-worker/src/test/resources/version-detection/logs-without-spec-message.jsonl b/airbyte-commons-worker/src/test/resources/version-detection/logs-without-spec-message.jsonl new file mode 100644 index 000000000000..267d60190925 --- /dev/null +++ b/airbyte-commons-worker/src/test/resources/version-detection/logs-without-spec-message.jsonl @@ -0,0 +1,18 @@ +{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} +{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} +{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} +{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} +{"type":"RECORD","stream":"s","emitted_at":5,"data":{"protocol_version":"0.5.9","documentationUrl":"https://docs.airbyte.io/integrations/destinations/bigquery","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"BigQuery Destination Spec","type":"object","required":["project_id","dataset_location","dataset_id"],"additionalProperties":true,"properties":{"project_id":{"type":"string","description":"The GCP project ID for the project containing the target BigQuery dataset. Read more here.","title":"Project ID","order":0},"dataset_location":{"type":"string","description":"The location of the dataset. Warning: Changes made after creation will not be applied. Read more here.","title":"Dataset Location","order":1,"enum":["US","EU","asia-east1","asia-east2","asia-northeast1","asia-northeast2","asia-northeast3","asia-south1","asia-south2","asia-southeast1","asia-southeast2","australia-southeast1","australia-southeast2","europe-central2","europe-north1","europe-west1","europe-west2","europe-west3","europe-west4","europe-west6","northamerica-northeast1","northamerica-northeast2","southamerica-east1","southamerica-west1","us-central1","us-east1","us-east4","us-west1","us-west2","us-west3","us-west4"]},"dataset_id":{"type":"string","description":"The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more here.","title":"Default Dataset ID","order":2},"loading_method":{"type":"object","title":"Loading Method","description":"Loading method used to send select the way data will be uploaded to BigQuery.
Standard Inserts - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging.
GCS Staging - Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO table to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging here.","order":3,"oneOf":[{"title":"Standard Inserts","required":["method"],"properties":{"method":{"type":"string","const":"Standard"}}},{"title":"GCS Staging","required":["method","gcs_bucket_name","gcs_bucket_path","credential"],"properties":{"method":{"type":"string","const":"GCS Staging","order":0},"credential":{"title":"Credential","description":"An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more here.","type":"object","order":1,"oneOf":[{"title":"HMAC key","required":["credential_type","hmac_key_access_id","hmac_key_secret"],"properties":{"credential_type":{"type":"string","const":"HMAC_KEY","order":0},"hmac_key_access_id":"**********","type":"string","description":"HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.","title":"HMAC Key Access ID","airbyte_secret":true,"examples":["1234567890abcdefghij1234"],"order":1},"hmac_key_secret":"**********","type":"string","description":"The corresponding secret for the access ID. It is a 40-character base-64 encoded string.","title":"HMAC Key Secret","airbyte_secret":true,"examples":["1234567890abcdefghij1234567890ABCDEFGHIJ"],"order":2}]},"gcs_bucket_name":{"title":"GCS Bucket Name","type":"string","description":"The name of the GCS bucket. Read more here.","examples":["airbyte_sync"],"order":2},"gcs_bucket_path":{"title":"GCS Bucket Path","description":"Directory under the GCS bucket where data will be written.","type":"string","examples":["data_sync/test"],"order":3},"keep_files_in_gcs-bucket":{"type":"string","description":"This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.","title":"GCS Tmp Files Afterward Processing (Optional)","default":"Delete all tmp files from GCS","enum":["Delete all tmp files from GCS","Keep all tmp files in GCS"],"order":4}}}]},"credentials_json":"**********","type":"string","description":"The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.","title":"Service Account Key JSON (Required for cloud, optional for open-source)","airbyte_secret":true,"order":4},"transformation_priority":{"type":"string","description":"Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type here. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries here. The default \"interactive\" value is used if not set explicitly.","title":"Transformation Query Run Type (Optional)","default":"interactive","enum":["interactive","batch"],"order":5},"big_query_client_buffer_size_mb":{"title":"Google BigQuery Client Chunk Size (Optional)","description":"Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more here.","type":"integer","minimum":1,"maximum":15,"default":15,"examples":["15"],"order":6}}},"supportsIncremental":true,"supportsNormalization":true,"supportsDBT":true,"supported_destination_sync_modes":["overwrite","append","append_dedup"]} +{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} +{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} +{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} +{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} +{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} +{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} +{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} +{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} +{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} +{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} +{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} +{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} +{"type":"RECORD","stream":"s","emitted_at":6,"data":{"protocol_version":"0.5.9","documentationUrl":"https://docs.airbyte.io/integrations/destinations/bigquery","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"BigQuery Destination Spec","type":"object","required":["project_id","dataset_location","dataset_id"],"additionalProperties":true,"properties":{"project_id":{"type":"string","description":"The GCP project ID for the project containing the target BigQuery dataset. Read more here.","title":"Project ID","order":0},"dataset_location":{"type":"string","description":"The location of the dataset. Warning: Changes made after creation will not be applied. Read more here.","title":"Dataset Location","order":1,"enum":["US","EU","asia-east1","asia-east2","asia-northeast1","asia-northeast2","asia-northeast3","asia-south1","asia-south2","asia-southeast1","asia-southeast2","australia-southeast1","australia-southeast2","europe-central2","europe-north1","europe-west1","europe-west2","europe-west3","europe-west4","europe-west6","northamerica-northeast1","northamerica-northeast2","southamerica-east1","southamerica-west1","us-central1","us-east1","us-east4","us-west1","us-west2","us-west3","us-west4"]},"dataset_id":{"type":"string","description":"The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more here.","title":"Default Dataset ID","order":2},"loading_method":{"type":"object","title":"Loading Method","description":"Loading method used to send select the way data will be uploaded to BigQuery.
Standard Inserts - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging.
GCS Staging - Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO table to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging here.","order":3,"oneOf":[{"title":"Standard Inserts","required":["method"],"properties":{"method":{"type":"string","const":"Standard"}}},{"title":"GCS Staging","required":["method","gcs_bucket_name","gcs_bucket_path","credential"],"properties":{"method":{"type":"string","const":"GCS Staging","order":0},"credential":{"title":"Credential","description":"An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more here.","type":"object","order":1,"oneOf":[{"title":"HMAC key","required":["credential_type","hmac_key_access_id","hmac_key_secret"],"properties":{"credential_type":{"type":"string","const":"HMAC_KEY","order":0},"hmac_key_access_id":"**********","type":"string","description":"HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.","title":"HMAC Key Access ID","airbyte_secret":true,"examples":["1234567890abcdefghij1234"],"order":1},"hmac_key_secret":"**********","type":"string","description":"The corresponding secret for the access ID. It is a 40-character base-64 encoded string.","title":"HMAC Key Secret","airbyte_secret":true,"examples":["1234567890abcdefghij1234567890ABCDEFGHIJ"],"order":2}]},"gcs_bucket_name":{"title":"GCS Bucket Name","type":"string","description":"The name of the GCS bucket. Read more here.","examples":["airbyte_sync"],"order":2},"gcs_bucket_path":{"title":"GCS Bucket Path","description":"Directory under the GCS bucket where data will be written.","type":"string","examples":["data_sync/test"],"order":3},"keep_files_in_gcs-bucket":{"type":"string","description":"This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.","title":"GCS Tmp Files Afterward Processing (Optional)","default":"Delete all tmp files from GCS","enum":["Delete all tmp files from GCS","Keep all tmp files in GCS"],"order":4}}}]},"credentials_json":"**********","type":"string","description":"The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.","title":"Service Account Key JSON (Required for cloud, optional for open-source)","airbyte_secret":true,"order":4},"transformation_priority":{"type":"string","description":"Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type here. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries here. The default \"interactive\" value is used if not set explicitly.","title":"Transformation Query Run Type (Optional)","default":"interactive","enum":["interactive","batch"],"order":5},"big_query_client_buffer_size_mb":{"title":"Google BigQuery Client Chunk Size (Optional)","description":"Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more here.","type":"integer","minimum":1,"maximum":15,"default":15,"examples":["15"],"order":6}}},"supportsIncremental":true,"supportsNormalization":true,"supportsDBT":true,"supported_destination_sync_modes":["overwrite","append","append_dedup"]} diff --git a/airbyte-commons-worker/src/test/resources/version-detection/logs-without-version.jsonl b/airbyte-commons-worker/src/test/resources/version-detection/logs-without-version.jsonl new file mode 100644 index 000000000000..ca5fdddcfa84 --- /dev/null +++ b/airbyte-commons-worker/src/test/resources/version-detection/logs-without-version.jsonl @@ -0,0 +1,5 @@ +{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} +{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} +{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} +{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} +{"type":"SPEC","spec":{"documentationUrl":"https://docs.airbyte.io/integrations/destinations/bigquery","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"BigQuery Destination Spec","type":"object","required":["project_id","dataset_location","dataset_id"],"additionalProperties":true,"properties":{"project_id":{"type":"string","description":"The GCP project ID for the project containing the target BigQuery dataset. Read more here.","title":"Project ID","order":0},"dataset_location":{"type":"string","description":"The location of the dataset. Warning: Changes made after creation will not be applied. Read more here.","title":"Dataset Location","order":1,"enum":["US","EU","asia-east1","asia-east2","asia-northeast1","asia-northeast2","asia-northeast3","asia-south1","asia-south2","asia-southeast1","asia-southeast2","australia-southeast1","australia-southeast2","europe-central2","europe-north1","europe-west1","europe-west2","europe-west3","europe-west4","europe-west6","northamerica-northeast1","northamerica-northeast2","southamerica-east1","southamerica-west1","us-central1","us-east1","us-east4","us-west1","us-west2","us-west3","us-west4"]},"dataset_id":{"type":"string","description":"The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more here.","title":"Default Dataset ID","order":2},"loading_method":{"type":"object","title":"Loading Method","description":"Loading method used to send select the way data will be uploaded to BigQuery.
Standard Inserts - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging.
GCS Staging - Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO table to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging here.","order":3,"oneOf":[{"title":"Standard Inserts","required":["method"],"properties":{"method":{"type":"string","const":"Standard"}}},{"title":"GCS Staging","required":["method","gcs_bucket_name","gcs_bucket_path","credential"],"properties":{"method":{"type":"string","const":"GCS Staging","order":0},"credential":{"title":"Credential","description":"An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more here.","type":"object","order":1,"oneOf":[{"title":"HMAC key","required":["credential_type","hmac_key_access_id","hmac_key_secret"],"properties":{"credential_type":{"type":"string","const":"HMAC_KEY","order":0},"hmac_key_access_id":"**********","type":"string","description":"HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.","title":"HMAC Key Access ID","airbyte_secret":true,"examples":["1234567890abcdefghij1234"],"order":1},"hmac_key_secret":"**********","type":"string","description":"The corresponding secret for the access ID. It is a 40-character base-64 encoded string.","title":"HMAC Key Secret","airbyte_secret":true,"examples":["1234567890abcdefghij1234567890ABCDEFGHIJ"],"order":2}]},"gcs_bucket_name":{"title":"GCS Bucket Name","type":"string","description":"The name of the GCS bucket. Read more here.","examples":["airbyte_sync"],"order":2},"gcs_bucket_path":{"title":"GCS Bucket Path","description":"Directory under the GCS bucket where data will be written.","type":"string","examples":["data_sync/test"],"order":3},"keep_files_in_gcs-bucket":{"type":"string","description":"This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.","title":"GCS Tmp Files Afterward Processing (Optional)","default":"Delete all tmp files from GCS","enum":["Delete all tmp files from GCS","Keep all tmp files in GCS"],"order":4}}}]},"credentials_json":"**********","type":"string","description":"The contents of the JSON service account key. Check out the docs if you need help generating this key. Default credentials will be used if this field is left empty.","title":"Service Account Key JSON (Required for cloud, optional for open-source)","airbyte_secret":true,"order":4},"transformation_priority":{"type":"string","description":"Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type here. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries here. The default \"interactive\" value is used if not set explicitly.","title":"Transformation Query Run Type (Optional)","default":"interactive","enum":["interactive","batch"],"order":5},"big_query_client_buffer_size_mb":{"title":"Google BigQuery Client Chunk Size (Optional)","description":"Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more here.","type":"integer","minimum":1,"maximum":15,"default":15,"examples":["15"],"order":6}}},"supportsIncremental":true,"supportsNormalization":true,"supportsDBT":true,"supported_destination_sync_modes":["overwrite","append","append_dedup"]} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index cd7d0fbca579..44a31b7c812a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -93,8 +93,7 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig, final Integration private CheckedSupplier, Exception> getWorkerFactory( final IntegrationLauncherConfig launcherConfig) { return () -> { - final Version protocolVersion = launcherConfig.getProtocolVersion(); - final AirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion); + final AirbyteStreamFactory streamFactory = getStreamFactory(launcherConfig); final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), @@ -106,4 +105,11 @@ private CheckedSupplier, Exception> }; } + private AirbyteStreamFactory getStreamFactory(final IntegrationLauncherConfig launcherConfig) { + final Version protocolVersion = + launcherConfig.getProtocolVersion() != null ? launcherConfig.getProtocolVersion() : migratorFactory.getMostRecentVersion(); + // Try to detect version from the stream + return new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion).withDetectVersion(true); + } + } From 819a8a617693c12a1e3957126f2d51b2e099a2a4 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Wed, 19 Oct 2022 15:14:34 -0700 Subject: [PATCH 3/7] Print log before the action for better debugging --- .../java/io/airbyte/test/acceptance/BasicAcceptanceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 6bed007bbe68..b1584e3f2c0b 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -1152,8 +1152,8 @@ void testResetAllWhenSchemaIsModifiedForLegacySource() throws Exception { testHarness.assertSourceAndDestinationDbInSync(false); } finally { // Set source back to version it was set to at beginning of test - testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefinitionVersion); LOGGER.info("Set source connector back to per-stream state supported version {}.", currentSourceDefinitionVersion); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, currentSourceDefinitionVersion); } } From 13fc8455a0858ade3bc50f1aeea506344e7ed585 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Wed, 19 Oct 2022 15:18:22 -0700 Subject: [PATCH 4/7] Fix buffer size for protocol detection --- .../workers/internal/VersionedAirbyteStreamFactory.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 212960f4d3ae..13a9c109100a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -33,7 +33,12 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactor private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class); private static final Version fallbackVersion = new Version("0.2.0"); - private static final int BUFFER_READ_AHEAD_LIMIT = 4096; + + // Buffer size to use when detecting the protocol version. + // Given that BufferedReader::reset fails if we try to reset if we go past its buffer size, this + // buffer has to be big enough to contain our longest spec and whatever messages get emitted before + // the SPEC. + private static final int BUFFER_READ_AHEAD_LIMIT = 32000; private static final int MESSAGES_LOOK_AHEAD_FOR_DETECTION = 10; private final AirbyteMessageSerDeProvider serDeProvider; From 47caba3d6a13faf69b63f99f8e221a12e9d72496 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Wed, 19 Oct 2022 15:30:29 -0700 Subject: [PATCH 5/7] Improve detectVersion error handling --- .../VersionedAirbyteStreamFactory.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 13a9c109100a..2b72b6060852 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -101,22 +101,34 @@ public Stream create(final BufferedReader bufferedReader) { * @throws IOException */ private Optional detectVersion(final BufferedReader bufferedReader) throws IOException { + // Buffersize needs to be big enough to containing everything we need for the detection. Otherwise, + // the reset will fail. bufferedReader.mark(BUFFER_READ_AHEAD_LIMIT); - // Cap detection to the first 10 messages - for (int i = 0; i < MESSAGES_LOOK_AHEAD_FOR_DETECTION; ++i) { - final String line = bufferedReader.readLine(); - final Optional jsonOpt = Jsons.tryDeserialize(line); - if (jsonOpt.isPresent()) { - final JsonNode json = jsonOpt.get(); - if (isSpecMessage(json)) { - final JsonNode protocolVersionNode = json.at("/spec/protocol_version"); - bufferedReader.reset(); - return Optional.ofNullable(protocolVersionNode).filter(Predicate.not(JsonNode::isMissingNode)).map(node -> new Version(node.asText())); + try { + // Cap detection to the first 10 messages. When doing the protocol detection, we expect the SPEC + // message to show up early in the stream. Ideally it should be first message however we do not + // enforce this constraint currently so connectors may send LOG messages before. + for (int i = 0; i < MESSAGES_LOOK_AHEAD_FOR_DETECTION; ++i) { + final String line = bufferedReader.readLine(); + final Optional jsonOpt = Jsons.tryDeserialize(line); + if (jsonOpt.isPresent()) { + final JsonNode json = jsonOpt.get(); + if (isSpecMessage(json)) { + final JsonNode protocolVersionNode = json.at("/spec/protocol_version"); + bufferedReader.reset(); + return Optional.ofNullable(protocolVersionNode).filter(Predicate.not(JsonNode::isMissingNode)).map(node -> new Version(node.asText())); + } } } + bufferedReader.reset(); + return Optional.empty(); + } catch (IOException e) { + logger.warn( + "Protocol version detection failed, it is likely than the connector sent more than {}B without an complete SPEC message." + + " A SPEC message that is too long could be the root cause here.", + BUFFER_READ_AHEAD_LIMIT); + throw e; } - bufferedReader.reset(); - return Optional.empty(); } private boolean isSpecMessage(final JsonNode json) { From f3f86369fd333f02966a05e7e57e0c6c5fd6b020 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Thu, 20 Oct 2022 09:04:28 -0700 Subject: [PATCH 6/7] extract constan --- .../workers/internal/VersionedAirbyteStreamFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java index 2b72b6060852..b49190dc8658 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/VersionedAirbyteStreamFactory.java @@ -40,6 +40,7 @@ public class VersionedAirbyteStreamFactory extends DefaultAirbyteStreamFactor // the SPEC. private static final int BUFFER_READ_AHEAD_LIMIT = 32000; private static final int MESSAGES_LOOK_AHEAD_FOR_DETECTION = 10; + private static final String TYPE_FIELD_NAME = "type"; private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; @@ -132,8 +133,7 @@ private Optional detectVersion(final BufferedReader bufferedReader) thr } private boolean isSpecMessage(final JsonNode json) { - final String typeFieldName = "type"; - return json.has(typeFieldName) && "spec".equalsIgnoreCase(json.get(typeFieldName).asText()); + return json.has(TYPE_FIELD_NAME) && "spec".equalsIgnoreCase(json.get(TYPE_FIELD_NAME).asText()); } public boolean setDetectVersion(final boolean detectVersion) { From 46490d4f6cb382f24a6e33ced19606324ce5a6f3 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Thu, 20 Oct 2022 09:08:37 -0700 Subject: [PATCH 7/7] Rename attribute for clarity --- .../commons/protocol/AirbyteMessageMigrator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java index 9683efc0082f..1906e143c33a 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/AirbyteMessageMigrator.java @@ -27,7 +27,7 @@ public class AirbyteMessageMigrator { private final List> migrationsToRegister; private final SortedMap> migrations = new TreeMap<>(); - private String mostRecentVersion = ""; + private String mostRecentMajorVersion = ""; public AirbyteMessageMigrator(List> migrations) { migrationsToRegister = migrations; @@ -47,7 +47,7 @@ public void initialize() { * required migrations */ public PreviousVersion downgrade(final CurrentVersion message, final Version target) { - if (target.getMajorVersion().equals(mostRecentVersion)) { + if (target.getMajorVersion().equals(mostRecentMajorVersion)) { return (PreviousVersion) message; } @@ -64,7 +64,7 @@ public PreviousVersion downgrade(final Current * migrations */ public CurrentVersion upgrade(final PreviousVersion message, final Version source) { - if (source.getMajorVersion().equals(mostRecentVersion)) { + if (source.getMajorVersion().equals(mostRecentMajorVersion)) { return (CurrentVersion) message; } @@ -76,7 +76,7 @@ public CurrentVersion upgrade(final PreviousVe } public Version getMostRecentVersion() { - return new Version(mostRecentVersion, "0", "0"); + return new Version(mostRecentMajorVersion, "0", "0"); } private Collection> selectMigrations(final Version version) { @@ -111,8 +111,8 @@ void registerMigration(final AirbyteMessageMigration migration) { final String key = migration.getPreviousVersion().getMajorVersion(); if (!migrations.containsKey(key)) { migrations.put(key, migration); - if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentVersion) > 0) { - mostRecentVersion = migration.getCurrentVersion().getMajorVersion(); + if (migration.getCurrentVersion().getMajorVersion().compareTo(mostRecentMajorVersion) > 0) { + mostRecentMajorVersion = migration.getCurrentVersion().getMajorVersion(); } } else { throw new RuntimeException("Trying to register a duplicated migration " + migration.getClass().getName());