diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 95fb9f59e3f40..ffa11b3e215f2 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -484,6 +484,8 @@ class BeamModulePlugin implements Plugin { def jackson_version = "2.13.3" def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" + def json_org_version = "20200518" + def everit_json_version = "1.14.1" def kafka_version = "2.4.1" def nemo_version = "0.1" def netty_version = "4.1.77.Final" @@ -678,6 +680,8 @@ class BeamModulePlugin implements Plugin { joda_time : "joda-time:joda-time:2.10.10", jsonassert : "org.skyscreamer:jsonassert:1.5.0", jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version", + json_org : "org.json:json:${json_org_version}", + everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}", junit : "junit:junit:4.13.1", kafka : "org.apache.kafka:kafka_2.11:$kafka_version", kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version", diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 85f0d23236cf4..db24145296543 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -91,6 +91,8 @@ dependencies { shadow library.java.avro shadow library.java.snappy_java shadow library.java.joda_time + shadow library.java.json_org + shadow library.java.everit_json_schema provided library.java.junit provided library.java.hamcrest provided 'io.airlift:aircompressor:0.18' diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java index 72dc92dcdc715..5def8ff4dfb0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java @@ -26,6 +26,11 @@ import org.apache.beam.sdk.util.RowJson; import org.apache.beam.sdk.util.RowJsonUtils; import org.apache.beam.sdk.values.Row; +import org.everit.json.schema.ArraySchema; +import org.everit.json.schema.NumberSchema; +import org.everit.json.schema.ObjectSchema; +import org.everit.json.schema.ReferenceSchema; +import org.json.JSONObject; /** Utils to convert JSON records to Beam {@link Row}. */ @Experimental(Kind.SCHEMAS) @@ -73,6 +78,77 @@ public String apply(Row input) { }; } + public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) { + org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr); + return beamSchemaFromJsonSchema(jsonSchema); + } + + private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) { + Schema.Builder beamSchemaBuilder = Schema.builder(); + for (String propertyName : jsonSchema.getPropertySchemas().keySet()) { + org.everit.json.schema.Schema propertySchema = + jsonSchema.getPropertySchemas().get(propertyName); + if (propertySchema == null) { + throw new IllegalArgumentException("Unable to parse schema " + jsonSchema.toString()); + } + if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) { + beamSchemaBuilder = + beamSchemaBuilder.addField( + Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); + } else if (propertySchema.getClass().equals(org.everit.json.schema.ArraySchema.class)) { + beamSchemaBuilder = + beamSchemaBuilder.addField( + Schema.Field.of( + propertyName, + Schema.FieldType.array( + beamTypeFromJsonSchemaType( + ((ArraySchema) propertySchema).getAllItemSchema())))); + } else { + try { + beamSchemaBuilder = + beamSchemaBuilder.addField( + Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema))); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unsupported field type in field " + propertyName, e); + } + } + } + return beamSchemaBuilder.build(); + } + + private static Schema.FieldType beamTypeFromJsonSchemaType( + org.everit.json.schema.Schema propertySchema) { + if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) { + return Schema.FieldType.row(beamSchemaFromJsonSchema((ObjectSchema) propertySchema)); + } else if (propertySchema.getClass().equals(org.everit.json.schema.BooleanSchema.class)) { + return Schema.FieldType.BOOLEAN; + } else if (propertySchema.getClass().equals(org.everit.json.schema.NumberSchema.class)) { + return ((NumberSchema) propertySchema).requiresInteger() + ? Schema.FieldType.INT64 + : Schema.FieldType.DOUBLE; + } + if (propertySchema.getClass().equals(org.everit.json.schema.StringSchema.class)) { + return Schema.FieldType.STRING; + } else if (propertySchema.getClass().equals(org.everit.json.schema.ReferenceSchema.class)) { + org.everit.json.schema.Schema sch = ((ReferenceSchema) propertySchema).getReferredSchema(); + return beamTypeFromJsonSchemaType(sch); + } else { + throw new IllegalArgumentException( + "Unsupported schema type: " + propertySchema.getClass().toString()); + } + } + + private static org.everit.json.schema.ObjectSchema jsonSchemaFromString(String jsonSchema) { + JSONObject parsedSchema = new JSONObject(jsonSchema); + org.everit.json.schema.Schema schemaValidator = + org.everit.json.schema.loader.SchemaLoader.load(parsedSchema); + if (!schemaValidator.getClass().equals(ObjectSchema.class)) { + throw new IllegalArgumentException( + String.format("The schema is not a valid object schema:\n%s", jsonSchema)); + } + return (org.everit.json.schema.ObjectSchema) schemaValidator; + } + private abstract static class JsonToRowFn extends SimpleFunction { final RowJson.RowJsonDeserializer deserializer; final ObjectMapper objectMapper; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java new file mode 100644 index 0000000000000..53615a010692e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.io.InputStream; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class JsonSchemaConversionTest { + + @Test + public void testBasicJsonSchemaToBeamSchema() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/basic_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat( + parsedSchema.getFieldNames(), + containsInAnyOrder("booleanProp", "integerProp", "numberProp", "stringProp")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.BOOLEAN, + Schema.FieldType.INT64, + Schema.FieldType.DOUBLE, + Schema.FieldType.STRING)); + } + } + + @Test + public void testNestedStructsJsonSchemaToBeamSchema() throws IOException { + try (InputStream inputStream = + getClass().getResourceAsStream("/schemas/json/nested_arrays_objects_json_schema.json")) { + String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8"); + Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema); + + assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("fruits", "vegetables")); + assertThat( + parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()), + containsInAnyOrder( + Schema.FieldType.array(Schema.FieldType.STRING), + Schema.FieldType.array( + Schema.FieldType.row( + Schema.of( + Schema.Field.of("veggieName", Schema.FieldType.STRING), + Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN)))))); + } + } +}