Skip to content

Commit

Permalink
Source & Destination sync modes are required (#2500)
Browse files Browse the repository at this point in the history
* Make sync modes required
* Provide Migration script making sure it is always defined for previous sync configs
  • Loading branch information
ChristopheDuong authored Mar 19, 2021
1 parent 35fcdd6 commit 231efae
Show file tree
Hide file tree
Showing 48 changed files with 590 additions and 168 deletions.
7 changes: 4 additions & 3 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1864,18 +1864,19 @@ components:
description: the mutable part of the stream to configure the destination
type: object
additionalProperties: false
required:
- syncMode
- destinationSyncMode
properties:
syncMode:
$ref: "#/components/schemas/SyncMode"
default: full_refresh
cursorField:
description: Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.
type: array
items:
type: string
destinationSyncMode:
$ref: "#/components/schemas/DestinationSyncMode"
default: append
primaryKey:
description: Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.
type: array
Expand Down Expand Up @@ -2155,7 +2156,7 @@ components:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2
AirbyteArchive:
type: string
format: binary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.2.1",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand All @@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
Expand Down Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ enum:
- append
- overwrite
#- upsert_dedup # TODO chris: SCD Type 1 can be implemented later
- append_dedup # SCD Type 2
- append_dedup # SCD Type 1 & 2
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ class Config:
extra = Extra.allow

stream: AirbyteStream
sync_mode: Optional[SyncMode] = "full_refresh"
sync_mode: SyncMode
cursor_field: Optional[List[str]] = Field(
None,
description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.",
)
destination_sync_mode: Optional[DestinationSyncMode] = "append"
destination_sync_mode: DestinationSyncMode
primary_key: Optional[List[str]] = Field(
None,
description="Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb

private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode) {
if (syncMode == null) {
return WriteDisposition.WRITE_APPEND;
throw new IllegalStateException("Undefined destination sync mode");
}
switch (syncMode) {
case OVERWRITE -> {
Expand All @@ -253,7 +253,7 @@ private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode
case APPEND, APPEND_DEDUP -> {
return WriteDisposition.WRITE_APPEND;
}
default -> throw new IllegalStateException("Unrecognized sync mode: " + syncMode);
default -> throw new IllegalStateException("Unrecognized destination sync mode: " + syncMode);
}
}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final Path finalPath = destinationDir.resolve(tableName + ".csv");
CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
final boolean isAppendMode = stream.getDestinationSyncMode() != DestinationSyncMode.OVERWRITE;
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
csvFormat = csvFormat.withSkipHeaderRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer
final String schemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode() != null ? stream.getDestinationSyncMode() : DestinationSyncMode.APPEND;
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
return new WriteConfig(streamName, schemaName, tmpTableName, tableName, syncMode);
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-local-json
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final String streamName = stream.getStream().getName();
final Path finalPath = destinationDir.resolve(namingResolver.getRawTableName(streamName) + ".jsonl");
final Path tmpPath = destinationDir.resolve(namingResolver.getTmpTableName(streamName) + ".jsonl");

final boolean isAppendMode = stream.getDestinationSyncMode() != DestinationSyncMode.OVERWRITE;
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
final boolean isAppendMode = syncMode != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ private static Map<String, Index> createIndices(ConfiguredAirbyteCatalog catalog
final Map<String, Index> map = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String indexName = getIndexName(stream);

if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE && indexExists(client, indexName)) {
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
if (syncMode == null) {
throw new IllegalStateException("Undefined destination sync mode");
}
if (syncMode == DestinationSyncMode.OVERWRITE && indexExists(client, indexName)) {
client.deleteIndex(indexName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import unittest
from unittest.mock import Mock, patch

from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode
from airbyte_protocol.models.airbyte_protocol import DestinationSyncMode
from google_sheets_source.client import GoogleSheetsClient
from google_sheets_source.helpers import Helpers
from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet
Expand Down Expand Up @@ -103,8 +104,16 @@ def test_parse_sheet_and_column_names_from_catalog(self):

catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema)),
ConfiguredAirbyteStream(stream=AirbyteStream(name=sheet2, json_schema=sheet2_schema)),
ConfiguredAirbyteStream(
stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
),
ConfiguredAirbyteStream(
stream=AirbyteStream(name=sheet2, json_schema=sheet2_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
),
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,12 @@ private AutoCloseableIterator<AirbyteMessage> createReadIterator(JdbcDatabase da
cursorOptional.orElse(null),
cursorType),
airbyteMessageIterator);
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH || airbyteStream.getSyncMode() == null) {
} else if (airbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
iterator = getFullRefreshStream(database, streamName, selectedDatabaseFields, table, emittedAt);
} else if (airbyteStream.getSyncMode() == null) {
throw new IllegalArgumentException(String.format("%s requires a source sync mode", AbstractJdbcSource.class));
} else {
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", airbyteStream.getSyncMode(), AbstractJdbcSource.class));
throw new IllegalArgumentException(String.format("%s does not support sync mode: %s.", AbstractJdbcSource.class, airbyteStream.getSyncMode()));
}

final AtomicLong recordCount = new AtomicLong();
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-mssql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/source-mysql
Loading

0 comments on commit 231efae

Please sign in to comment.