Skip to content

Commit

Permalink
MySQL CDC sync fails because starting binlog position not found in DB #…
Browse files Browse the repository at this point in the history
…6425 (#9514)

* Check binlog position on mysql server before run sync job, add error description into log

* fix MySqlStrictEncryptSourceAcceptanceTest

* fix formatting

* fix review comments

* added java docs and fixed few minor comments

* fix formatting

* update versions

* update source_specs.yaml
  • Loading branch information
mkhokh-33 authored Feb 10, 2022
1 parent e3e05d7 commit b230543
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.5.1",
"dockerImageTag": "0.5.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.5.1
dockerImageTag: 0.5.2
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4963,7 +4963,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.5.1"
- dockerImage: "airbyte/source-mysql:0.5.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected static List<AirbyteRecordMessage> filterRecords(final Collection<Airby
.collect(Collectors.toList());
}

private ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
protected ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
final ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
for (final ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getSyncMode() == INCREMENTAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.5.1
LABEL io.airbyte.version=0.5.2
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper.checkBinlog;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -21,8 +22,10 @@
import io.airbyte.integrations.base.ssh.SshWrappedSource;
import io.airbyte.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
Expand Down Expand Up @@ -97,9 +100,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));
if (isCdc(config)) {
checkOperations.addAll(List.of(getCheckOperation("log_bin", "ON"),
getCheckOperation("binlog_format", "ROW"),
getCheckOperation("binlog_row_image", "FULL")));
checkOperations.addAll(CdcConfigurationHelper.getCheckOperations());
}
return checkOperations;
}
Expand Down Expand Up @@ -180,8 +181,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(),
catalog, true);

return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState());
MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null));
cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database));
return handler.getIncrementalIterators(fetcher, new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,
Expand Down Expand Up @@ -210,23 +213,4 @@ public enum ReplicationMethod {
CDC
}

private CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name, String value) {
return database -> {
final List<String> result = database.resultSetQuery(connection -> {
final String sql = String.format("show variables where Variable_name = '%s'", name);

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (result.size() != 1) {
throw new RuntimeException(String.format("Could not query the variable %s", name));
}

final String resultValue = result.get(0);
if (!resultValue.equalsIgnoreCase(value)) {
throw new RuntimeException(String.format("The variable %s should be set to %s, but it is : %s", name, value, resultValue));
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql.helpers;

import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Helper class for MySqlSource used to check cdc configuration in case of:
* <p>
* 1. adding new source and checking operations #getCheckOperations method.
* </p>
* <p>
* 2. checking whether binlog required from saved cdc offset is available on mysql server
* #checkBinlog method
* </p>
*/
public class CdcConfigurationHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigurationHelper.class);
private static final String CDC_OFFSET = "mysql_cdc_offset";
private static final String LOG_BIN = "log_bin";
private static final String BINLOG_FORMAT = "binlog_format";
private static final String BINLOG_ROW_IMAGE = "binlog_row_image";

/**
* Method will check whether required binlog is available on mysql server
*
* @param offset - saved cdc offset with required binlog file
* @param database - database
*/
public static void checkBinlog(JsonNode offset, JdbcDatabase database) {
Optional<String> binlogOptional = getBinlog(offset);
binlogOptional.ifPresent(binlog -> {
if (isBinlogAvailable(binlog, database)) {
LOGGER.info("""
Binlog %s is available""".formatted(binlog));
} else {
String error =
"""
Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. To fix data synchronization you need to reset your data. Please check binlog retention policy configurations."""
.formatted(binlog);
LOGGER.error(error);
throw new RuntimeException("""
Binlog %s is not available.""".formatted(binlog));
}
});
}

/**
* Method will get required configurations for cdc sync
*
* @return list of List<CheckedConsumer<JdbcDatabase, Exception>>
*/
public static List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations() {
return List.of(getCheckOperation(LOG_BIN, "ON"),
getCheckOperation(BINLOG_FORMAT, "ROW"),
getCheckOperation(BINLOG_ROW_IMAGE, "FULL"));

}

private static boolean isBinlogAvailable(String binlog, JdbcDatabase database) {
try {
List<String> binlogs = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"),
resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList());

return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog));
} catch (SQLException e) {
LOGGER.error("Can not get binlog list. Error: ", e);
throw new RuntimeException(e);
}
}

private static Optional<String> getBinlog(JsonNode offset) {
JsonNode node = offset.get(CDC_OFFSET);
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> jsonField = fields.next();
return Optional.ofNullable(Jsons.deserialize(jsonField.getValue().asText()).path("file").asText());
}
return Optional.empty();
}

private static CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name, String value) {
return database -> {
final List<String> result = database.resultSetQuery(connection -> {
final String sql = """
show variables where Variable_name = '%s'""".formatted(name);

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (result.size() != 1) {
throw new RuntimeException("""
Could not query the variable %s""".formatted(name));
}

final String resultValue = result.get(0);
if (!resultValue.equalsIgnoreCase(value)) {
throw new RuntimeException("""
The variable %s should be set to %s, but it is : %s""".formatted(name, value, resultValue));
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +18,9 @@
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -23,7 +30,9 @@
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
Expand Down Expand Up @@ -52,7 +61,7 @@ protected JsonNode getConfig() {
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
Expand All @@ -62,9 +71,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
Expand All @@ -74,7 +83,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}

@Override
Expand Down Expand Up @@ -143,4 +152,30 @@ protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Test
public void testIncrementalSyncFailedIfBinlogIsDeleted() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
// only sync incremental streams
configuredCatalog.setStreams(
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));

final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = stateMessages.get(stateMessages.size() - 1).getData();
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
executeQuery("RESET MASTER;");
assertThrows(Exception.class, () -> filterRecords(runRead(configuredCatalog, latestState)));
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB |
| 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description |
| 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types |
| 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication |
Expand Down

0 comments on commit b230543

Please sign in to comment.