Skip to content

Commit

Permalink
🐛 Destination MySQL: fix problem if source has a column with json (#4825
Browse files Browse the repository at this point in the history
)

* [4583] Fixed MySQL destination of fails is source has a column with json data
  • Loading branch information
etsybaev authored and gl-pix committed Jul 22, 2021
1 parent 657b804 commit aa62bac
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private void loadDataIntoTable(JdbcDatabase database,
String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'";

String query = String.format(
"LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\r\\n'",
"LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'",
absoluteFile, schemaName, tmpTableName);

try (Statement stmt = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -204,6 +213,48 @@ public void testCustomDbtTransformations() throws Exception {
// super.testCustomDbtTransformations();
}

@Test
public void testJsonSync() throws Exception {
final String catalogAsText = "{\n"
+ " \"streams\": [\n"
+ " {\n"
+ " \"name\": \"exchange_rate\",\n"
+ " \"json_schema\": {\n"
+ " \"properties\": {\n"
+ " \"id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"data\": {\n"
+ " \"type\": \"string\"\n"
+ " }"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}\n";

final AirbyteCatalog catalog = Jsons.deserialize(catalogAsText, AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("data", "{\"name\":\"Conferência Faturamento - Custo - Taxas - Margem - Resumo ano inicial até -2\",\"description\":null}")
.build()))),
new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
}

@Override
@Test
public void testLineBreakCharacters() {
Expand Down

0 comments on commit aa62bac

Please sign in to comment.