Skip to content

Commit

Permalink
Destination S3 (avro/parquet format): handle empty schemas (#44933)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Aug 30, 2024
1 parent 9934979 commit 0050274
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 9 deletions.
9 changes: 5 additions & 4 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ corresponds to that version.

### Java CDK

| Version | Date | Pull Request | Subject |
| :--------- | :--------- | :---------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
| Version | Date | Pull Request | Subject |
| :--------- | :--------- | :----------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
| 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. |
| 0.44.17 | 2024-08-27 | [\#44832](https://github.com/airbytehq/airbyte/pull/44832) | Fix issues where some error messages with upper cases do not get matched by the error translation framework. |
| 0.44.16 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Destinations: add sqlgenerator testing for mixed-case stream name |
| 0.44.15 | ?????????? | [\#?????](https://github.com/airbytehq/airbyte/pull/?????) | ????? |
| 0.44.14 | 2024-08-19 | [\#42503](https://github.com/airbytehq/airbyte/pull/42503) | Destinations (refreshes) - correctly detect existing raw/final table of the correct generation during truncate sync |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.19
version=0.44.20
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ class JsonRecordAvroPreprocessor : JsonRecordIdentityMapper() {
override fun mapArrayWithoutItems(record: JsonNode?, schema: ObjectNode): JsonNode? {
return serializeToJsonNode(record)
}

override fun mapUnknown(record: JsonNode?, schema: ObjectNode): JsonNode? {
return serializeToJsonNode(record)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ class JsonSchemaAvroPreprocessor : JsonSchemaIdentityMapper() {

return super.mapArrayWithItem(schema)
}

override fun mapUnknown(schema: ObjectNode): ObjectNode {
return STRING_TYPE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ enum class AirbyteJsonSchemaType {
OBJECT_WITHOUT_PROPERTIES,
OBJECT_WITH_PROPERTIES,
UNION,
COMBINED;
COMBINED,
UNKNOWN;

fun matchesValue(tree: JsonNode): Boolean {
return when (this) {
Expand All @@ -54,6 +55,7 @@ enum class AirbyteJsonSchemaType {
OBJECT_WITH_PROPERTIES -> tree.isObject
UNION,
COMBINED -> throw IllegalArgumentException("Union type cannot be matched")
UNKNOWN -> true
}
}

Expand Down Expand Up @@ -175,7 +177,7 @@ enum class AirbyteJsonSchemaType {
// Usually the root node
return OBJECT_WITH_PROPERTIES
} else {
throw IllegalArgumentException("Unspecified schema type")
return UNKNOWN
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,8 @@ open class JsonRecordIdentityMapper : JsonRecordMapper<JsonNode?>() {
val match = AirbyteJsonSchemaType.getMatchingValueForType(record, options)
return mapRecordWithSchema(record, match)
}

override fun mapUnknown(record: JsonNode?, schema: ObjectNode): JsonNode? {
return record?.deepCopy()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ abstract class JsonRecordMapper<R> {
AirbyteJsonSchemaType.OBJECT_WITH_PROPERTIES -> mapObjectWithProperties(record, schema)
AirbyteJsonSchemaType.UNION -> mapUnion(record, schema)
AirbyteJsonSchemaType.COMBINED -> mapCombined(record, schema)
AirbyteJsonSchemaType.UNKNOWN -> mapUnknown(record, schema)
}
}

Expand All @@ -53,4 +54,5 @@ abstract class JsonRecordMapper<R> {
abstract fun mapObjectWithProperties(record: JsonNode?, schema: ObjectNode): R
abstract fun mapUnion(record: JsonNode?, schema: ObjectNode): R
abstract fun mapCombined(record: JsonNode?, schema: ObjectNode): R
abstract fun mapUnknown(record: JsonNode?, schema: ObjectNode): R
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.s3.jsonschema

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons

open class JsonSchemaIdentityMapper : JsonSchemaMapper() {
private fun makeType(
Expand Down Expand Up @@ -146,4 +147,8 @@ open class JsonSchemaIdentityMapper : JsonSchemaMapper() {

return newUnionSchema
}

override fun mapUnknown(schema: ObjectNode): ObjectNode {
return Jsons.emptyObject() as ObjectNode
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ abstract class JsonSchemaMapper {
AirbyteJsonSchemaType.NUMBER -> mapNumber(schema)
AirbyteJsonSchemaType.COMBINED -> mapCombined(schema)
AirbyteJsonSchemaType.UNION -> mapUnion(schema)
AirbyteJsonSchemaType.UNKNOWN -> mapUnknown(schema)
}
}

Expand All @@ -50,4 +51,5 @@ abstract class JsonSchemaMapper {
abstract fun mapNumber(schema: ObjectNode): ObjectNode
abstract fun mapCombined(schema: ObjectNode): ObjectNode
abstract fun mapUnion(schema: ObjectNode): ObjectNode
abstract fun mapUnknown(schema: ObjectNode): ObjectNode
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ class JsonSchemaParquetPreprocessor : JsonSchemaIdentityMapper() {
AirbyteJsonSchemaType.UNION,
AirbyteJsonSchemaType.COMBINED ->
throw IllegalStateException("Nested unions are not supported")
// Parquet has a native JSON type, which we would ideally use here.
// Unfortunately, we're currently building parquet schemas via
// Avro schemas, and Avro doesn't have a native JSON type.
// So for now, we assume that the JsonSchemaAvroPreprocessor
// was invoked before this preprocessor.
AirbyteJsonSchemaType.UNKNOWN ->
throw IllegalStateException(
"JSON fields should be converted to string upstream of this processor"
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityMapper
import io.airbyte.cdk.integrations.destination.s3.parquet.JsonRecordParquetPreprocessor
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -162,4 +163,50 @@ class JsonRecordTransformerTest {
Assertions.assertEquals(jsonExpectedOut[index], transformedRecord)
}
}

@Test
fun testStringifyJsonValues() {
val inputSchema =
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"foo": {},
"bar": {
"type": "array",
"items": {}
}
}
}
""".trimIndent()
) as ObjectNode
val inputRecord =
Jsons.deserialize(
"""
{
"foo": {"a": 42},
"bar": [1, null, {}]
}
""".trimIndent()
)

val avroMappedRecord =
JsonRecordAvroPreprocessor().mapRecordWithSchema(inputRecord, inputSchema)

Assertions.assertEquals(
Jsons.deserialize(
// The "foo" field is completely serialized
// and the "bar" field has each entry serialized,
// except for the null entry, which is unchanged.
"""
{
"foo": "{\"a\":42}",
"bar": ["1", null, "{}"]
}
""".trimIndent()
),
avroMappedRecord
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonSchemaIdentityM
import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonSchemaUnionMerger
import io.airbyte.cdk.integrations.destination.s3.parquet.JsonSchemaParquetPreprocessor
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons
import kotlin.test.assertEquals
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -139,4 +141,42 @@ class JsonSchemaTransformerTest {
Assertions.assertEquals(nullType, properties["redundant_null"])
Assertions.assertEquals(stringType, properties["combined_null_string"])
}

@Test
fun testJsonType() {
val inputSchema =
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"foo": {},
"bar": {
"type": "array",
"items": {}
}
}
}
""".trimIndent()
) as ObjectNode
val mapped = JsonSchemaAvroPreprocessor().mapSchema(inputSchema)

assertEquals(
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"foo": {"type": "string"},
"bar": {
"type": "array",
"items": {"type": "string"}
}
}
}
""".trimIndent()
),
mapped
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.19'
cdkVersionRequired = '0.44.20'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.0.3
dockerImageTag: 1.0.4
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |
| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. |
Expand Down

0 comments on commit 0050274

Please sign in to comment.