From b230543ba24ec4e83905e0fbce9b8e13882c919e Mon Sep 17 00:00:00 2001 From: mkhokh-33 Date: Thu, 10 Feb 2022 16:41:52 +0200 Subject: [PATCH] MySQL CDC sync fails because starting binlog position not found in DB #6425 (#9514) * Check binlog position on mysql server before run sync job, add error description into log * fix MySqlStrictEncryptSourceAcceptanceTest * fix formatting * fix review comments * added java docs and fixed few minor comments * fix formatting * update versions * update source_specs.yaml --- .../435bb9a5-7887-4809-aa58-28c27df0d7ad.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../source/SourceAcceptanceTest.java | 2 +- .../source-mysql-strict-encrypt/Dockerfile | 2 +- .../connectors/source-mysql/Dockerfile | 2 +- .../source/mysql/MySqlSource.java | 32 ++--- .../mysql/helpers/CdcConfigurationHelper.java | 120 ++++++++++++++++++ .../mysql/CdcMySqlSourceAcceptanceTest.java | 43 ++++++- docs/integrations/sources/mysql.md | 1 + 10 files changed, 174 insertions(+), 34 deletions(-) create mode 100644 airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index b0aa147e3715..b56e2aa2b9d3 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.5.1", + "dockerImageTag": "0.5.2", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 6ee5feda60cf..b7774b0ce986 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -474,7 +474,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.5.1 + dockerImageTag: 0.5.2 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ed4efe78cbfb..9fb1316fe936 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -4963,7 +4963,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.5.1" +- dockerImage: "airbyte/source-mysql:0.5.2" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java index 6d858261fc46..f1569cfea32f 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java @@ -286,7 +286,7 @@ protected static List filterRecords(final Collection> getCheckOperations(final JsonNode config) throws Exception { final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); if (isCdc(config)) { - checkOperations.addAll(List.of(getCheckOperation("log_bin", "ON"), - getCheckOperation("binlog_format", "ROW"), - getCheckOperation("binlog_row_image", "FULL"))); + checkOperations.addAll(CdcConfigurationHelper.getCheckOperations()); } return checkOperations; } @@ -180,8 +181,10 @@ public List> getIncrementalIterators(final new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(), catalog, true); - return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), - new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt); + Optional cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); + MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); + cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database)); + return handler.getIncrementalIterators(fetcher, new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt); } else { LOGGER.info("using CDC: {}", false); return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager, @@ -210,23 +213,4 @@ public enum ReplicationMethod { CDC } - private CheckedConsumer getCheckOperation(String name, String value) { - return database -> { - final List result = database.resultSetQuery(connection -> { - final String sql = String.format("show variables where Variable_name = '%s'", name); - - return connection.createStatement().executeQuery(sql); - }, resultSet -> resultSet.getString("Value")).collect(toList()); - - if (result.size() != 1) { - throw new RuntimeException(String.format("Could not query the variable %s", name)); - } - - final String resultValue = result.get(0); - if (!resultValue.equalsIgnoreCase(value)) { - throw new RuntimeException(String.format("The variable %s should be set to %s, but it is : %s", name, value, resultValue)); - } - }; - } - } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java new file mode 100644 index 000000000000..e20c2fc5f790 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.mysql.helpers; + +import static java.util.stream.Collectors.toList; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for MySqlSource used to check cdc configuration in case of: + *

+ * 1. adding new source and checking operations #getCheckOperations method. + *

+ *

+ * 2. checking whether binlog required from saved cdc offset is available on mysql server + * #checkBinlog method + *

