Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Phlair committed Jul 13, 2021
1 parent 12e6ed0 commit 2f87d62
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public class MssqlCdcTargetPosition implements CdcTargetPosition {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class);
private final Lsn targetLsn;
public final Lsn targetLsn;

public MssqlCdcTargetPosition(Lsn targetLsn) {
this.targetLsn = targetLsn;
Expand Down Expand Up @@ -84,17 +84,19 @@ public int hashCode() {
return targetLsn.hashCode();
}

public static MssqlCdcTargetPosition getTargetPostion(JdbcDatabase database) {
public static MssqlCdcTargetPosition getTargetPosition(JdbcDatabase database, String dbName) {
try {
final List<JsonNode> jsonNodes = database
.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;"), JdbcUtils::rowToJson);
.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(
"USE " + dbName + "; SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;"), JdbcUtils::rowToJson);
Preconditions.checkState(jsonNodes.size() == 1);
if (jsonNodes.get(0).get("max_lsn") != null) {
Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue());
LOGGER.info("identified target lsn: " + maxLsn);
return new MssqlCdcTargetPosition(maxLsn);
} else {
throw new RuntimeException("Max LSN is null, see docs"); // todo: make this error way better
throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " +
"Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service?view=sql-server-ver15)");
}
} catch (SQLException | IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.io.File;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -134,104 +135,94 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(JsonNod
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));

if (isCdc(config)) {
checkOperations.add(database -> assertCdcEnabledInDb(config, database));
checkOperations.add(database -> assertCdcSchemaQueryable(config, database));
checkOperations.add(database -> assertSqlServerAgentRunning(database));
checkOperations.add(database -> assertSnapshotIsolationAllowed(config, database));
}

// note, it's possible these queries could fail if user doesn't have correct permissions
// hopefully in these cases it should be evident from the SQLServerException thrown

// check that cdc is enabled on database
checkOperations.add(database -> {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, config.get("database").asText());
LOGGER.info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'",
config.get("database").asText(), sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());

if (queryResponse.size() < 1) {
throw new RuntimeException(String.format(
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
config.get("database").asText()));
}
return checkOperations;
}

if (!(queryResponse.get(0).get("is_cdc_enabled").asBoolean())) {
throw new RuntimeException(String.format(
"Detected that CDC is not enabled for database '%s'. Please check the documentation on how to enable CDC on MS SQL Server.",
config.get("database").asText()));
}
});

// check that we can query cdc schema and check we have at least 1 table with cdc enabled that this
// user can see
checkOperations.add(database -> {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT * FROM cdc.change_tables";
PreparedStatement ps = connection.prepareStatement(sql);
LOGGER.info(String.format("Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'",
config.get("username").asText(), sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());

if (queryResponse.size() < 1) {
throw new RuntimeException("No cdc-enabled tables found. Please check the documentation on how to enable CDC on MS SQL Server.");
}
});

// check sql server agent is running
// todo: ensure this works for Azure managed SQL (since it uses different sql server agent)
checkOperations.add(database -> {
try {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%'";
PreparedStatement ps = connection.prepareStatement(sql);
LOGGER.info(String.format("Checking that the SQL Server Agent is running using the query: '%s'", sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());

if (!(queryResponse.get(0).get("status_desc").toString().contains("Running"))) {
throw new RuntimeException(String.format(
"The SQL Server Agent is not running. Current state: '%s'. Please check the documentation on ensuring SQL Server Agent is running.",
queryResponse.get(0).get("status_desc").toString()));
}
} catch (Exception e) {
if (e.getCause() != null && e.getCause().getClass().equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) {
LOGGER.warn(String.format("Skipping check for whether the SQL Server Agent is running, SQLServerException thrown: '%s'",
e.getMessage()));
} else {
throw e;
}
}
});

// check that snapshot isolation is allowed
checkOperations.add(database -> {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, config.get("database").asText());
LOGGER.info(String.format("Checking that snapshot isolation is enabled on database '%s' using the query: '%s'",
config.get("database").asText(), sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());

if (queryResponse.size() < 1) {
throw new RuntimeException(String.format(
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
config.get("database").asText()));
}
protected void assertCdcEnabledInDb(JsonNode config, JdbcDatabase database) throws SQLException {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, config.get("database").asText());
LOGGER.info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'",
config.get("database").asText(), sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());
if (queryResponse.size() < 1) {
throw new RuntimeException(String.format(
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
config.get("database").asText()));
}
if (!(queryResponse.get(0).get("is_cdc_enabled").asBoolean())) {
throw new RuntimeException(String.format(
"Detected that CDC is not enabled for database '%s'. Please check the documentation on how to enable CDC on MS SQL Server.",
config.get("database").asText()));
}
}

if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) {
throw new RuntimeException(String.format(
"Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. "
+ "Please check the documentation on how to enable snapshot isolation on MS SQL Server.",
config.get("database").asText()));
}
});
protected void assertCdcSchemaQueryable(JsonNode config, JdbcDatabase database) throws SQLException {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables";
PreparedStatement ps = connection.prepareStatement(sql);
LOGGER.info(String.format("Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'",
config.get("username").asText(), sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());
// Ensure at least one available CDC table
if (queryResponse.size() < 1) {
throw new RuntimeException("No cdc-enabled tables found. Please check the documentation on how to enable CDC on MS SQL Server.");
}
}

// todo: ensure this works for Azure managed SQL (since it uses different sql server agent)
protected void assertSqlServerAgentRunning(JdbcDatabase database) throws SQLException {
try {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%'";
PreparedStatement ps = connection.prepareStatement(sql);
LOGGER.info(String.format("Checking that the SQL Server Agent is running using the query: '%s'", sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());
if (!(queryResponse.get(0).get("status_desc").toString().contains("Running"))) {
throw new RuntimeException(String.format(
"The SQL Server Agent is not running. Current state: '%s'. Please check the documentation on ensuring SQL Server Agent is running.",
queryResponse.get(0).get("status_desc").toString()));
}
} catch (Exception e) {
if (e.getCause() != null && e.getCause().getClass().equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) {
LOGGER.warn(String.format("Skipping check for whether the SQL Server Agent is running, SQLServerException thrown: '%s'",
e.getMessage()));
} else {
throw e;
}
}
}

return checkOperations;
protected void assertSnapshotIsolationAllowed(JsonNode config, JdbcDatabase database) throws SQLException {
List<JsonNode> queryResponse = database.query(connection -> {
final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, config.get("database").asText());
LOGGER.info(String.format("Checking that snapshot isolation is enabled on database '%s' using the query: '%s'",
config.get("database").asText(), sql));
return ps;
}, JdbcUtils::rowToJson).collect(toList());
if (queryResponse.size() < 1) {
throw new RuntimeException(String.format(
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
config.get("database").asText()));
}
if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) {
throw new RuntimeException(String.format(
"Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. "
+ "Please check the documentation on how to enable snapshot isolation on MS SQL Server.",
config.get("database").asText()));
}
}

@Override
Expand All @@ -243,7 +234,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(JdbcD
JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
LOGGER.info("using CDC: {}", true);
AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, MssqlCdcTargetPosition.getTargetPostion(database),
AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig,
MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get("database").asText()),
MssqlCdcProperties.getDebeziumProperties(), catalog, true);
return handler.getIncrementalIterators(new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MssqlCdcStateHandler(stateManager), new MssqlCdcConnectorMetadataInjector(), emittedAt);
Expand Down
Loading

0 comments on commit 2f87d62

Please sign in to comment.