Skip to content

Commit

Permalink
🎉 Source MSSQL: implementation for CDC (#4689)
Browse files Browse the repository at this point in the history
* first few classes for mssql cdc

* wip

* mssql cdc working against unit tests

* increment version

* add cdc acceptance test

* tweaks

* add file

* working on comprehensive tests

* change isolation from snapshot to read_committed_snapshot

* finalised type tests

* Revert "change isolation from snapshot to read_committed_snapshot"

This reverts commit 20c6768.

* small docstring fix

* remove unused imports

* stress test fixes

* minor formatting improvements

* mssql cdc docs

* finish off cdc docs

* format fix

* update connector version

* add to changelog

* fix for sql server agent offline failing cdc enable on tables

* final structure

* few more updates

* undo unwanted changes

* add abstract test + more refinement

* remove CDC metadata to debezium

* use new cdc abstraction for mysql

* undo wanted change

* use cdc abstraction for postgres

* add files

* pull in latest changes

* ready

* rename class + add missing property

* use renamed class + move constants to MySqlSource

* use renamed class + move constants to PostgresSource

* move debezium to bases + upgrade debezium version + review comments

* downgrade version + minor fixes

* bring in latest changes from cdc abstraction

* reset to minutes

* bring in the latest changes

* format

* fix build

* address review comments

* bring in latest changes

* bring in latest changes

* use common abstraction for CDC via debezium for sql server

* remove debezium from build

* finalise PR

* should return Optional

* pull in latest changes

* pull in latest changes

* address review comments

* use common abstraction for CDC via debezium for mysql (#4604)

* use new cdc abstraction for mysql

* undo wanted change

* pull in latest changes

* use renamed class + move constants to MySqlSource

* bring in latest changes from cdc abstraction

* format

* bring in latest changes

* pull in latest changes

* use common abstraction for CDC via debezium for postgres (#4607)

* use cdc abstraction for postgres

* add files

* ready

* use renamed class + move constants to PostgresSource

* bring in the latest changes

* bring in latest changes

* pull in latest changes

* lower version for tests to run on CI

* format

* Update docs/integrations/sources/mssql.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* addressing review comments

* fix for testGetTargetPosition

* format changes

Co-authored-by: George Claireaux <george@claireaux.co.uk>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
3 people authored and gl-pix committed Jul 22, 2021
1 parent 9f98d42 commit 81467bb
Show file tree
Hide file tree
Showing 19 changed files with 1,933 additions and 43 deletions.
1 change: 1 addition & 0 deletions airbyte-integrations/bases/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation 'io.debezium:debezium-embedded:1.4.2.Final'
implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final'
implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final'
implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'

testFixturesImplementation project(':airbyte-db')
testFixturesImplementation project(':airbyte-integrations:bases:base-java')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -78,9 +80,8 @@ public abstract class CdcSourceTest {
protected static final String COL_ID = "id";
protected static final String COL_MAKE_ID = "make_id";
protected static final String COL_MODEL = "model";
protected static final String DB_NAME = MODELS_SCHEMA;

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
MODELS_SCHEMA,
Expand Down Expand Up @@ -124,6 +125,24 @@ protected void executeQuery(String query) {
}
}

public String columnClause(Map<String, String> columnsWithDataType, Optional<String> primaryKey) {
StringBuilder columnClause = new StringBuilder();
int i = 0;
for (Map.Entry<String, String> column : columnsWithDataType.entrySet()) {
columnClause.append(column.getKey());
columnClause.append(" ");
columnClause.append(column.getValue());
if (i < (columnsWithDataType.size() - 1)) {
columnClause.append(",");
columnClause.append(" ");
}
i++;
}
primaryKey.ifPresent(s -> columnClause.append(", PRIMARY KEY (").append(s).append(")"));

return columnClause.toString();
}

public void createTable(String schemaName, String tableName, String columnClause) {
executeQuery(createTableQuery(schemaName, tableName, columnClause));
}
Expand All @@ -143,7 +162,7 @@ public String createSchemaQuery(String schemaName) {
private void createAndPopulateActualTable() {
createSchema(MODELS_SCHEMA);
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME,
String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));
for (JsonNode recordJson : MODEL_RECORDS) {
writeModelRecord(recordJson);
}
Expand All @@ -156,9 +175,8 @@ private void createAndPopulateActualTable() {
private void createAndPopulateRandomTable() {
createSchema(MODELS_SCHEMA + "_random");
createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random",
String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID + "_random",
COL_MAKE_ID + "_random",
COL_MODEL + "_random", COL_ID + "_random"));
columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"),
Optional.of(COL_ID + "_random")));
final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
Jsons
.jsonNode(ImmutableMap
Expand Down Expand Up @@ -448,7 +466,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2")));

createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID)));

