Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't split lines on LSEP unicode characters when reading lines in destinations #3327

Merged
merged 9 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.3.1",
"dockerImageTag": "0.3.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.3.2",
"dockerImageTag": "0.3.3",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres",
"icon": "postgresql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.4",
"dockerImageTag": "0.3.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.4",
"dockerImageTag": "0.2.5",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.3.5",
"dockerImageTag": "0.3.6",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"icon": "redshift.svg"
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
icon: postgresql.svg
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.5
dockerImageTag: 0.3.6
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.4
dockerImageTag: 0.2.5
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,13 @@ public void run(String[] args) throws Exception {

@VisibleForTesting
static void consumeWriteStream(AirbyteMessageConsumer consumer) throws Exception {
final Scanner input = new Scanner(System.in);
// use a Scanner that only processes new line characters to strictly abide with the
// https://jsonlines.org/ standard
final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+");
try (consumer) {
consumer.start();
while (input.hasNextLine()) {
final String inputString = input.nextLine();
while (input.hasNext()) {
final String inputString = input.next();
final Optional<AirbyteMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,35 @@ public void testSecondSync() throws Exception {
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);
}

/**
* Tests that we are able to read over special characters properly when processing line breaks in
* destinations.
*/
@Test
public void testLineBreakCharacters() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.catalogFile), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final JsonNode config = getConfig();

final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("currency", "USD\u2028")
.put("date", "2020-03-\n31T00:00:00Z\r")
.put("HKD", 10)
.put("NZD", 700)
.build()))));

runSync(config, secondSyncMessages, configuredCatalog);
final String defaultSchema = getDefaultSchema(config);
retrieveRawRecordsAndAssertSameMessages(catalog, secondSyncMessages, defaultSchema);
}

/**
* Verify that the integration successfully writes records incrementally. The second run should
* append records to the datastore instead of overwriting the previous run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/destination-local-json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLIntegrationTest extends TestDestination {
Expand Down Expand Up @@ -161,4 +162,10 @@ protected void tearDown(TestDestinationEnv testEnv) {
db.close();
}

@Override
@Test
public void testLineBreakCharacters() {
// overrides test with a no-op until we handle full UTF-8 in the destination
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.name=airbyte/destination-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.version=0.3.6
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5
LABEL io.airbyte.name=airbyte/destination-snowflake