From 6add8d0c458127ac83351153c6ff5679f05083c0 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Fri, 18 Nov 2022 14:21:04 -0800 Subject: [PATCH 01/14] Adding Beam Schemas capability to parse json-schemas. This is the de-facto standard way to define JSON schemas --- .../beam/gradle/BeamModulePlugin.groovy | 4 + sdks/java/core/build.gradle | 2 + .../beam/sdk/schemas/utils/JsonUtils.java | 76 +++++++++++++++++++ .../sdk/schemas/JsonSchemaConversionTest.java | 74 ++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 95fb9f59e3f40..ffa11b3e215f2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -484,6 +484,8 @@ class BeamModulePlugin implements Plugin { def jackson_version = "2.13.3" def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" + def json_org_version = "20200518" + def everit_json_version = "1.14.1" def kafka_version = "2.4.1" def nemo_version = "0.1" def netty_version = "4.1.77.Final" @@ -678,6 +680,8 @@ class BeamModulePlugin implements Plugin { joda_time : "joda-time:joda-time:2.10.10", jsonassert : "org.skyscreamer:jsonassert:1.5.0", jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version", + json_org : "org.json:json:${json_org_version}", + everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}", junit : "junit:junit:4.13.1", kafka : "org.apache.kafka:kafka_2.11:$kafka_version", kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version", diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 85f0d23236cf4..db24145296543 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -91,6 +91,8 @@ dependencies { shadow library.java.avro shadow library.java.snappy_java shadow library.java.joda_time + shadow library.java.json_org + shadow library.java.everit_json_schema provided library.java.junit provided library.java.hamcrest provided 'io.airlift:aircompressor:0.18' diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 72dc92dcdc715..5def8ff4dfb0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -26,6 +26,11 @@ import org.apache.beam.sdk.util.RowJson; import org.apache.beam.sdk.util.RowJsonUtils; import org.apache.beam.sdk.values.Row; +import org.everit.json.schema.ArraySchema; +import org.everit.json.schema.NumberSchema; +import org.everit.json.schema.ObjectSchema; +import org.everit.json.schema.ReferenceSchema; +import org.json.JSONObject; /** Utils to convert JSON records to Beam {@link Row}. */ @Experimental(Kind.SCHEMAS) @@ -73,6 +78,77 @@ public String apply(Row input) { }; } + public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) { + org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr); + return beamSchemaFromJsonSchema(jsonSchema); + } + + private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) { + Schema.Builder beamSchemaBuilder = Schema.builder(); + for (String propertyName : jsonSchema.getPropertySchemas().keySet()) { + org.everit.json.schema.Schema propertySchema = + jsonSchema.getPropertySchemas().get(propertyName); + if (propertySchema == null) { + throw new IllegalArgumentException("Unable to parse schema " + jsonSchema.toString()); + } + if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) { + beamSchemaBuilder = + beamSchemaBuilder.addField( + Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); + } else if (propertySchema.getClass().equals(org.everit.json.schema.ArraySchema.class)) { + beamSchemaBuilder = + beamSchemaBuilder.addField( + Schema.Field.of( + propertyName, + Schema.FieldType.array( + beamTypeFromJsonSchemaType( + ((ArraySchema) propertySchema).getAllItemSchema())))); + } else { + try { + beamSchemaBuilder = + beamSchemaBuilder.addField( + Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unsupported field type in field " + propertyName, e); + } + } + } + return beamSchemaBuilder.build(); + } + + private static Schema.FieldType beamTypeFromJsonSchemaType( + org.everit.json.schema.Schema propertySchema) { + if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) { + return Schema.FieldType.row(beamSchemaFromJsonSchema((ObjectSchema) propertySchema)); + } else if (propertySchema.getClass().equals(org.everit.json.schema.BooleanSchema.class)) { + return Schema.FieldType.BOOLEAN; + } else if (propertySchema.getClass().equals(org.everit.json.schema.NumberSchema.class)) { + return ((NumberSchema) propertySchema).requiresInteger() + ? Schema.FieldType.INT64 + : Schema.FieldType.DOUBLE; + } + if (propertySchema.getClass().equals(org.everit.json.schema.StringSchema.class)) { + return Schema.FieldType.STRING; + } else if (propertySchema.getClass().equals(org.everit.json.schema.ReferenceSchema.class)) { + org.everit.json.schema.Schema sch = ((ReferenceSchema) propertySchema).getReferredSchema(); + return beamTypeFromJsonSchemaType(sch); + } else { + throw new IllegalArgumentException( + "Unsupported schema type: " + propertySchema.getClass().toString()); + } + } + + private static org.everit.json.schema.ObjectSchema jsonSchemaFromString(String jsonSchema) { + JSONObject parsedSchema = new JSONObject(jsonSchema); + org.everit.json.schema.Schema schemaValidator = + org.everit.json.schema.loader.SchemaLoader.load(parsedSchema); + if (!schemaValidator.getClass().equals(ObjectSchema.class)) { + throw new IllegalArgumentException( + String.format("The schema is not a valid object schema:\n%s", jsonSchema)); + } + return (org.everit.json.schema.ObjectSchema) schemaValidator; + } + private abstract static class JsonToRowFn extends SimpleFunction { final RowJson.RowJsonDeserializer deserializer; final ObjectMapper objectMapper; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java new file mode 100644 index 0000000000000..53615a010692e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.io.InputStream; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class JsonSchemaConversionTest { + + @Test + public void testBasicJsonSchemaToBeamSchema() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/basic_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat( + parsedSchema.getFieldNames(), + containsInAnyOrder("booleanProp", "integerProp", "numberProp", "stringProp")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.BOOLEAN, + Schema.FieldType.INT64, + Schema.FieldType.DOUBLE, + Schema.FieldType.STRING)); + } + } + + @Test + public void testNestedStructsJsonSchemaToBeamSchema() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/nested_arrays_objects_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("fruits", "vegetables")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.array(Schema.FieldType.STRING), + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("veggieName", Schema.FieldType.STRING), + Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN)))))); + } + } +} From 2d77e7a9401e63dd1b5c8ba4745f70e65debf254 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Fri, 18 Nov 2022 14:22:02 -0800 Subject: [PATCH 02/14] json sample schema files for tests --- .../schemas/json/basic_json_schema.json | 16 +++++++++ .../nested_arrays_objects_json_schema.json | 33 +++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 sdks/java/core/src/test/resources/schemas/json/basic_json_schema.json create mode 100644 sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json diff --git a/sdks/java/core/src/test/resources/schemas/json/basic_json_schema.json b/sdks/java/core/src/test/resources/schemas/json/basic_json_schema.json new file mode 100644 index 0000000000000..39e1d1d05fdcc --- /dev/null +++ b/sdks/java/core/src/test/resources/schemas/json/basic_json_schema.json @@ -0,0 +1,16 @@ +{ + "properties": { + "booleanProp": { + "type": "boolean" + }, + "integerProp": { + "type": "integer" + }, + "numberProp": { + "type": "number" + }, + "stringProp": { + "type": "string" + } + } +} \ No newline at end of file diff --git a/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json b/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json new file mode 100644 index 0000000000000..7fd44ada27d76 --- /dev/null +++ b/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json @@ -0,0 +1,33 @@ +{ + "$id": "https://example.com/arrays.schema.json", + "description": "A representation of a person, company, organization, or place", + "type": "object", + "properties": { + "fruits": { + "type": "array", + "items": { + "type": "string" + } + }, + "vegetables": { + "type": "array", + "items": { "$ref": "#/$defs/veggie" } + } + }, + "$defs": { + "veggie": { + "type": "object", + "required": [ "veggieName", "veggieLike" ], + "properties": { + "veggieName": { + "type": "string", + "description": "The name of the vegetable." + }, + "veggieLike": { + "type": "boolean", + "description": "Do I like this vegetable?" + } + } + } + } +} \ No newline at end of file From a14c8c4ae0d6876774c9e8e53c9f63ca9c81e10d Mon Sep 17 00:00:00 2001 From: Pablo E Date: Sat, 19 Nov 2022 19:55:51 -0800 Subject: [PATCH 03/14] addressing comments --- .../beam/gradle/BeamModulePlugin.groovy | 3 +- sdks/java/core/build.gradle | 7 +- .../beam/sdk/schemas/utils/JsonUtils.java | 32 ++++--- .../sdk/schemas/JsonSchemaConversionTest.java | 91 ++++++++++++++++++- .../json/array_nested_array_json_schema.json | 18 ++++ .../nested_arrays_objects_json_schema.json | 3 +- ...t_nested_object_and_array_json_schema.json | 40 ++++++++ .../json/unsupported_nested_tuple_array.json | 38 ++++++++ .../json/unsupported_tuple_arrays.json | 13 +++ 9 files changed, 228 insertions(+), 17 deletions(-) create mode 100644 sdks/java/core/src/test/resources/schemas/json/array_nested_array_json_schema.json create mode 100644 sdks/java/core/src/test/resources/schemas/json/object_nested_object_and_array_json_schema.json create mode 100644 sdks/java/core/src/test/resources/schemas/json/unsupported_nested_tuple_array.json create mode 100644 sdks/java/core/src/test/resources/schemas/json/unsupported_tuple_arrays.json diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ffa11b3e215f2..3ebf748d0b310 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -484,7 +484,6 @@ class BeamModulePlugin implements Plugin { def jackson_version = "2.13.3" def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" - def json_org_version = "20200518" def everit_json_version = "1.14.1" def kafka_version = "2.4.1" def nemo_version = "0.1" @@ -680,7 +679,7 @@ class BeamModulePlugin implements Plugin { joda_time : "joda-time:joda-time:2.10.10", jsonassert : "org.skyscreamer:jsonassert:1.5.0", jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version", - json_org : "org.json:json:${json_org_version}", + json_org : "org.json:json:20200518", // Try to keep in sync with google_cloud_platform_libraries_bom transitive deps. everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}", junit : "junit:junit:4.13.1", kafka : "org.apache.kafka:kafka_2.11:$kafka_version", diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index db24145296543..e8e8d814c37ec 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -92,7 +92,12 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time shadow library.java.json_org - shadow library.java.everit_json_schema + // com.github.everit JSON schema validation library is used for json-schema.org validation. + // to aoid forcing the library onto users, we ask users to provide it rather than include + // it by default. + // It is only used for optional functionality in JsonUtils schema parsing and conversion. + provided library.java.everit_json_schema + testImplementation library.java.everit_json_schema provided library.java.junit provided library.java.hamcrest provided 'io.airlift:aircompressor:0.18' diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 5def8ff4dfb0b..4caa77069d598 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -91,11 +91,15 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche if (propertySchema == null) { throw new IllegalArgumentException("Unable to parse schema " + jsonSchema.toString()); } - if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) { + if (propertySchema instanceof org.everit.json.schema.ObjectSchema) { beamSchemaBuilder = beamSchemaBuilder.addField( Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); - } else if (propertySchema.getClass().equals(org.everit.json.schema.ArraySchema.class)) { + } else if (propertySchema instanceof org.everit.json.schema.ArraySchema) { + if (((ArraySchema) propertySchema).getAllItemSchema() == null) { + throw new IllegalArgumentException( + "Array schema is not properly formatted or unsupported: " + propertyName); + } beamSchemaBuilder = beamSchemaBuilder.addField( Schema.Field.of( @@ -118,23 +122,29 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche private static Schema.FieldType beamTypeFromJsonSchemaType( org.everit.json.schema.Schema propertySchema) { - if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) { + if (propertySchema instanceof org.everit.json.schema.ObjectSchema) { return Schema.FieldType.row(beamSchemaFromJsonSchema((ObjectSchema) propertySchema)); - } else if (propertySchema.getClass().equals(org.everit.json.schema.BooleanSchema.class)) { + } else if (propertySchema instanceof org.everit.json.schema.BooleanSchema) { return Schema.FieldType.BOOLEAN; - } else if (propertySchema.getClass().equals(org.everit.json.schema.NumberSchema.class)) { + } else if (propertySchema instanceof org.everit.json.schema.NumberSchema) { return ((NumberSchema) propertySchema).requiresInteger() ? Schema.FieldType.INT64 : Schema.FieldType.DOUBLE; } - if (propertySchema.getClass().equals(org.everit.json.schema.StringSchema.class)) { + if (propertySchema instanceof org.everit.json.schema.StringSchema) { return Schema.FieldType.STRING; - } else if (propertySchema.getClass().equals(org.everit.json.schema.ReferenceSchema.class)) { + } else if (propertySchema instanceof org.everit.json.schema.ReferenceSchema) { org.everit.json.schema.Schema sch = ((ReferenceSchema) propertySchema).getReferredSchema(); return beamTypeFromJsonSchemaType(sch); + } else if (propertySchema instanceof org.everit.json.schema.ArraySchema) { + if (((ArraySchema) propertySchema).getAllItemSchema() == null) { + throw new IllegalArgumentException( + "Array schema is not properly formatted or unsupported: " + propertySchema); + } + return Schema.FieldType.array( + beamTypeFromJsonSchemaType(((ArraySchema) propertySchema).getAllItemSchema())); } else { - throw new IllegalArgumentException( - "Unsupported schema type: " + propertySchema.getClass().toString()); + throw new IllegalArgumentException("Unsupported schema type: " + propertySchema.getClass()); } } @@ -142,9 +152,9 @@ private static org.everit.json.schema.ObjectSchema jsonSchemaFromString(String j JSONObject parsedSchema = new JSONObject(jsonSchema); org.everit.json.schema.Schema schemaValidator = org.everit.json.schema.loader.SchemaLoader.load(parsedSchema); - if (!schemaValidator.getClass().equals(ObjectSchema.class)) { + if (!(schemaValidator instanceof ObjectSchema)) { throw new IllegalArgumentException( - String.format("The schema is not a valid object schema:\n%s", jsonSchema)); + String.format("The schema is not a valid object schema:%n %s", jsonSchema)); } return (org.everit.json.schema.ObjectSchema) schemaValidator; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java index 53615a010692e..b08ab5010d387 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java @@ -19,12 +19,14 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThrows; -import com.google.common.io.ByteStreams; import java.io.IOException; import java.io.InputStream; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -71,4 +73,91 @@ public void testNestedStructsJsonSchemaToBeamSchema() throws IOException { Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN)))))); } } + + @Test + public void testArrayNestedArrayObjectJsonSchemaToBeamSchema() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/array_nested_array_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("complexMatrix")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.array( + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("imaginary", Schema.FieldType.DOUBLE), + Schema.Field.of("real", Schema.FieldType.DOUBLE))))))); + } + } + + @Test + public void testObjectNestedObjectArrayJsonSchemaToBeamSchema() throws IOException { + try (InputStream inputStream = + getClass() + .getResourceAsStream("/schemas/json/object_nested_object_and_array_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("classroom")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("teacher", Schema.FieldType.STRING), + Schema.Field.of( + "classroom", + Schema.FieldType.row( + Schema.of( + Schema.Field.of( + "students", + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("name", Schema.FieldType.STRING), + Schema.Field.of( + "age", Schema.FieldType.INT64))))), + Schema.Field.of("building", Schema.FieldType.STRING)))))))); + } + } + + @Test + public void testUnsupportedTupleArrays() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/unsupported_tuple_arrays.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + + IllegalArgumentException thrownException = + assertThrows( + IllegalArgumentException.class, + () -> { + JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + }); + + assertThat( + thrownException.getMessage(), containsString("Array schema is not properly formatted")); + } + } + + @Test + public void testUnsupportedNestedTupleArrays() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/unsupported_nested_tuple_array.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + + IllegalArgumentException thrownException = + assertThrows( + IllegalArgumentException.class, + () -> { + JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + }); + + assertThat( + thrownException.getMessage(), containsString("Array schema is not properly formatted")); + } + } } diff --git a/sdks/java/core/src/test/resources/schemas/json/array_nested_array_json_schema.json b/sdks/java/core/src/test/resources/schemas/json/array_nested_array_json_schema.json new file mode 100644 index 0000000000000..47da351668591 --- /dev/null +++ b/sdks/java/core/src/test/resources/schemas/json/array_nested_array_json_schema.json @@ -0,0 +1,18 @@ +{ + "type": "object", + "properties": { + "complexMatrix": { + "type": "array", + "items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "real": {"type": "number"}, + "imaginary": {"type": "number"} + } + } + } + } + } +} diff --git a/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json b/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json index 7fd44ada27d76..5abab6942265c 100644 --- a/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json +++ b/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json @@ -1,6 +1,5 @@ { "$id": "https://example.com/arrays.schema.json", - "description": "A representation of a person, company, organization, or place", "type": "object", "properties": { "fruits": { @@ -30,4 +29,4 @@ } } } -} \ No newline at end of file +} diff --git a/sdks/java/core/src/test/resources/schemas/json/object_nested_object_and_array_json_schema.json b/sdks/java/core/src/test/resources/schemas/json/object_nested_object_and_array_json_schema.json new file mode 100644 index 0000000000000..e68dd9c5bb98d --- /dev/null +++ b/sdks/java/core/src/test/resources/schemas/json/object_nested_object_and_array_json_schema.json @@ -0,0 +1,40 @@ +{ + "type": "object", + "properties": { + "classroom": { + "type": "object", + "properties": { + "teacher": { + "type": "string" + }, + "classroom": { + "type": "object", + "properties": { + "building": { + "type": "string" + }, + "students": { + "type": "array", + "items": { + "$ref": "#/$defs/student" + } + } + } + } + } + } + }, + "$defs": { + "student": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "age": { + "type": "integer" + } + } + } + } +} diff --git a/sdks/java/core/src/test/resources/schemas/json/unsupported_nested_tuple_array.json b/sdks/java/core/src/test/resources/schemas/json/unsupported_nested_tuple_array.json new file mode 100644 index 0000000000000..74d4413758adb --- /dev/null +++ b/sdks/java/core/src/test/resources/schemas/json/unsupported_nested_tuple_array.json @@ -0,0 +1,38 @@ +{ + "properties": { + "objectWithTuple": { + "type": "object", + "properties": { + "someString": { + "type": "string" + }, + "tupleArray": { + "type": "array", + "prefixItems": [ + { + "type": "number" + }, + { + "type": "string" + }, + { + "enum": [ + "Street", + "Avenue", + "Boulevard" + ] + }, + { + "enum": [ + "NW", + "NE", + "SW", + "SE" + ] + } + ] + } + } + } + } +} diff --git a/sdks/java/core/src/test/resources/schemas/json/unsupported_tuple_arrays.json b/sdks/java/core/src/test/resources/schemas/json/unsupported_tuple_arrays.json new file mode 100644 index 0000000000000..c6fbccf3820db --- /dev/null +++ b/sdks/java/core/src/test/resources/schemas/json/unsupported_tuple_arrays.json @@ -0,0 +1,13 @@ +{ + "properties": { + "tupleArray": { + "type": "array", + "prefixItems": [ + { "type": "number" }, + { "type": "string" }, + { "enum": ["Street", "Avenue", "Boulevard"] }, + { "enum": ["NW", "NE", "SW", "SE"] } + ] + } + } +} From 342edca5b0b2e5210efc7e468c566bf6379f102f Mon Sep 17 00:00:00 2001 From: Pablo E Date: Tue, 22 Nov 2022 15:37:13 -0800 Subject: [PATCH 04/14] fixup --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- sdks/java/core/build.gradle | 6 ++- .../sdk/schemas/JsonSchemaConversionTest.java | 38 +++++++++++++--- .../array_nested_array_json_schema.json | 0 .../basic_json_schema.json | 0 .../nested_arrays_objects_json_schema.json | 0 ...t_nested_object_and_array_json_schema.json | 0 .../json-schema/ref_with_ref_json_schema.json | 43 +++++++++++++++++++ .../unsupported_nested_tuple_array.json | 0 .../unsupported_tuple_arrays.json | 0 .../sdk/tpcds/TableSchemaJSONLoaderTest.java | 17 +++----- 11 files changed, 87 insertions(+), 19 deletions(-) rename sdks/java/core/src/test/resources/{schemas/json => json-schema}/array_nested_array_json_schema.json (100%) rename sdks/java/core/src/test/resources/{schemas/json => json-schema}/basic_json_schema.json (100%) rename sdks/java/core/src/test/resources/{schemas/json => json-schema}/nested_arrays_objects_json_schema.json (100%) rename sdks/java/core/src/test/resources/{schemas/json => json-schema}/object_nested_object_and_array_json_schema.json (100%) create mode 100644 sdks/java/core/src/test/resources/json-schema/ref_with_ref_json_schema.json rename sdks/java/core/src/test/resources/{schemas/json => json-schema}/unsupported_nested_tuple_array.json (100%) rename sdks/java/core/src/test/resources/{schemas/json => json-schema}/unsupported_tuple_arrays.json (100%) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3ebf748d0b310..731065b680489 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -679,7 +679,7 @@ class BeamModulePlugin implements Plugin { joda_time : "joda-time:joda-time:2.10.10", jsonassert : "org.skyscreamer:jsonassert:1.5.0", jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version", - json_org : "org.json:json:20200518", // Try to keep in sync with google_cloud_platform_libraries_bom transitive deps. + json_org : "org.json:json", // Try to keep in sync with google_cloud_platform_libraries_bom transitive deps. everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}", junit : "junit:junit:4.13.1", kafka : "org.apache.kafka:kafka_2.11:$kafka_version", diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e8e8d814c37ec..987f1b213cb7d 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -91,9 +91,11 @@ dependencies { shadow library.java.avro shadow library.java.snappy_java shadow library.java.joda_time - shadow library.java.json_org + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + provided library.java.json_org // com.github.everit JSON schema validation library is used for json-schema.org validation. - // to aoid forcing the library onto users, we ask users to provide it rather than include + // to avoid forcing the library onto users, we ask users to provide it rather than include // it by default. // It is only used for optional functionality in JsonUtils schema parsing and conversion. provided library.java.everit_json_schema diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java index b08ab5010d387..3da0344685004 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java @@ -37,7 +37,7 @@ public class JsonSchemaConversionTest { @Test public void testBasicJsonSchemaToBeamSchema() throws IOException { try (InputStream inputStream = - getClass().getResourceAsStream("/schemas/json/basic_json_schema.json")) { + getClass().getResourceAsStream("/json-schema/basic_json_schema.json")) { String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); @@ -57,7 +57,7 @@ public void testBasicJsonSchemaToBeamSchema() throws IOException { @Test public void testNestedStructsJsonSchemaToBeamSchema() throws IOException { try (InputStream inputStream = - getClass().getResourceAsStream("/schemas/json/nested_arrays_objects_json_schema.json")) { + getClass().getResourceAsStream("/json-schema/nested_arrays_objects_json_schema.json")) { String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); @@ -77,7 +77,7 @@ public void testNestedStructsJsonSchemaToBeamSchema() throws IOException { @Test public void testArrayNestedArrayObjectJsonSchemaToBeamSchema() throws IOException { try (InputStream inputStream = - getClass().getResourceAsStream("/schemas/json/array_nested_array_json_schema.json")) { + getClass().getResourceAsStream("/json-schema/array_nested_array_json_schema.json")) { String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); @@ -98,7 +98,7 @@ public void testArrayNestedArrayObjectJsonSchemaToBeamSchema() throws IOExceptio public void testObjectNestedObjectArrayJsonSchemaToBeamSchema() throws IOException { try (InputStream inputStream = getClass() - .getResourceAsStream("/schemas/json/object_nested_object_and_array_json_schema.json")) { + .getResourceAsStream("/json-schema/object_nested_object_and_array_json_schema.json")) { String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); @@ -125,10 +125,36 @@ public void testObjectNestedObjectArrayJsonSchemaToBeamSchema() throws IOExcepti } } + @Test + public void testArrayWithNestedRefsBeamSchema() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/json-schema/ref_with_ref_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("vegetables")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("veggieName", Schema.FieldType.STRING), + Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN), + Schema.Field.of( + "origin", + Schema.FieldType.row( + Schema.of( + Schema.Field.of("country", Schema.FieldType.STRING), + Schema.Field.of("town", Schema.FieldType.STRING), + Schema.Field.of("region", Schema.FieldType.STRING))))))))); + } + } + @Test public void testUnsupportedTupleArrays() throws IOException { try (InputStream inputStream = - getClass().getResourceAsStream("/schemas/json/unsupported_tuple_arrays.json")) { + getClass().getResourceAsStream("/json-schema/unsupported_tuple_arrays.json")) { String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); IllegalArgumentException thrownException = @@ -146,7 +172,7 @@ public void testUnsupportedTupleArrays() throws IOException { @Test public void testUnsupportedNestedTupleArrays() throws IOException { try (InputStream inputStream = - getClass().getResourceAsStream("/schemas/json/unsupported_nested_tuple_array.json")) { + getClass().getResourceAsStream("/json-schema/unsupported_nested_tuple_array.json")) { String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); IllegalArgumentException thrownException = diff --git a/sdks/java/core/src/test/resources/schemas/json/array_nested_array_json_schema.json b/sdks/java/core/src/test/resources/json-schema/array_nested_array_json_schema.json similarity index 100% rename from sdks/java/core/src/test/resources/schemas/json/array_nested_array_json_schema.json rename to sdks/java/core/src/test/resources/json-schema/array_nested_array_json_schema.json diff --git a/sdks/java/core/src/test/resources/schemas/json/basic_json_schema.json b/sdks/java/core/src/test/resources/json-schema/basic_json_schema.json similarity index 100% rename from sdks/java/core/src/test/resources/schemas/json/basic_json_schema.json rename to sdks/java/core/src/test/resources/json-schema/basic_json_schema.json diff --git a/sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json b/sdks/java/core/src/test/resources/json-schema/nested_arrays_objects_json_schema.json similarity index 100% rename from sdks/java/core/src/test/resources/schemas/json/nested_arrays_objects_json_schema.json rename to sdks/java/core/src/test/resources/json-schema/nested_arrays_objects_json_schema.json diff --git a/sdks/java/core/src/test/resources/schemas/json/object_nested_object_and_array_json_schema.json b/sdks/java/core/src/test/resources/json-schema/object_nested_object_and_array_json_schema.json similarity index 100% rename from sdks/java/core/src/test/resources/schemas/json/object_nested_object_and_array_json_schema.json rename to sdks/java/core/src/test/resources/json-schema/object_nested_object_and_array_json_schema.json diff --git a/sdks/java/core/src/test/resources/json-schema/ref_with_ref_json_schema.json b/sdks/java/core/src/test/resources/json-schema/ref_with_ref_json_schema.json new file mode 100644 index 0000000000000..cf044b0fed05d --- /dev/null +++ b/sdks/java/core/src/test/resources/json-schema/ref_with_ref_json_schema.json @@ -0,0 +1,43 @@ +{ + "$id": "https://example.com/arrays.schema.json", + "type": "object", + "properties": { + "vegetables": { + "type": "array", + "items": { "$ref": "#/$defs/veggie" } + } + }, + "$defs": { + "veggie": { + "type": "object", + "required": [ "veggieName", "veggieLike" ], + "properties": { + "veggieName": { + "type": "string", + "description": "The name of the vegetable." + }, + "veggieLike": { + "type": "boolean", + "description": "Do I like this vegetable?" + }, + "origin": { + "$ref": "#/$defs/location" + } + } + }, + "location": { + "type": "object", + "properties": { + "country": { + "type": "string" + }, + "region": { + "type": "string" + }, + "town": { + "type": "string" + } + } + } + } +} diff --git a/sdks/java/core/src/test/resources/schemas/json/unsupported_nested_tuple_array.json b/sdks/java/core/src/test/resources/json-schema/unsupported_nested_tuple_array.json similarity index 100% rename from sdks/java/core/src/test/resources/schemas/json/unsupported_nested_tuple_array.json rename to sdks/java/core/src/test/resources/json-schema/unsupported_nested_tuple_array.json diff --git a/sdks/java/core/src/test/resources/schemas/json/unsupported_tuple_arrays.json b/sdks/java/core/src/test/resources/json-schema/unsupported_tuple_arrays.json similarity index 100% rename from sdks/java/core/src/test/resources/schemas/json/unsupported_tuple_arrays.json rename to sdks/java/core/src/test/resources/json-schema/unsupported_tuple_arrays.json diff --git a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java index 45f6f91aac7b5..3eb84d32c57cc 100644 --- a/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java +++ b/sdks/java/testing/tpcds/src/test/java/org/apache/beam/sdk/tpcds/TableSchemaJSONLoaderTest.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.tpcds; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.Test; @@ -142,8 +143,10 @@ public void testWarehouseTable() throws Exception { public void testGetAllTableNames() throws IOException { List tableNames = TableSchemaJSONLoader.getAllTableNames(); Collections.sort(tableNames); - List expectedTableNames = - Arrays.asList( + + assertThat( + tableNames, + containsInAnyOrder( "call_center", "catalog_page", "catalog_returns", @@ -167,12 +170,6 @@ public void testGetAllTableNames() throws IOException { "web_page", "web_returns", "web_sales", - "web_site"); - - assertEquals(expectedTableNames.size(), tableNames.size()); - - for (int i = 0; i < tableNames.size(); i++) { - assertEquals(expectedTableNames.get(i), tableNames.get(i)); - } + "web_site")); } } From 0162d05c43f2b7c5dd9833f8e30aecf09b54cdd4 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Tue, 22 Nov 2022 16:47:06 -0800 Subject: [PATCH 05/14] fixup --- .../beam/sdk/schemas/utils/JsonUtils.java | 18 ++++++++++-------- .../sdk/schemas/JsonSchemaConversionTest.java | 3 ++- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 4caa77069d598..9e2af789ef9e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -91,14 +91,12 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche if (propertySchema == null) { throw new IllegalArgumentException("Unable to parse schema " + jsonSchema.toString()); } - if (propertySchema instanceof org.everit.json.schema.ObjectSchema) { - beamSchemaBuilder = - beamSchemaBuilder.addField( - Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); - } else if (propertySchema instanceof org.everit.json.schema.ArraySchema) { + if (propertySchema instanceof org.everit.json.schema.ArraySchema) { if (((ArraySchema) propertySchema).getAllItemSchema() == null) { throw new IllegalArgumentException( - "Array schema is not properly formatted or unsupported: " + propertyName); + "Array schema is not properly formatted or unsupported (" + + propertyName + + "). Note that JSON-schema's tuple-like arrays are not supported by Beam."); } beamSchemaBuilder = beamSchemaBuilder.addField( @@ -113,7 +111,9 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche beamSchemaBuilder.addField( Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Unsupported field type in field " + propertyName, e); + throw new IllegalArgumentException( + "Unsupported field type " + propertySchema.getClass() + " in field " + propertyName, + e); } } } @@ -139,7 +139,9 @@ private static Schema.FieldType beamTypeFromJsonSchemaType( } else if (propertySchema instanceof org.everit.json.schema.ArraySchema) { if (((ArraySchema) propertySchema).getAllItemSchema() == null) { throw new IllegalArgumentException( - "Array schema is not properly formatted or unsupported: " + propertySchema); + "Array schema is not properly formatted or unsupported (" + + propertySchema + + "). Note that JSON-schema's tuple-like arrays are not supported by Beam."); } return Schema.FieldType.array( beamTypeFromJsonSchemaType(((ArraySchema) propertySchema).getAllItemSchema())); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java index 3da0344685004..bcd12240a9132 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java @@ -183,7 +183,8 @@ public void testUnsupportedNestedTupleArrays() throws IOException { }); assertThat( - thrownException.getMessage(), containsString("Array schema is not properly formatted")); + thrownException.getCause().getMessage(), + containsString("Array schema is not properly formatted")); } } } From f0a029d335d7d3a98883a641d408608913db6f8a Mon Sep 17 00:00:00 2001 From: Pablo E Date: Wed, 23 Nov 2022 19:33:09 -0800 Subject: [PATCH 06/14] documenting and fixing nullable cases --- sdks/java/core/build.gradle | 2 +- .../beam/sdk/schemas/utils/JsonUtils.java | 51 ++++++++++- .../sdk/schemas/JsonSchemaConversionTest.java | 88 ++++++++++--------- 3 files changed, 96 insertions(+), 45 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 987f1b213cb7d..bc429a4647aca 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -99,7 +99,7 @@ dependencies { // it by default. // It is only used for optional functionality in JsonUtils schema parsing and conversion. provided library.java.everit_json_schema - testImplementation library.java.everit_json_schema + shadowTest library.java.everit_json_schema provided library.java.junit provided library.java.hamcrest provided 'io.airlift:aircompressor:0.18' diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 9e2af789ef9e6..605bbb12a1a8b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -32,7 +32,46 @@ import org.everit.json.schema.ReferenceSchema; import org.json.JSONObject; -/** Utils to convert JSON records to Beam {@link Row}. */ +/** + * Utils to convert JSON records to Beam {@link Row}. + * + *

JSON-Schema (https://json-schema.org) support

+ * + *

This class provides utility methods to parse, validate and translate between JSON + * Schema-formatted schemas and Beam Schemas. The support is based on the + * everit-json-schema package, which is not provided by default. + * + *

Therefore, functionality in {@link JsonUtils::beamSchemaFromJsonSchema} requires that you + * include everit-json-schema in your project like so: + * + *

{@code
+ * 
+ * 	com.github.erosb
+ * 	everit-json-schema
+ * 	1.14.1
+ * 
+ * }
+ * + *

JSON-Schema supported features

+ * + *

The current Beam implementation does not support all possible features of JSON-schema. The + * current implementation supports: + * + *

+ *

  • String, boolean and numeric values (integer and floating-point). + *
  • Arrays, nested arrays and arbitratily nested object types + *
  • Fields marked as required are non-null. Other fields are nullable. + * + *

    The following JSON-schema features are not supported: + * + *

    + *

  • Tuple-like arrays (or arrays with multiple item types). + *
  • Validation of row regular expressions, enum values, etc. + *
  • Special annotations for types (e.g. contentMediaType) are ignored. + *
  • Composite schemas (schemas made out of a combination + * of other schemas) + */ @Experimental(Kind.SCHEMAS) public class JsonUtils { @@ -89,8 +128,12 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche org.everit.json.schema.Schema propertySchema = jsonSchema.getPropertySchemas().get(propertyName); if (propertySchema == null) { - throw new IllegalArgumentException("Unable to parse schema " + jsonSchema.toString()); + throw new IllegalArgumentException("Unable to parse schema " + jsonSchema); } + java.util.function.BiFunction fieldConstructor = + jsonSchema.getRequiredProperties().contains(propertyName) + ? Schema.Field::of + : Schema.Field::nullable; if (propertySchema instanceof org.everit.json.schema.ArraySchema) { if (((ArraySchema) propertySchema).getAllItemSchema() == null) { throw new IllegalArgumentException( @@ -100,7 +143,7 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche } beamSchemaBuilder = beamSchemaBuilder.addField( - Schema.Field.of( + fieldConstructor.apply( propertyName, Schema.FieldType.array( beamTypeFromJsonSchemaType( @@ -109,7 +152,7 @@ private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSche try { beamSchemaBuilder = beamSchemaBuilder.addField( - Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); + fieldConstructor.apply(propertyName, beamTypeFromJsonSchemaType(propertySchema))); } catch (IllegalArgumentException e) { throw new IllegalArgumentException( "Unsupported field type " + propertySchema.getClass() + " in field " + propertyName, diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java index bcd12240a9132..5e65a57a0f73a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java @@ -47,10 +47,10 @@ public void testBasicJsonSchemaToBeamSchema() throws IOException { assertThat( parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), containsInAnyOrder( - Schema.FieldType.BOOLEAN, - Schema.FieldType.INT64, - Schema.FieldType.DOUBLE, - Schema.FieldType.STRING)); + Schema.FieldType.BOOLEAN.withNullable(true), + Schema.FieldType.INT64.withNullable(true), + Schema.FieldType.DOUBLE.withNullable(true), + Schema.FieldType.STRING.withNullable(true))); } } @@ -65,12 +65,13 @@ public void testNestedStructsJsonSchemaToBeamSchema() throws IOException { assertThat( parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), containsInAnyOrder( - Schema.FieldType.array(Schema.FieldType.STRING), + Schema.FieldType.array(Schema.FieldType.STRING).withNullable(true), Schema.FieldType.array( - Schema.FieldType.row( - Schema.of( - Schema.Field.of("veggieName", Schema.FieldType.STRING), - Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN)))))); + Schema.FieldType.row( + Schema.of( + Schema.Field.of("veggieName", Schema.FieldType.STRING), + Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN)))) + .withNullable(true))); } } @@ -86,11 +87,12 @@ public void testArrayNestedArrayObjectJsonSchemaToBeamSchema() throws IOExceptio parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), containsInAnyOrder( Schema.FieldType.array( - Schema.FieldType.array( - Schema.FieldType.row( - Schema.of( - Schema.Field.of("imaginary", Schema.FieldType.DOUBLE), - Schema.Field.of("real", Schema.FieldType.DOUBLE))))))); + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.nullable("imaginary", Schema.FieldType.DOUBLE), + Schema.Field.nullable("real", Schema.FieldType.DOUBLE))))) + .withNullable(true))); } } @@ -107,21 +109,25 @@ public void testObjectNestedObjectArrayJsonSchemaToBeamSchema() throws IOExcepti parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), containsInAnyOrder( Schema.FieldType.row( - Schema.of( - Schema.Field.of("teacher", Schema.FieldType.STRING), - Schema.Field.of( - "classroom", - Schema.FieldType.row( - Schema.of( - Schema.Field.of( - "students", - Schema.FieldType.array( - Schema.FieldType.row( - Schema.of( - Schema.Field.of("name", Schema.FieldType.STRING), - Schema.Field.of( - "age", Schema.FieldType.INT64))))), - Schema.Field.of("building", Schema.FieldType.STRING)))))))); + Schema.of( + Schema.Field.nullable("teacher", Schema.FieldType.STRING), + Schema.Field.nullable( + "classroom", + Schema.FieldType.row( + Schema.of( + Schema.Field.nullable( + "students", + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.nullable( + "name", Schema.FieldType.STRING), + Schema.Field.nullable( + "age", Schema.FieldType.INT64)))) + .withNullable(true)), + Schema.Field.nullable( + "building", Schema.FieldType.STRING)))))) + .withNullable(true))); } } @@ -137,17 +143,19 @@ public void testArrayWithNestedRefsBeamSchema() throws IOException { parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), containsInAnyOrder( Schema.FieldType.array( - Schema.FieldType.row( - Schema.of( - Schema.Field.of("veggieName", Schema.FieldType.STRING), - Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN), - Schema.Field.of( - "origin", - Schema.FieldType.row( - Schema.of( - Schema.Field.of("country", Schema.FieldType.STRING), - Schema.Field.of("town", Schema.FieldType.STRING), - Schema.Field.of("region", Schema.FieldType.STRING))))))))); + Schema.FieldType.row( + Schema.of( + Schema.Field.of("veggieName", Schema.FieldType.STRING), + Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN), + Schema.Field.nullable( + "origin", + Schema.FieldType.row( + Schema.of( + Schema.Field.nullable("country", Schema.FieldType.STRING), + Schema.Field.nullable("town", Schema.FieldType.STRING), + Schema.Field.nullable( + "region", Schema.FieldType.STRING))))))) + .withNullable(true))); } } From 88c9176f1c6e140fbbcf56edb59c4ad0c6d3cfb6 Mon Sep 17 00:00:00 2001 From: Pablo E Date: Wed, 23 Nov 2022 23:19:14 -0800 Subject: [PATCH 07/14] fixup --- .../beam/sdk/schemas/utils/JsonUtils.java | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 605bbb12a1a8b..05dc5f9cc472f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -41,15 +41,15 @@ * Schema-formatted schemas and Beam Schemas. The support is based on the * everit-json-schema package, which is not provided by default. * - *

    Therefore, functionality in {@link JsonUtils::beamSchemaFromJsonSchema} requires that you - * include everit-json-schema in your project like so: + *

    Therefore, functionality in {@link JsonUtils::beamSchemaFromJsonSchema} requires that you + * include {@code everit-json-schema} in your project like so: * *

    {@code
    - * 
    - * 	com.github.erosb
    - * 	everit-json-schema
    - * 	1.14.1
    - * 
    + *  
    + * 	  com.github.erosb < /groupId>
    + * 	  everit-json-schema < /artifactId>
    + * 	  1.14.1 < /version>
    + *  < /dependency>
      * }
    * *

    JSON-Schema supported features

    @@ -57,20 +57,26 @@ *

    The current Beam implementation does not support all possible features of JSON-schema. The * current implementation supports: * - *

    - *

  • String, boolean and numeric values (integer and floating-point). - *
  • Arrays, nested arrays and arbitratily nested object types - *
  • Fields marked as required are non-null. Other fields are nullable. + *

    * - *

    The following JSON-schema features are not supported: + *

      + *
    • String, boolean and numeric values (integer and floating-point). + *
    • Arrays, nested arrays and arbitratily nested object types + *
    • Fields marked as required are non-null. Other fields are nullable. + *
    * - *

    - *

  • Tuple-like arrays (or arrays with multiple item types). - *
  • Validation of row regular expressions, enum values, etc. - *
  • Special annotations for types (e.g. contentMediaType) are ignored. - *
  • Composite schemas (schemas made out of a combination - * of other schemas) + *

    The following JSON-schema features are not supported: + * + *

    + * + *

      + *
    • Tuple-like arrays (or arrays with multiple item types). + *
    • Validation of row regular expressions, enum values, etc. + *
    • Special annotations for types (e.g. contentMediaType) are ignored. + *
    • Composite schemas (schemas made out of a combination + * of other schemas) + *
    */ @Experimental(Kind.SCHEMAS) public class JsonUtils { From bcb19dc81aaf51120e3c7eead4089553d1d8ac0a Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 2 Dec 2022 13:46:24 -0500 Subject: [PATCH 08/14] Update sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java Co-authored-by: Lukasz Cwik --- .../java/org/apache/beam/sdk/schemas/utils/JsonUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 05dc5f9cc472f..e9db7ddc3b707 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -130,9 +130,9 @@ public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) { private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) { Schema.Builder beamSchemaBuilder = Schema.builder(); - for (String propertyName : jsonSchema.getPropertySchemas().keySet()) { - org.everit.json.schema.Schema propertySchema = - jsonSchema.getPropertySchemas().get(propertyName); + for (Map.Entry entry : jsonSchema.getPropertySchemas().entries()) { + String propertyName = entry.getKey(); + org.everit.json.schema.Schema propertySchema = entry.getValue(); if (propertySchema == null) { throw new IllegalArgumentException("Unable to parse schema " + jsonSchema); } From 1fe9e779e99dc20310840b81501f23dfb5f8d075 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 2 Dec 2022 13:46:39 -0500 Subject: [PATCH 09/14] Update sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java Co-authored-by: Lukasz Cwik --- .../main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index e9db7ddc3b707..454a47add4c52 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -61,7 +61,8 @@ * *
      *
    • String, boolean and numeric values (integer and floating-point). - *
    • Arrays, nested arrays and arbitratily nested object types + *
    • Arrays with any one of the supported types as elements. This includes nested arrays. + *
    • Objects with any of the supported types as fields. This includes nested objects. *
    • Fields marked as required are non-null. Other fields are nullable. *
    * From d7c787a99334ef9aa9c4c373c31687e02f7c4015 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 2 Dec 2022 13:46:51 -0500 Subject: [PATCH 10/14] Update sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java Co-authored-by: Lukasz Cwik --- .../main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 454a47add4c52..c71446a8b0624 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -180,8 +180,7 @@ private static Schema.FieldType beamTypeFromJsonSchemaType( return ((NumberSchema) propertySchema).requiresInteger() ? Schema.FieldType.INT64 : Schema.FieldType.DOUBLE; - } - if (propertySchema instanceof org.everit.json.schema.StringSchema) { + } else if (propertySchema instanceof org.everit.json.schema.StringSchema) { return Schema.FieldType.STRING; } else if (propertySchema instanceof org.everit.json.schema.ReferenceSchema) { org.everit.json.schema.Schema sch = ((ReferenceSchema) propertySchema).getReferredSchema(); From 426c48b4f36f37851566f33178943bc7c00fb335 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 2 Dec 2022 13:47:06 -0500 Subject: [PATCH 11/14] Update sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java Co-authored-by: Lukasz Cwik --- .../main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index c71446a8b0624..8fbee22aae377 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -72,7 +72,7 @@ * *