Skip to content

Commit

Permalink
☝🏼Destinations supports destination sync mode (#2460)
Browse files Browse the repository at this point in the history
* Handle destination sync mode in destinations

* Source & Destination sync modes are required (#2500)

* Provide Migration script making sure it is always defined for previous sync configs
  • Loading branch information
ChristopheDuong authored Mar 26, 2021
1 parent 62263b6 commit 8a29584
Show file tree
Hide file tree
Showing 64 changed files with 812 additions and 225 deletions.
7 changes: 4 additions & 3 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1887,18 +1887,19 @@ components:
description: the mutable part of the stream to configure the destination
type: object
additionalProperties: false
required:
- syncMode
- destinationSyncMode
properties:
syncMode:
$ref: "#/components/schemas/SyncMode"
default: full_refresh
cursorField:
description: Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.
type: array
items:
type: string
destinationSyncMode:
$ref: "#/components/schemas/DestinationSyncMode"
default: append
primaryKey:
description: Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.
type: array
Expand Down Expand Up @@ -2180,7 +2181,7 @@ components:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2
AirbyteArchive:
type: string
format: binary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.0
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand All @@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
Expand Down Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ enum:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ class Config:
extra = Extra.allow

stream: AirbyteStream
sync_mode: Optional[SyncMode] = "full_refresh"
sync_mode: SyncMode
cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.",
)
destination_sync_mode: Optional[DestinationSyncMode] = "append"
destination_sync_mode: DestinationSyncMode
primary_key: Optional[List[str]] = Field(
None,
description="Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@

package io.airbyte.integrations.destination;

import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;

public class WriteConfig {

private final String streamName;
private final String outputNamespaceName;
private final String tmpTableName;
private final String outputTableName;
private final SyncMode syncMode;
private final DestinationSyncMode syncMode;

public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, SyncMode syncMode) {
public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) {
this.streamName = streamName;
this.outputNamespaceName = outputNamespaceName;
this.tmpTableName = tmpTableName;
Expand All @@ -58,7 +58,7 @@ public String getOutputTableName() {
return outputTableName;
}

public SyncMode getSyncMode() {
public DestinationSyncMode getSyncMode() {
return syncMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.DefaultCheckConnectionWorker;
Expand Down Expand Up @@ -292,7 +293,10 @@ public void testIncrementalSync(String messagesFilename, String catalogFilename)
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
configuredCatalog.getStreams().forEach(s -> s.withSyncMode(SyncMode.INCREMENTAL));
configuredCatalog.getStreams().forEach(s -> {
s.withSyncMode(SyncMode.INCREMENTAL);
s.withDestinationSyncMode(DestinationSyncMode.APPEND);
});
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), firstSyncMessages, configuredCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
Expand Down Expand Up @@ -378,6 +379,7 @@ private ConfiguredAirbyteCatalog withFullRefreshSyncModes(ConfiguredAirbyteCatal
for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getStream().getSupportedSyncModes().contains(FULL_REFRESH)) {
configuredStream.setSyncMode(FULL_REFRESH);
configuredStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
}
}
return clone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -231,7 +231,7 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
.setFormatOptions(FormatOptions.json()).build(); // new-line delimited json.

final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration);
final WriteDisposition syncMode = getWriteDisposition(stream.getSyncMode());
final WriteDisposition syncMode = getWriteDisposition(stream.getDestinationSyncMode());

writeConfigs.put(stream.getStream().getName(),
new WriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer, syncMode));
Expand All @@ -242,13 +242,18 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
return new RecordConsumer(bigquery, writeConfigs, catalog);
}

private static WriteDisposition getWriteDisposition(SyncMode syncMode) {
if (syncMode == null || syncMode == SyncMode.FULL_REFRESH) {
return WriteDisposition.WRITE_TRUNCATE;
} else if (syncMode == SyncMode.INCREMENTAL) {
return WriteDisposition.WRITE_APPEND;
} else {
throw new IllegalStateException("Unrecognized sync mode: " + syncMode);
private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode) {
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
switch (syncMode) {
case OVERWRITE -> {
return WriteDisposition.WRITE_TRUNCATE;
}
case APPEND, APPEND_DEDUP -> {
return WriteDisposition.WRITE_APPEND;
}
default -> throw new IllegalStateException("Unrecognized destination sync mode: " + syncMode);
}
}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -105,12 +105,16 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final Path finalPath = destinationDir.resolve(tableName + ".csv");
CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
csvFormat = csvFormat.withSkipHeaderRecord();
}
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isIncremental);
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isAppendMode);
final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-jdbc
Loading

0 comments on commit 8a29584

Please sign in to comment.