Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination S3: avro and parquet formats have issues with JsonToAvroSchemaConverter #8574

Merged
merged 32 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fff9beb
add namespace to avro record type
Dec 5, 2021
2f259cd
refactoring
Dec 6, 2021
487360d
Add unit tests
Dec 7, 2021
f75c85e
added CHANGELOG
Dec 7, 2021
0add1d8
Merge branch 'master' into vmaltsev/8242-destination-s3-support-names…
Dec 7, 2021
8ca1165
fix typo in method name
Dec 8, 2021
dcde062
fix typo in method name
Dec 8, 2021
93d53ac
fix for jdk 17
Dec 15, 2021
ede8d38
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
27be3fe
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 15, 2021
6ea465f
Merge branch 'master' into vmaltsev/8242-destination-s3-support-names…
Dec 15, 2021
8ad0a65
created recursive keys adding
Dec 16, 2021
ca75033
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
72ab46f
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
f4bb8cd
Merge branch 'master' into vmaltsev/8242-destination-s3-support-names…
Dec 16, 2021
1aed974
refactoring
Dec 16, 2021
5c5bbb1
format code
Dec 16, 2021
2575dcf
cleanup Dockerfile
Dec 16, 2021
4409405
refactoring
Dec 16, 2021
aec3384
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 16, 2021
e2816e5
removed unneded tests case
Dec 20, 2021
f35e1e3
updated namespace generation
Dec 20, 2021
21895ff
removed unneeded method from AvroNameTransformer
Dec 20, 2021
7efd5aa
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 20, 2021
6a773f9
Merge branch 'master' of github.com:airbytehq/airbyte
Dec 21, 2021
7a1db5b
Merge branch 'master' into vmaltsev/8242-destination-s3-support-names…
Dec 21, 2021
6f0d91f
resolved merge conflicts
Dec 21, 2021
7879fd2
removed unused imports
Dec 21, 2021
1e16cb2
reformat the code
alexandr-shegeda Dec 21, 2021
0af1281
Merge remote-tracking branch 'origin/vmaltsev/8242-destination-s3-sup…
alexandr-shegeda Dec 21, 2021
9d0d0ad
bump version
Dec 21, 2021
affce39
bump Bigquery Denormalized version
Dec 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.1.8",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.14",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3",
"icon": "s3.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
"name": "Google Cloud Storage (GCS)",
"dockerRepository": "airbyte/destination-gcs",
"dockerImageTag": "0.1.14",
"dockerImageTag": "0.1.17",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs",
"icon": "googlecloudstorage.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.11
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: Cassandra
Expand Down Expand Up @@ -60,7 +60,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
- name: Google PubSub
Expand Down Expand Up @@ -167,7 +167,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
- name: SFTP-JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,

Schema schema = (airbyteSchema == null ? GcsUtils.getDefaultAvroSchema(stream.getName(), stream.getNamespace(), true)
: new JsonToAvroSchemaConverter().getAvroSchema(airbyteSchema, stream.getName(),
stream.getNamespace(), true, false, false));
stream.getNamespace(), true, false, false,true));
LOGGER.info("Avro schema : {}", schema);
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO);
objectKey = String.join("/", outputPrefix, outputFilename);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public S3Writer create(final GcsDestinationConfig config,
return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, AvroConstants.JSON_CONVERTER, stream.getJsonSchema());
} else {
final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true);
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true, true);

LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));
return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, AvroConstants.JSON_CONVERTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.s3.avro;

