Skip to content

Commit

Permalink
implement support for per-stream state in mysql source (#16007)
Browse files Browse the repository at this point in the history
* implement support for per-stream state in mysql source

* enable flag in tests

* activate per stream in tests

* enable per stream state in test

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
subodh1810 and octavia-squidington-iii authored Sep 9, 2022
1 parent 8c56eeb commit cd6f997
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.9
dockerImageTag: 0.6.10
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6480,7 +6480,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.9"
- dockerImage: "airbyte/source-mysql:0.6.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.9
LABEL io.airbyte.version=0.6.10
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,9 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
Expand Down Expand Up @@ -68,6 +69,7 @@ static void init() throws SQLException {
.withEnv("MYSQL_ROOT_HOST", "%")
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD);
container.start();
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", container.getPassword());
connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
}
Expand Down Expand Up @@ -320,4 +322,9 @@ void testStrictSSLUnsecuredWithTunnel() throws Exception {

final Exception exception = assertThrows(NullPointerException.class, () -> source.check(config));
}

@Override
protected boolean supportsPerStream() {
return true;
}
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.9
LABEL io.airbyte.version=0.6.10
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
Expand All @@ -29,10 +31,15 @@
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.SyncMode;
Expand Down Expand Up @@ -63,13 +70,15 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source

public static final String SSL_PARAMETERS_WITH_CERTIFICATE_VALIDATION = "verifyServerCertificate=true";
public static final String SSL_PARAMETERS_WITHOUT_CERTIFICATE_VALIDATION = "verifyServerCertificate=false";
private final FeatureFlags featureFlags;

public static Source sshWrappedSource() {
return new SshWrappedSource(new MySqlSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public MySqlSource() {
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new MySqlSourceOperations());
this.featureFlags = new EnvVariableFeatureFlags();
}

private static AirbyteStream overrideSyncModes(final AirbyteStream stream) {
Expand Down Expand Up @@ -184,6 +193,35 @@ private static boolean isCdc(final JsonNode config) {
return false;
}

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
if (!featureFlags.useStreamCapableState()) {
return AirbyteStateType.LEGACY;
}

return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

@Override
protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode config) {
if (!featureFlags.useStreamCapableState()) {
return List.of(new AirbyteStateMessage()
.withType(AirbyteStateType.LEGACY)
.withData(Jsons.jsonNode(new DbState())));
}

if (getSupportedStateType(config) == AirbyteStateType.GLOBAL) {
final AirbyteGlobalState globalState = new AirbyteGlobalState()
.withSharedState(Jsons.jsonNode(new CdcState()))
.withStreamStates(List.of());
return List.of(new AirbyteStateMessage().withType(AirbyteStateType.GLOBAL).withGlobal(globalState));
} else {
return List.of(new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()));
}
}

@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -12,6 +13,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand Down Expand Up @@ -101,6 +103,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.build());
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down Expand Up @@ -185,4 +188,9 @@ public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception
assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
Expand Down Expand Up @@ -43,6 +46,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "STANDARD")
.build());
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
Expand Down Expand Up @@ -121,4 +125,9 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest.setEnv;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS;
Expand All @@ -20,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -62,6 +64,7 @@ public class CdcMysqlSourceTest extends CdcSourceTest {

@BeforeEach
public void setup() throws SQLException {
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
init();
revokeAllPermissions();
grantCorrectPermissions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
Expand Down Expand Up @@ -55,6 +56,7 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {

@BeforeAll
static void init() throws Exception {
setEnv(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, "true");
container = new MySQLContainer<>("mysql:8.0")
.withUsername(TEST_USER)
.withPassword(TEST_PASSWORD.call())
Expand Down Expand Up @@ -220,4 +222,10 @@ protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String names
return expectedMessages;
}

@Override
protected boolean supportsPerStream() {
return true;
}


}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ If you do not see a type in this list, assume that it is coerced into a string.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------|
| 0.6.10 | 2022-09-08 | [16007](https://github.com/airbytehq/airbyte/pull/16007) | Implement per stream state support. |
| 0.6.9 | 2022-09-03 | [16216](https://github.com/airbytehq/airbyte/pull/16216) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. |
| 0.6.8 | 2022-09-01 | [16259](https://github.com/airbytehq/airbyte/pull/16259) | Emit state messages more frequently |
| 0.6.7 | 2022-08-30 | [16114](https://github.com/airbytehq/airbyte/pull/16114) | Prevent traffic going on an unsecured channel in strict-encryption version of source mysql |
Expand Down

0 comments on commit cd6f997

Please sign in to comment.