Skip to content

Commit

Permalink
MSSQL Source : Standardize spec.json for DB connectors that support l…
Browse files Browse the repository at this point in the history
…og-based CDC replication (airbytehq#16215)

* Fixed bucket naming for S3

* removed redundant configs

* MSSQL Source : Standardize spec.json for DB connectors that support log-based CDC replication

* bump version

* bump version
  • Loading branch information
VitaliiMaltsev authored and jhammarstedt committed Oct 31, 2022
1 parent b9d5101 commit 0416bd2
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.17
dockerImageTag: 0.4.18
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
12 changes: 6 additions & 6 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5273,7 +5273,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.17"
- dockerImage: "airbyte/source-mssql:0.4.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -5374,7 +5374,7 @@
description: "Specifies the host name of the server. The value of\
\ this property must match the subject property of the certificate."
order: 7
replication:
replication_method:
type: "object"
title: "Replication Method"
description: "The replication method used for extracting data from the database.\
Expand All @@ -5389,9 +5389,9 @@
description: "Standard replication requires no setup on the DB side but\
\ will not be able to represent deletions incrementally."
required:
- "replication_type"
- "method"
properties:
replication_type:
method:
type: "string"
const: "STANDARD"
enum:
Expand All @@ -5402,9 +5402,9 @@
description: "CDC uses {TBC} to detect inserts, updates, and deletes.\
\ This needs to be configured on the source database itself."
required:
- "replication_type"
- "method"
properties:
replication_type:
method:
type: "string"
const: "CDC"
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.17
LABEL io.airbyte.version=0.4.18
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
}
]
},
"replication": {
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
Expand All @@ -97,9 +97,9 @@
{
"title": "Standard",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
Expand All @@ -111,9 +111,9 @@
{
"title": "Logical Replication (CDC)",
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.17
LABEL io.airbyte.version=0.4.18
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class MssqlCdcHelper {
// it is an oneOf object
private static final String REPLICATION_FIELD = "replication";
private static final String REPLICATION_TYPE_FIELD = "replication_type";
private static final String METHOD_FIELD = "method";
private static final String CDC_SNAPSHOT_ISOLATION_FIELD = "snapshot_isolation";
private static final String CDC_DATA_TO_SYNC_FIELD = "data_to_sync";

Expand Down Expand Up @@ -91,14 +92,19 @@ public static DataToSync from(final String value) {
@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
}
]
},
"replication": {
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
Expand All @@ -110,9 +110,9 @@
{
"title": "Standard",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
Expand All @@ -124,9 +124,9 @@
{
"title": "Logical Replication (CDC)",
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Int
container.start();

final JsonNode replicationConfig = Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"));

Expand All @@ -107,7 +107,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Int
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, TEST_USER_NAME)
.put(JdbcUtils.PASSWORD_KEY, TEST_USER_PASSWORD)
.put("replication", replicationConfig)
.put("replication_method", replicationConfig)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected Database setupDatabase() throws Exception {
container.start();

final JsonNode replicationConfig = Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"));

Expand All @@ -42,7 +42,7 @@ protected Database setupDatabase() throws Exception {
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication", replicationConfig)
.put("replication_method", replicationConfig)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected String getImageName() {
@Override
protected Database setupDatabase(final String dbName) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("replication_type", "Standard")
.put("method", "Standard")
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -49,7 +49,7 @@ protected Database setupDatabase(final String dbName) {
.put(JdbcUtils.DATABASE_KEY, dbName) // set your db name
.put(JdbcUtils.USERNAME_KEY, "your_username")
.put(JdbcUtils.PASSWORD_KEY, "your_pass")
.put("replication", replicationMethod)
.put("replication_method", replicationMethod)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void init() {
source = new MssqlSource();

final JsonNode replicationConfig = Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"));
config = Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -96,7 +96,7 @@ private void init() {
.put(JdbcUtils.SCHEMAS_KEY, List.of(MODELS_SCHEMA, MODELS_SCHEMA + "_random"))
.put(JdbcUtils.USERNAME_KEY, TEST_USER_NAME)
.put(JdbcUtils.PASSWORD_KEY, TEST_USER_PASSWORD)
.put("replication", replicationConfig)
.put("replication_method", replicationConfig)
.build());

dataSource = DataSourceFactory.create(
Expand Down Expand Up @@ -279,7 +279,7 @@ void testAssertSnapshotIsolationAllowed() {
@Test
void testAssertSnapshotIsolationDisabled() {
final JsonNode replicationConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("replication_type", "CDC")
.put("method", "CDC")
.put("data_to_sync", "New Changes Only")
// set snapshot_isolation level to "Read Committed" to disable snapshot
.put("snapshot_isolation", "Read Committed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,30 @@ public void testIsCdc() {
assertTrue(MssqlCdcHelper.isCdc(LEGACY_CDC_CONFIG));

// new replication method config since version 0.4.0
final JsonNode newNonCdc = Jsons.jsonNode(Map.of("replication",
Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
final JsonNode newNonCdc = Jsons.jsonNode(Map.of("replication_method",
Jsons.jsonNode(Map.of("method", "STANDARD"))));
assertFalse(MssqlCdcHelper.isCdc(newNonCdc));

final JsonNode newCdc = Jsons.jsonNode(Map.of("replication",
final JsonNode newCdc = Jsons.jsonNode(Map.of("replication_method",
Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(newCdc));

// migration from legacy to new config
final JsonNode mixNonCdc = Jsons.jsonNode(Map.of(
"replication_method", "CDC",
"replication", Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
"replication_method", Jsons.jsonNode(Map.of("method", "STANDARD")),
"replication", Jsons.jsonNode(Map.of("replication_type", "CDC"))));
assertFalse(MssqlCdcHelper.isCdc(mixNonCdc));

final JsonNode mixCdc = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"replication_type", "Standard",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot")),
"replication_method", Jsons.jsonNode(Map.of(
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(mixCdc));
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----------------------------------------------------- |:-------------------------------------------------------------------------------------------------------|
| 0.4.18 | 2022-09-03 | [14910](https://github.com/airbytehq/airbyte/pull/14910) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. |
| 0.4.17 | 2022-09-01 | [16261](https://github.com/airbytehq/airbyte/pull/16261) | Emit state messages more frequently |
| 0.4.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.4.15 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state |
Expand Down

0 comments on commit 0416bd2

Please sign in to comment.