Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination MySQL: fix problem if source has a column with json #4825

Merged
merged 5 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private void loadDataIntoTable(JdbcDatabase database,
String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'";

String query = String.format(
"LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\r\\n'",
"LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'",
absoluteFile, schemaName, tmpTableName);

try (Statement stmt = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -204,6 +213,48 @@ public void testCustomDbtTransformations() throws Exception {
// super.testCustomDbtTransformations();
}

@Test
public void testJsonSync() throws Exception {
final String catalogAsText = "{\n"
+ " \"streams\": [\n"
+ " {\n"
+ " \"name\": \"exchange_rate\",\n"
+ " \"json_schema\": {\n"
+ " \"properties\": {\n"
+ " \"id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"data\": {\n"
+ " \"type\": \"string\"\n"
+ " }"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}\n";

final AirbyteCatalog catalog = Jsons.deserialize(catalogAsText, AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("data", "{\"name\":\"Conferência Faturamento - Custo - Taxas - Margem - Resumo ano inicial até -2\",\"description\":null}")
.build()))),
new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
}

@Override
@Test
public void testLineBreakCharacters() {
Expand Down