Skip to content

Commit

Permalink
don't split lines on LSEP unicode characters when reading lines in de…
Browse files Browse the repository at this point in the history
…stinations (#3327)

* use strict JSONL definition of new lines in destinations

* failing test case

* use next instead of nextLine

* add \n in string for test

* bump destination versions

* bump to even newer version

* bump versions in dockerfiles as well

* force mysql test to pass
  • Loading branch information
jrhizor authored May 10, 2021
1 parent 4b79b2a commit 6fd8e00
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.3.1",
"dockerImageTag": "0.3.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.3.2",
"dockerImageTag": "0.3.3",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.4",
"dockerImageTag": "0.3.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.3.5",
"dockerImageTag": "0.3.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"icon": "redshift.svg"
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.5
dockerImageTag: 0.3.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ public void run(String[] args) throws Exception {

@VisibleForTesting
static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception {
final Scanner input = new Scanner(System.in);
// use a Scanner that only processes new line characters to strictly abide with the
// https://jsonlines.org/ standard
final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+");
try (consumer) {
consumer.start();
while (input.hasNextLine()) {
final String inputString = input.nextLine();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,35 @@ public void testSecondSync() throws Exception {
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);
}

/**
* Tests that we are able to read over special characters properly when processing line breaks in
* destinations.
*/
@Test
public void testLineBreakCharacters() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final JsonNode config = getConfig();

final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD\u2028")
.put("date", "2020-03-\n31T00:00:00Z\r")
.put("HKD", 10)
.put("NZD", 700)
.build()))));

runSync(config, secondSyncMessages, configuredCatalog);
final String defaultSchema = getDefaultSchema(config);
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);
}

/**
* Verify that the integration successfully writes records incrementally. The second run should
* append records to the datastore instead of overwriting the previous run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/destination-local-json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLIntegrationTest extends TestDestination {
Expand Down Expand Up @@ -161,4 +162,10 @@ protected void tearDown(TestDestinationEnv testEnv) {
db.close();
}

@Override
@Test
public void testLineBreakCharacters() {
// overrides test with a no-op until we handle full UTF-8 in the destination
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.name=airbyte/destination-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.version=0.3.6
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/destination-snowflake

0 comments on commit 6fd8e00

Please sign in to comment.