From f59429862034c79c30670e9c16324b3cac3e26a1 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Thu, 16 Jun 2022 17:11:06 +0300 Subject: [PATCH 01/10] 6339: debug info --- .../connectors/source-mssql/Dockerfile | 2 +- .../integrations/source/mssql/MssqlSource.java | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index eadfd5211f0a..13e811944507 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -13,7 +13,7 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION source-mssql - +ENV JAVA_TOOL_OPTIONS -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005 COPY --from=build /airbyte /airbyte LABEL io.airbyte.version=0.4.2 diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 2a770d8e1ddd..49cff5902aa9 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -35,6 +35,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.SyncMode; import java.io.File; +import java.sql.DatabaseMetaData; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -102,6 +103,10 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); + DatabaseMetaData databaseMetaData = connection.getMetaData(); + String dbProductName = databaseMetaData.getDatabaseProductName(); + String dbProductVersion = databaseMetaData.getDatabaseProductVersion(); + System.out.println("1================================Product name/version: " + dbProductName + "/" + dbProductVersion); final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString(); final List newColumnNames = getWrappedColumn(database, columnNames, schemaName, tableName, identifierQuoteString); @@ -252,6 +257,10 @@ public List> getCheckOperations(final J protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase database) throws SQLException { final List queryResponse = database.queryJsons(connection -> { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + String dbProductName = databaseMetaData.getDatabaseProductName(); + String dbProductVersion = databaseMetaData.getDatabaseProductVersion(); + System.out.println("2================================Product name/version: " + dbProductName + "/" + dbProductVersion); final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("database").asText()); @@ -275,6 +284,10 @@ protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase da protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabase database) throws SQLException { final List queryResponse = database.queryJsons(connection -> { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + String dbProductName = databaseMetaData.getDatabaseProductName(); + String dbProductVersion = databaseMetaData.getDatabaseProductVersion(); + System.out.println("================================Product name/version: " + dbProductName + "/" + dbProductVersion); final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; final PreparedStatement ps = connection.prepareStatement(sql); LOGGER.info(String.format( From 3c17eb95848dcd895322651f7b550d8fd67fd47c Mon Sep 17 00:00:00 2001 From: kimerinn Date: Fri, 17 Jun 2022 02:50:30 +0300 Subject: [PATCH 02/10] 6339: not using 'USE' on Azure SQL servers --- .../source/mssql/MssqlSource.java | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 49cff5902aa9..567946a4f811 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -38,7 +38,9 @@ import java.sql.DatabaseMetaData; import java.sql.JDBCType; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -73,9 +75,9 @@ public static Source sshWrappedSource() { @Override public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName) { + final List columnNames, + final String schemaName, + final String tableName) { LOGGER.info("Queueing query for table: {}", tableName); final List newIdentifiersList = getWrappedColumn(database, @@ -91,12 +93,12 @@ public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase @Override public AutoCloseableIterator queryTableIncremental(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final String cursorField, - final JDBCType cursorFieldType, - final String cursor) { + final List columnNames, + final String schemaName, + final String tableName, + final String cursorField, + final JDBCType cursorFieldType, + final String cursor) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { @@ -131,19 +133,17 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase } /** - * There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be - * converted to a nvarchar(4000) data type by calling the ToString() method. So we make a separate - * query to get Table's MetaData, check is there any hierarchyid columns, and wrap required fields - * with the ToString() function in the final Select query. Reference: - * https://docs.microsoft.com/en-us/sql/t-sql/data-types/hierarchyid-data-type-method-reference?view=sql-server-ver15#data-type-conversion + * There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be converted to a nvarchar(4000) data type by + * calling the ToString() method. So we make a separate query to get Table's MetaData, check is there any hierarchyid columns, and wrap required + * fields with the ToString() function in the final Select query. Reference: https://docs.microsoft.com/en-us/sql/t-sql/data-types/hierarchyid-data-type-method-reference?view=sql-server-ver15#data-type-conversion * * @return the list with Column names updated to handle functions (if nay) properly */ private List getWrappedColumn(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final String enquoteSymbol) { + final List columnNames, + final String schemaName, + final String tableName, + final String enquoteSymbol) { final List hierarchyIdColumns = new ArrayList<>(); try { final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database @@ -257,10 +257,6 @@ public List> getCheckOperations(final J protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase database) throws SQLException { final List queryResponse = database.queryJsons(connection -> { - DatabaseMetaData databaseMetaData = connection.getMetaData(); - String dbProductName = databaseMetaData.getDatabaseProductName(); - String dbProductVersion = databaseMetaData.getDatabaseProductVersion(); - System.out.println("2================================Product name/version: " + dbProductName + "/" + dbProductVersion); final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("database").asText()); @@ -284,11 +280,16 @@ protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase da protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabase database) throws SQLException { final List queryResponse = database.queryJsons(connection -> { - DatabaseMetaData databaseMetaData = connection.getMetaData(); - String dbProductName = databaseMetaData.getDatabaseProductName(); - String dbProductVersion = databaseMetaData.getDatabaseProductVersion(); - System.out.println("================================Product name/version: " + dbProductName + "/" + dbProductVersion); - final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; + boolean isAzureSQL = false; + + try (Statement stmt = connection.createStatement(); + ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) { + isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1)); + } + + final String sql = + isAzureSQL ? "SELECT * FROM cdc.change_tables" : "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; + final PreparedStatement ps = connection.prepareStatement(sql); LOGGER.info(String.format( "Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'", @@ -361,11 +362,11 @@ protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcD @Override public List> getIncrementalIterators( - final JdbcDatabase database, - final ConfiguredAirbyteCatalog catalog, - final Map>> tableNameToTable, - final StateManager stateManager, - final Instant emittedAt) { + final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { final JsonNode sourceConfig = database.getSourceConfig(); if (MssqlCdcHelper.isCdc(sourceConfig) && shouldUseCDC(catalog)) { LOGGER.info("using CDC: {}", true); From 49b474ed459e919971b79e6de3a6e4444e83ae75 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Fri, 17 Jun 2022 02:51:17 +0300 Subject: [PATCH 03/10] 6339: cleanup --- airbyte-integrations/connectors/source-mssql/Dockerfile | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 13e811944507..445153e54104 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -13,7 +13,6 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION source-mssql -ENV JAVA_TOOL_OPTIONS -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005 COPY --from=build /airbyte /airbyte LABEL io.airbyte.version=0.4.2 From 54cf1654d5aa560fb0e78b2f3684fe947537cc51 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Fri, 17 Jun 2022 02:55:35 +0300 Subject: [PATCH 04/10] 6339: cleanup2 --- airbyte-integrations/connectors/source-mssql/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 445153e54104..eadfd5211f0a 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -13,6 +13,7 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION source-mssql + COPY --from=build /airbyte /airbyte LABEL io.airbyte.version=0.4.2 From 077314e639124ff2ca3b34620fc720b99f9310d7 Mon Sep 17 00:00:00 2001 From: kimerinn Date: Fri, 17 Jun 2022 02:56:50 +0300 Subject: [PATCH 05/10] 6339: cleanup3 --- .../io/airbyte/integrations/source/mssql/MssqlSource.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 567946a4f811..81199ad8e02f 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -105,10 +105,6 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - DatabaseMetaData databaseMetaData = connection.getMetaData(); - String dbProductName = databaseMetaData.getDatabaseProductName(); - String dbProductVersion = databaseMetaData.getDatabaseProductVersion(); - System.out.println("1================================Product name/version: " + dbProductName + "/" + dbProductVersion); final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString(); final List newColumnNames = getWrappedColumn(database, columnNames, schemaName, tableName, identifierQuoteString); From 415f274ff80b8d55f490d01cd5346d57f11fb80e Mon Sep 17 00:00:00 2001 From: kimerinn Date: Fri, 17 Jun 2022 13:31:09 +0300 Subject: [PATCH 06/10] 6339: versions/changelogs updated --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- airbyte-integrations/connectors/source-mssql/Dockerfile | 2 +- .../java/io/airbyte/integrations/source/mssql/MssqlSource.java | 1 - docs/integrations/sources/mssql.md | 1 + 5 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7a0fa2a1ba64..1c61e38413e1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -525,7 +525,7 @@ - name: Microsoft SQL Server (MSSQL) sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 dockerRepository: airbyte/source-mssql - dockerImageTag: 0.4.2 + dockerImageTag: 0.4.3 documentationUrl: https://docs.airbyte.io/integrations/sources/mssql icon: mssql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 54ac042ae145..7d5a6c2dbcbf 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4824,7 +4824,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:0.4.2" +- dockerImage: "airbyte/source-mssql:0.4.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index eadfd5211f0a..e52ba8240154 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.2 +LABEL io.airbyte.version=0.4.3 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 81199ad8e02f..3efa8f837590 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -35,7 +35,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.SyncMode; import java.io.File; -import java.sql.DatabaseMetaData; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 203b6ddc004b..1a0e730d5063 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -302,6 +302,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------|:-----------| :----------------------------------------------------- | :------------------------------------- | +| 0.4.3 | 2022-06-17 | [13866](https://github.com/airbytehq/airbyte/pull/13866) | Fixed connection to Azure SQL with CDC enabled | | 0.4.2 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.1 | 2022-05-25 | [13419](https://github.com/airbytehq/airbyte/pull/13419) | Correct enum for Standard method. | | 0.4.0 | 2022-05-25 | [12759](https://github.com/airbytehq/airbyte/pull/12759) [13168](https://github.com/airbytehq/airbyte/pull/13168) | For CDC, Add option to ignore existing data and only sync new changes from the database. | From febf0877787fb0281c15560312366a6450e1e40b Mon Sep 17 00:00:00 2001 From: kimerinn Date: Mon, 20 Jun 2022 16:11:48 +0300 Subject: [PATCH 07/10] 6339: merge from master (consolidation issue) --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- airbyte-integrations/connectors/source-mssql/Dockerfile | 2 +- docs/integrations/sources/mssql.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7bdc4054c69f..871bceaf8424 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -525,7 +525,7 @@ - name: Microsoft SQL Server (MSSQL) sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 dockerRepository: airbyte/source-mssql - dockerImageTag: 0.4.3 + dockerImageTag: 0.4.4 documentationUrl: https://docs.airbyte.io/integrations/sources/mssql icon: mssql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 0dd75580c048..90852882060b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4829,7 +4829,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:0.4.3" +- dockerImage: "airbyte/source-mssql:0.4.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index e52ba8240154..9b139b9580c4 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.3 +LABEL io.airbyte.version=0.4.4 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index e13158a1c23b..2712d2997e14 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -302,6 +302,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------|:-----------| :----------------------------------------------------- |:-------------------------------------------------------------------------------------------------------| +| 0.4.4 | 2022-07-20 | [13866](https://github.com/airbytehq/airbyte/pull/13866) | Omit using 'USE' keyword on Azure SQL with CDC | | 0.4.3 | 2022-07-17 | [13887](https://github.com/airbytehq/airbyte/pull/13887) | Increase version to include changes from [13854](https://github.com/airbytehq/airbyte/pull/13854) | | 0.4.2 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.1 | 2022-05-25 | [13419](https://github.com/airbytehq/airbyte/pull/13419) | Correct enum for Standard method. | From 1ac4fd0e288cf0bf22faa352945d5d7cb39163db Mon Sep 17 00:00:00 2001 From: kimerinn Date: Mon, 20 Jun 2022 17:17:39 +0300 Subject: [PATCH 08/10] 6339: dev connector version (for testing in airbyte cloud) --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- airbyte-integrations/connectors/source-mssql/Dockerfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 871bceaf8424..7bdc4054c69f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -525,7 +525,7 @@ - name: Microsoft SQL Server (MSSQL) sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 dockerRepository: airbyte/source-mssql - dockerImageTag: 0.4.4 + dockerImageTag: 0.4.3 documentationUrl: https://docs.airbyte.io/integrations/sources/mssql icon: mssql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 90852882060b..0dd75580c048 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4829,7 +4829,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:0.4.4" +- dockerImage: "airbyte/source-mssql:0.4.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 9b139b9580c4..dfeb0234865f 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.4 +LABEL io.airbyte.version=0.4.4-dev LABEL io.airbyte.name=airbyte/source-mssql From eda1a06702850d4e0a8db29559adb249f90b623c Mon Sep 17 00:00:00 2001 From: kimerinn Date: Wed, 22 Jun 2022 11:53:08 +0300 Subject: [PATCH 09/10] 6339: code review implementation --- airbyte-integrations/connectors/source-mssql/Dockerfile | 2 +- .../java/io/airbyte/integrations/source/mssql/MssqlSource.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index dfeb0234865f..9b139b9580c4 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.4-dev +LABEL io.airbyte.version=0.4.4 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 3efa8f837590..be087ab3867c 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -282,6 +282,7 @@ protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabas isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1)); } + //Azure SQL does not support USE clause final String sql = isAzureSQL ? "SELECT * FROM cdc.change_tables" : "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; From 0026bd8cd840fa5659cb08ef537b6ef63e12deaf Mon Sep 17 00:00:00 2001 From: kimerinn Date: Wed, 22 Jun 2022 11:55:30 +0300 Subject: [PATCH 10/10] 6339: apply formatting --- .../source/mssql/MssqlSource.java | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index be087ab3867c..6f7531db0806 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -74,9 +74,9 @@ public static Source sshWrappedSource() { @Override public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName) { + final List columnNames, + final String schemaName, + final String tableName) { LOGGER.info("Queueing query for table: {}", tableName); final List newIdentifiersList = getWrappedColumn(database, @@ -92,12 +92,12 @@ public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase @Override public AutoCloseableIterator queryTableIncremental(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final String cursorField, - final JDBCType cursorFieldType, - final String cursor) { + final List columnNames, + final String schemaName, + final String tableName, + final String cursorField, + final JDBCType cursorFieldType, + final String cursor) { LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { @@ -128,17 +128,19 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase } /** - * There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be converted to a nvarchar(4000) data type by - * calling the ToString() method. So we make a separate query to get Table's MetaData, check is there any hierarchyid columns, and wrap required - * fields with the ToString() function in the final Select query. Reference: https://docs.microsoft.com/en-us/sql/t-sql/data-types/hierarchyid-data-type-method-reference?view=sql-server-ver15#data-type-conversion + * There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be + * converted to a nvarchar(4000) data type by calling the ToString() method. So we make a separate + * query to get Table's MetaData, check is there any hierarchyid columns, and wrap required fields + * with the ToString() function in the final Select query. Reference: + * https://docs.microsoft.com/en-us/sql/t-sql/data-types/hierarchyid-data-type-method-reference?view=sql-server-ver15#data-type-conversion * * @return the list with Column names updated to handle functions (if nay) properly */ private List getWrappedColumn(final JdbcDatabase database, - final List columnNames, - final String schemaName, - final String tableName, - final String enquoteSymbol) { + final List columnNames, + final String schemaName, + final String tableName, + final String enquoteSymbol) { final List hierarchyIdColumns = new ArrayList<>(); try { final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database @@ -282,7 +284,7 @@ protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabas isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1)); } - //Azure SQL does not support USE clause + // Azure SQL does not support USE clause final String sql = isAzureSQL ? "SELECT * FROM cdc.change_tables" : "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; @@ -358,11 +360,11 @@ protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcD @Override public List> getIncrementalIterators( - final JdbcDatabase database, - final ConfiguredAirbyteCatalog catalog, - final Map>> tableNameToTable, - final StateManager stateManager, - final Instant emittedAt) { + final JdbcDatabase database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { final JsonNode sourceConfig = database.getSourceConfig(); if (MssqlCdcHelper.isCdc(sourceConfig) && shouldUseCDC(catalog)) { LOGGER.info("using CDC: {}", true);