+ */ +public class CdcConfigurationHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigurationHelper.class); + private static final String CDC_OFFSET = "mysql_cdc_offset"; + private static final String LOG_BIN = "log_bin"; + private static final String BINLOG_FORMAT = "binlog_format"; + private static final String BINLOG_ROW_IMAGE = "binlog_row_image"; + + /** + * Method will check whether required binlog is available on mysql server + * + * @param offset - saved cdc offset with required binlog file + * @param database - database + */ + public static void checkBinlog(JsonNode offset, JdbcDatabase database) { + Optional binlogOptional = getBinlog(offset); + binlogOptional.ifPresent(binlog -> { + if (isBinlogAvailable(binlog, database)) { + LOGGER.info(""" + Binlog %s is available""".formatted(binlog)); + } else { + String error = + """ + Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. To fix data synchronization you need to reset your data. Please check binlog retention policy configurations.""" + .formatted(binlog); + LOGGER.error(error); + throw new RuntimeException(""" + Binlog %s is not available.""".formatted(binlog)); + } + }); + } + + /** + * Method will get required configurations for cdc sync + * + * @return list of List> + */ + public static List> getCheckOperations() { + return List.of(getCheckOperation(LOG_BIN, "ON"), + getCheckOperation(BINLOG_FORMAT, "ROW"), + getCheckOperation(BINLOG_ROW_IMAGE, "FULL")); + + } + + private static boolean isBinlogAvailable(String binlog, JdbcDatabase database) { + try { + List binlogs = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"), + resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList()); + + return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog)); + } catch (SQLException e) { + LOGGER.error("Can not get binlog list. Error: ", e); + throw new RuntimeException(e); + } + } + + private static Optional getBinlog(JsonNode offset) { + JsonNode node = offset.get(CDC_OFFSET); + Iterator> fields = node.fields(); + while (fields.hasNext()) { + Map.Entry jsonField = fields.next(); + return Optional.ofNullable(Jsons.deserialize(jsonField.getValue().asText()).path("file").asText()); + } + return Optional.empty(); + } + + private static CheckedConsumer getCheckOperation(String name, String value) { + return database -> { + final List result = database.resultSetQuery(connection -> { + final String sql = """ + show variables where Variable_name = '%s'""".formatted(name); + + return connection.createStatement().executeQuery(sql); + }, resultSet -> resultSet.getString("Value")).collect(toList()); + + if (result.size() != 1) { + throw new RuntimeException(""" + Could not query the variable %s""".formatted(name)); + } + + final String resultValue = result.get(0); + if (!resultValue.equalsIgnoreCase(value)) { + throw new RuntimeException(""" + The variable %s should be set to %s, but it is : %s""".formatted(name, value, resultValue)); + } + }; + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java index 3bd6b2436746..5daf918707af 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java @@ -4,6 +4,10 @@ package io.airbyte.integrations.source.mysql; +import static io.airbyte.protocol.models.SyncMode.INCREMENTAL; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -14,6 +18,9 @@ import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -23,7 +30,9 @@ import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; import java.util.List; +import java.util.stream.Collectors; import org.jooq.SQLDialect; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.MySQLContainer; public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest { @@ -52,7 +61,7 @@ protected JsonNode getConfig() { protected ConfiguredAirbyteCatalog getConfiguredCatalog() { return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) + .withSyncMode(INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( String.format("%s", STREAM_NAME), @@ -62,9 +71,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withSourceDefinedCursor(true) .withSourceDefinedPrimaryKey(List.of(List.of("id"))) .withSupportedSyncModes( - Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))), new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) + .withSyncMode(INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( String.format("%s", STREAM_NAME2), @@ -74,7 +83,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() { .withSourceDefinedCursor(true) .withSourceDefinedPrimaryKey(List.of(List.of("id"))) .withSupportedSyncModes( - Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))))); } @Override @@ -143,4 +152,30 @@ protected void tearDown(final TestDestinationEnv testEnv) { container.close(); } + @Test + public void testIncrementalSyncFailedIfBinlogIsDeleted() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog()); + // only sync incremental streams + configuredCatalog.setStreams( + configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList())); + + final List airbyteMessages = runRead(configuredCatalog, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages + .stream() + .filter(m -> m.getType() == AirbyteMessage.Type.STATE) + .map(AirbyteMessage::getState) + .collect(Collectors.toList()); + assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records"); + assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages"); + + // when we run incremental sync again there should be no new records. Run a sync with the latest + // state message and assert no records were emitted. + final JsonNode latestState = stateMessages.get(stateMessages.size() - 1).getData(); + // RESET MASTER removes all binary log files that are listed in the index file, + // leaving only a single, empty binary log file with a numeric suffix of .000001 + executeQuery("RESET MASTER;"); + assertThrows(Exception.class, () -> filterRecords(runRead(configuredCatalog, latestState))); + } + } diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 6c819d593ff3..55a4acb32df4 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -182,6 +182,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | +| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB | | 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description | | 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types | | 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication |