diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 175eacde342a..8951b5276662 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.3.0", + "dockerImageTag": "0.3.1", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 6c7bf854524b..f749cc25d768 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -61,7 +61,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.3.0 + dockerImageTag: 0.3.1 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg - sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77 diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java index 9b3aa7eaca6c..aa3598c86c4b 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java @@ -79,6 +79,8 @@ public abstract class SourceAcceptanceTest { public static final String CDC_LSN = "_ab_cdc_lsn"; public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at"; public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at"; + public static final String CDC_LOG_FILE = "_ab_cdc_log_file"; + public static final String CDC_LOG_POS = "_ab_cdc_log_pos"; private static final long JOB_ID = 0L; private static final int JOB_ATTEMPT = 0; @@ -472,6 +474,8 @@ private AirbyteRecordMessage pruneEmittedAt(AirbyteRecordMessage m) { private AirbyteRecordMessage pruneCdcMetadata(AirbyteRecordMessage m) { final AirbyteRecordMessage clone = Jsons.clone(m); ((ObjectNode) clone.getData()).remove(CDC_LSN); + ((ObjectNode) clone.getData()).remove(CDC_LOG_FILE); + ((ObjectNode) clone.getData()).remove(CDC_LOG_POS); ((ObjectNode) clone.getData()).remove(CDC_UPDATED_AT); ((ObjectNode) clone.getData()).remove(CDC_DELETED_AT); return clone; diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index e2cd9aeccadd..2913c6c8d386 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -87,6 +87,8 @@ public abstract class AbstractJdbcSource extends BaseConnector implements Source public static final String CDC_LSN = "_ab_cdc_lsn"; public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at"; public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at"; + public static final String CDC_LOG_FILE = "_ab_cdc_log_file"; + public static final String CDC_LOG_POS = "_ab_cdc_log_pos"; private static final String JDBC_COLUMN_DATABASE_NAME = "TABLE_CAT"; private static final String JDBC_COLUMN_SCHEMA_NAME = "TABLE_SCHEM"; diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index bb9a36c7dc3d..895fc0ad2130 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.0 +LABEL io.airbyte.version=0.3.1 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 6e2955a268e0..10c28b4d418c 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -11,18 +11,21 @@ application { dependencies { implementation project(':airbyte-db') implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-protocol:models') implementation project(':airbyte-integrations:connectors:source-jdbc') + implementation project(':airbyte-protocol:models') + implementation 'io.debezium:debezium-api:1.4.2.Final' + implementation 'io.debezium:debezium-connector-mysql:1.4.2.Final' + implementation 'io.debezium:debezium-embedded:1.4.2.Final' implementation 'mysql:mysql-connector-java:8.0.22' implementation 'org.apache.commons:commons-lang3:3.11' testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) - testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation 'org.testcontainers:mysql:1.15.1' integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-mysql') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/AirbyteFileOffsetBackingStore.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/AirbyteFileOffsetBackingStore.java new file mode 100644 index 000000000000..33f490b7e32c --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/AirbyteFileOffsetBackingStore.java @@ -0,0 +1,178 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.jdbc.JdbcStateManager; +import io.airbyte.integrations.source.jdbc.models.CdcState; +import java.io.EOFException; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.SafeObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class handles reading and writing a debezium offset file. In many cases it is duplicating + * logic in debezium because that logic is not exposed in the public API. We mostly treat the + * contents of this state file like a black box. We know it is a Map. We + * deserialize it to a Map so that the state file can be human readable. If we ever + * discover that any of the contents of these offset files is not string serializable we will likely + * have to drop the human readability support and just base64 encode it. + */ +public class AirbyteFileOffsetBackingStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class); + + private final Path offsetFilePath; + + public AirbyteFileOffsetBackingStore(final Path offsetFilePath) { + this.offsetFilePath = offsetFilePath; + } + + public Path getOffsetFilePath() { + return offsetFilePath; + } + + public CdcState read() { + final Map raw = load(); + + final Map mappedAsStrings = raw.entrySet().stream().collect(Collectors.toMap( + e -> byteBufferToString(e.getKey()), + e -> byteBufferToString(e.getValue()))); + final JsonNode asJson = Jsons.jsonNode(mappedAsStrings); + + LOGGER.info("debezium state: {}", asJson); + + return new CdcState().withState(asJson); + } + + public Map readMap() { + final Map raw = load(); + + return raw.entrySet().stream().collect(Collectors.toMap( + e -> byteBufferToString(e.getKey()), + e -> byteBufferToString(e.getValue()))); + } + + @SuppressWarnings("unchecked") + public void persist(CdcState cdcState) { + final Map mapAsString = + cdcState != null && cdcState.getState() != null ? Jsons.object(cdcState.getState().get(MYSQL_CDC_OFFSET), Map.class) : Collections.emptyMap(); + final Map mappedAsStrings = mapAsString.entrySet().stream().collect(Collectors.toMap( + e -> stringToByteBuffer(e.getKey()), + e -> stringToByteBuffer(e.getValue()))); + + FileUtils.deleteQuietly(offsetFilePath.toFile()); + save(mappedAsStrings); + } + + private static String byteBufferToString(ByteBuffer byteBuffer) { + Preconditions.checkNotNull(byteBuffer); + return new String(byteBuffer.array(), StandardCharsets.UTF_8); + } + + private static ByteBuffer stringToByteBuffer(String s) { + Preconditions.checkNotNull(s); + return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)); + } + + /** + * See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this + * method is not public. + */ + @SuppressWarnings("unchecked") + private Map load() { + try (final SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(offsetFilePath))) { + final Object obj = is.readObject(); + if (!(obj instanceof HashMap)) + throw new ConnectException("Expected HashMap but found " + obj.getClass()); + final Map raw = (Map) obj; + final Map data = new HashMap<>(); + for (Map.Entry mapEntry : raw.entrySet()) { + final ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; + final ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; + data.put(key, value); + } + + return data; + } catch (NoSuchFileException | EOFException e) { + // NoSuchFileException: Ignore, may be new. + // EOFException: Ignore, this means the file was missing or corrupt + return Collections.emptyMap(); + } catch (IOException | ClassNotFoundException e) { + throw new ConnectException(e); + } + } + + /** + * See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this + * method is not public. + */ + private void save(Map data) { + try (ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) { + Map raw = new HashMap<>(); + for (Map.Entry mapEntry : data.entrySet()) { + byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; + byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; + raw.put(key, value); + } + os.writeObject(raw); + } catch (IOException e) { + throw new ConnectException(e); + } + } + + static AirbyteFileOffsetBackingStore initializeState(JdbcStateManager stateManager) { + final Path cdcWorkingDir; + try { + cdcWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-state-offset"); + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path cdcOffsetFilePath = cdcWorkingDir.resolve("offset.dat"); + + final AirbyteFileOffsetBackingStore offsetManager = new AirbyteFileOffsetBackingStore( + cdcOffsetFilePath); + offsetManager.persist(stateManager.getCdcStateManager().getCdcState()); + return offsetManager; + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/AirbyteSchemaHistoryStorage.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/AirbyteSchemaHistoryStorage.java new file mode 100644 index 000000000000..9a1aaf4f708e --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/AirbyteSchemaHistoryStorage.java @@ -0,0 +1,169 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.jdbc.JdbcStateManager; +import io.airbyte.integrations.source.jdbc.models.CdcState; +import io.debezium.document.Document; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.history.HistoryRecord; +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.function.Consumer; +import org.apache.commons.io.FileUtils; + +/** + * The purpose of this class is : to , 1. Read the contents of the file {@link #path} at the end of + * the sync so that it can be saved in state for future syncs. Check {@link #read()} 2. Write the + * saved content back to the file {@link #path} at the beginning of the sync so that debezium can + * function smoothly. Check {@link #persist(CdcState)}. To understand more about file, please refer + * {@link FilteredFileDatabaseHistory} + */ +public class AirbyteSchemaHistoryStorage { + + private final Path path; + private static final Charset UTF8 = StandardCharsets.UTF_8; + private final DocumentReader reader = DocumentReader.defaultReader(); + private final DocumentWriter writer = DocumentWriter.defaultWriter(); + + public AirbyteSchemaHistoryStorage(final Path path) { + this.path = path; + } + + public Path getPath() { + return path; + } + + /** + * This implementation is is kind of similar to + * {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} + */ + public String read() { + StringBuilder fileAsString = new StringBuilder(); + try { + for (String line : Files.readAllLines(path, UTF8)) { + if (line != null && !line.isEmpty()) { + Document record = reader.read(line); + String recordAsString = writer.write(record); + fileAsString.append(recordAsString); + fileAsString.append(System.lineSeparator()); + } + } + return fileAsString.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * This implementation is is kind of similar to + * {@link io.debezium.relational.history.FileDatabaseHistory#start()} + */ + private void makeSureFileExists() { + try { + // Make sure the file exists ... + if (!Files.exists(path)) { + // Create parent directories if we have them ... + if (path.getParent() != null) { + Files.createDirectories(path.getParent()); + } + try { + Files.createFile(path); + } catch (FileAlreadyExistsException e) { + // do nothing + } + } + } catch (IOException e) { + throw new IllegalStateException( + "Unable to create history file at " + path + ": " + e.getMessage(), e); + } + } + + public void persist(CdcState cdcState) { + String fileAsString = cdcState != null && cdcState.getState() != null ? Jsons + .object(cdcState.getState().get(MYSQL_DB_HISTORY), String.class) : null; + + if (fileAsString == null || fileAsString.isEmpty()) { + return; + } + + FileUtils.deleteQuietly(path.toFile()); + makeSureFileExists(); + writeToFile(fileAsString); + } + + /** + * This implementation is kind of similar to + * {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} + * + * @param fileAsString Represents the contents of the file saved in state from previous syncs + */ + private void writeToFile(String fileAsString) { + try { + String[] split = fileAsString.split(System.lineSeparator()); + for (String element : split) { + Document read = reader.read(element); + String line = writer.write(read); + + try (BufferedWriter historyWriter = Files + .newBufferedWriter(path, StandardOpenOption.APPEND)) { + try { + historyWriter.append(line); + historyWriter.newLine(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static AirbyteSchemaHistoryStorage initializeDBHistory(JdbcStateManager stateManager) { + final Path dbHistoryWorkingDir; + try { + dbHistoryWorkingDir = Files.createTempDirectory(Path.of("/tmp"), "cdc-db-history"); + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path dbHistoryFilePath = dbHistoryWorkingDir.resolve("dbhistory.dat"); + + final AirbyteSchemaHistoryStorage schemaHistoryManager = new AirbyteSchemaHistoryStorage(dbHistoryFilePath); + schemaHistoryManager.persist(stateManager.getCdcStateManager().getCdcState()); + return schemaHistoryManager; + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumEventUtils.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumEventUtils.java new file mode 100644 index 000000000000..02db98401481 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumEventUtils.java @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_DELETED_AT; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_FILE; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_POS; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_UPDATED_AT; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.debezium.engine.ChangeEvent; +import java.time.Instant; + +public class DebeziumEventUtils { + + public static AirbyteMessage toAirbyteMessage(ChangeEvent event, Instant emittedAt) { + final JsonNode debeziumRecord = Jsons.deserialize(event.value()); + final JsonNode before = debeziumRecord.get("before"); + final JsonNode after = debeziumRecord.get("after"); + final JsonNode source = debeziumRecord.get("source"); + + final JsonNode data = formatDebeziumData(before, after, source); + final String schemaName = source.get("db").asText(); + final String streamName = source.get("table").asText(); + + final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(schemaName) + .withEmittedAt(emittedAt.toEpochMilli()) + .withData(data); + + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(airbyteRecordMessage); + } + + // warning mutates input args. + private static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source) { + final ObjectNode base = (ObjectNode) (after.isNull() ? before : after); + + long transactionMillis = source.get("ts_ms").asLong(); + + base.put(CDC_UPDATED_AT, transactionMillis); + base.put(CDC_LOG_FILE, source.get("file").asText()); + base.put(CDC_LOG_POS, source.get("pos").asLong()); + + if (after.isNull()) { + base.put(CDC_DELETED_AT, transactionMillis); + } else { + base.put("_ab_cdc_deleted_at", (Long) null); + } + + return base; + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java new file mode 100644 index 000000000000..f302f0f84dbd --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java @@ -0,0 +1,145 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import com.google.common.collect.AbstractIterator; +import io.airbyte.commons.concurrency.VoidCallable; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.debezium.engine.ChangeEvent; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The record iterator is the consumer (in the producer / consumer relationship with debezium) + * responsible for 1. making sure every record produced by the record publisher is processed 2. + * signalling to the record publisher when it is time for it to stop producing records. It emits + * this signal either when the publisher had not produced a new record for a long time or when it + * has processed at least all of the records that were present in the database when the source was + * started. Because the publisher might publish more records between the consumer sending this + * signal and the publisher actually shutting down, the consumer must stay alive as long as the + * publisher is not closed. Even after the publisher is closed, the consumer will finish processing + * any produced records before closing. + */ +public class DebeziumRecordIterator extends AbstractIterator> + implements AutoCloseableIterator> { + + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordIterator.class); + + private static final TimeUnit SLEEP_TIME_UNIT = TimeUnit.SECONDS; + private static final int SLEEP_TIME_AMOUNT = 5; + + private final LinkedBlockingQueue> queue; + private final Optional targetFilePosition; + private final Supplier publisherStatusSupplier; + private final VoidCallable requestClose; + + public DebeziumRecordIterator(LinkedBlockingQueue> queue, + Optional targetFilePosition, + Supplier publisherStatusSupplier, + VoidCallable requestClose) { + this.queue = queue; + this.targetFilePosition = targetFilePosition; + this.publisherStatusSupplier = publisherStatusSupplier; + this.requestClose = requestClose; + } + + @Override + protected ChangeEvent computeNext() { + // keep trying until the publisher is closed or until the queue is empty. the latter case is + // possible when the publisher has shutdown but the consumer has not yet processed all messages it + // emitted. + while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) { + final ChangeEvent next; + try { + next = queue.poll(SLEEP_TIME_AMOUNT, SLEEP_TIME_UNIT); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // if within the timeout, the consumer could not get a record, it is time to tell the producer to + // shutdown. + if (next == null) { + requestClose(); + LOGGER.info("no record found. polling again."); + continue; + } + + // if the last record matches the target file position, it is time to tell the producer to shutdown. + if (shouldSignalClose(next)) { + requestClose(); + } + + return next; + } + return endOfData(); + } + + @Override + public void close() throws Exception { + requestClose.call(); + } + + private boolean shouldSignalClose(ChangeEvent event) { + if (targetFilePosition.isEmpty()) { + return false; + } + + String file = Jsons.deserialize(event.value()).get("source").get("file").asText(); + int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt(); + if (!file.equals(targetFilePosition.get().fileName)) { + return false; + } + + if (targetFilePosition.get().position >= position) { + return false; + } + + // if not snapshot or is snapshot but last record in snapshot. + return SnapshotMetadata.TRUE != SnapshotMetadata.valueOf( + Jsons.deserialize(event.value()).get("source").get("snapshot").asText() + .toUpperCase()); + } + + private void requestClose() { + try { + requestClose.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + enum SnapshotMetadata { + TRUE, + FALSE, + LAST + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java new file mode 100644 index 000000000000..cf93dc75c575 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java @@ -0,0 +1,201 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.codehaus.plexus.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebeziumRecordPublisher implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class); + private final ExecutorService executor; + private DebeziumEngine> engine; + + private final JsonNode config; + private final ConfiguredAirbyteCatalog catalog; + private final AirbyteFileOffsetBackingStore offsetManager; + private final AirbyteSchemaHistoryStorage schemaHistoryManager; + + private final AtomicBoolean hasClosed; + private final AtomicBoolean isClosing; + private final AtomicReference thrownError; + private final CountDownLatch engineLatch; + + public DebeziumRecordPublisher(JsonNode config, + ConfiguredAirbyteCatalog catalog, + AirbyteFileOffsetBackingStore offsetManager, + AirbyteSchemaHistoryStorage schemaHistoryManager) { + this.config = config; + this.catalog = catalog; + this.offsetManager = offsetManager; + this.schemaHistoryManager = schemaHistoryManager; + this.hasClosed = new AtomicBoolean(false); + this.isClosing = new AtomicBoolean(false); + this.thrownError = new AtomicReference<>(); + this.executor = Executors.newSingleThreadExecutor(); + this.engineLatch = new CountDownLatch(1); + } + + public void start(Queue> queue) { + engine = DebeziumEngine.create(Json.class) + .using(getDebeziumProperties(config, catalog, offsetManager)) + .using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy()) + .notifying(e -> { + // debezium outputs a tombstone event that has a value of null. this is an artifact of how it + // interacts with kafka. we want to ignore it. + // more on the tombstone: + // https://debezium.io/documentation/reference/configuration/event-flattening.html + if (e.value() != null) { + queue.add(e); + } + }) + .using((success, message, error) -> { + LOGGER.info("Debezium engine shutdown."); + thrownError.set(error); + engineLatch.countDown(); + }) + .build(); + + // Run the engine asynchronously ... + executor.execute(engine); + } + + public boolean hasClosed() { + return hasClosed.get(); + } + + public void close() throws Exception { + if (isClosing.compareAndSet(false, true)) { + // consumers should assume records can be produced until engine has closed. + if (engine != null) { + engine.close(); + } + + // wait for closure before shutting down executor service + engineLatch.await(5, TimeUnit.MINUTES); + + // shut down and await for thread to actually go down + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); + + // after the engine is completely off, we can mark this as closed + hasClosed.set(true); + + if (thrownError.get() != null) { + throw new RuntimeException(thrownError.get()); + } + } + } + + protected Properties getDebeziumProperties(JsonNode config, + ConfiguredAirbyteCatalog catalog, + AirbyteFileOffsetBackingStore offsetManager) { + final Properties props = new Properties(); + + // debezium engine configuration + props.setProperty("name", "engine"); + props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); + props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore"); + props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); + props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer + + // snapshot config + // https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode + props.setProperty("snapshot.mode", "initial"); + // https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-locking-mode + // This is to make sure other database clients are allowed to write to a table while Airbyte is + // taking a snapshot. There is a risk involved that + // if any database client makes a schema change then the sync might break + props.setProperty("snapshot.locking.mode", "none"); + + // https://debezium.io/documentation/reference/1.4/operations/debezium-server.html#debezium-source-database-history-file-filename + // https://debezium.io/documentation/reference/development/engine.html#_in_the_code + // As mentioned in the documents above, debezium connector for MySQL needs to track the schema + // changes. If we don't do this, we can't fetch records for the table + // We have implemented our own implementation to filter out the schema information from other + // databases that the connector is not syncing + props.setProperty("database.history", + "io.airbyte.integrations.source.mysql.FilteredFileDatabaseHistory"); + props.setProperty("database.history.file.filename", + schemaHistoryManager.getPath().toString()); + + // https://debezium.io/documentation/reference/configuration/avro.html + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + + // https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-include-schema-changes + props.setProperty("include.schema.changes", "false"); + + // debezium names + props.setProperty("name", config.get("database").asText()); + props.setProperty("database.server.name", config.get("database").asText()); + + // db connection configuration + props.setProperty("database.hostname", config.get("host").asText()); + props.setProperty("database.port", config.get("port").asText()); + props.setProperty("database.user", config.get("username").asText()); + props.setProperty("database.dbname", config.get("database").asText()); + + if (config.has("password")) { + props.setProperty("database.password", config.get("password").asText()); + } + + // table selection + final String tableWhitelist = getTableWhitelist(catalog, config); + props.setProperty("table.include.list", tableWhitelist); + props.setProperty("database.include.list", config.get("database").asText()); + + return props; + } + + private static String getTableWhitelist(ConfiguredAirbyteCatalog catalog, JsonNode config) { + return catalog.getStreams().stream() + .filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL) + .map(ConfiguredAirbyteStream::getStream) + .map(stream -> config.get("database").asText() + "." + stream.getName()) + // debezium needs commas escaped to split properly + .map(x -> StringUtils.escape(x, new char[] {','}, "\\,")) + .collect(Collectors.joining(",")); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/FilteredFileDatabaseHistory.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/FilteredFileDatabaseHistory.java new file mode 100644 index 000000000000..91307e679d91 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/FilteredFileDatabaseHistory.java @@ -0,0 +1,168 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.debezium.config.Configuration; +import io.debezium.relational.history.AbstractDatabaseHistory; +import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.DatabaseHistoryListener; +import io.debezium.relational.history.FileDatabaseHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecord.Fields; +import io.debezium.relational.history.HistoryRecordComparator; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.function.Consumer; + +/** + * MySQL Debezium connector monitors the database schema evolution over the time and stores the data + * in a database history file. Without this file we can't fetch the records from binlog. We need to + * save the contents of the file. Debezium by default uses + * {@link io.debezium.relational.history.FileDatabaseHistory} class to write the schema information + * in the file. The problem is that the Debezium tracks the schema evolution of all the tables in + * all the databases, because of that the file content can grow. In order to make sure that debezium + * tracks only the schema of the tables that are present in the database that Airbyte is syncing, we + * created this class. In the method {@link #storeRecord(HistoryRecord)}, we introduced a check to + * make sure only those records are being saved whose database name matches the database Airbyte is + * syncing. We tell debezium to use this class by passing it as property in debezium engine. Look + * for "database.history" property in + * {@link DebeziumRecordPublisher#getDebeziumProperties(JsonNode, ConfiguredAirbyteCatalog, AirbyteFileOffsetBackingStore)} + * Ideally {@link FilteredFileDatabaseHistory} should have extended + * {@link io.debezium.relational.history.FileDatabaseHistory} and overridden the + * {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final + * class and can not be inherited + */ +public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory { + + private final FileDatabaseHistory fileDatabaseHistory; + private static String databaseName; + + public FilteredFileDatabaseHistory() { + this.fileDatabaseHistory = new FileDatabaseHistory(); + } + + /** + * Ideally the databaseName should have been initialized in the constructor of the class. But since + * we supply the class name to debezium and it uses reflection to construct the object of the class, + * we can't pass in the databaseName as a parameter to the constructor. That's why we had to take + * the static approach. + * + * @param databaseName Name of the database that the connector is syncing + */ + static void setDatabaseName(String databaseName) { + if (FilteredFileDatabaseHistory.databaseName == null) { + FilteredFileDatabaseHistory.databaseName = databaseName; + } else if (!FilteredFileDatabaseHistory.databaseName.equals(databaseName)) { + throw new RuntimeException( + "Database name has already been set : " + FilteredFileDatabaseHistory.databaseName + + " can't set to : " + databaseName); + } + } + + @Override + public void configure(Configuration config, + HistoryRecordComparator comparator, + DatabaseHistoryListener listener, + boolean useCatalogBeforeSchema) { + fileDatabaseHistory.configure(config, comparator, listener, useCatalogBeforeSchema); + } + + @Override + public void start() { + fileDatabaseHistory.start(); + } + + @Override + public void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + if (record == null) { + return; + } + try { + String dbNameInRecord = record.document().getString(Fields.DATABASE_NAME); + if (databaseName != null && dbNameInRecord != null && !dbNameInRecord.equals(databaseName)) { + return; + } + + /** + * We are using reflection because the method + * {@link io.debezium.relational.history.FileDatabaseHistory#storeRecord(HistoryRecord)} is + * protected and can not be accessed from here + */ + final Method storeRecordMethod = fileDatabaseHistory.getClass() + .getDeclaredMethod("storeRecord", record.getClass()); + storeRecordMethod.setAccessible(true); + storeRecordMethod.invoke(fileDatabaseHistory, record); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + fileDatabaseHistory.stop(); + // this is just for tests + databaseName = null; + } + + @Override + protected void recoverRecords(Consumer records) { + try { + /** + * We are using reflection because the method + * {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} is protected + * and can not be accessed from here + */ + final Method recoverRecords = fileDatabaseHistory.getClass() + .getDeclaredMethod("recoverRecords", Consumer.class); + recoverRecords.setAccessible(true); + recoverRecords.invoke(fileDatabaseHistory, records); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean storageExists() { + return fileDatabaseHistory.storageExists(); + } + + @Override + public void initializeStorage() { + fileDatabaseHistory.initializeStorage(); + } + + @Override + public boolean exists() { + return fileDatabaseHistory.exists(); + } + + @Override + public String toString() { + return fileDatabaseHistory.toString(); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 334e080acdbf..53372ed9ead3 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -24,13 +24,45 @@ package io.airbyte.integrations.source.mysql; +import static io.airbyte.integrations.source.mysql.AirbyteFileOffsetBackingStore.initializeState; +import static io.airbyte.integrations.source.mysql.AirbyteSchemaHistoryStorage.initializeDBHistory; +import static java.util.stream.Collectors.toList; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.commons.util.CompositeIterator; +import io.airbyte.commons.util.MoreIterators; +import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.jdbc.JdbcStateManager; +import io.airbyte.integrations.source.jdbc.models.CdcState; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; +import io.debezium.engine.ChangeEvent; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +71,121 @@ public class MySqlSource extends AbstractJdbcSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class); public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset"; + public static final String MYSQL_DB_HISTORY = "mysql_db_history"; public MySqlSource() { super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration()); } + private static AirbyteStream removeIncrementalWithoutPk(AirbyteStream stream) { + if (stream.getSourceDefinedPrimaryKey().isEmpty()) { + stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL); + } + + return stream; + } + + private static AirbyteStream setIncrementalToSourceDefined(AirbyteStream stream) { + if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) { + stream.setSourceDefinedCursor(true); + } + + return stream; + } + + // Note: in place mutation. + private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { + + ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + + final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); + final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); + properties.set(CDC_LOG_FILE, stringType); + properties.set(CDC_LOG_POS, numberType); + properties.set(CDC_UPDATED_AT, numberType); + properties.set(CDC_DELETED_AT, numberType); + + return stream; + } + + @Override + public List> getCheckOperations(JsonNode config) throws Exception { + final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); + if (isCdc(config)) { + checkOperations.add(database -> { + List matchingSlots = database.resultSetQuery(connection -> { + final String sql = "show variables where Variable_name = 'log_bin'"; + + return connection.createStatement().executeQuery(sql); + }, resultSet -> resultSet.getString("Value")).collect(toList()); + + if (matchingSlots.size() != 1) { + throw new RuntimeException("Could not query the variable log_bin"); + } + + String logBin = matchingSlots.get(0); + if (!logBin.equalsIgnoreCase("ON")) { + throw new RuntimeException("The variable log_bin should be set to ON, but it is : " + logBin); + } + }); + + checkOperations.add(database -> { + List matchingSlots = database.resultSetQuery(connection -> { + final String sql = "show variables where Variable_name = 'binlog_format'"; + + return connection.createStatement().executeQuery(sql); + }, resultSet -> resultSet.getString("Value")).collect(toList()); + + if (matchingSlots.size() != 1) { + throw new RuntimeException("Could not query the variable binlog_format"); + } + + String binlogFormat = matchingSlots.get(0); + if (!binlogFormat.equalsIgnoreCase("ROW")) { + throw new RuntimeException("The variable binlog_format should be set to ROW, but it is : " + binlogFormat); + } + }); + } + + checkOperations.add(database -> { + List matchingSlots = database.resultSetQuery(connection -> { + final String sql = "show variables where Variable_name = 'binlog_row_image'"; + + return connection.createStatement().executeQuery(sql); + }, resultSet -> resultSet.getString("Value")).collect(toList()); + + if (matchingSlots.size() != 1) { + throw new RuntimeException("Could not query the variable binlog_row_image"); + } + + String binlogRowImage = matchingSlots.get(0); + if (!binlogRowImage.equalsIgnoreCase("FULL")) { + throw new RuntimeException("The variable binlog_row_image should be set to FULL, but it is : " + binlogRowImage); + } + }); + + return checkOperations; + } + + @Override + public AirbyteCatalog discover(JsonNode config) throws Exception { + AirbyteCatalog catalog = super.discover(config); + + if (isCdc(config)) { + final List streams = catalog.getStreams().stream() + .map(MySqlSource::removeIncrementalWithoutPk) + .map(MySqlSource::setIncrementalToSourceDefined) + .map(MySqlSource::addCdcMetadataColumns) + .collect(toList()); + + catalog.setStreams(streams); + } + + return catalog; + } + @Override public JsonNode toJdbcConfig(JsonNode config) { final StringBuilder jdbc_url = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", @@ -66,6 +208,91 @@ public JsonNode toJdbcConfig(JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } + private static boolean isCdc(JsonNode config) { + return config.hasNonNull("replication_method") + && ReplicationMethod.valueOf(config.get("replication_method").asText()) + .equals(ReplicationMethod.CDC); + } + + private static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) { + Optional any = catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode) + .filter(syncMode -> syncMode == SyncMode.INCREMENTAL).findAny(); + return any.isPresent(); + } + + @Override + public List> getIncrementalIterators(JsonNode config, + JdbcDatabase database, + ConfiguredAirbyteCatalog catalog, + Map tableNameToTable, + JdbcStateManager stateManager, + Instant emittedAt) { + if (isCdc(config) && shouldUseCDC(catalog)) { + LOGGER.info("using CDC: {}", true); + // TODO: Figure out how to set the isCDC of stateManager to true. Its always false + final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager); + AirbyteSchemaHistoryStorage schemaHistoryManager = initializeDBHistory(stateManager); + FilteredFileDatabaseHistory.setDatabaseName(config.get("database").asText()); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(); + final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(config, catalog, offsetManager, schemaHistoryManager); + publisher.start(queue); + + Optional targetFilePosition = TargetFilePosition + .targetFilePosition(database); + + // handle state machine around pub/sub logic. + final AutoCloseableIterator> eventIterator = new DebeziumRecordIterator( + queue, + targetFilePosition, + publisher::hasClosed, + publisher::close); + + // convert to airbyte message. + final AutoCloseableIterator messageIterator = AutoCloseableIterators + .transform( + eventIterator, + (event) -> DebeziumEventUtils.toAirbyteMessage(event, emittedAt)); + + // our goal is to get the state at the time this supplier is called (i.e. after all message records + // have been produced) + final Supplier stateMessageSupplier = () -> { + Map offset = offsetManager.readMap(); + String dbHistory = schemaHistoryManager.read(); + + Map state = new HashMap<>(); + state.put(MYSQL_CDC_OFFSET, offset); + state.put(MYSQL_DB_HISTORY, dbHistory); + + final JsonNode asJson = Jsons.jsonNode(state); + + LOGGER.info("debezium state: {}", asJson); + + CdcState cdcState = new CdcState().withState(asJson); + stateManager.getCdcStateManager().setCdcState(cdcState); + final AirbyteStateMessage stateMessage = stateManager.emit(); + return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); + + }; + + // wrap the supplier in an iterator so that we can concat it to the message iterator. + final Iterator stateMessageIterator = MoreIterators + .singletonIteratorFromSupplier(stateMessageSupplier); + + // this structure guarantees that the debezium engine will be closed, before we attempt to emit the + // state file. we want this so that we have a guarantee that the debezium offset file (which we use + // to produce the state file) is up-to-date. + final CompositeIterator messageIteratorWithStateDecorator = AutoCloseableIterators + .concatWithEagerClose(messageIterator, + AutoCloseableIterators.fromIterator(stateMessageIterator)); + + return Collections.singletonList(messageIteratorWithStateDecorator); + } else { + LOGGER.info("using CDC: {}", false); + return super.getIncrementalIterators(config, database, catalog, tableNameToTable, stateManager, + emittedAt); + } + } + @Override public Set getExcludedInternalSchemas() { return Set.of( @@ -82,4 +309,9 @@ public static void main(String[] args) throws Exception { LOGGER.info("completed source: {}", MySqlSource.class); } + public enum ReplicationMethod { + STANDARD, + CDC + } + } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/TargetFilePosition.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/TargetFilePosition.java new file mode 100644 index 000000000000..8e258ca432fe --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/TargetFilePosition.java @@ -0,0 +1,75 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import io.airbyte.db.jdbc.JdbcDatabase; +import java.sql.SQLException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TargetFilePosition { + + private static final Logger LOGGER = LoggerFactory.getLogger(TargetFilePosition.class); + public final String fileName; + public final Integer position; + + public TargetFilePosition(String fileName, Integer position) { + this.fileName = fileName; + this.position = position; + } + + @Override + public String toString() { + return "FileName: " + fileName + ", Position : " + position; + } + + public static Optional targetFilePosition(JdbcDatabase database) { + try { + List masterStatus = database.resultSetQuery( + connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), + resultSet -> { + String file = resultSet.getString("File"); + int position = resultSet.getInt("Position"); + if (file == null || position == 0) { + return new TargetFilePosition(null, null); + } + return new TargetFilePosition(file, position); + }).collect(Collectors.toList()); + TargetFilePosition targetFilePosition = masterStatus.get(0); + LOGGER.info("Target File position : " + targetFilePosition); + if (targetFilePosition.fileName == null || targetFilePosition == null) { + return Optional.empty(); + } + return Optional.of(targetFilePosition); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json index 9d76f8aadbaa..69adb708927b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json @@ -1,15 +1,16 @@ { - "documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql", + "documentationUrl": "https://docs.airbyte.io/integrations/source/mysql", "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "MySql Source Spec", "type": "object", - "required": ["host", "port", "database", "username"], + "required": ["host", "port", "database", "username", "replication_method"], "additionalProperties": false, "properties": { "host": { "description": "Hostname of the database.", - "type": "string" + "type": "string", + "order": 0 }, "port": { "description": "Port of the database.", @@ -17,24 +18,37 @@ "minimum": 0, "maximum": 65536, "default": 3306, - "examples": ["3306"] + "examples": ["3306"], + "order": 1 }, "database": { "description": "Name of the database.", - "type": "string" + "type": "string", + "order": 2 }, "username": { "description": "Username to use to access the database.", - "type": "string" + "type": "string", + "order": 3 }, "password": { "description": "Password associated with the username.", "type": "string", - "airbyte_secret": true + "airbyte_secret": true, + "order": 4 }, "jdbc_url_params": { "description": "Additional properties to pass to the jdbc url string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)", - "type": "string" + "type": "string", + "order": 5 + }, + "replication_method": { + "type": "string", + "title": "Replication Method", + "description": "Replication method to use for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses the Binlog to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", + "order": 6, + "default": "STANDARD", + "enum": ["STANDARD", "CDC"] } } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java new file mode 100644 index 000000000000..d5bdee740f22 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java @@ -0,0 +1,170 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; +import java.util.Collections; +import java.util.List; +import org.jooq.SQLDialect; +import org.testcontainers.containers.MySQLContainer; + +public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME2 = "starships"; + private MySQLContainer container; + private JsonNode config; + + @Override + protected String getImageName() { + return "airbyte/source-mysql:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s", STREAM_NAME), + String.format("%s", config.get("database").asText()), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + String.format("%s", STREAM_NAME2), + String.format("%s", config.get("database").asText()), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + } + + @Override + protected JsonNode getState() { + return null; + } + + @Override + protected List getRegexTests() { + return Collections.emptyList(); + } + + @Override + protected void setup(TestDestinationEnv testEnv) { + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", container.getDatabaseName()) + .put("username", container.getUsername()) + .put("password", container.getPassword()) + .put("replication_method", "CDC") + .build()); + + revokeAllPermissions(); + grantCorrectPermissions(); + createAndPopulateTables(); + } + + private void createAndPopulateTables() { + executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));"); + executeQuery( + "INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + executeQuery("CREATE TABLE starships(id INTEGER PRIMARY KEY, name VARCHAR(200));"); + executeQuery( + "INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); + } + + private void grantCorrectPermissions() { + executeQuery( + "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + + container.getUsername() + "@'%';"); + } + + private void executeQuery(String query) { + try (Database database = Databases.createDatabase( + "root", + "test", + String.format("jdbc:mysql://%s:%s/%s", + container.getHost(), + container.getFirstMappedPort(), + container.getDatabaseName()), + MySqlSource.DRIVER_CLASS, + SQLDialect.MYSQL)) { + database.query( + ctx -> ctx + .execute(query)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + container.close(); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java new file mode 100644 index 000000000000..28982bb462f1 --- /dev/null +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java @@ -0,0 +1,668 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.mysql; + +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_DELETED_AT; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_FILE; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_LOG_POS; +import static io.airbyte.integrations.source.jdbc.AbstractJdbcSource.CDC_UPDATED_AT; +import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS; +import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; +import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.Streams; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; + +public class CdcMySqlSourceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdcMySqlSourceTest.class); + + private static final String MODELS_SCHEMA = "models_schema"; + private static final String MODELS_STREAM_NAME = "models"; + private static final Set STREAM_NAMES = Sets + .newHashSet(MODELS_STREAM_NAME); + private static final String COL_ID = "id"; + private static final String COL_MAKE_ID = "make_id"; + private static final String COL_MODEL = "model"; + private static final String DB_NAME = MODELS_SCHEMA; + + private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME, + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))))); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers + .toDefaultConfiguredCatalog(CATALOG); + + // set all streams to incremental. + static { + CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); + } + + private static final List MODEL_RECORDS = ImmutableList.of( + Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 13, COL_MAKE_ID, 1, COL_MODEL, "Ranger")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 14, COL_MAKE_ID, 2, COL_MODEL, "GLA")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 15, COL_MAKE_ID, 2, COL_MODEL, "A 220")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 16, COL_MAKE_ID, 2, COL_MODEL, "E 350"))); + + private MySQLContainer container; + private Database database; + private MySqlSource source; + private JsonNode config; + + @BeforeEach + public void setup() { + init(); + revokeAllPermissions(); + grantCorrectPermissions(); + createAndPopulateTables(); + } + + private void init() { + container = new MySQLContainer<>("mysql:8.0"); + container.start(); + source = new MySqlSource(); + database = Databases.createDatabase( + "root", + "test", + String.format("jdbc:mysql://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + DRIVER_CLASS, + SQLDialect.MYSQL); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", CdcMySqlSourceTest.DB_NAME) + .put("username", container.getUsername()) + .put("password", container.getPassword()) + .put("replication_method", "CDC") + .build()); + } + + private void revokeAllPermissions() { + executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); + } + + private void grantCorrectPermissions() { + executeQuery( + "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + + container.getUsername() + "@'%';"); + } + + private void executeQuery(String query) { + try { + database.query( + ctx -> ctx + .execute(query)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void createAndPopulateTables() { + createAndPopulateActualTable(); + createAndPopulateRandomTable(); + } + + private void createAndPopulateActualTable() { + executeQuery("CREATE DATABASE " + MODELS_SCHEMA + ";"); + executeQuery(String + .format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", + MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID)); + for (JsonNode recordJson : MODEL_RECORDS) { + writeModelRecord(recordJson); + } + } + + /** + * This database and table is not part of Airbyte sync. It is being created just to make sure the + * databases not being synced by Airbyte are not causing issues with our debezium logic + */ + private void createAndPopulateRandomTable() { + executeQuery("CREATE DATABASE " + MODELS_SCHEMA + "_random" + ";"); + executeQuery(String + .format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", + MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", COL_ID + "_random", + COL_MAKE_ID + "_random", + COL_MODEL + "_random", COL_ID + "_random")); + final List MODEL_RECORDS_RANDOM = ImmutableList.of( + Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 11000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", + "Fiesta-random")), + Jsons.jsonNode(ImmutableMap + .of(COL_ID + "_random", 12000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", + "Focus-random")), + Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 13000, COL_MAKE_ID + "_random", 1, COL_MODEL + "_random", + "Ranger-random")), + Jsons.jsonNode(ImmutableMap + .of(COL_ID + "_random", 14000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", + "GLA-random")), + Jsons.jsonNode(ImmutableMap + .of(COL_ID + "_random", 15000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", + "A 220-random")), + Jsons + .jsonNode(ImmutableMap + .of(COL_ID + "_random", 16000, COL_MAKE_ID + "_random", 2, COL_MODEL + "_random", + "E 350-random"))); + for (JsonNode recordJson : MODEL_RECORDS_RANDOM) { + writeRecords(recordJson, MODELS_SCHEMA + "_random", MODELS_STREAM_NAME + "_random", + COL_ID + "_random", COL_MAKE_ID + "_random", COL_MODEL + "_random"); + } + } + + private void writeModelRecord(JsonNode recordJson) { + writeRecords(recordJson, CdcMySqlSourceTest.MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, + COL_MAKE_ID, + COL_MODEL); + } + + private void writeRecords( + JsonNode recordJson, + String dbName, + String streamName, + String idCol, + String makeIdCol, + String modelCol) { + executeQuery( + String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName, + idCol, makeIdCol, modelCol, + recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(), + recordJson.get(modelCol).asText())); + } + + @AfterEach + public void tearDown() { + try { + database.close(); + container.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + @DisplayName("On the first sync, produce returns records that exist in the database.") + void testExistingData() throws Exception { + final AutoCloseableIterator read = source + .read(config, CONFIGURED_CATALOG, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read); + + final Set recordMessages = extractRecordMessages(actualRecords); + final List stateMessages = extractStateMessages(actualRecords); + + assertExpectedRecords( + new HashSet<>(MODEL_RECORDS), recordMessages); + assertExpectedStateMessages(stateMessages); + } + + @Test + @DisplayName("When a record is deleted, produces a deletion record.") + void testDelete() throws Exception { + final AutoCloseableIterator read1 = source + .read(config, CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final List stateMessages1 = extractStateMessages(actualRecords1); + + assertExpectedStateMessages(stateMessages1); + + executeQuery(String + .format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, + 11)); + + final JsonNode state = stateMessages1.get(0).getData(); + final AutoCloseableIterator read2 = source + .read(config, CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + final List recordMessages2 = new ArrayList<>( + extractRecordMessages(actualRecords2)); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedStateMessages(stateMessages2); + assertEquals(1, recordMessages2.size()); + assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); + assertNotNull(recordMessages2.get(0).getData().get(CDC_LOG_FILE)); + assertNotNull(recordMessages2.get(0).getData().get(CDC_UPDATED_AT)); + assertNotNull(recordMessages2.get(0).getData().get(CDC_DELETED_AT)); + } + + @Test + @DisplayName("When a record is updated, produces an update record.") + void testUpdate() throws Exception { + final String updatedModel = "Explorer"; + final AutoCloseableIterator read1 = source + .read(config, CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final List stateMessages1 = extractStateMessages(actualRecords1); + + assertExpectedStateMessages(stateMessages1); + + executeQuery(String + .format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, + COL_MODEL, updatedModel, COL_ID, 11)); + + final JsonNode state = stateMessages1.get(0).getData(); + final AutoCloseableIterator read2 = source + .read(config, CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + final List recordMessages2 = new ArrayList<>( + extractRecordMessages(actualRecords2)); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedStateMessages(stateMessages2); + assertEquals(1, recordMessages2.size()); + assertEquals(11, recordMessages2.get(0).getData().get(COL_ID).asInt()); + assertEquals(updatedModel, recordMessages2.get(0).getData().get(COL_MODEL).asText()); + assertNotNull(recordMessages2.get(0).getData().get(CDC_LOG_FILE)); + assertNotNull(recordMessages2.get(0).getData().get(CDC_UPDATED_AT)); + assertTrue(recordMessages2.get(0).getData().get(CDC_DELETED_AT).isNull()); + } + + @SuppressWarnings({"BusyWait", "CodeBlock2Expr"}) + @Test + @DisplayName("Verify that when data is inserted into the database while a sync is happening and after the first sync, it all gets replicated.") + void testRecordsProducedDuringAndAfterSync() throws Exception { + + final int recordsToCreate = 20; + final int[] recordsCreated = {0}; + // first batch of records. 20 created here and 6 created in setup method. + while (recordsCreated[0] < recordsToCreate) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 100 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated[0])); + writeModelRecord(record); + recordsCreated[0]++; + } + + final AutoCloseableIterator firstBatchIterator = source + .read(config, CONFIGURED_CATALOG, null); + final List dataFromFirstBatch = AutoCloseableIterators + .toListAndClose(firstBatchIterator); + List stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch); + assertExpectedStateMessages(stateAfterFirstBatch); + Set recordsFromFirstBatch = extractRecordMessages( + dataFromFirstBatch); + assertEquals((MODEL_RECORDS.size() + 20), recordsFromFirstBatch.size()); + + // second batch of records again 20 being created + recordsCreated[0] = 0; + while (recordsCreated[0] < recordsToCreate) { + final JsonNode record = + Jsons.jsonNode(ImmutableMap + .of(COL_ID, 200 + recordsCreated[0], COL_MAKE_ID, 1, COL_MODEL, + "F-" + recordsCreated[0])); + writeModelRecord(record); + recordsCreated[0]++; + } + + final JsonNode state = stateAfterFirstBatch.get(0).getData(); + final AutoCloseableIterator secondBatchIterator = source + .read(config, CONFIGURED_CATALOG, state); + final List dataFromSecondBatch = AutoCloseableIterators + .toListAndClose(secondBatchIterator); + + List stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch); + assertExpectedStateMessages(stateAfterSecondBatch); + + Set recordsFromSecondBatch = extractRecordMessages( + dataFromSecondBatch); + assertEquals(20, recordsFromSecondBatch.size(), + "Expected 20 records to be replicated in the second sync."); + + // sometimes there can be more than one of these at the end of the snapshot and just before the + // first incremental. + final Set recordsFromFirstBatchWithoutDuplicates = removeDuplicates( + recordsFromFirstBatch); + final Set recordsFromSecondBatchWithoutDuplicates = removeDuplicates( + recordsFromSecondBatch); + + final int recordsCreatedBeforeTestCount = MODEL_RECORDS.size(); + assertTrue(recordsCreatedBeforeTestCount < recordsFromFirstBatchWithoutDuplicates.size(), + "Expected first sync to include records created while the test was running."); + assertEquals(40 + recordsCreatedBeforeTestCount, + recordsFromFirstBatchWithoutDuplicates.size() + recordsFromSecondBatchWithoutDuplicates + .size()); + } + + private static Set removeDuplicates(Set messages) { + final Set existingDataRecordsWithoutUpdated = new HashSet<>(); + final Set output = new HashSet<>(); + + for (AirbyteRecordMessage message : messages) { + ObjectNode node = message.getData().deepCopy(); + node.remove("_ab_cdc_updated_at"); + + if (existingDataRecordsWithoutUpdated.contains(node)) { + LOGGER.info("Removing duplicate node: " + node); + } else { + output.add(message); + existingDataRecordsWithoutUpdated.add(node); + } + } + + return output; + } + + @Test + @DisplayName("When both incremental CDC and full refresh are configured for different streams in a sync, the data is replicated as expected.") + void testCdcAndFullRefreshInSameSync() throws Exception { + final ConfiguredAirbyteCatalog configuredCatalog = Jsons.clone(CONFIGURED_CATALOG); + + final List MODEL_RECORDS_2 = ImmutableList.of( + Jsons.jsonNode(ImmutableMap.of(COL_ID, 110, COL_MAKE_ID, 1, COL_MODEL, "Fiesta-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 120, COL_MAKE_ID, 1, COL_MODEL, "Focus-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 130, COL_MAKE_ID, 1, COL_MODEL, "Ranger-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 140, COL_MAKE_ID, 2, COL_MODEL, "GLA-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 150, COL_MAKE_ID, 2, COL_MODEL, "A 220-2")), + Jsons.jsonNode(ImmutableMap.of(COL_ID, 160, COL_MAKE_ID, 2, COL_MODEL, "E 350-2"))); + + executeQuery(String + .format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", + MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID)); + + for (JsonNode recordJson : MODEL_RECORDS_2) { + writeRecords(recordJson, CdcMySqlSourceTest.MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, + COL_MAKE_ID, COL_MODEL); + } + + ConfiguredAirbyteStream airbyteStream = new ConfiguredAirbyteStream() + .withStream(CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_2", + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes( + Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))); + airbyteStream.setSyncMode(SyncMode.FULL_REFRESH); + + List streams = configuredCatalog.getStreams(); + streams.add(airbyteStream); + configuredCatalog.withStreams(streams); + + final AutoCloseableIterator read1 = source + .read(config, configuredCatalog, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + + final Set recordMessages1 = extractRecordMessages(actualRecords1); + final List stateMessages1 = extractStateMessages(actualRecords1); + HashSet names = new HashSet<>(STREAM_NAMES); + names.add(MODELS_STREAM_NAME + "_2"); + assertExpectedStateMessages(stateMessages1); + assertExpectedRecords(Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream()) + .collect(Collectors.toSet()), + recordMessages1, + Collections.singleton(MODELS_STREAM_NAME), + names); + + final JsonNode puntoRecord = Jsons + .jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto")); + writeModelRecord(puntoRecord); + + final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); + final AutoCloseableIterator read2 = source + .read(config, configuredCatalog, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + + final Set recordMessages2 = extractRecordMessages(actualRecords2); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedStateMessages(stateMessages2); + assertExpectedRecords( + Streams.concat(MODEL_RECORDS_2.stream(), Stream.of(puntoRecord)) + .collect(Collectors.toSet()), + recordMessages2, + Collections.singleton(MODELS_STREAM_NAME), + names); + } + + @Test + @DisplayName("When no records exist, no records are returned.") + void testNoData() throws Exception { + + executeQuery(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME)); + + final AutoCloseableIterator read = source + .read(config, CONFIGURED_CATALOG, null); + final List actualRecords = AutoCloseableIterators.toListAndClose(read); + + final Set recordMessages = extractRecordMessages(actualRecords); + final List stateMessages = extractStateMessages(actualRecords); + + assertExpectedRecords(Collections.emptySet(), recordMessages); + assertExpectedStateMessages(stateMessages); + } + + @Test + @DisplayName("When no changes have been made to the database since the previous sync, no records are returned.") + void testNoDataOnSecondSync() throws Exception { + final AutoCloseableIterator read1 = source + .read(config, CONFIGURED_CATALOG, null); + final List actualRecords1 = AutoCloseableIterators.toListAndClose(read1); + final JsonNode state = extractStateMessages(actualRecords1).get(0).getData(); + + final AutoCloseableIterator read2 = source + .read(config, CONFIGURED_CATALOG, state); + final List actualRecords2 = AutoCloseableIterators.toListAndClose(read2); + + final Set recordMessages2 = extractRecordMessages(actualRecords2); + final List stateMessages2 = extractStateMessages(actualRecords2); + + assertExpectedRecords(Collections.emptySet(), recordMessages2); + assertExpectedStateMessages(stateMessages2); + } + + @Test + void testCheck() { + final AirbyteConnectionStatus status = source.check(config); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED); + } + + @Test + void testDiscover() throws Exception { + final AirbyteCatalog expectedCatalog = Jsons.clone(CATALOG); + + executeQuery(String + .format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200));", + MODELS_SCHEMA, MODELS_STREAM_NAME + "_2", COL_ID, COL_MAKE_ID, COL_MODEL)); + + List streams = expectedCatalog.getStreams(); + // stream with PK + streams.get(0).setSourceDefinedCursor(true); + addCdcMetadataColumns(streams.get(0)); + + AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream( + MODELS_STREAM_NAME + "_2", + MODELS_SCHEMA, + Field.of(COL_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER), + Field.of(COL_MODEL, JsonSchemaPrimitive.STRING)); + streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList()); + streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + addCdcMetadataColumns(streamWithoutPK); + + streams.add(streamWithoutPK); + expectedCatalog.withStreams(streams); + + final AirbyteCatalog actualCatalog = source.discover(config); + + assertEquals( + expectedCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) + .collect(Collectors.toList()), + actualCatalog.getStreams().stream().sorted(Comparator.comparing(AirbyteStream::getName)) + .collect(Collectors.toList())); + } + + private static AirbyteStream addCdcMetadataColumns(AirbyteStream stream) { + ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema(); + ObjectNode properties = (ObjectNode) jsonSchema.get("properties"); + + final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number")); + final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string")); + properties.set(CDC_LOG_FILE, stringType); + properties.set(CDC_LOG_POS, numberType); + properties.set(CDC_UPDATED_AT, numberType); + properties.set(CDC_DELETED_AT, numberType); + + return stream; + } + + private Set extractRecordMessages(List messages) { + final List recordMessageList = messages + .stream() + .filter(r -> r.getType() == Type.RECORD).map(AirbyteMessage::getRecord) + .collect(Collectors.toList()); + final Set recordMessageSet = new HashSet<>(recordMessageList); + + assertEquals(recordMessageList.size(), recordMessageSet.size(), + "Expected no duplicates in airbyte record message output for a single sync."); + + return recordMessageSet; + } + + private List extractStateMessages(List messages) { + return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState) + .collect(Collectors.toList()); + } + + private static void assertExpectedStateMessages(List stateMessages) { + // TODO: add assertion for boolean cdc is true + assertEquals(1, stateMessages.size()); + assertNotNull(stateMessages.get(0).getData()); + assertNotNull( + stateMessages.get(0).getData().get("cdc_state").get("state").get(MYSQL_CDC_OFFSET)); + assertNotNull( + stateMessages.get(0).getData().get("cdc_state").get("state").get(MYSQL_DB_HISTORY)); + } + + private static void assertExpectedRecords(Set expectedRecords, + Set actualRecords) { + // assume all streams are cdc. + assertExpectedRecords( + expectedRecords, + actualRecords, + actualRecords.stream().map(AirbyteRecordMessage::getStream).collect(Collectors.toSet())); + } + + private static void assertExpectedRecords(Set expectedRecords, + Set actualRecords, + Set cdcStreams) { + assertExpectedRecords(expectedRecords, actualRecords, cdcStreams, STREAM_NAMES); + } + + private static void assertExpectedRecords(Set expectedRecords, + Set actualRecords, + Set cdcStreams, + Set streamNames) { + final Set actualData = actualRecords + .stream() + .map(recordMessage -> { + assertTrue(streamNames.contains(recordMessage.getStream())); + assertNotNull(recordMessage.getEmittedAt()); + + assertEquals(MODELS_SCHEMA, recordMessage.getNamespace()); + + final JsonNode data = recordMessage.getData(); + + if (cdcStreams.contains(recordMessage.getStream())) { + assertNotNull(data.get(CDC_LOG_FILE)); + assertNotNull(data.get(CDC_LOG_POS)); + assertNotNull(data.get(CDC_UPDATED_AT)); + } else { + assertNull(data.get(CDC_LOG_FILE)); + assertNull(data.get(CDC_LOG_POS)); + assertNull(data.get(CDC_UPDATED_AT)); + assertNull(data.get(CDC_DELETED_AT)); + } + + ((ObjectNode) data).remove(CDC_LOG_FILE); + ((ObjectNode) data).remove(CDC_LOG_POS); + ((ObjectNode) data).remove(CDC_UPDATED_AT); + ((ObjectNode) data).remove(CDC_DELETED_AT); + + return data; + }) + .collect(Collectors.toSet()); + + assertEquals(expectedRecords, actualData); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 66fb76d101e2..6482eef659eb 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -219,6 +219,14 @@ public List> getIncrementalIterators(JsonN Map tableNameToTable, JdbcStateManager stateManager, Instant emittedAt) { + /** + * If a customer sets up a postgres source with cdc parameters (replication_slot and publication) + * but selects all the tables in FULL_REFRESH mode then we would still end up going through this + * path. We do have a check in place for debezium to make sure only tales in INCREMENTAL mode are + * synced {@link DebeziumRecordPublisher#getTableWhitelist(ConfiguredAirbyteCatalog)} but we should + * have a check here as well to make sure that if no table is in INCREMENTAL mode then skip this + * part + */ if (isCdc(config)) { // State works differently in CDC than it does in convention incremental. The state is written to an // offset file that debezium reads from. Then once all records are replicated, we read back that diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java index f112c0568e53..0760f59053c1 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java @@ -50,6 +50,10 @@ // todo (cgardens) - Sanity check that when configured for CDC that postgres performs like any other // incremental source. As we have more sources support CDC we will find a more reusable way of doing // this, but for now this is a solid sanity check. +/** + * None of the tests in this class use the cdc path (run the tests and search for `using CDC: false` + * in logs). This is exact same as {@link PostgresSourceAcceptanceTest} + */ public class CdcPostgresSourceAcceptanceTest extends SourceAcceptanceTest { private static final String SLOT_NAME_BASE = "debezium_slot"; @@ -66,6 +70,11 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); container.start(); + /** + * The publication is not being set as part of the config and because of it + * {@link io.airbyte.integrations.source.postgres.PostgresSource#isCdc(JsonNode)} returns false, as + * a result no test in this class runs through the cdc path. + */ config = Jsons.jsonNode(ImmutableMap.builder() .put("host", container.getHost()) .put("port", container.getFirstMappedPort()) @@ -84,7 +93,10 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { config.get("database").asText()), "org.postgresql.Driver", SQLDialect.POSTGRES); - + /** + * cdc expects the INCREMENTAL tables to contain primary key checkout + * {@link io.airbyte.integrations.source.postgres.PostgresSource#removeIncrementalWithoutPk(AirbyteStream)} + */ database.query(ctx -> { ctx.execute("SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); ctx.execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); @@ -119,6 +131,14 @@ protected JsonNode getConfig() { @Override protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + /** + * This catalog config is incorrect for CDC replication. We specify + * withCursorField(Lists.newArrayList("id")) but with CDC customers can't/shouldn't be able to + * specify cursor field for INCREMENTAL tables Take a look at + * {@link io.airbyte.integrations.source.postgres.PostgresSource#setIncrementalToSourceDefined(AirbyteStream)} + * We should also specify the primary keys for INCREMENTAL tables checkout + * {@link io.airbyte.integrations.source.postgres.PostgresSource#removeIncrementalWithoutPk(AirbyteStream)} + */ return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index a63cbb166d7d..3b3ed868bfda 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -37,8 +37,8 @@ If you do not see a type in this list, assume that it is coerced into a string. | :--- | :--- | :--- | | Full Refresh Sync | Yes | | | Incremental - Append Sync | Yes | | -| Replicate Incremental Deletes | Coming soon | | -| Logical Replication \(WAL\) | Coming soon | | +| Replicate Incremental Deletes | Yes | | +| CDC | Yes | | | SSL Support | Yes | | | SSH Tunnel Connection | Coming soon | | | Namespaces | Yes | Enabled by default | @@ -66,13 +66,69 @@ To create a dedicated database user, run the following commands against your dat CREATE USER 'airbyte'@'%' IDENTIFIED BY 'your_password_here'; ``` -Then give it access to the relevant schema: +The right set of permissions differ between the `STANDARD` and `CDC` replication method. +For `STANDARD` replication method, only `SELECT` permission is required. ```sql GRANT SELECT ON .* TO 'airbyte'@'%'; ``` +For `CDC` replication method, `SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT` permissions are required. +```sql +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'airbyte'@'%'; +``` + +Your database user should now be ready for use with Airbyte. + +#### 3. Set up CDC -You can limit this grant down to specific tables instead of the whole database. Note that to replicate data from multiple MySQL schemas, you can re-run the command above to grant access to all the relevant schemas, but you'll need to set up multiple sources connecting to the same db on multiple schemas. +For `STANDARD` replication method this is not applicable. If you select the `CDC` replication method then only this is required. Please read [the section on CDC below](mysql.md#setting-up-cdc-for-mysql) for more information. + +#### 4. That's it! Your database user should now be ready for use with Airbyte. +## Change Data Capture \(CDC\) + +* If you need a record of deletions and can accept the limitations posted below, you should be able to use CDC for MySQL. +* If your data set is small, and you just want snapshot of your table in the destination, consider using Full Refresh replication for your table instead of CDC. +* If the limitations prevent you from using CDC and your goal is to maintain a snapshot of your table in the destination, consider using non-CDC incremental and occasionally reset the data and re-sync. +* If your table has a primary key but doesn't have a reasonable cursor field for incremental syncing \(i.e. `updated_at`\), CDC allows you to sync your table incrementally. + +### CDC Limitations + +* Make sure to read our [CDC docs](../../understanding-airbyte/cdc.md) to see limitations that impact all databases using CDC replication. +* Our CDC implementation uses at least once delivery for all change records. + +### Setting up CDC for MySQL + +You must enable binary logging for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes. + +#### Enable binary logging + +You must enable binary logging for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes. You can configure your MySQL server configuration file with the following properties, which are described in below: +``` +server-id = 223344 +log_bin = mysql-bin +binlog_format = ROW +binlog_row_image = FULL +expire_logs_days = 10 +``` +* server-id : The value for the server-id must be unique for each server and replication client in the MySQL cluster. The `server-id` should be a non-zero value. If the `server-id` is already set to a non-zero value, you don't need to make any change. You can set the `server-id` to any value between 1 and 4294967295. For more information refer [mysql doc](https://dev.mysql.com/doc/refman/8.0/en/replication-options.html#sysvar_server_id) +* log_bin : The value of log_bin is the base name of the sequence of binlog files. If the `log_bin` is already set, you don't need to make any change. For more information refer [mysql doc](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#option_mysqld_log-bin) +* binlog_format : The `binlog_format` must be set to `ROW`. For more information refer [mysql doc](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_format) +* binlog_row_image : The `binlog_row_image` must be set to `FULL`. It determines how row images are written to the binary log. For more information refer [mysql doc](https://dev.mysql.com/doc/refman/5.7/en/replication-options-binary-log.html#sysvar_binlog_row_image) +* expire_logs_days : This is the number of days for automatic binlog file removal. We recommend 10 days so that in case of a failure in sync or if the sync is paused, we still have some bandwidth to start from the last point in incremental sync. We also recommend setting frequent syncs for CDC. + +#### Enable GTIDs \(Optional\) + +Global transaction identifiers (GTIDs) uniquely identify transactions that occur on a server within a cluster. +Though not required for a Airbyte MySQL connector, using GTIDs simplifies replication and enables you to more easily confirm if primary and replica servers are consistent. +For more information refer [mysql doc](https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#option_mysqld_gtid-mode) +* Enable gtid_mode : Boolean that specifies whether GTID mode of the MySQL server is enabled or not. Enable it via `mysql> gtid_mode=ON` +* Enable enforce_gtid_consistency : Boolean that specifies whether the server enforces GTID consistency by allowing the execution of statements that can be logged in a transactionally safe manner. Required when using GTIDs. Enable it via `mysql> enforce_gtid_consistency=ON` + +####Note + +When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. +Airbyte doesn't acquire any table locks (for tables defined with MyISAM engine, the tables would still be locked) while creating the snapshot to allow writes by other database clients. +But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running. \ No newline at end of file diff --git a/docs/understanding-airbyte/cdc.md b/docs/understanding-airbyte/cdc.md index fc20f286d3bf..737ab07bccaf 100644 --- a/docs/understanding-airbyte/cdc.md +++ b/docs/understanding-airbyte/cdc.md @@ -14,7 +14,8 @@ The Airbyte Protocol outputs records from sources. Records from `UPDATE` stateme We add some metadata columns for CDC sources: -* `ab_cdc_lsn` is the point in the log where the record was retrieved +* `ab_cdc_lsn` (specific to postgres source) is the point in the log where the record was retrieved +* `ab_cdc_log_file` & `ab_cdc_log_pos` (specific to mysql source) is the file name and position in the file where the record was retrieved * `ab_cdc_updated_at` is the timestamp for the database transaction that resulted in this record change and is present for records from `DELETE`/`INSERT`/`UPDATE` statements * `ab_cdc_deleted_at` is the timestamp for the database transaction that resulted in this record change and is only present for records from `DELETE` statements @@ -30,10 +31,10 @@ We add some metadata columns for CDC sources: ## Current Support * [Postgres](../integrations/sources/postgres.md) (For a quick video overview of CDC on Postgres, click [here](https://www.youtube.com/watch?v=NMODvLgZvuE&ab_channel=Airbyte)) +* [MySQL](../integrations/sources/mysql.md) ## Coming Soon -* [MySQL](../integrations/sources/mysql.md) * [SQL Server / MSSQL](../integrations/sources/mssql.md) * Oracle DB * Please [create a ticket](https://github.com/airbytehq/airbyte/issues/new/choose) if you need CDC support on another database!