Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL CDC sync fails because starting binlog position not found in DB #6425 #9514

Merged
merged 12 commits into from
Feb 10, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected static List<AirbyteRecordMessage> filterRecords(final Collection<Airby
.collect(Collectors.toList());
}

private ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
protected ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
final ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
for (final ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getSyncMode() == INCREMENTAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper.checkBinlog;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -21,8 +22,10 @@
import io.airbyte.integrations.base.ssh.SshWrappedSource;
import io.airbyte.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
Expand Down Expand Up @@ -97,9 +100,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));
if (isCdc(config)) {
checkOperations.addAll(List.of(getCheckOperation("log_bin", "ON"),
getCheckOperation("binlog_format", "ROW"),
getCheckOperation("binlog_row_image", "FULL")));
checkOperations.addAll(CdcConfigurationHelper.getCheckOperations());
}
return checkOperations;
}
Expand Down Expand Up @@ -180,8 +181,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(),
catalog, true);

return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState());
MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null));
cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database));
return handler.getIncrementalIterators(fetcher, new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,
Expand Down Expand Up @@ -210,23 +213,4 @@ public enum ReplicationMethod {
CDC
}

private CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name, String value) {
return database -> {
final List<String> result = database.resultSetQuery(connection -> {
final String sql = String.format("show variables where Variable_name = '%s'", name);

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (result.size() != 1) {
throw new RuntimeException(String.format("Could not query the variable %s", name));
}

final String resultValue = result.get(0);
if (!resultValue.equalsIgnoreCase(value)) {
throw new RuntimeException(String.format("The variable %s should be set to %s, but it is : %s", name, value, resultValue));
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.helpers;

import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcConfigurationHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigurationHelper.class);
private static final String CDC_OFFSET = "mysql_cdc_offset";
private static final String LOG_BIN = "log_bin";
private static final String BINLOG_FORMAT = "binlog_format";
private static final String BINLOG_ROW_IMAGE = "binlog_row_image";

public static void checkBinlog(JsonNode offset, JdbcDatabase database) {
Optional<String> binlogOptional = getBinlog(offset);
if (binlogOptional.isPresent()) {
String binlog = binlogOptional.get();
if (isBinlogAvailable(binlog, database)) {
LOGGER.info("""
Binlog %s is available""".formatted(binlog));
} else {
String error =
"""
Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. " To fix data synchronization you need to reset your data. Please check binlog retention policy configurations."""
.formatted(binlog);
LOGGER.error(error);
throw new RuntimeException("""
Binlog %s is not available.""".formatted(binlog));
}
}
}

public static List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations() {
return List.of(getCheckOperation(LOG_BIN, "ON"),
getCheckOperation(BINLOG_FORMAT, "ROW"),
getCheckOperation(BINLOG_ROW_IMAGE, "FULL"));

}

private static boolean isBinlogAvailable(String binlog, JdbcDatabase database) {
try {
List<String> binlogs = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"),
resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList());

return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog));
} catch (SQLException e) {
LOGGER.error("Can not get binlog list. Error: ", e);
throw new RuntimeException(e);
}
}

private static Optional<String> getBinlog(JsonNode offset) {
JsonNode node = offset.get(CDC_OFFSET);
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> jsonField = fields.next();
return Optional.ofNullable(Jsons.deserialize(jsonField.getValue().asText()).path("file").asText());
}
return Optional.empty();
}

private static CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name, String value) {
return database -> {
final List<String> result = database.resultSetQuery(connection -> {
final String sql = """
show variables where Variable_name = '%s'""".formatted(name);

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (result.size() != 1) {
throw new RuntimeException("""
Could not query the variable %s""".formatted(name));
}

final String resultValue = result.get(0);
if (!resultValue.equalsIgnoreCase(value)) {
throw new RuntimeException("""
The variable %s should be set to %s, but it is : %s""".formatted(name, value, resultValue));
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +18,9 @@
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -23,7 +30,9 @@
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
Expand Down Expand Up @@ -52,7 +61,7 @@ protected JsonNode getConfig() {
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
Expand All @@ -62,9 +71,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
Expand All @@ -74,7 +83,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}

@Override
Expand Down Expand Up @@ -143,4 +152,30 @@ protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Test
public void testIncrementalSyncFailedIfBinlogIsDeleted() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
// only sync incremental streams
configuredCatalog.setStreams(
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));

final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = stateMessages.get(stateMessages.size() - 1).getData();
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
executeQuery("RESET MASTER;");
assertThrows(Exception.class, () -> filterRecords(runRead(configuredCatalog, latestState)));
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB |
| 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description |
| 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types |
| 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication |
Expand Down