-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add message translation to GetSpec #18130
Changes from 8 commits
1b4e323
0002a1a
d8b3439
9911074
819a8a6
13fc845
47caba3
040c7a4
f3f8636
46490d4
8acd879
e980f04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
package io.airbyte.workers.internal; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.base.Preconditions; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.logging.MdcScope; | ||
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; | ||
|
@@ -13,7 +14,12 @@ | |
import io.airbyte.commons.protocol.serde.AirbyteMessageDeserializer; | ||
import io.airbyte.commons.version.Version; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import java.io.BufferedReader; | ||
import java.io.IOException; | ||
import java.util.Optional; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Stream; | ||
import lombok.SneakyThrows; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -26,10 +32,22 @@ | |
public class VersionedAirbyteStreamFactory<T> extends DefaultAirbyteStreamFactory { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(VersionedAirbyteStreamFactory.class); | ||
private static final Version fallbackVersion = new Version("0.2.0"); | ||
|
||
private final AirbyteMessageDeserializer<T> deserializer; | ||
private final AirbyteMessageVersionedMigrator<T> migrator; | ||
private final Version protocolVersion; | ||
// Buffer size to use when detecting the protocol version. | ||
// Given that BufferedReader::reset fails if we try to reset if we go past its buffer size, this | ||
// buffer has to be big enough to contain our longest spec and whatever messages get emitted before | ||
// the SPEC. | ||
private static final int BUFFER_READ_AHEAD_LIMIT = 32000; | ||
private static final int MESSAGES_LOOK_AHEAD_FOR_DETECTION = 10; | ||
|
||
private final AirbyteMessageSerDeProvider serDeProvider; | ||
private final AirbyteMessageVersionedMigratorFactory migratorFactory; | ||
private AirbyteMessageDeserializer<T> deserializer; | ||
private AirbyteMessageVersionedMigrator<T> migrator; | ||
private Version protocolVersion; | ||
|
||
private boolean shouldDetectVersion = false; | ||
|
||
public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProvider, | ||
final AirbyteMessageVersionedMigratorFactory migratorFactory, | ||
|
@@ -43,6 +61,91 @@ public VersionedAirbyteStreamFactory(final AirbyteMessageSerDeProvider serDeProv | |
final MdcScope.Builder containerLogMdcBuilder) { | ||
// TODO AirbyteProtocolPredicate needs to be updated to be protocol version aware | ||
super(new AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder); | ||
Preconditions.checkNotNull(protocolVersion); | ||
this.serDeProvider = serDeProvider; | ||
this.migratorFactory = migratorFactory; | ||
this.initializeForProtocolVersion(protocolVersion); | ||
} | ||
|
||
/** | ||
* Create the AirbyteMessage stream. | ||
* | ||
* If detectVersion is set to true, it will decide which protocol version to use from the content of | ||
* the stream rather than the one passed from the constructor. | ||
*/ | ||
@SneakyThrows | ||
@Override | ||
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) { | ||
if (shouldDetectVersion) { | ||
final Optional<Version> versionMaybe = detectVersion(bufferedReader); | ||
if (versionMaybe.isPresent()) { | ||
logger.info("Detected Protocol Version {}", versionMaybe.get().serialize()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: why use serialized here instead of toString? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It comes from our implementation, |
||
initializeForProtocolVersion(versionMaybe.get()); | ||
} else { | ||
// No version found, use the default as a fallback | ||
logger.info("Unable to detect Protocol Version, assuming protocol version {}", fallbackVersion.serialize()); | ||
initializeForProtocolVersion(fallbackVersion); | ||
} | ||
} | ||
return super.create(bufferedReader); | ||
} | ||
|
||
/** | ||
* Attempt to detect the version by scanning the stream | ||
* | ||
* Using the BufferedReader reset/mark feature to get a look-ahead. We will attempt to find the | ||
* first SPEC message and decide on a protocol version from this message. | ||
* | ||
* @param bufferedReader the stream to read | ||
* @return The Version if found | ||
* @throws IOException | ||
*/ | ||
private Optional<Version> detectVersion(final BufferedReader bufferedReader) throws IOException { | ||
// Buffersize needs to be big enough to containing everything we need for the detection. Otherwise, | ||
// the reset will fail. | ||
bufferedReader.mark(BUFFER_READ_AHEAD_LIMIT); | ||
try { | ||
// Cap detection to the first 10 messages. When doing the protocol detection, we expect the SPEC | ||
// message to show up early in the stream. Ideally it should be first message however we do not | ||
// enforce this constraint currently so connectors may send LOG messages before. | ||
for (int i = 0; i < MESSAGES_LOOK_AHEAD_FOR_DETECTION; ++i) { | ||
final String line = bufferedReader.readLine(); | ||
final Optional<JsonNode> jsonOpt = Jsons.tryDeserialize(line); | ||
if (jsonOpt.isPresent()) { | ||
final JsonNode json = jsonOpt.get(); | ||
if (isSpecMessage(json)) { | ||
final JsonNode protocolVersionNode = json.at("/spec/protocol_version"); | ||
bufferedReader.reset(); | ||
return Optional.ofNullable(protocolVersionNode).filter(Predicate.not(JsonNode::isMissingNode)).map(node -> new Version(node.asText())); | ||
} | ||
} | ||
} | ||
bufferedReader.reset(); | ||
return Optional.empty(); | ||
} catch (IOException e) { | ||
logger.warn( | ||
"Protocol version detection failed, it is likely than the connector sent more than {}B without an complete SPEC message." + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is meant for bytes. |
||
" A SPEC message that is too long could be the root cause here.", | ||
BUFFER_READ_AHEAD_LIMIT); | ||
throw e; | ||
} | ||
} | ||
|
||
private boolean isSpecMessage(final JsonNode json) { | ||
final String typeFieldName = "type"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider extracting to constant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
return json.has(typeFieldName) && "spec".equalsIgnoreCase(json.get(typeFieldName).asText()); | ||
} | ||
|
||
public boolean setDetectVersion(final boolean detectVersion) { | ||
return this.shouldDetectVersion = detectVersion; | ||
} | ||
|
||
public VersionedAirbyteStreamFactory<T> withDetectVersion(final boolean detectVersion) { | ||
setDetectVersion(detectVersion); | ||
return this; | ||
} | ||
|
||
final protected void initializeForProtocolVersion(final Version protocolVersion) { | ||
this.deserializer = (AirbyteMessageDeserializer<T>) serDeProvider.getDeserializer(protocolVersion).orElseThrow(); | ||
this.migrator = migratorFactory.getVersionedMigrator(protocolVersion); | ||
this.protocolVersion = protocolVersion; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.internal; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.verify; | ||
|
||
import io.airbyte.commons.protocol.AirbyteMessageMigrator; | ||
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; | ||
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; | ||
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigrationV0; | ||
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer; | ||
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer; | ||
import io.airbyte.commons.version.Version; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import java.io.BufferedReader; | ||
import java.io.InputStreamReader; | ||
import java.io.StringReader; | ||
import java.nio.charset.Charset; | ||
import java.util.List; | ||
import java.util.stream.Stream; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.platform.commons.util.ClassLoaderUtils; | ||
|
||
class VersionedAirbyteStreamFactoryTest { | ||
|
||
AirbyteMessageSerDeProvider serDeProvider; | ||
AirbyteMessageVersionedMigratorFactory migratorFactory; | ||
|
||
final static Version defaultVersion = new Version("0.2.0"); | ||
|
||
@BeforeEach | ||
void beforeEach() { | ||
serDeProvider = spy(new AirbyteMessageSerDeProvider( | ||
List.of(new AirbyteMessageV0Deserializer()), | ||
List.of(new AirbyteMessageV0Serializer()))); | ||
serDeProvider.initialize(); | ||
final AirbyteMessageMigrator migrator = new AirbyteMessageMigrator( | ||
List.of(new AirbyteMessageMigrationV0())); | ||
migrator.initialize(); | ||
migratorFactory = spy(new AirbyteMessageVersionedMigratorFactory(migrator)); | ||
} | ||
|
||
@Test | ||
void testCreate() { | ||
final Version initialVersion = new Version("0.1.2"); | ||
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion); | ||
|
||
final BufferedReader bufferedReader = new BufferedReader(new StringReader("")); | ||
streamFactory.create(bufferedReader); | ||
|
||
verify(serDeProvider).getDeserializer(initialVersion); | ||
verify(migratorFactory).getVersionedMigrator(initialVersion); | ||
} | ||
|
||
@Test | ||
void testCreateWithVersionDetection() { | ||
final Version initialVersion = new Version("0.0.0"); | ||
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) | ||
.withDetectVersion(true); | ||
|
||
final BufferedReader bufferedReader = | ||
getBuffereredReader("version-detection/logs-with-version.jsonl"); | ||
final Stream<AirbyteMessage> stream = streamFactory.create(bufferedReader); | ||
|
||
long messageCount = stream.toList().size(); | ||
verify(serDeProvider).getDeserializer(initialVersion); | ||
verify(serDeProvider).getDeserializer(new Version("0.5.9")); | ||
assertEquals(1, messageCount); | ||
} | ||
|
||
@Test | ||
void testCreateWithVersionDetectionFallback() { | ||
final Version initialVersion = new Version("0.0.6"); | ||
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) | ||
.withDetectVersion(true); | ||
|
||
final BufferedReader bufferedReader = | ||
getBuffereredReader("version-detection/logs-without-version.jsonl"); | ||
final Stream<AirbyteMessage> stream = streamFactory.create(bufferedReader); | ||
|
||
final long messageCount = stream.toList().size(); | ||
verify(serDeProvider).getDeserializer(initialVersion); | ||
verify(serDeProvider).getDeserializer(defaultVersion); | ||
assertEquals(1, messageCount); | ||
} | ||
|
||
@Test | ||
void testCreateWithVersionDetectionWithoutSpecMessage() { | ||
final Version initialVersion = new Version("0.0.1"); | ||
final VersionedAirbyteStreamFactory<?> streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, initialVersion) | ||
.withDetectVersion(true); | ||
|
||
final BufferedReader bufferedReader = | ||
getBuffereredReader("version-detection/logs-without-spec-message.jsonl"); | ||
final Stream<AirbyteMessage> stream = streamFactory.create(bufferedReader); | ||
|
||
final long messageCount = stream.toList().size(); | ||
verify(serDeProvider).getDeserializer(initialVersion); | ||
verify(serDeProvider).getDeserializer(defaultVersion); | ||
assertEquals(2, messageCount); | ||
} | ||
|
||
BufferedReader getBuffereredReader(final String resourceFile) { | ||
return new BufferedReader( | ||
new InputStreamReader( | ||
ClassLoaderUtils.getDefaultClassLoader().getResourceAsStream(resourceFile), | ||
Charset.defaultCharset())); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}} | ||
{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.destination.bigquery.BigQueryDestination"}} | ||
{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}} | ||
{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} | ||
{"type":"SPEC","spec":{"protocol_version":"0.5.9","documentationUrl":"https://docs.airbyte.io/integrations/destinations/bigquery","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"BigQuery Destination Spec","type":"object","required":["project_id","dataset_location","dataset_id"],"additionalProperties":true,"properties":{"project_id":{"type":"string","description":"The GCP project ID for the project containing the target BigQuery dataset. Read more <a href=\"https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects\">here</a>.","title":"Project ID","order":0},"dataset_location":{"type":"string","description":"The location of the dataset. Warning: Changes made after creation will not be applied. Read more <a href=\"https://cloud.google.com/bigquery/docs/locations\">here</a>.","title":"Dataset Location","order":1,"enum":["US","EU","asia-east1","asia-east2","asia-northeast1","asia-northeast2","asia-northeast3","asia-south1","asia-south2","asia-southeast1","asia-southeast2","australia-southeast1","australia-southeast2","europe-central2","europe-north1","europe-west1","europe-west2","europe-west3","europe-west4","europe-west6","northamerica-northeast1","northamerica-northeast2","southamerica-east1","southamerica-west1","us-central1","us-east1","us-east4","us-west1","us-west2","us-west3","us-west4"]},"dataset_id":{"type":"string","description":"The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more <a href=\"https://cloud.google.com/bigquery/docs/datasets#create-dataset\">here</a>.","title":"Default Dataset ID","order":2},"loading_method":{"type":"object","title":"Loading Method","description":"Loading method used to send select the way data will be uploaded to BigQuery. <br/><b>Standard Inserts</b> - Direct uploading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In almost all cases, you should use staging. <br/><b>GCS Staging</b> - Writes large batches of records to a file, uploads the file to GCS, then uses <b>COPY INTO table</b> to upload the file. Recommended for most workloads for better speed and scalability. Read more about GCS Staging <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#gcs-staging\">here</a>.","order":3,"oneOf":[{"title":"Standard Inserts","required":["method"],"properties":{"method":{"type":"string","const":"Standard"}}},{"title":"GCS Staging","required":["method","gcs_bucket_name","gcs_bucket_path","credential"],"properties":{"method":{"type":"string","const":"GCS Staging","order":0},"credential":{"title":"Credential","description":"An HMAC key is a type of credential and can be associated with a service account or a user account in Cloud Storage. Read more <a href=\"https://cloud.google.com/storage/docs/authentication/hmackeys\">here</a>.","type":"object","order":1,"oneOf":[{"title":"HMAC key","required":["credential_type","hmac_key_access_id","hmac_key_secret"],"properties":{"credential_type":{"type":"string","const":"HMAC_KEY","order":0},"hmac_key_access_id":"**********","type":"string","description":"HMAC key access ID. When linked to a service account, this ID is 61 characters long; when linked to a user account, it is 24 characters long.","title":"HMAC Key Access ID","airbyte_secret":true,"examples":["1234567890abcdefghij1234"],"order":1},"hmac_key_secret":"**********","type":"string","description":"The corresponding secret for the access ID. It is a 40-character base-64 encoded string.","title":"HMAC Key Secret","airbyte_secret":true,"examples":["1234567890abcdefghij1234567890ABCDEFGHIJ"],"order":2}]},"gcs_bucket_name":{"title":"GCS Bucket Name","type":"string","description":"The name of the GCS bucket. Read more <a href=\"https://cloud.google.com/storage/docs/naming-buckets\">here</a>.","examples":["airbyte_sync"],"order":2},"gcs_bucket_path":{"title":"GCS Bucket Path","description":"Directory under the GCS bucket where data will be written.","type":"string","examples":["data_sync/test"],"order":3},"keep_files_in_gcs-bucket":{"type":"string","description":"This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default \"Delete all tmp files from GCS\" value is used if not set explicitly.","title":"GCS Tmp Files Afterward Processing (Optional)","default":"Delete all tmp files from GCS","enum":["Delete all tmp files from GCS","Keep all tmp files in GCS"],"order":4}}}]},"credentials_json":"**********","type":"string","description":"The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key\">docs</a> if you need help generating this key. Default credentials will be used if this field is left empty.","title":"Service Account Key JSON (Required for cloud, optional for open-source)","airbyte_secret":true,"order":4},"transformation_priority":{"type":"string","description":"Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type <a href=\"https://cloud.google.com/bigquery/docs/running-queries#queries\">here</a>. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries <a href=\"https://cloud.google.com/bigquery/docs/running-queries#batch\">here</a>. The default \"interactive\" value is used if not set explicitly.","title":"Transformation Query Run Type (Optional)","default":"interactive","enum":["interactive","batch"],"order":5},"big_query_client_buffer_size_mb":{"title":"Google BigQuery Client Chunk Size (Optional)","description":"Google BigQuery client's chunk (buffer) size (MIN=1, MAX = 15) for each table. The size that will be written by a single RPC. Written data will be buffered and only flushed upon reaching this size or closing the channel. The default 15MB value is used if not set explicitly. Read more <a href=\"https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html\">here</a>.","type":"integer","minimum":1,"maximum":15,"default":15,"examples":["15"],"order":6}}},"supportsIncremental":true,"supportsNormalization":true,"supportsDBT":true,"supported_destination_sync_modes":["overwrite","append","append_dedup"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extract "0" to a named constant to make it easy to understand what it represents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed
mostRecentVersion
tomostRecentMajorVersion
which is actually what this attribute is. This should make it clearer already.If you think it still helps to have the other constants, happy to change it as well.