Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSSQL Source : Standardize spec.json for DB connectors that support log-based CDC replication #16215

Merged
merged 27 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
979fe1f
Fixed bucket naming for S3
VitaliiMaltsev Aug 1, 2022
7d2963a
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 1, 2022
e921180
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 2, 2022
0e6f7df
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 7, 2022
7924657
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 8, 2022
b5c1106
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 10, 2022
fc34cfb
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 11, 2022
8ae35a5
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 11, 2022
e5e1d12
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 11, 2022
f003cc6
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 15, 2022
a526c81
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 15, 2022
b9a4c3d
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 17, 2022
3917237
Merge remote-tracking branch 'origin/master'
VitaliiMaltsev Aug 17, 2022
aeae719
removed redundant configs
VitaliiMaltsev Aug 17, 2022
9ec1b7a
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 19, 2022
ebbcedf
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Aug 22, 2022
c2a123b
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 1, 2022
0b2cad7
MSSQL Source : Standardize spec.json for DB connectors that support l…
VitaliiMaltsev Sep 1, 2022
28875bd
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 1, 2022
749b8ca
Merge branch 'master' into vmaltsev/12917-mssql-source-standartize-spec
VitaliiMaltsev Sep 1, 2022
b3c7e13
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 1, 2022
e9b8486
Merge branch 'master' of https://github.com/airbytehq/airbyte
VitaliiMaltsev Sep 3, 2022
f9a14a5
Merge branch 'master' into vmaltsev/12917-mssql-source-standartize-spec
VitaliiMaltsev Sep 3, 2022
25929ff
bump version
VitaliiMaltsev Sep 3, 2022
7826346
Merge remote-tracking branch 'origin/vmaltsev/12917-mssql-source-stan…
VitaliiMaltsev Sep 3, 2022
d4a3b60
bump version
VitaliiMaltsev Sep 3, 2022
91a3dc2
auto-bump connector version [ci skip]
octavia-squidington-iii Sep 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
}
]
},
"replication": {
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
Expand All @@ -97,9 +97,9 @@
{
"title": "Standard",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
Expand All @@ -111,9 +111,9 @@
{
"title": "Logical Replication (CDC)",
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class MssqlCdcHelper {
// it is an oneOf object
private static final String REPLICATION_FIELD = "replication";
private static final String REPLICATION_TYPE_FIELD = "replication_type";
private static final String METHOD_FIELD = "method";
private static final String CDC_SNAPSHOT_ISOLATION_FIELD = "snapshot_isolation";
private static final String CDC_DATA_TO_SYNC_FIELD = "data_to_sync";

Expand Down Expand Up @@ -91,14 +92,19 @@ public static DataToSync from(final String value) {
@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
}
]
},
"replication": {
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
Expand All @@ -110,9 +110,9 @@
{
"title": "Standard",
"description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "STANDARD",
"enum": ["STANDARD"],
Expand All @@ -124,9 +124,9 @@
{
"title": "Logical Replication (CDC)",
"description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"required": ["replication_type"],
"required": ["method"],
"properties": {
"replication_type": {
"method": {
"type": "string",
"const": "CDC",
"enum": ["CDC"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Int
container.start();

final JsonNode replicationConfig = Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"));

Expand All @@ -107,7 +107,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Int
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, TEST_USER_NAME)
.put(JdbcUtils.PASSWORD_KEY, TEST_USER_PASSWORD)
.put("replication", replicationConfig)
.put("replication_method", replicationConfig)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected Database setupDatabase() throws Exception {
container.start();

final JsonNode replicationConfig = Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"));

Expand All @@ -42,7 +42,7 @@ protected Database setupDatabase() throws Exception {
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put("replication", replicationConfig)
.put("replication_method", replicationConfig)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected String getImageName() {
@Override
protected Database setupDatabase(final String dbName) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("replication_type", "Standard")
.put("method", "Standard")
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -49,7 +49,7 @@ protected Database setupDatabase(final String dbName) {
.put(JdbcUtils.DATABASE_KEY, dbName) // set your db name
.put(JdbcUtils.USERNAME_KEY, "your_username")
.put(JdbcUtils.PASSWORD_KEY, "your_pass")
.put("replication", replicationMethod)
.put("replication_method", replicationMethod)
.build());

dslContext = DSLContextFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void init() {
source = new MssqlSource();

final JsonNode replicationConfig = Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"));
config = Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -96,7 +96,7 @@ private void init() {
.put(JdbcUtils.SCHEMAS_KEY, List.of(MODELS_SCHEMA, MODELS_SCHEMA + "_random"))
.put(JdbcUtils.USERNAME_KEY, TEST_USER_NAME)
.put(JdbcUtils.PASSWORD_KEY, TEST_USER_PASSWORD)
.put("replication", replicationConfig)
.put("replication_method", replicationConfig)
.build());

dataSource = DataSourceFactory.create(
Expand Down Expand Up @@ -279,7 +279,7 @@ void testAssertSnapshotIsolationAllowed() {
@Test
void testAssertSnapshotIsolationDisabled() {
final JsonNode replicationConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("replication_type", "CDC")
.put("method", "CDC")
.put("data_to_sync", "New Changes Only")
// set snapshot_isolation level to "Read Committed" to disable snapshot
.put("snapshot_isolation", "Read Committed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,30 @@ public void testIsCdc() {
assertTrue(MssqlCdcHelper.isCdc(LEGACY_CDC_CONFIG));

// new replication method config since version 0.4.0
final JsonNode newNonCdc = Jsons.jsonNode(Map.of("replication",
Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
final JsonNode newNonCdc = Jsons.jsonNode(Map.of("replication_method",
Jsons.jsonNode(Map.of("method", "STANDARD"))));
assertFalse(MssqlCdcHelper.isCdc(newNonCdc));

final JsonNode newCdc = Jsons.jsonNode(Map.of("replication",
final JsonNode newCdc = Jsons.jsonNode(Map.of("replication_method",
Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(newCdc));

// migration from legacy to new config
final JsonNode mixNonCdc = Jsons.jsonNode(Map.of(
"replication_method", "CDC",
"replication", Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
"replication_method", Jsons.jsonNode(Map.of("method", "STANDARD")),
"replication", Jsons.jsonNode(Map.of("replication_type", "CDC"))));
assertFalse(MssqlCdcHelper.isCdc(mixNonCdc));

final JsonNode mixCdc = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"replication_type", "Standard",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot")),
"replication_method", Jsons.jsonNode(Map.of(
"method", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(mixCdc));
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----------------------------------------------------- |:-------------------------------------------------------------------------------------------------------|
| 0.4.17 | 2022-08-17 | [14910](https://github.com/airbytehq/airbyte/pull/14910) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. |
| 0.4.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field |
| 0.4.15 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state |
| 0.4.14 | 2022-08-10 | [15430](https://github.com/airbytehq/airbyte/pull/15430) | fixed a bug on handling special character on database name
Expand Down