Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6339: error when attempting to use azure sql database within an elastic pool as source for cdc based replication #13866

Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.2
dockerImageTag: 0.4.3
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4824,7 +4824,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.2"
- dockerImage: "airbyte/source-mssql:0.4.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.2
LABEL io.airbyte.version=0.4.3
LABEL io.airbyte.name=airbyte/source-mssql
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.io.File;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -72,9 +74,9 @@ public static Source sshWrappedSource() {

@Override
public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
final List<String> columnNames,
final String schemaName,
final String tableName) {
LOGGER.info("Queueing query for table: {}", tableName);

final List<String> newIdentifiersList = getWrappedColumn(database,
Expand All @@ -90,12 +92,12 @@ public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final JdbcDatabase

@Override
public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final String cursorField,
final JDBCType cursorFieldType,
final String cursor) {
final List<String> columnNames,
final String schemaName,
final String tableName,
final String cursorField,
final JDBCType cursorFieldType,
final String cursor) {
LOGGER.info("Queueing query for table: {}", tableName);
return AutoCloseableIterators.lazyIterator(() -> {
try {
Expand Down Expand Up @@ -126,19 +128,17 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase
}

/**
* There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be
* converted to a nvarchar(4000) data type by calling the ToString() method. So we make a separate
* query to get Table's MetaData, check is there any hierarchyid columns, and wrap required fields
* with the ToString() function in the final Select query. Reference:
* https://docs.microsoft.com/en-us/sql/t-sql/data-types/hierarchyid-data-type-method-reference?view=sql-server-ver15#data-type-conversion
* There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be converted to a nvarchar(4000) data type by
* calling the ToString() method. So we make a separate query to get Table's MetaData, check is there any hierarchyid columns, and wrap required
* fields with the ToString() function in the final Select query. Reference: https://docs.microsoft.com/en-us/sql/t-sql/data-types/hierarchyid-data-type-method-reference?view=sql-server-ver15#data-type-conversion
*
* @return the list with Column names updated to handle functions (if nay) properly
*/
private List<String> getWrappedColumn(final JdbcDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final String enquoteSymbol) {
final List<String> columnNames,
final String schemaName,
final String tableName,
final String enquoteSymbol) {
final List<String> hierarchyIdColumns = new ArrayList<>();
try {
final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database
Expand Down Expand Up @@ -275,7 +275,16 @@ protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase da
protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabase database)
throws SQLException {
final List<JsonNode> queryResponse = database.queryJsons(connection -> {
final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables";
boolean isAzureSQL = false;

try (Statement stmt = connection.createStatement();
ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) {
isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1));
}

final String sql =
isAzureSQL ? "SELECT * FROM cdc.change_tables" : "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables";
kimerinn marked this conversation as resolved.
Show resolved Hide resolved

final PreparedStatement ps = connection.prepareStatement(sql);
LOGGER.info(String.format(
"Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'",
Expand Down Expand Up @@ -348,11 +357,11 @@ protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcD

@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
if (MssqlCdcHelper.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
LOGGER.info("using CDC: {}", true);
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----------------------------------------------------- | :------------------------------------- |
| 0.4.3 | 2022-06-17 | [13866](https://github.com/airbytehq/airbyte/pull/13866) | Fixed connection to Azure SQL with CDC enabled |
| 0.4.2 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size |
| 0.4.1 | 2022-05-25 | [13419](https://github.com/airbytehq/airbyte/pull/13419) | Correct enum for Standard method. |
| 0.4.0 | 2022-05-25 | [12759](https://github.com/airbytehq/airbyte/pull/12759) [13168](https://github.com/airbytehq/airbyte/pull/13168) | For CDC, Add option to ignore existing data and only sync new changes from the database. |
Expand Down