import static io.airbyte.integrations.destination.s3.util.AvroRecordHelper.obtainPaths;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -46,6 +48,7 @@ public class JsonToAvroSchemaConverter {
.addToSchema(Schema.create(Schema.Type.LONG));

private final Map<String, String> standardizedNames = new HashMap<>();
private final Map<JsonNode, String> jsonNodePathMap = new HashMap<>();

static List<JsonSchemaType> getNonNullTypes(final String fieldName, final JsonNode fieldDefinition) {
return getTypes(fieldName, fieldDefinition).stream()
Expand Down Expand Up @@ -96,8 +99,9 @@ public Map<String, String> getStandardizedNames() {
public Schema getAvroSchema(final JsonNode jsonSchema,
final String name,
@Nullable final String namespace,
final boolean appendAirbyteFields) {
return getAvroSchema(jsonSchema, name, namespace, appendAirbyteFields, true, true);
final boolean appendAirbyteFields,
final boolean isRootNode) {
return getAvroSchema(jsonSchema, name, namespace, appendAirbyteFields, true, true, isRootNode);
}

/**
Expand All @@ -108,9 +112,13 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
@Nullable final String namespace,
final boolean appendAirbyteFields,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
final boolean addStringToLogicalTypes,
final boolean isRootNode) {
final String stdName = AvroConstants.NAME_TRANSFORMER.getIdentifier(name);
RecordBuilder<Schema> builder = SchemaBuilder.record(stdName);
if (isRootNode) {
obtainPaths("", jsonSchema, jsonNodePathMap);
}
if (!stdName.equals(name)) {
standardizedNames.put(name, stdName);
LOGGER.warn("Schema name contains illegal character(s) and is standardized: {} -> {}", name,
Expand Down Expand Up @@ -220,7 +228,7 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType
String.format("Array field %s has invalid items property: %s", fieldName, items));
}
}
case OBJECT -> fieldSchema = getAvroSchema(fieldDefinition, fieldName, null, false, appendExtraProps, addStringToLogicalTypes);
case OBJECT -> fieldSchema = getAvroSchema(fieldDefinition, fieldName, jsonNodePathMap.get(fieldDefinition), false, appendExtraProps, addStringToLogicalTypes, false);
default -> throw new IllegalStateException(
String.format("Unexpected type for field %s: %s", fieldName, fieldType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
package io.airbyte.integrations.destination.s3.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Helper methods for unit tests. This is needed by multiple modules, so it is in the src directory.
Expand All @@ -18,7 +23,7 @@ public class AvroRecordHelper {

public static JsonFieldNameUpdater getFieldNameUpdater(final String streamName, final String namespace, final JsonNode streamSchema) {
final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true);
schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true, true);
return new JsonFieldNameUpdater(schemaConverter.getStandardizedNames());
}

Expand Down Expand Up @@ -47,4 +52,32 @@ public static JsonNode pruneAirbyteJson(final JsonNode input) {
return output;
}

public static void obtainPaths(String currentPath, JsonNode jsonNode, Map<JsonNode, String> jsonNodePathMap) {
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
String pathPrefix = currentPath.isEmpty() ? "" : currentPath + "/";
String[] pathFieldsArray = currentPath.split("/");
String parent = Arrays.stream(pathFieldsArray)
.filter(x -> !x.equals("items"))
.filter(x -> !x.equals("properties"))
.filter(x -> !x.equals(pathFieldsArray[pathFieldsArray.length - 1]))
.collect(Collectors.joining("."));
if (!parent.isEmpty()) {
jsonNodePathMap.put(jsonNode, parent);
}
while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
obtainPaths(pathPrefix + entry.getKey(), entry.getValue(), jsonNodePathMap);
}
} else if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;

for (int i = 0; i < arrayNode.size(); i++) {
String arrayPath = currentPath + "/" + i;
obtainPaths(arrayPath, arrayNode.get(i), jsonNodePathMap);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public S3Writer create(final S3DestinationConfig config,
LOGGER.info("Json schema for stream {}: {}", stream.getName(), stream.getJsonSchema());

final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true);
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true, true);

LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testJsonAvroConversion(final String schemaName,
final JsonNode avroSchema,
final JsonNode avroObject)
throws Exception {
final Schema actualAvroSchema = SCHEMA_CONVERTER.getAvroSchema(jsonSchema, schemaName, namespace, appendAirbyteFields);
final Schema actualAvroSchema = SCHEMA_CONVERTER.getAvroSchema(jsonSchema, schemaName, namespace, appendAirbyteFields, true);
assertEquals(
avroSchema,
Jsons.deserialize(actualAvroSchema.toString()),
Expand Down
Loading