diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 87caf4557938..f0c18b3ffc1d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.3.7", + "dockerImageTag": "0.3.8", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 25f6500cdcd6..0f6ec030e564 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -81,7 +81,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.3.7 + dockerImageTag: 0.3.8 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg - sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77 diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index a5c11eda3505..9eab9754c5c3 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.7 +LABEL io.airbyte.version=0.3.8 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java index 07bb6738454a..ea2ce50ae20f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java @@ -86,7 +86,17 @@ public void start(Queue> queue) { // more on the tombstone: // https://debezium.io/documentation/reference/configuration/event-flattening.html if (e.value() != null) { - queue.add(e); + boolean inserted = false; + while (!inserted) { + inserted = queue.offer(e); + if (!inserted) { + try { + Thread.sleep(10); + } catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + } + } } }) .using((success, message, error) -> { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 79c772b96a81..29c7f3dbcec6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -233,7 +233,12 @@ public List> getIncrementalIterators(JsonN final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager); AirbyteSchemaHistoryStorage schemaHistoryManager = initializeDBHistory(stateManager); FilteredFileDatabaseHistory.setDatabaseName(config.get("database").asText()); - final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(); + /** + * We use 10000 as capacity cause the default queue size and batch size of debezium is : + * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE} is 2048 + * {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192 + */ + final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(10000); final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(config, catalog, offsetManager, schemaHistoryManager); publisher.start(queue);