Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 MySQL Source : Expose serverTimezone debezium option via MySQL Source spec for CDC #17815

Merged
merged 19 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.3
dockerImageTag: 1.0.4
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
11 changes: 9 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6964,7 +6964,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.3"
- dockerImage: "airbyte/source-mysql:1.0.4"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down Expand Up @@ -7194,9 +7194,16 @@
\ <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\"\
>initial waiting time</a>."
default: 300
order: 4
min: 120
max: 1200
order: 1
server_time_zone:
type: "string"
title: "Configured server timezone for the MySQL source (Advanced)"
description: "Enter the configured MySQL server timezone. This should\
\ only be done if the configured timezone in your MySQL instance\
\ does not conform to IANNA standard."
order: 2
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ ENV APPLICATION source-mysql-strict-encrypt
COPY --from=build /airbyte /airbyte


LABEL io.airbyte.version=1.0.3
LABEL io.airbyte.version=1.0.4

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,15 @@
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\">initial waiting time</a>.",
"default": 300,
"order": 4,
"min": 120,
"max": 1200
"max": 1200,
"order": 1
},
"server_time_zone": {
"type": "string",
"title": "Configured server timezone for the MySQL source (Advanced)",
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.3
LABEL io.airbyte.version=1.0.4

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ static Properties getDebeziumProperties(final JdbcDatabase database) {
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "when_needed");
}

return props;
}

Expand All @@ -52,6 +53,15 @@ private static Properties commonProperties(final JdbcDatabase database) {
props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");

// For CDC mode, the user cannot provide timezone arguments as JDBC parameters - they are specifically defined in the replication_method
// config.
if (sourceConfig.get("replication_method").has("server_time_zone")) {
final String serverTimeZone = sourceConfig.get("replication_method").get("server_time_zone").asText();
if (!serverTimeZone.isEmpty()) {
props.setProperty("database.serverTimezone", serverTimeZone);
}
}

// Check params for SSL connection in config and add properties for CDC SSL connection
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
Expand Down Expand Up @@ -111,7 +121,6 @@ private static Properties commonProperties(final JdbcDatabase database) {
props.setProperty("database.include.list", sourceConfig.get("database").asText());

return props;

}

static Properties getSnapshotProperties(final JdbcDatabase database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J

checkOperations.add(database -> {
CdcConfigurationHelper.checkFirstRecordWaitTime(config);
CdcConfigurationHelper.checkServerTimeZoneConfig(config);
});
}
return checkOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.integrations.source.mysql.helpers;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.time.Duration;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,6 +76,26 @@ public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config)
return Optional.empty();
}

private static Optional<String> getCdcServerTimezone(final JsonNode config) {
final JsonNode replicationMethod = config.get("replication_method");
if (replicationMethod != null && replicationMethod.has("server_time_zone")) {
final String serverTimeZone = config.get("replication_method").get("server_time_zone").asText();
return Optional.of(serverTimeZone);
}
return Optional.empty();
}

public static void checkServerTimeZoneConfig(final JsonNode config) {
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
final Optional<String> serverTimeZone = getCdcServerTimezone(config);
if (serverTimeZone.isPresent()) {
final String timeZone = serverTimeZone.get();
if (!timeZone.isEmpty() && !ZoneId.getAvailableZoneIds().contains((timeZone))) {
throw new IllegalArgumentException(String.format("Given timezone %s is not valid. The given timezone must conform to the IANNA standard. "
+ "See https://www.iana.org/time-zones for more details", serverTimeZone.get()));
}
}
}

public static void checkFirstRecordWaitTime(final JsonNode config) {
// we need to skip the check because in tests, we set initial_waiting_seconds
// to 5 seconds for performance reasons, which is shorter than the minimum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,15 @@
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\">initial waiting time</a>.",
"default": 300,
"order": 4,
"min": 120,
"max": 1200
"max": 1200,
"order": 1
},
"server_time_zone": {
"type": "string",
"title": "Configured server timezone for the MySQL source (Advanced)",
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
"order": 2
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,18 @@ void testGetFirstRecordWaitTime() {
assertEquals(MAX_FIRST_RECORD_WAIT_TIME, CdcConfigurationHelper.getFirstRecordWaitTime(tooLongConfig));
}

@Test
void testServerTimeConfig() {
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
assertDoesNotThrow(() -> CdcConfigurationHelper.checkServerTimeZoneConfig(emptyConfig));

final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "server_time_zone", "America/Los_Angeles")));
assertDoesNotThrow(() -> CdcConfigurationHelper.checkServerTimeZoneConfig(normalConfig));

final JsonNode invalidConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "server_time_zone", "CEST")));
assertThrows(IllegalArgumentException.class, () -> CdcConfigurationHelper.checkServerTimeZoneConfig(invalidConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private void init() {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("initial_waiting_seconds", INITIAL_WAITING_SECONDS)
.put("time_zone", "America/Los_Angeles")
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
Expand Down
12 changes: 12 additions & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ The connector waits for the default initial wait time of 5 minutes (300 seconds)

If you know there are database changes to be synced, but the connector cannot read those changes, the root cause may be insufficient waiting time. In that case, you can increase the waiting time (example: set to 600 seconds) to test if it is indeed the root cause. On the other hand, if you know there are no database changes, you can decrease the wait time to speed up the zero record syncs.

**4. Set up server timezone\(Optional\)**

:::warning
This is an advanced feature. Use it if absolutely necessary.
:::

In CDC mode, the MySQl connector may need a timezone configured if the existing MySQL database been set up with a system timezone that is not recognized by the [IANA Timezone Database](https://www.iana.org/time-zones).

In this case, you can configure the server timezone to the equivalent IANA timezone compliant timezone. (e.g. CEST -> Europe/Berlin).


**Note**

When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running.
Expand Down Expand Up @@ -240,6 +251,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.4 | 2022-10-11 | [17815](https://github.com/airbytehq/airbyte/pull/17815) | Expose setting server timezone for CDC syncs |
| 1.0.3 | 2022-10-07 | [17236](https://github.com/airbytehq/airbyte/pull/17236) | Fix large table issue by fetch size |
| 1.0.2 | 2022-10-03 | [17170](https://github.com/airbytehq/airbyte/pull/17170) | Make initial CDC waiting time configurable |
| 1.0.1 | 2022-10-01 | [17459](https://github.com/airbytehq/airbyte/pull/17459) | Upgrade debezium version to 1.9.6 from 1.9.2 |
Expand Down