for (JsonNode recordJson : MODEL_RECORDS_2) {
writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID,
Expand Down Expand Up @@ -571,7 +589,8 @@ void testDiscover() throws Exception {
protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG);

createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200)", COL_ID, COL_MAKE_ID, COL_MODEL));
createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2",
columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty()));

List<AirbyteStream> streams = expectedCatalog.getStreams();
// stream with PK
Expand All @@ -588,7 +607,19 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
addCdcMetadataColumns(streamWithoutPK);

AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
Field.of(COL_ID + "_random", JsonSchemaPrimitive.NUMBER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaPrimitive.NUMBER),
Field.of(COL_MODEL + "_random", JsonSchemaPrimitive.STRING))
.withSourceDefinedCursor(true)
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random")));
addCdcMetadataColumns(randomStream);

streams.add(streamWithoutPK);
streams.add(randomStream);
expectedCatalog.withStreams(streams);
return expectedCatalog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ public TestDataHolderBuilder airbyteType(JsonSchemaPrimitive airbyteType) {

/**
* Set custom the create table script pattern. Use it if you source uses untypical table creation
* sql. Default patter described {@link #DEFAULT_CREATE_TABLE_SQL} Note! The patter should contains
* two String place holders for the table name and data type.
* sql. Default patter described {@link #DEFAULT_CREATE_TABLE_SQL} Note! The patter should contain
* four String place holders for the: - namespace.table name (as one placeholder together) - id
* column name - test column name - test column data type
*
* @param createTablePatternSql creation table sql pattern
* @return builder
Expand Down
4 changes: 4 additions & 0 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ dependencies {

implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'
implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation "org.testcontainers:mssqlserver:1.15.1"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mssql')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.mssql;

import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.debezium.CdcMetadataInjector;

public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector {

@Override
public void addMetaData(ObjectNode event, JsonNode source) {
String commitLsn = source.get("commit_lsn").asText();
event.put(CDC_LSN, commitLsn);
}

@Override
public String namespace(JsonNode source) {
return source.get("schema").asText();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.mssql;

import java.util.Properties;

public class MssqlCdcProperties {

static Properties getDebeziumProperties() {
final Properties props = new Properties();
props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");

// snapshot config
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode
props.setProperty("snapshot.mode", "initial");
// https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
// we set this to avoid preventing other (non-Airbyte) transactions from updating table rows while
// we snapshot
props.setProperty("snapshot.isolation.mode", "snapshot");

// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes
props.setProperty("include.schema.changes", "false");
// https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata
props.setProperty("provide.transaction.metadata", "false");

return props;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.mssql;

import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.CdcSavedInfoFetcher;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import java.util.Optional;

public class MssqlCdcSavedInfoFetcher implements CdcSavedInfoFetcher {

private final JsonNode savedOffset;
private final JsonNode savedSchemaHistory;

protected MssqlCdcSavedInfoFetcher(CdcState savedState) {
final boolean savedStatePresent = savedState != null && savedState.getState() != null;
this.savedOffset = savedStatePresent ? savedState.getState().get(MSSQL_CDC_OFFSET) : null;
this.savedSchemaHistory = savedStatePresent ? savedState.getState().get(MSSQL_DB_HISTORY) : null;
}

@Override
public JsonNode getSavedOffset() {
return savedOffset;
}

@Override
public Optional<JsonNode> getSavedSchemaHistory() {
return Optional.ofNullable(savedSchemaHistory);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.mssql;

import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcStateHandler;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlCdcStateHandler implements CdcStateHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcStateHandler.class);
private final StateManager stateManager;

public MssqlCdcStateHandler(StateManager stateManager) {
this.stateManager = stateManager;
}

@Override
public AirbyteMessage saveState(Map<String, String> offset, String dbHistory) {
Map<String, Object> state = new HashMap<>();
state.put(MSSQL_CDC_OFFSET, offset);
state.put(MSSQL_DB_HISTORY, dbHistory);

final JsonNode asJson = Jsons.jsonNode(state);

LOGGER.info("debezium state: {}", asJson);

final CdcState cdcState = new CdcState().withState(asJson);
stateManager.getCdcStateManager().setCdcState(cdcState);
final AirbyteStateMessage stateMessage = stateManager.emit();
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}

}
Loading

0 comments on commit 81467bb

Please sign in to comment.