diff --git a/airbyte-integrations/bases/debezium/build.gradle b/airbyte-integrations/bases/debezium/build.gradle index 2d2b5e9ab0a8..50590aabec7a 100644 --- a/airbyte-integrations/bases/debezium/build.gradle +++ b/airbyte-integrations/bases/debezium/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation 'io.debezium:debezium-embedded:1.4.2.Final' implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final' implementation 'io.debezium:debezium-connector-postgres:1.4.2.Final' + implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final' testFixturesImplementation project(':airbyte-db') testFixturesImplementation project(':airbyte-integrations:bases:base-java') diff --git a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java index 383c2e63f65f..b88b24c95aa2 100644 --- a/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java +++ b/airbyte-integrations/bases/debezium/src/testFixtures/java/io/airbyte/integrations/debezium/CdcSourceTest.java @@ -59,6 +59,8 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -78,9 +80,8 @@ public abstract class CdcSourceTest { protected static final String COL_ID = "id"; protected static final String COL_MAKE_ID = "make_id"; protected static final String COL_MODEL = "model"; - protected static final String DB_NAME = MODELS_SCHEMA; - private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( + protected static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( CatalogHelpers.createAirbyteStream( MODELS_STREAM_NAME, MODELS_SCHEMA, @@ -124,6 +125,24 @@ protected void executeQuery(String query) { } } + public String columnClause(Map columnsWithDataType, Optional primaryKey) { + StringBuilder columnClause = new StringBuilder(); + int i = 0; + for (Map.Entry column : columnsWithDataType.entrySet()) { + columnClause.append(column.getKey()); + columnClause.append(" "); + columnClause.append(column.getValue()); + if (i < (columnsWithDataType.size() - 1)) { + columnClause.append(","); + columnClause.append(" "); + } + i++; + } + primaryKey.ifPresent(s -> columnClause.append(", PRIMARY KEY (").append(s).append(")")); + + return columnClause.toString(); + } + public void createTable(String schemaName, String tableName, String columnClause) { executeQuery(createTableQuery(schemaName, tableName, columnClause)); } @@ -143,7 +162,7 @@ public String createSchemaQuery(String schemaName) { private void createAndPopulateActualTable() { createSchema(MODELS_SCHEMA); createTable(MODELS_SCHEMA, MODELS_STREAM_NAME, - String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID)); + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID))); for (JsonNode recordJson : MODEL_RECORDS) { writeModelRecord(recordJson); } @@ -156,9 +175,8 @@ private void createAndPopulateActualTable() { private void createAndPopulateRandomTable() { createSchema(MODELS_SCHEMA + "_random"); createTable(MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", - String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID + "_random", - COL_MAKE_ID + "_random", - COL_MODEL + "_random", COL_ID + "_random")); + columnClause(ImmutableMap.of(COL_ID + "_random", "INTEGER", COL_MAKE_ID + "_random", "INTEGER", COL_MODEL + "_random", "VARCHAR(200)"), + Optional.of(COL_ID + "_random"))); final List MODEL_RECORDS_RANDOM = ImmutableList.of( Jsons .jsonNode(ImmutableMap @@ -448,7 +466,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception { Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2"))); createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", - String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s)", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID)); + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.of(COL_ID))); for (JsonNode recordJson : MODEL_RECORDS_2) { writeRecords(recordJson, MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, @@ -571,7 +589,8 @@ void testDiscover() throws Exception { protected AirbyteCatalog expectedCatalogForDiscover() { final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); - createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", String.format("%s INTEGER, %s INTEGER, %s VARCHAR(200)", COL_ID, COL_MAKE_ID, COL_MODEL)); + createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty())); List streams = expectedCatalog.getStreams(); // stream with PK @@ -588,7 +607,19 @@ protected AirbyteCatalog expectedCatalogForDiscover() { streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); addCdcMetadataColumns(streamWithoutPK); + AirbyteStream randomStream = CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_random", + MODELS_SCHEMA + "_random", + Field.of(COL_ID + "_random", JsonSchemaPrimitive.NUMBER), + Field.of(COL_MAKE_ID + "_random", JsonSchemaPrimitive.NUMBER), + Field.of(COL_MODEL + "_random", JsonSchemaPrimitive.STRING)) + .withSourceDefinedCursor(true) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))); + addCdcMetadataColumns(randomStream); + streams.add(streamWithoutPK); + streams.add(randomStream); expectedCatalog.withStreams(streams); return expectedCatalog; } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java index edb2e1820224..f59f41d4d143 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java @@ -117,8 +117,9 @@ public TestDataHolderBuilder airbyteType(JsonSchemaPrimitive airbyteType) { /** * Set custom the create table script pattern. Use it if you source uses untypical table creation - * sql. Default patter described {@link #DEFAULT_CREATE_TABLE_SQL} Note! The patter should contains - * two String place holders for the table name and data type. + * sql. Default patter described {@link #DEFAULT_CREATE_TABLE_SQL} Note! The patter should contain + * four String place holders for the: - namespace.table name (as one placeholder together) - id + * column name - test column name - test column data type * * @param createTablePatternSql creation table sql pattern * @return builder diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index fdfc0183929c..3a299f7c4d48 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -13,18 +13,22 @@ dependencies { implementation project(':airbyte-db') implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:bases:debezium') implementation project(':airbyte-protocol:models') implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-integrations:connectors:source-relational-db') + implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final' implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation "org.testcontainers:mssqlserver:1.15.1" integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mssql') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java new file mode 100644 index 000000000000..3b2e992c9dd3 --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcConnectorMetadataInjector.java @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.integrations.debezium.CdcMetadataInjector; + +public class MssqlCdcConnectorMetadataInjector implements CdcMetadataInjector { + + @Override + public void addMetaData(ObjectNode event, JsonNode source) { + String commitLsn = source.get("commit_lsn").asText(); + event.put(CDC_LSN, commitLsn); + } + + @Override + public String namespace(JsonNode source) { + return source.get("schema").asText(); + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcProperties.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcProperties.java new file mode 100644 index 000000000000..ca24e4352d5d --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcProperties.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import java.util.Properties; + +public class MssqlCdcProperties { + + static Properties getDebeziumProperties() { + final Properties props = new Properties(); + props.setProperty("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector"); + + // snapshot config + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-mode + props.setProperty("snapshot.mode", "initial"); + // https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15 + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode + // we set this to avoid preventing other (non-Airbyte) transactions from updating table rows while + // we snapshot + props.setProperty("snapshot.isolation.mode", "snapshot"); + + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-include-schema-changes + props.setProperty("include.schema.changes", "false"); + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-property-provide-transaction-metadata + props.setProperty("provide.transaction.metadata", "false"); + + return props; + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcSavedInfoFetcher.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcSavedInfoFetcher.java new file mode 100644 index 000000000000..3ac4a1db3bf2 --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcSavedInfoFetcher.java @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET; +import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.debezium.CdcSavedInfoFetcher; +import io.airbyte.integrations.source.relationaldb.models.CdcState; +import java.util.Optional; + +public class MssqlCdcSavedInfoFetcher implements CdcSavedInfoFetcher { + + private final JsonNode savedOffset; + private final JsonNode savedSchemaHistory; + + protected MssqlCdcSavedInfoFetcher(CdcState savedState) { + final boolean savedStatePresent = savedState != null && savedState.getState() != null; + this.savedOffset = savedStatePresent ? savedState.getState().get(MSSQL_CDC_OFFSET) : null; + this.savedSchemaHistory = savedStatePresent ? savedState.getState().get(MSSQL_DB_HISTORY) : null; + } + + @Override + public JsonNode getSavedOffset() { + return savedOffset; + } + + @Override + public Optional getSavedSchemaHistory() { + return Optional.ofNullable(savedSchemaHistory); + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java new file mode 100644 index 000000000000..cc51e2e06038 --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcStateHandler.java @@ -0,0 +1,68 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET; +import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.debezium.CdcStateHandler; +import io.airbyte.integrations.source.relationaldb.StateManager; +import io.airbyte.integrations.source.relationaldb.models.CdcState; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MssqlCdcStateHandler implements CdcStateHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcStateHandler.class); + private final StateManager stateManager; + + public MssqlCdcStateHandler(StateManager stateManager) { + this.stateManager = stateManager; + } + + @Override + public AirbyteMessage saveState(Map offset, String dbHistory) { + Map state = new HashMap<>(); + state.put(MSSQL_CDC_OFFSET, offset); + state.put(MSSQL_DB_HISTORY, dbHistory); + + final JsonNode asJson = Jsons.jsonNode(state); + + LOGGER.info("debezium state: {}", asJson); + + final CdcState cdcState = new CdcState().withState(asJson); + stateManager.getCdcStateManager().setCdcState(cdcState); + final AirbyteStateMessage stateMessage = stateManager.emit(); + return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java new file mode 100644 index 000000000000..35cf0d8198ec --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -0,0 +1,106 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.debezium.CdcTargetPosition; +import io.airbyte.integrations.debezium.internals.SnapshotMetadata; +import io.debezium.connector.sqlserver.Lsn; +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MssqlCdcTargetPosition implements CdcTargetPosition { + + private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class); + public final Lsn targetLsn; + + public MssqlCdcTargetPosition(Lsn targetLsn) { + this.targetLsn = targetLsn; + } + + @Override + public boolean reachedTargetPosition(JsonNode valueAsJson) { + Lsn recordLsn = extractLsn(valueAsJson); + + if (targetLsn.compareTo(recordLsn) > 0) { + return false; + } else { + SnapshotMetadata snapshotMetadata = SnapshotMetadata.valueOf(valueAsJson.get("source").get("snapshot").asText().toUpperCase()); + // if not snapshot or is snapshot but last record in snapshot. + return SnapshotMetadata.TRUE != snapshotMetadata; + } + } + + private Lsn extractLsn(JsonNode valueAsJson) { + return Optional.ofNullable(valueAsJson.get("source")) + .flatMap(source -> Optional.ofNullable(source.get("commit_lsn").asText())) + .map(Lsn::valueOf) + .orElseThrow(() -> new IllegalStateException("Could not find LSN")); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MssqlCdcTargetPosition that = (MssqlCdcTargetPosition) o; + return targetLsn.equals(that.targetLsn); + } + + @Override + public int hashCode() { + return targetLsn.hashCode(); + } + + public static MssqlCdcTargetPosition getTargetPosition(JdbcDatabase database, String dbName) { + try { + final List jsonNodes = database + .bufferedResultSetQuery(conn -> conn.createStatement().executeQuery( + "USE " + dbName + "; SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;"), JdbcUtils::rowToJson); + Preconditions.checkState(jsonNodes.size() == 1); + if (jsonNodes.get(0).get("max_lsn") != null) { + Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue()); + LOGGER.info("identified target lsn: " + maxLsn); + return new MssqlCdcTargetPosition(maxLsn); + } else { + throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " + + "Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service?view=sql-server-ver15)"); + } + } catch (SQLException | IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index ec48ca2e5159..9d113b563207 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -24,15 +24,39 @@ package io.airbyte.integrations.source.mssql; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +import static java.util.stream.Collectors.toList; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.debezium.AirbyteDebeziumHandler; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.relationaldb.StateManager; +import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; import java.io.File; +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import org.slf4j.Logger; @@ -43,6 +67,9 @@ public class MssqlSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(MssqlSource.class); static final String DRIVER_CLASS = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; + public static final String MSSQL_CDC_OFFSET = "mssql_cdc_offset"; + public static final String MSSQL_DB_HISTORY = "mssql_db_history"; + public static final String CDC_LSN = "_ab_cdc_lsn"; public MssqlSource() { super(DRIVER_CLASS, new MssqlJdbcStreamingQueryConfiguration()); @@ -82,7 +109,185 @@ public Set getExcludedInternalNameSpaces() { "spt_values", "spt_fallback_usg", "MSreplication_options", - "spt_fallback_dev"); + "spt_fallback_dev", + "cdc"); // is this actually ok? what if the user wants cdc schema for some reason? + } + + @Override + public AirbyteCatalog discover(JsonNode config) throws Exception { + AirbyteCatalog catalog = super.discover(config); + + if (isCdc(config)) { + final List streams = catalog.getStreams().stream() + .map(MssqlSource::removeIncrementalWithoutPk) + .map(MssqlSource::setIncrementalToSourceDefined) + .map(MssqlSource::addCdcMetadataColumns) + .collect(toList()); + + catalog.setStreams(streams); + } + + return catalog; + } + + @Override + public List> getCheckOperations(JsonNode config) throws Exception { + final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); + + if (isCdc(config)) { + checkOperations.add(database -> assertCdcEnabledInDb(config, database)); + checkOperations.add(database -> assertCdcSchemaQueryable(config, database)); + checkOperations.add(database -> assertSqlServerAgentRunning(database)); + checkOperations.add(database -> assertSnapshotIsolationAllowed(config, database)); + } + + return checkOperations; + } + + protected void assertCdcEnabledInDb(JsonNode config, JdbcDatabase database) throws SQLException { + List queryResponse = database.query(connection -> { + final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; + PreparedStatement ps = connection.prepareStatement(sql); + ps.setString(1, config.get("database").asText()); + LOGGER.info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'", + config.get("database").asText(), sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + if (queryResponse.size() < 1) { + throw new RuntimeException(String.format( + "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", + config.get("database").asText())); + } + if (!(queryResponse.get(0).get("is_cdc_enabled").asBoolean())) { + throw new RuntimeException(String.format( + "Detected that CDC is not enabled for database '%s'. Please check the documentation on how to enable CDC on MS SQL Server.", + config.get("database").asText())); + } + } + + protected void assertCdcSchemaQueryable(JsonNode config, JdbcDatabase database) throws SQLException { + List queryResponse = database.query(connection -> { + final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; + PreparedStatement ps = connection.prepareStatement(sql); + LOGGER.info(String.format("Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'", + config.get("username").asText(), sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + // Ensure at least one available CDC table + if (queryResponse.size() < 1) { + throw new RuntimeException("No cdc-enabled tables found. Please check the documentation on how to enable CDC on MS SQL Server."); + } + } + + // todo: ensure this works for Azure managed SQL (since it uses different sql server agent) + protected void assertSqlServerAgentRunning(JdbcDatabase database) throws SQLException { + try { + List queryResponse = database.query(connection -> { + final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%'"; + PreparedStatement ps = connection.prepareStatement(sql); + LOGGER.info(String.format("Checking that the SQL Server Agent is running using the query: '%s'", sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + if (!(queryResponse.get(0).get("status_desc").toString().contains("Running"))) { + throw new RuntimeException(String.format( + "The SQL Server Agent is not running. Current state: '%s'. Please check the documentation on ensuring SQL Server Agent is running.", + queryResponse.get(0).get("status_desc").toString())); + } + } catch (Exception e) { + if (e.getCause() != null && e.getCause().getClass().equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) { + LOGGER.warn(String.format("Skipping check for whether the SQL Server Agent is running, SQLServerException thrown: '%s'", + e.getMessage())); + } else { + throw e; + } + } + } + + protected void assertSnapshotIsolationAllowed(JsonNode config, JdbcDatabase database) throws SQLException { + List queryResponse = database.query(connection -> { + final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?"; + PreparedStatement ps = connection.prepareStatement(sql); + ps.setString(1, config.get("database").asText()); + LOGGER.info(String.format("Checking that snapshot isolation is enabled on database '%s' using the query: '%s'", + config.get("database").asText(), sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + if (queryResponse.size() < 1) { + throw new RuntimeException(String.format( + "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", + config.get("database").asText())); + } + if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) { + throw new RuntimeException(String.format( + "Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. " + + "Please check the documentation on how to enable snapshot isolation on MS SQL Server.", + config.get("database").asText())); + } + } + + @Override + public List> getIncrementalIterators(JdbcDatabase database, + ConfiguredAirbyteCatalog catalog, + Map>> tableNameToTable, + StateManager stateManager, + Instant emittedAt) { + JsonNode sourceConfig = database.getSourceConfig(); + if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { + LOGGER.info("using CDC: {}", true); + AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, + MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get("database").asText()), + MssqlCdcProperties.getDebeziumProperties(), catalog, true); + return handler.getIncrementalIterators(new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), + new MssqlCdcStateHandler(stateManager), new MssqlCdcConnectorMetadataInjector(), emittedAt); + } else { + LOGGER.info("using CDC: {}", false); + return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, emittedAt); + } + } + + private static boolean isCdc(JsonNode config) { + return config.hasNonNull("replication_method") + && ReplicationMethod.valueOf(config.get("replication_method").asText()) + .equals(ReplicationMethod.CDC); + } + + private static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) { + Optional any = catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode) + .filter(syncMode -> syncMode == SyncMode.INCREMENTAL).findAny(); + return any.isPresent(); + } + + // Note: in place mutation. + private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) { + if (stream.getSourceDefinedPrimaryKey().isEmpty()) { + stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL); + } + + return stream; + } + + // Note: in place mutation. + private static AirbyteStream setIncrementalToSourceDefined(AirbyteStream stream) { + if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) { + stream.setSourceDefinedCursor(true); + } + + return stream; + } + + // Note: in place mutation. + private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { + + ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + + final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); + final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); + properties.set(CDC_LSN, stringType); + properties.set(CDC_UPDATED_AT, numberType); + properties.set(CDC_DELETED_AT, numberType); + + return stream; } private void readSsl(JsonNode sslMethod, List additionalParameters) { @@ -124,4 +329,9 @@ public static void main(String[] args) throws Exception { LOGGER.info("completed source: {}", MssqlSource.class); } + public enum ReplicationMethod { + STANDARD, + CDC + } + } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json index fd584a1dd6b7..05c09c53abf7 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json @@ -84,6 +84,13 @@ } } ] + }, + "replication_method": { + "type": "string", + "title": "Replication Method", + "description": "Replication method to use for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", + "default": "STANDARD", + "enum": ["STANDARD", "CDC"] } } } diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java new file mode 100644 index 000000000000..95ee5274b18b --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java @@ -0,0 +1,221 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; +import java.util.Collections; +import java.util.List; +import org.testcontainers.containers.MSSQLServerContainer; + +public class CdcMssqlSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final String DB_NAME = "acceptance"; + private static final String SCHEMA_NAME = "dbo"; + private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME2 = "starships"; + private static final String TEST_USER_NAME = "tester"; + private static final String TEST_USER_PASSWORD = "testerjester[1]"; + private static final String CDC_ROLE_NAME = "cdc_selector"; + private MSSQLServerContainer container; + private JsonNode config; + private Database database; + + @Override + protected String getImageName() { + return "airbyte/source-mssql:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s", STREAM_NAME), + String.format("%s", SCHEMA_NAME), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s", STREAM_NAME2), + String.format("%s", SCHEMA_NAME), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + } + + @Override + protected JsonNode getState() { + return null; + } + + @Override + protected List getRegexTests() { + return Collections.emptyList(); + } + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws InterruptedException { + container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense(); + container.addEnv("MSSQL_AGENT_ENABLED", "True"); // need this running for cdc to work + container.start(); + database = Databases.createDatabase( + container.getUsername(), + container.getPassword(), + String.format("jdbc:sqlserver://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + "com.microsoft.sqlserver.jdbc.SQLServerDriver", + null); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", DB_NAME) + .put("username", TEST_USER_NAME) + .put("password", TEST_USER_PASSWORD) + .put("replication_method", "CDC") + .build()); + + executeQuery("CREATE DATABASE " + DB_NAME + ";"); + executeQuery("ALTER DATABASE " + DB_NAME + "\n\tSET ALLOW_SNAPSHOT_ISOLATION ON"); + executeQuery("USE " + DB_NAME + "\n" + "EXEC sys.sp_cdc_enable_db"); + + setupTestUser(); + revokeAllPermissions(); + createAndPopulateTables(); + grantCorrectPermissions(); + } + + private void setupTestUser() { + executeQuery("USE " + DB_NAME); + executeQuery("CREATE LOGIN " + TEST_USER_NAME + " WITH PASSWORD = '" + TEST_USER_PASSWORD + "';"); + executeQuery("CREATE USER " + TEST_USER_NAME + " FOR LOGIN " + TEST_USER_NAME + ";"); + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL FROM " + TEST_USER_NAME + " CASCADE;"); + executeQuery("EXEC sp_msforeachtable \"REVOKE ALL ON '?' TO " + TEST_USER_NAME + ";\""); + } + + private void createAndPopulateTables() throws InterruptedException { + executeQuery(String.format("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", + SCHEMA_NAME, STREAM_NAME)); + executeQuery(String.format("INSERT INTO %s.%s (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');", + SCHEMA_NAME, STREAM_NAME)); + executeQuery(String.format("CREATE TABLE %s.%s(id INTEGER PRIMARY KEY, name VARCHAR(200));", + SCHEMA_NAME, STREAM_NAME2)); + executeQuery(String.format("INSERT INTO %s.%s (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');", + SCHEMA_NAME, STREAM_NAME2)); + + // sometimes seeing an error that we can't enable cdc on a table while sql server agent is still + // spinning up + // solving with a simple while retry loop + boolean failingToStart = true; + int retryNum = 0; + int maxRetries = 10; + while (failingToStart) { + try { + // enabling CDC on each table + String[] tables = {STREAM_NAME, STREAM_NAME2}; + for (String table : tables) { + executeQuery(String.format( + "EXEC sys.sp_cdc_enable_table\n" + + "\t@source_schema = N'%s',\n" + + "\t@source_name = N'%s', \n" + + "\t@role_name = N'%s',\n" + + "\t@supports_net_changes = 0", + SCHEMA_NAME, table, CDC_ROLE_NAME)); + } + failingToStart = false; + } catch (Exception e) { + if (retryNum >= maxRetries) { + throw e; + } else { + retryNum++; + Thread.sleep(10000); // 10 seconds + } + } + } + } + + private void grantCorrectPermissions() { + executeQuery(String.format("EXEC sp_addrolemember N'%s', N'%s';", "db_datareader", TEST_USER_NAME)); + executeQuery(String.format("USE %s;\n" + "GRANT SELECT ON SCHEMA :: [%s] TO %s", DB_NAME, "cdc", TEST_USER_NAME)); + executeQuery(String.format("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, TEST_USER_NAME)); + } + + private void executeQuery(String query) { + try { + database.query( + ctx -> ctx + .execute(query)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + container.close(); + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceComprehensiveTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceComprehensiveTest.java new file mode 100644 index 000000000000..e98972505280 --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceComprehensiveTest.java @@ -0,0 +1,549 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.standardtest.source.SourceComprehensiveTest; +import io.airbyte.integrations.standardtest.source.TestDataHolder; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import org.testcontainers.containers.MSSQLServerContainer; + +public class CdcMssqlSourceComprehensiveTest extends SourceComprehensiveTest { + + private MSSQLServerContainer container; + private JsonNode config; + private static final String DB_NAME = "comprehensive"; + private static final String SCHEMA_NAME = "dbo"; + + private static final String CREATE_TABLE_SQL = "USE " + DB_NAME + "\nCREATE TABLE %1$s(%2$s INTEGER PRIMARY KEY, %3$s %4$s)"; + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + container.close(); + } + + @Override + protected String getImageName() { + return "airbyte/source-mssql:dev"; + } + + @Override + protected Database setupDatabase() throws Exception { + container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense(); + container.addEnv("MSSQL_AGENT_ENABLED", "True"); // need this running for cdc to work + container.start(); + + final Database database = Databases.createDatabase( + container.getUsername(), + container.getPassword(), + String.format("jdbc:sqlserver://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + "com.microsoft.sqlserver.jdbc.SQLServerDriver", + null); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", DB_NAME) + .put("username", container.getUsername()) + .put("password", container.getPassword()) + .put("replication_method", "CDC") + .build()); + + executeQuery("CREATE DATABASE " + DB_NAME + ";"); + executeQuery("ALTER DATABASE " + DB_NAME + "\n\tSET ALLOW_SNAPSHOT_ISOLATION ON"); + executeQuery("USE " + DB_NAME + "\n" + "EXEC sys.sp_cdc_enable_db"); + + return database; + } + + @Override + protected String getNameSpace() { + return SCHEMA_NAME; + } + + private void executeQuery(String query) { + try (Database database = Databases.createDatabase( + container.getUsername(), + container.getPassword(), + String.format("jdbc:sqlserver://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + "com.microsoft.sqlserver.jdbc.SQLServerDriver", + null)) { + database.query( + ctx -> ctx + .execute(query)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + super.setupEnvironment(environment); + enableCdcOnAllTables(); + } + + @Override + protected void initTests() { + // in SQL Server there is no boolean, BIT is the sole boolean-like column + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bit") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "0", "1", "'true'", "'false'") + .addExpectedValues(null, "false", "true", "true", "false") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tinyint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "0", "255") + .addExpectedValues(null, "0", "255") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("smallint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "-32768", "32767") + .addExpectedValues(null, "-32768", "32767") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("int") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "-2147483648", "2147483647") + .addExpectedValues(null, "-2147483648", "2147483647") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "-9223372036854775808", "9223372036854775807") + .addExpectedValues(null, "-9223372036854775808", "9223372036854775807") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("real") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "power(1e1, 38)*-3.4", "power(1e1, -38)*-1.18", "power(1e1, -38)*1.18", "power(1e1, 38)*3.4") + .addExpectedValues(null, String.valueOf(Math.pow(10, 38) * -3.4), String.valueOf(Math.pow(10, -38) * -1.18), + String.valueOf(Math.pow(10, -38) * 1.18), String.valueOf(Math.pow(10, 38) * 3.4)) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("float") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .fullSourceDataType("float(24)") + .addInsertValues("null", "power(1e1, 38)*-3.4", "power(1e1, -38)*-1.18", "power(1e1, -38)*1.18", "power(1e1, 38)*3.4") + .addExpectedValues(null, String.valueOf(Math.pow(10, 38) * -3.4), String.valueOf(Math.pow(10, -38) * -1.18), + String.valueOf(Math.pow(10, -38) * 1.18), String.valueOf(Math.pow(10, 38) * 3.4)) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("float") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .fullSourceDataType("float(53)") + .addInsertValues("null", "power(1e1, 308)*-1.79", "power(1e1, -308)*-2.23", + "power(1e1, -308)*2.23", "power(1e1, 308)*1.79") + .addExpectedValues(null, String.valueOf(Math.pow(10, 308) * -1.79), String.valueOf(Math.pow(10, -308) * -2.23), + String.valueOf(Math.pow(10, -308) * 2.23), String.valueOf(Math.pow(10, 308) * 1.79)) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("decimal") + .fullSourceDataType("DECIMAL(5,2)") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("999", "5.1", "0", "null") + .addExpectedValues("999.00", "5.10", "0.00", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("numeric") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("'99999'", "null") + .addExpectedValues("99999", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("money") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "'9990000.99'") + .addExpectedValues(null, "9990000.9900") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("smallmoney") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .addInsertValues("null", "'-214748.3648'", "214748.3647") + .addExpectedValues(null, "-214748.3648", "214748.3647") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("char") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'*'", "null") + .addExpectedValues("a", "*", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("char") + .fullSourceDataType("char(8)") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'{asb123}'", "'{asb12}'") + .addExpectedValues("{asb123}", "{asb12} ") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("varchar") + .fullSourceDataType("varchar(16)") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'abc'", "'{asb123}'", "' '", "''", "null") + .addExpectedValues("a", "abc", "{asb123}", " ", "", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("varchar") + .fullSourceDataType("varchar(max) COLLATE Latin1_General_100_CI_AI_SC_UTF8") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'abc'", "N'Миші йдуть на південь, не питай чому;'", "N'櫻花分店'", + "''", "null", "N'\\xF0\\x9F\\x9A\\x80'") + .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", + null, "\\xF0\\x9F\\x9A\\x80") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("text") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'abc'", "'Some test text 123$%^&*()_'", "''", "null") + .addExpectedValues("a", "abc", "Some test text 123$%^&*()_", "", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("nchar") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'*'", "N'д'", "null") + .addExpectedValues("a", "*", "д", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("nvarchar") + .airbyteType(JsonSchemaPrimitive.STRING) + .fullSourceDataType("nvarchar(max)") + .addInsertValues("'a'", "'abc'", "N'Миші ççуть на південь, не питай чому;'", "N'櫻花分店'", + "''", "null", "N'\\xF0\\x9F\\x9A\\x80'") + .addExpectedValues("a", "abc", "Миші ççуть на південь, не питай чому;", "櫻花分店", "", + null, "\\xF0\\x9F\\x9A\\x80") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("nvarchar") + .airbyteType(JsonSchemaPrimitive.STRING) + .fullSourceDataType("nvarchar(24)") + .addInsertValues("'a'", "'abc'", "N'Миші йдуть;'", "N'櫻花分店'", "''", "null") + .addExpectedValues("a", "abc", "Миші йдуть;", "櫻花分店", "", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("ntext") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'abc'", "N'Миші йдуть на південь, не питай чому;'", "N'櫻花分店'", + "''", "null", "N'\\xF0\\x9F\\x9A\\x80'") + .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", + null, "\\xF0\\x9F\\x9A\\x80") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("xml") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues( + "CONVERT(XML, N'Manual...')", + "null", "''") + .addExpectedValues("Manual...", null, "") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("date") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'0001-01-01'", "'9999-12-31'", "'1999-01-08'", + "null") + // TODO: Debezium is returning DATE/DATETIME from mssql as integers (days or milli/micro/nanoseconds + // since the epoch) + // still useable but requires transformation if true date/datetime type required in destination + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-data-types + // .addExpectedValues("0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z", + // "1999-01-08T00:00:00Z", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("smalldatetime") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'1900-01-01'", "'2079-06-06'", "null") + // TODO: Debezium is returning DATE/DATETIME from mssql as integers (days or milli/micro/nanoseconds + // since the epoch) + // still useable but requires transformation if true date/datetime type required in destination + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-data-types + // .addExpectedValues("1900-01-01T00:00:00Z", "2079-06-06T00:00:00Z", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("datetime") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'1753-01-01'", "'9999-12-31'", "null") + // TODO: Debezium is returning DATE/DATETIME from mssql as integers (days or milli/micro/nanoseconds + // since the epoch) + // still useable but requires transformation if true date/datetime type required in destination + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-data-types + // .addExpectedValues("1753-01-01T00:00:00Z", "9999-12-31T00:00:00Z", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("datetime2") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'0001-01-01'", "'9999-12-31'", "null") + // TODO: Debezium is returning DATE/DATETIME from mssql as integers (days or milli/micro/nanoseconds + // since the epoch) + // still useable but requires transformation if true date/datetime type required in destination + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-data-types + // .addExpectedValues("0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("time") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("null") + // TODO: Debezium is returning DATE/DATETIME from mssql as integers (days or milli/micro/nanoseconds + // since the epoch) + // still useable but requires transformation if true date/datetime type required in destination + // https://debezium.io/documentation/reference/1.4/connectors/sqlserver.html#sqlserver-data-types + .addNullExpectedValue() + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("datetimeoffset") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'0001-01-10 00:00:00 +01:00'", "'9999-01-10 00:00:00 +01:00'", "null") + // TODO: BUG - seem to be getting back 0001-01-08T00:00:00+01:00 ... this is clearly wrong + // .addExpectedValues("0001-01-10 00:00:00.0000000 +01:00", + // "9999-01-10 00:00:00.0000000 +01:00", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + // TODO BUG Returns binary value instead of actual value + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("binary") + .airbyteType(JsonSchemaPrimitive.STRING) + // .addInsertValues("CAST( 'A' AS VARBINARY)", "null") + // .addExpectedValues("A") + .addInsertValues("null") + .addNullExpectedValue() + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + // TODO BUG Returns binary value instead of actual value + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("varbinary") + .fullSourceDataType("varbinary(30)") + .airbyteType(JsonSchemaPrimitive.STRING) + // .addInsertValues("CAST( 'ABC' AS VARBINARY)", "null") + // .addExpectedValues("A") + .addInsertValues("null") + .addNullExpectedValue() + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + // TODO BUG: airbyte returns binary representation instead of readable one + // create table dbo_1_hierarchyid1 (test_column hierarchyid); + // insert dbo_1_hierarchyid1 values ('/1/1/'); + // select test_column ,test_column.ToString() AS [Node Text],test_column.GetLevel() [Node Level] + // from dbo_1_hierarchyid1; + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("hierarchyid") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("null") + .addNullExpectedValue() + // .addInsertValues("null","'/1/1/'") + // .addExpectedValues(null, "/1/1/") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("sql_variant") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'a'", "'abc'", "N'Миші йдуть на південь, не питай чому;'", "N'櫻花分店'", + "''", "null", "N'\\xF0\\x9F\\x9A\\x80'") + // TODO: BUG - These all come through as nulls, Debezium doesn't mention sql_variant at all so + // assume unsupported + // .addExpectedValues("a", "abc", "Миші йдуть на південь, не питай чому;", "櫻花分店", "", + // null, "\\xF0\\x9F\\x9A\\x80") + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + // TODO BUG: Airbyte returns binary representation instead of text one. + // Proper select query example: SELECT test_column.STAsText() from dbo_1_geometry; + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("geometry") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("null") + .addNullExpectedValue() + // .addInsertValues("geometry::STGeomFromText('LINESTRING (100 100, 20 180, 180 180)', 0)") + // .addExpectedValues("LINESTRING (100 100, 20 180, 180 180)", + // "POLYGON ((0 0, 150 0, 150 150, 0 150, 0 0)", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("uniqueidentifier") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("'375CFC44-CAE3-4E43-8083-821D2DF0E626'", "null") + .addExpectedValues("375CFC44-CAE3-4E43-8083-821D2DF0E626", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + // TODO BUG: Airbyte returns binary representation instead of text one. + // Proper select query example: SELECT test_column.STAsText() from dbo_1_geography; + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("geography") + .airbyteType(JsonSchemaPrimitive.STRING) + .addInsertValues("null") + .addNullExpectedValue() + // .addInsertValues("geography::STGeomFromText('LINESTRING(-122.360 47.656, -122.343 47.656 )', + // 4326)") + // .addExpectedValues("LINESTRING(-122.360 47.656, -122.343 47.656 )", null) + .createTablePatternSql(CREATE_TABLE_SQL) + .build()); + + } + + private void enableCdcOnAllTables() { + executeQuery("USE " + DB_NAME + "\n" + + "DECLARE @TableName VARCHAR(100)\n" + + "DECLARE @TableSchema VARCHAR(100)\n" + + "DECLARE CDC_Cursor CURSOR FOR\n" + + " SELECT * FROM ( \n" + + " SELECT Name,SCHEMA_NAME(schema_id) AS TableSchema\n" + + " FROM sys.objects\n" + + " WHERE type = 'u'\n" + + " AND is_ms_shipped <> 1\n" + + " ) CDC\n" + + "OPEN CDC_Cursor\n" + + "FETCH NEXT FROM CDC_Cursor INTO @TableName,@TableSchema\n" + + "WHILE @@FETCH_STATUS = 0\n" + + " BEGIN\n" + + " DECLARE @SQL NVARCHAR(1000)\n" + + " DECLARE @CDC_Status TINYINT\n" + + " SET @CDC_Status=(SELECT COUNT(*)\n" + + " FROM cdc.change_tables\n" + + " WHERE Source_object_id = OBJECT_ID(@TableSchema+'.'+@TableName))\n" + + " --IF CDC is not enabled on Table, Enable CDC\n" + + " IF @CDC_Status <> 1\n" + + " BEGIN\n" + + " SET @SQL='EXEC sys.sp_cdc_enable_table\n" + + " @source_schema = '''+@TableSchema+''',\n" + + " @source_name = ''' + @TableName\n" + + " + ''',\n" + + " @role_name = null;'\n" + + " EXEC sp_executesql @SQL\n" + + " END\n" + + " FETCH NEXT FROM CDC_Cursor INTO @TableName,@TableSchema\n" + + "END\n" + + "CLOSE CDC_Cursor\n" + + "DEALLOCATE CDC_Cursor"); + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java new file mode 100644 index 000000000000..1970f3133f35 --- /dev/null +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -0,0 +1,399 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mssql; + +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +import static io.airbyte.integrations.source.mssql.MssqlSource.CDC_LSN; +import static io.airbyte.integrations.source.mssql.MssqlSource.DRIVER_CLASS; +import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET; +import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.debezium.CdcSourceTest; +import io.airbyte.integrations.debezium.CdcTargetPosition; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.debezium.connector.sqlserver.Lsn; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MSSQLServerContainer; + +public class CdcMssqlSourceTest extends CdcSourceTest { + + private static final String CDC_ROLE_NAME = "cdc_selector"; + private static final String TEST_USER_NAME = "tester"; + private static final String TEST_USER_PASSWORD = "testerjester[1]"; + + private MSSQLServerContainer container; + + private String dbName; + private Database database; + private JdbcDatabase testJdbcDatabase; + private MssqlSource source; + private JsonNode config; + + @BeforeEach + public void setup() throws SQLException { + init(); + setupTestUser(); + revokeAllPermissions(); + super.setup(); + grantCorrectPermissions(); + } + + private void init() { + container = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense(); + container.addEnv("MSSQL_AGENT_ENABLED", "True"); // need this running for cdc to work + container.start(); + + dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + source = new MssqlSource(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", dbName) + .put("username", TEST_USER_NAME) + .put("password", TEST_USER_PASSWORD) + .put("replication_method", "CDC") + .build()); + + database = Databases.createDatabase( + container.getUsername(), + container.getPassword(), + String.format("jdbc:sqlserver://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + DRIVER_CLASS, + null); + + testJdbcDatabase = Databases.createJdbcDatabase( + TEST_USER_NAME, + TEST_USER_PASSWORD, + String.format("jdbc:sqlserver://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + DRIVER_CLASS); + + executeQuery("CREATE DATABASE " + dbName + ";"); + switchSnapshotIsolation(true, dbName); + } + + private void switchSnapshotIsolation(Boolean on, String db) { + String onOrOff = on ? "ON" : "OFF"; + executeQuery("ALTER DATABASE " + db + "\n\tSET ALLOW_SNAPSHOT_ISOLATION " + onOrOff); + } + + private void setupTestUser() { + executeQuery("USE " + dbName); + executeQuery("CREATE LOGIN " + TEST_USER_NAME + " WITH PASSWORD = '" + TEST_USER_PASSWORD + "';"); + executeQuery("CREATE USER " + TEST_USER_NAME + " FOR LOGIN " + TEST_USER_NAME + ";"); + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL FROM " + TEST_USER_NAME + " CASCADE;"); + executeQuery("EXEC sp_msforeachtable \"REVOKE ALL ON '?' TO " + TEST_USER_NAME + ";\""); + } + + private void alterPermissionsOnSchema(Boolean grant, String schema) { + String grantOrRemove = grant ? "GRANT" : "REVOKE"; + executeQuery(String.format("USE %s;\n" + "%s SELECT ON SCHEMA :: [%s] TO %s", dbName, grantOrRemove, schema, TEST_USER_NAME)); + } + + private void grantCorrectPermissions() { + alterPermissionsOnSchema(true, MODELS_SCHEMA); + alterPermissionsOnSchema(true, MODELS_SCHEMA + "_random"); + alterPermissionsOnSchema(true, "cdc"); + executeQuery(String.format("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, TEST_USER_NAME)); + } + + @Override + public String createSchemaQuery(String schemaName) { + return "CREATE SCHEMA " + schemaName; + } + + private void switchCdcOnDatabase(Boolean enable, String db) { + String storedProc = enable ? "sys.sp_cdc_enable_db" : "sys.sp_cdc_disable_db"; + executeQuery("USE " + db + "\n" + "EXEC " + storedProc); + } + + @Override + public void createTable(String schemaName, String tableName, String columnClause) { + switchCdcOnDatabase(true, dbName); + super.createTable(schemaName, tableName, columnClause); + + // sometimes seeing an error that we can't enable cdc on a table while sql server agent is still + // spinning up + // solving with a simple while retry loop + boolean failingToStart = true; + int retryNum = 0; + int maxRetries = 10; + while (failingToStart) { + try { + executeQuery(String.format( + "EXEC sys.sp_cdc_enable_table\n" + + "\t@source_schema = N'%s',\n" + + "\t@source_name = N'%s', \n" + + "\t@role_name = N'%s',\n" + + "\t@supports_net_changes = 0", + schemaName, tableName, CDC_ROLE_NAME)); // enables cdc on MODELS_SCHEMA.MODELS_STREAM_NAME, giving CDC_ROLE_NAME select access + failingToStart = false; + } catch (Exception e) { + if (retryNum >= maxRetries) { + throw e; + } else { + retryNum++; + try { + Thread.sleep(10000); // 10 seconds + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + } + } + + @Override + public String columnClause(Map columnsWithDataType, Optional primaryKey) { + StringBuilder columnClause = new StringBuilder(); + int i = 0; + for (Map.Entry column : columnsWithDataType.entrySet()) { + columnClause.append(column.getKey()); + columnClause.append(" "); + columnClause.append(column.getValue()); + if (primaryKey.isPresent() && primaryKey.get().equals(column.getKey())) { + columnClause.append(" PRIMARY KEY"); + } + if (i < (columnsWithDataType.size() - 1)) { + columnClause.append(","); + columnClause.append(" "); + } + i++; + } + return columnClause.toString(); + } + + @AfterEach + public void tearDown() { + try { + database.close(); + container.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + void testAssertCdcEnabledInDb() { + // since we enable cdc in setup, assert that we successfully pass this first + assertDoesNotThrow(() -> source.assertCdcEnabledInDb(config, testJdbcDatabase)); + // then disable cdc and assert the check fails + switchCdcOnDatabase(false, dbName); + assertThrows(RuntimeException.class, () -> source.assertCdcEnabledInDb(config, testJdbcDatabase)); + } + + @Test + void testAssertCdcSchemaQueryable() { + // correct access granted by setup so assert check passes + assertDoesNotThrow(() -> source.assertCdcSchemaQueryable(config, testJdbcDatabase)); + // now revoke perms and assert that check fails + alterPermissionsOnSchema(false, "cdc"); + assertThrows(com.microsoft.sqlserver.jdbc.SQLServerException.class, () -> source.assertCdcSchemaQueryable(config, testJdbcDatabase)); + } + + private void switchSqlServerAgentAndWait(Boolean start) throws InterruptedException { + String startOrStop = start ? "START" : "STOP"; + executeQuery(String.format("EXEC xp_servicecontrol N'%s',N'SQLServerAGENT';", startOrStop)); + Thread.sleep(15 * 1000); // 15 seconds to wait for change of agent state + } + + @Test + void testAssertSqlServerAgentRunning() throws InterruptedException { + executeQuery(String.format("USE master;\n" + "GRANT VIEW SERVER STATE TO %s", TEST_USER_NAME)); + // assert expected failure if sql server agent stopped + switchSqlServerAgentAndWait(false); + assertThrows(RuntimeException.class, () -> source.assertSqlServerAgentRunning(testJdbcDatabase)); + // assert success if sql server agent running + switchSqlServerAgentAndWait(true); + assertDoesNotThrow(() -> source.assertSqlServerAgentRunning(testJdbcDatabase)); + } + + @Test + void testAssertSnapshotIsolationAllowed() { + // snapshot isolation enabled by setup so assert check passes + assertDoesNotThrow(() -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase)); + // now disable snapshot isolation and assert that check fails + switchSnapshotIsolation(false, dbName); + assertThrows(RuntimeException.class, () -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase)); + } + + // Ensure the CDC check operations are included when CDC is enabled + // todo: make this better by checking the returned checkOperations from source.getCheckOperations + @Test + void testCdcCheckOperations() throws Exception { + // assertCdcEnabledInDb + switchCdcOnDatabase(false, dbName); + AirbyteConnectionStatus status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + switchCdcOnDatabase(true, dbName); + // assertCdcSchemaQueryable + alterPermissionsOnSchema(false, "cdc"); + status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + alterPermissionsOnSchema(true, "cdc"); + // assertSqlServerAgentRunning + executeQuery(String.format("USE master;\n" + "GRANT VIEW SERVER STATE TO %s", TEST_USER_NAME)); + switchSqlServerAgentAndWait(false); + status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + switchSqlServerAgentAndWait(true); + // assertSnapshotIsolationAllowed + switchSnapshotIsolation(false, dbName); + status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + } + + // todo: check LSN returned is actually the max LSN + // todo: check we fail as expected under certain conditions + @Test + void testGetTargetPosition() throws InterruptedException { + Thread.sleep(10 * 1000); // Sleeping because sometimes the db is not yet completely ready and the lsn is not found + // check that getTargetPosition returns higher Lsn after inserting new row + Lsn firstLsn = MssqlCdcTargetPosition.getTargetPosition(testJdbcDatabase, dbName).targetLsn; + executeQuery(String.format("USE %s; INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", + dbName, MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, 910019, 1, "another car")); + Thread.sleep(15 * 1000); // 15 seconds to wait for Agent capture job to log cdc change + Lsn secondLsn = MssqlCdcTargetPosition.getTargetPosition(testJdbcDatabase, dbName).targetLsn; + assertTrue(secondLsn.compareTo(firstLsn) > 0); + } + + @Override + protected void removeCDCColumns(ObjectNode data) { + data.remove(CDC_LSN); + data.remove(CDC_UPDATED_AT); + data.remove(CDC_DELETED_AT); + } + + @Override + protected CdcTargetPosition cdcLatestTargetPosition() { + try { + // Sleeping because sometimes the db is not yet completely ready and the lsn is not found + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + JdbcDatabase jdbcDatabase = Databases.createStreamingJdbcDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:sqlserver://%s:%s;databaseName=%s;", + config.get("host").asText(), + config.get("port").asInt(), + dbName), + DRIVER_CLASS, new MssqlJdbcStreamingQueryConfiguration(), null); + return MssqlCdcTargetPosition.getTargetPosition(jdbcDatabase, dbName); + } + + @Override + protected CdcTargetPosition extractPosition(JsonNode record) { + return new MssqlCdcTargetPosition(Lsn.valueOf(record.get(CDC_LSN).asText())); + } + + @Override + protected void assertNullCdcMetaData(JsonNode data) { + assertNull(data.get(CDC_LSN)); + assertNull(data.get(CDC_UPDATED_AT)); + assertNull(data.get(CDC_DELETED_AT)); + } + + @Override + protected void assertCdcMetaData(JsonNode data, boolean deletedAtNull) { + assertNotNull(data.get(CDC_LSN)); + assertNotNull(data.get(CDC_UPDATED_AT)); + if (deletedAtNull) { + assertTrue(data.get(CDC_DELETED_AT).isNull()); + } else { + assertFalse(data.get(CDC_DELETED_AT).isNull()); + } + } + + @Override + protected void addCdcMetadataColumns(AirbyteStream stream) { + ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + + final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); + final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); + properties.set(CDC_LSN, stringType); + properties.set(CDC_UPDATED_AT, numberType); + properties.set(CDC_DELETED_AT, numberType); + + } + + @Override + protected Source getSource() { + return new MssqlSource(); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected Database getDatabase() { + return database; + } + + @Override + protected void assertExpectedStateMessages(List stateMessages) { + assertEquals(1, stateMessages.size()); + assertNotNull(stateMessages.get(0).getData()); + assertNotNull(stateMessages.get(0).getData().get("cdc_state").get("state").get(MSSQL_CDC_OFFSET)); + assertNotNull(stateMessages.get(0).getData().get("cdc_state").get("state").get(MSSQL_DB_HISTORY)); + } + +} diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java index aa1d26203d08..880f8b7c5be9 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java @@ -125,7 +125,7 @@ private JsonNode getConfig(MSSQLServerContainer db) { .build()); } - private static Database getDatabase(JsonNode config) { + public static Database getDatabase(JsonNode config) { // todo (cgardens) - rework this abstraction so that we do not have to pass a null into the // constructor. at least explicitly handle it, even if the impl doesn't change. return Databases.createDatabase( diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java index c69f439c96f5..fe0d6e9a6e3a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java @@ -55,10 +55,14 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; import java.sql.SQLException; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.jooq.SQLDialect; @@ -69,6 +73,7 @@ public class CdcMySqlSourceTest extends CdcSourceTest { + private static final String DB_NAME = MODELS_SCHEMA; private MySQLContainer container; private Database database; private MySqlSource source; @@ -286,4 +291,31 @@ public void assertExpectedStateMessages(List stateMessages) } } + @Override + protected AirbyteCatalog expectedCatalogForDiscover() { + final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); + + createTable(MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", + columnClause(ImmutableMap.of(COL_ID, "INTEGER", COL_MAKE_ID, "INTEGER", COL_MODEL, "VARCHAR(200)"), Optional.empty())); + + List streams = expectedCatalog.getStreams(); + // stream with PK + streams.get(0).setSourceDefinedCursor(true); + addCdcMetadataColumns(streams.get(0)); + + AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_2", + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)); + streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList()); + streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + addCdcMetadataColumns(streamWithoutPK); + + streams.add(streamWithoutPK); + expectedCatalog.withStreams(streams); + return expectedCatalog; + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 4a45f6b6ef11..195c59bd0a4f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -37,7 +37,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; @@ -48,14 +47,9 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.debezium.CdcSourceTest; import io.airbyte.integrations.debezium.CdcTargetPosition; -import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaPrimitive; -import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; import java.util.List; @@ -254,24 +248,4 @@ public String createSchemaQuery(String schemaName) { return "CREATE SCHEMA " + schemaName + ";"; } - @Override - protected AirbyteCatalog expectedCatalogForDiscover() { - AirbyteCatalog catalog = super.expectedCatalogForDiscover(); - List streams = catalog.getStreams(); - - AirbyteStream randomStream = CatalogHelpers.createAirbyteStream( - MODELS_STREAM_NAME + "_random", - MODELS_SCHEMA + "_random", - Field.of(COL_ID + "_random", JsonSchemaPrimitive.NUMBER), - Field.of(COL_MAKE_ID + "_random", JsonSchemaPrimitive.NUMBER), - Field.of(COL_MODEL + "_random", JsonSchemaPrimitive.STRING)) - .withSourceDefinedCursor(true) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) - .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID + "_random"))); - addCdcMetadataColumns(randomStream); - streams.add(randomStream); - catalog.withStreams(streams); - return catalog; - } - } diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 3d2b51893b8a..96a317ebd554 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -2,7 +2,7 @@ ## Overview -The MSSQL source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. +The MSSQL source supports Full Refresh and Incremental syncs, including Change Data Capture. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. ### Resulting schema @@ -26,14 +26,16 @@ MSSQL data types are mapped to the following data types when synchronizing data: If you do not see a type in this list, assume that it is coerced into a string. We are happy to take feedback on preferred mappings. +Please see [this issue](https://github.com/airbytehq/airbyte/issues/4270) for description of unexpected behaviour for certain datatypes. + ### Features | Feature | Supported | Notes | | :--- | :--- | :--- | | Full Refresh Sync | Yes | | | Incremental Sync - Append | Yes | | -| Replicate Incremental Deletes | Coming soon | | -| Logical Replication \(WAL\) | Coming soon | | +| Replicate Incremental Deletes | Yes | | +| CDC (Change Data Capture) | Yes | | | SSL Support | Yes | | | SSH Tunnel Connection | Coming soon | | | Namespaces | Yes | Enabled by default | @@ -44,6 +46,7 @@ If you do not see a type in this list, assume that it is coerced into a string. 1. MSSQL Server `Azure SQL Database`, `Azure Synapse Analytics`, `Azure SQL Managed Instance`, `SQL Server 2019`, `SQL Server 2017`, `SQL Server 2016`, `SQL Server 2014`, `SQL Server 2012`, `PDW 2008R2 AU34`. 2. Create a dedicated read-only Airbyte user with access to all tables needed for replication +3. If you want to use CDC, please see [the relevant section below](mssql.md#Change-Data-Capture-:-CDC) for further setup requirements ### Setup guide @@ -59,10 +62,140 @@ _Coming soon: suggestions on how to create this user._ Your database user should now be ready for use with Airbyte. +## Change Data Capture : CDC + +We use [SQL Server's change data capture feature](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-2017) +to capture row-level `INSERT`, `UPDATE` and `DELETE` operations that occur on cdc-enabled tables. + +Some extra setup requiring at least *db_owner* permissions on the database(s) you intend to sync from will be required (detailed [below](mssql.md#Setting-up-CDC-for-MSSQL)). + +Please read the [CDC docs](../../understanding-airbyte/cdc.md) for an overview of how Airbyte approaches CDC. + +### Should I use CDC for MSSQL? + +* If you need a record of deletions and can accept the limitations posted below, CDC is the way to go! +* If your data set is small and/or you just want a snapshot of your table in the destination, consider using Full Refresh replication for your table instead of CDC. +* If the limitations below prevent you from using CDC and your goal is to maintain a snapshot of your table in the destination, consider using non-CDC incremental and occasionally reset the data and re-sync. +* If your table has a primary key but doesn't have a reasonable cursor field for incremental syncing \(i.e. `updated_at`\), CDC allows you to sync your table incrementally. + +### CDC Limitations + +* Make sure to read our [CDC docs](../../understanding-airbyte/cdc.md) to see limitations that impact all databases using CDC replication. +* There are some critical issues regarding certain datatypes. Please find detailed info in [this Github issue](https://github.com/airbytehq/airbyte/issues/4542). +* CDC is only available for SQL Server 2016 Service Pack 1 (SP1) and later. +* *db_owner* (or higher) permissions are required to perform the [neccessary setup](mssql.md#Setting-up-CDC-for-MSSQL) for CDC. +* You must enable [snapshot isolation mode](https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server) on the database(s) you want to sync. This is used for retrieving an initial snapshot without locking tables. +* On Linux, CDC is not supported on versions earlier than SQL Server 2017 CU18 (SQL Server 2019 is supported). +* Change data capture cannot be enabled on tables with a clustered columnstore index. (It can be enabled on tables with a *non-clustered* columnstore index). +* The SQL Server CDC feature processes changes that occur in user-created tables only. You cannot enable CDC on the SQL Server master database. +* Using variables with partition switching on databases or tables with change data capture (CDC) is not supported for the `ALTER TABLE` ... `SWITCH TO` ... `PARTITION` ... statement +* Our implementation has not been tested with managed instances, such as Azure SQL Database (we welcome any feedback from users who try this!) + * If you do want to try this, CDC can only be enabled on Azure SQL databases tiers above Standard 3 (S3+). Basic, S0, S1 and S2 tiers are not supported for CDC. +* Our CDC implementation uses at least once delivery for all change records. +* Read more on CDC limitations in the [Microsoft docs](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-2017#limitations). + +### Setting up CDC for MSSQL + +#### Enable CDC on database and tables + +MS SQL Server provides some built-in stored procedures to enable CDC. + +- To enable CDC, a SQL Server administrator with the necessary privileges (*db_owner* or *sysadmin*) must first run a query to enable CDC at the database level. +```text + USE {database name} + GO + EXEC sys.sp_cdc_enable_db + GO + ``` +- The administrator must then enable CDC for each table that you want to capture. Here's an example: +```text + USE {database name} + GO + + EXEC sys.sp_cdc_enable_table + @source_schema = N'{schema name}', + @source_name = N'{table name}', + @role_name = N'{role name}', [*] + @filegroup_name = N'{fiilegroup name}', [**] + @supports_net_changes = 0 [***] + GO +``` + - [*] Specifies a role which will gain `SELECT` permission on the captured columns of the source table. We suggest putting a value here so you can use this role in the next step but you can also set the value of @role_name to `NULL` to allow only *sysadmin* and *db_owner* to have access. Be sure that the credentials used to connect to the source in Airbyte align with this role so that Airbyte can access the cdc tables. + - [**] Specifies the filegroup where SQL Server places the change table. We recommend creating a separate filegroup for CDC but you can leave this parameter out to use the default filegroup. + - [***] If 0, only the support functions to query for all changes are generated. If 1, the functions that are needed to query for net changes are also generated. If supports_net_changes is set to 1, index_name must be specified, or the source table must have a defined primary key. + +- (For more details on parameters, see the [Microsoft doc page](https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-enable-table-transact-sql?view=sql-server-ver15) for this stored procedure). + + +- If you have many tables to enable CDC on and would like to avoid having to run this query one-by-one for every table, [this script](http://www.techbrothersit.com/2013/06/change-data-capture-cdc-sql-server_69.html) might help! + +For further detail, see the [Microsoft docs on enabling and disabling CDC](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-ver15). + +#### Enabling snapshot isolation + +- When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. To avoid acquiring table locks, Airbyte uses *snapshot isolation*, allowing simultaneous writes by other database clients. This must be enabled on the database like so: +```text + ALTER DATABASE {database name} + SET ALLOW_SNAPSHOT_ISOLATION ON; +``` + +#### Create a user and grant appropriate permissions +- Rather than use *sysadmin* or *db_owner* credentials, we recommend creating a new user with the relevant CDC access for use with Airbyte. First let's create the login and user and add to the [db_datareader](https://docs.microsoft.com/en-us/sql/relational-databases/security/authentication-access/database-level-roles?view=sql-server-ver15) role: +```text + USE {database name}; + CREATE LOGIN {user name} + WITH PASSWORD = '{password}'; + CREATE USER {user name} FOR LOGIN {user name}; + EXEC sp_addrolemember 'db_datareader', '{user name}'; +``` + - Add the user to the role specified earlier when enabling cdc on the table(s): +```text + EXEC sp_addrolemember '{role name}', '{user name}'; +``` + - This should be enough access, but if you run into problems, try also directly granting the user `SELECT` access on the cdc schema: +```text + USE {database name}; + GRANT SELECT ON SCHEMA :: [cdc] TO {user name}; +``` + - If feasible, granting this user 'VIEW SERVER STATE' permissions will allow Airbyte to check whether or not the [SQL Server Agent](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver15#relationship-with-log-reader-agent) is running. This is preferred as it ensures syncs will fail if the CDC tables are not being updated by the Agent in the source database. +```text + USE master; + GRANT VIEW SERVER STATE TO {user name}; +``` + +#### Extending the retention period of CDC data + +- In SQL Server, by default, only three days of data are retained in the change tables. Unless you are running very frequent syncs, we suggest increasing this retention so that in case of a failure in sync or if the sync is paused, there is still some bandwidth to start from the last point in incremental sync. +- These settings can be changed using the stored procedure [sys.sp_cdc_change_job](https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-change-job-transact-sql?view=sql-server-ver15) as below: +```text + -- we recommend 14400 minutes (10 days) as retention period + EXEC sp_cdc_change_job @job_type='cleanup', @retention = {minutes} +``` +- After making this change, a restart of the cleanup job is required: +```text + EXEC sys.sp_cdc_stop_job @job_type = 'cleanup'; + + EXEC sys.sp_cdc_start_job @job_type = 'cleanup'; +``` + +#### Ensuring the SQL Server Agent is running + +- MSSQL uses the SQL Server Agent to [run the jobs necessary](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver15#agent-jobs) for CDC. It is therefore vital that the Agent is operational in order for to CDC to work effectively. You can check the status of the SQL Server Agent as follows: +```text + EXEC xp_servicecontrol 'QueryState', N'SQLServerAGENT'; +``` +- If you see something other than 'Running.' please follow the [Microsoft docs](https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service?view=sql-server-ver15) to start the service. + +#### Setting up CDC on managed versions of SQL Server + +We readily welcome [contributions to our docs](https://github.com/airbytehq/airbyte/tree/master/docs) providing setup instructions. Please consider contributing to expand our docs! + + ## Changelog | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.3 | 2021-07-05 | [4689](https://github.com/airbytehq/airbyte/pull/4689) | Add CDC support | | 0.3.2 | 2021-06-09 | [3179](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE_ENTRYPOINT for Kubernetes support | | 0.3.1 | 2021-06-08 | [3893](https://github.com/airbytehq/airbyte/pull/3893) | Enable SSL connection | | 0.3.0 | 2021-04-21 | [2990](https://github.com/airbytehq/airbyte/pull/2990) | Support namespaces | diff --git a/docs/understanding-airbyte/cdc.md b/docs/understanding-airbyte/cdc.md index 737ab07bccaf..506ba61ab116 100644 --- a/docs/understanding-airbyte/cdc.md +++ b/docs/understanding-airbyte/cdc.md @@ -14,7 +14,7 @@ The Airbyte Protocol outputs records from sources. Records from `UPDATE` stateme We add some metadata columns for CDC sources: -* `ab_cdc_lsn` (specific to postgres source) is the point in the log where the record was retrieved +* `ab_cdc_lsn` (postgres and sql server sources) is the point in the log where the record was retrieved * `ab_cdc_log_file` & `ab_cdc_log_pos` (specific to mysql source) is the file name and position in the file where the record was retrieved * `ab_cdc_updated_at` is the timestamp for the database transaction that resulted in this record change and is present for records from `DELETE`/`INSERT`/`UPDATE` statements * `ab_cdc_deleted_at` is the timestamp for the database transaction that resulted in this record change and is only present for records from `DELETE` statements @@ -32,10 +32,10 @@ We add some metadata columns for CDC sources: * [Postgres](../integrations/sources/postgres.md) (For a quick video overview of CDC on Postgres, click [here](https://www.youtube.com/watch?v=NMODvLgZvuE&ab_channel=Airbyte)) * [MySQL](../integrations/sources/mysql.md) +* [Microsoft SQL Server / MSSQL](../integrations/sources/mssql.md) ## Coming Soon -* [SQL Server / MSSQL](../integrations/sources/mssql.md) * Oracle DB * Please [create a ticket](https://github.com/airbytehq/airbyte/issues/new/choose) if you need CDC support on another database!