Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL CDC sync fails because starting binlog position not found in DB #6425 #9514

Merged
merged 12 commits into from
Feb 10, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,14 @@ public void testEntrypointEnvVar() throws Exception {
checkEntrypointEnvVariable();
}

private List<AirbyteRecordMessage> filterRecords(final Collection<AirbyteMessage> messages) {
protected List<AirbyteRecordMessage> filterRecords(final Collection<AirbyteMessage> messages) {
return messages.stream()
.filter(m -> m.getType() == Type.RECORD)
.map(AirbyteMessage::getRecord)
.collect(Collectors.toList());
}

private ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
protected ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
final ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
for (final ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getSyncMode() == INCREMENTAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
"additionalProperties": false,
"properties": {
"host": {
"description": "Hostname of the database.",
"description": "The host name of the database.",
"title": "Host",
"type": "string",
"order": 0
},
"port": {
"description": "Port of the database.",
"description": "The port to connect to.",
"title": "Port",
"type": "integer",
"minimum": 0,
"maximum": 65536,
Expand All @@ -22,30 +24,34 @@
"order": 1
},
"database": {
"description": "Name of the database.",
"description": "The database name.",
"title": "Database",
"type": "string",
"order": 2
},
"username": {
"description": "Username to use to access the database.",
"description": "The username which is used to access the database.",
"title": "Username",
"type": "string",
"order": 3
},
"password": {
"description": "Password associated with the username.",
"description": "The password associated with the username.",
"title": "Password",
"type": "string",
"airbyte_secret": true,
"order": 4
},
"jdbc_url_params": {
"description": "Additional properties to pass to the jdbc url string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)",
"description": "Additional properties to pass to the jdbc url string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 5
},
"replication_method": {
"type": "string",
"title": "Replication Method",
"description": "Replication method to use for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"description": "Replication method which is used for data extraction from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.",
"order": 7,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.StateManager;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,6 +52,7 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
public static final String MYSQL_DB_HISTORY = "mysql_db_history";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public static final String CDC_OFFSET = "mysql_cdc_offset";
public static final List<String> SSL_PARAMETERS = List.of(
"useSSL=true",
"requireSSL=true",
Expand Down Expand Up @@ -180,8 +185,12 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(),
catalog, true);

return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
CdcState cdcState = stateManager.getCdcStateManager().getCdcState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could stateManager.getCdcStateManager() return null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, from what I can see from the code :
public StateManager(final DbState serialized, final ConfiguredAirbyteCatalog catalog) {
this.cdcStateManager = new CdcStateManager(serialized.getCdcState());
this.isCdc = serialized.getCdc();
if (serialized.getCdc() == null) {
this.isCdc = false;
}

MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState);
if (cdcState != null) {
Copy link
Contributor

@alexandertsukanov alexandertsukanov Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use Optional type here?

checkBinlog(cdcState.getState(), database);
}
return handler.getIncrementalIterators(fetcher, new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
} else {
LOGGER.info("using CDC: {}", false);
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,
Expand Down Expand Up @@ -229,4 +238,42 @@ private CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name,
};
}

private void checkBinlog(JsonNode offset, JdbcDatabase database) {
String binlog = getBinlog(offset);
if (binlog != null && !binlog.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use Optional type here?

if (isBinlogAvailable(binlog, database)) {
LOGGER.info(String.format("Binlog %s is available", binlog));
} else {
String error =
String.format("Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe, we can use Text block here instead of string concatenation https://openjdk.java.net/jeps/378, as we migrate to Java 17.

"To fix data synchronization you need to reset your data. " +
"Please check binlog retention policy configurations.", binlog);
LOGGER.error(error);
throw new RuntimeException(String.format("Binlog %s is not available.", binlog));
}
}
}

private boolean isBinlogAvailable(String binlog, JdbcDatabase database) {
try {
List<String> binlogs = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"),
resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList());

return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog));
} catch (SQLException e) {
LOGGER.error("Can not get binlog list. Error: ", e);
throw new RuntimeException(e);
}
}

private String getBinlog(JsonNode offset) {
JsonNode node = offset.get(CDC_OFFSET);
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> jsonField = fields.next();
return Jsons.deserialize(jsonField.getValue().asText()).path("file").asText();
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we return null here?

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand All @@ -14,6 +18,9 @@
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -24,7 +31,9 @@
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
Expand Down Expand Up @@ -53,7 +62,7 @@ protected JsonNode getConfig() {
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
Expand All @@ -63,9 +72,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
Expand All @@ -75,7 +84,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}

@Override
Expand Down Expand Up @@ -149,4 +158,30 @@ protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Test
public void testIncrementalSyncFailedIfBinlogIsDeleted() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
// only sync incremental streams
configuredCatalog.setStreams(
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));

final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = stateMessages.get(stateMessages.size() - 1).getData();
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
executeQuery("RESET MASTER;");
assertThrows(Exception.class, () -> filterRecords(runRead(configuredCatalog, latestState)));
}

}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB |
| 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description |
| 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types |
| 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication |
Expand Down