Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Beam Schemas capability to parse json-schemas. This is the de-… #24271

Merged
merged 14 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ class BeamModulePlugin implements Plugin<Project> {
def jackson_version = "2.13.3"
def jaxb_api_version = "2.3.3"
def jsr305_version = "3.0.2"
def everit_json_version = "1.14.1"
def kafka_version = "2.4.1"
def nemo_version = "0.1"
def netty_version = "4.1.77.Final"
Expand Down Expand Up @@ -678,6 +679,8 @@ class BeamModulePlugin implements Plugin<Project> {
joda_time : "joda-time:joda-time:2.10.10",
jsonassert : "org.skyscreamer:jsonassert:1.5.0",
jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version",
json_org : "org.json:json", // Try to keep in sync with google_cloud_platform_libraries_bom transitive deps.
everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}",
junit : "junit:junit:4.13.1",
kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
Expand Down
9 changes: 9 additions & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ dependencies {
shadow library.java.avro
shadow library.java.snappy_java
shadow library.java.joda_time
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
provided library.java.json_org
// com.github.everit JSON schema validation library is used for json-schema.org validation.
// to avoid forcing the library onto users, we ask users to provide it rather than include
// it by default.
// It is only used for optional functionality in JsonUtils schema parsing and conversion.
provided library.java.everit_json_schema
shadowTest library.java.everit_json_schema
provided library.java.junit
provided library.java.hamcrest
provided 'io.airlift:aircompressor:0.18'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,58 @@
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.Row;
import org.everit.json.schema.ArraySchema;
import org.everit.json.schema.NumberSchema;
import org.everit.json.schema.ObjectSchema;
import org.everit.json.schema.ReferenceSchema;
import org.json.JSONObject;

/** Utils to convert JSON records to Beam {@link Row}. */
/**
* Utils to convert JSON records to Beam {@link Row}.
*
* <h2>JSON-Schema (https://json-schema.org) support</h2>
*
* <p>This class provides utility methods to parse, validate and translate between <b>JSON
* Schema</b>-formatted schemas and Beam Schemas. The support is based on the <code>
* everit-json-schema</code> package, which is <b>not provided by default</b>.
*
* <p>Therefore, functionality in {@link JsonUtils::beamSchemaFromJsonSchema} requires that you
* include {@code everit-json-schema} in your project like so:
*
* <pre>{@code
* <dependency>
* <groupId>com.github.erosb < /groupId>
* <artifactId>everit-json-schema < /artifactId>
* <version>1.14.1 < /version>
pabloem marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should point the person to use the same version that Beam was tested with and not hard-code the version in the documentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annoying but is fine

* < /dependency>
* }</pre>
*
* <h3>JSON-Schema supported features</h3>
*
* <p>The current Beam implementation does not support all possible features of JSON-schema. The
* current implementation supports:
*
* <p>
*
* <ul>
* <li>String, boolean and numeric values (integer and floating-point).
* <li>Arrays, nested arrays and arbitratily nested object types
pabloem marked this conversation as resolved.
Show resolved Hide resolved
* <li>Fields marked as required are non-null. Other fields are nullable.
* </ul>
*
* <p><b>The following JSON-schema features are not supported:</b>
*
* <p>
*
* <ul>
* <li>Tuple-like arrays (or arrays with multiple item types).
* <li>Validation of row regular expressions, enum values, etc.
pabloem marked this conversation as resolved.
Show resolved Hide resolved
* <li>Special annotations for types (e.g. <code>contentMediaType</code>) are ignored.
* <li>Composite schemas (schemas made out of a <a
* href="https://json-schema.org/understanding-json-schema/reference/combining.html#combining">combination
* of other schemas</a>)
* </ul>
*/
@Experimental(Kind.SCHEMAS)
public class JsonUtils {

Expand Down Expand Up @@ -73,6 +123,93 @@ public String apply(Row input) {
};
}

public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
return beamSchemaFromJsonSchema(jsonSchema);
}

private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
Schema.Builder beamSchemaBuilder = Schema.builder();
for (String propertyName : jsonSchema.getPropertySchemas().keySet()) {
org.everit.json.schema.Schema propertySchema =
jsonSchema.getPropertySchemas().get(propertyName);
pabloem marked this conversation as resolved.
Show resolved Hide resolved
if (propertySchema == null) {
throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
}
java.util.function.BiFunction<String, Schema.FieldType, Schema.Field> fieldConstructor =
jsonSchema.getRequiredProperties().contains(propertyName)
? Schema.Field::of
: Schema.Field::nullable;
if (propertySchema instanceof org.everit.json.schema.ArraySchema) {
if (((ArraySchema) propertySchema).getAllItemSchema() == null) {
throw new IllegalArgumentException(
"Array schema is not properly formatted or unsupported ("
+ propertyName
+ "). Note that JSON-schema's tuple-like arrays are not supported by Beam.");
pabloem marked this conversation as resolved.
Show resolved Hide resolved
}
beamSchemaBuilder =
beamSchemaBuilder.addField(
fieldConstructor.apply(
propertyName,
Schema.FieldType.array(
beamTypeFromJsonSchemaType(
pabloem marked this conversation as resolved.
Show resolved Hide resolved
((ArraySchema) propertySchema).getAllItemSchema()))));
pabloem marked this conversation as resolved.
Show resolved Hide resolved
} else {
try {
beamSchemaBuilder =
beamSchemaBuilder.addField(
fieldConstructor.apply(propertyName, beamTypeFromJsonSchemaType(propertySchema)));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Unsupported field type " + propertySchema.getClass() + " in field " + propertyName,
e);
}
}
}
return beamSchemaBuilder.build();
}

private static Schema.FieldType beamTypeFromJsonSchemaType(
org.everit.json.schema.Schema propertySchema) {
if (propertySchema instanceof org.everit.json.schema.ObjectSchema) {
return Schema.FieldType.row(beamSchemaFromJsonSchema((ObjectSchema) propertySchema));
} else if (propertySchema instanceof org.everit.json.schema.BooleanSchema) {
return Schema.FieldType.BOOLEAN;
} else if (propertySchema instanceof org.everit.json.schema.NumberSchema) {
return ((NumberSchema) propertySchema).requiresInteger()
? Schema.FieldType.INT64
: Schema.FieldType.DOUBLE;
}
if (propertySchema instanceof org.everit.json.schema.StringSchema) {
pabloem marked this conversation as resolved.
Show resolved Hide resolved
return Schema.FieldType.STRING;
} else if (propertySchema instanceof org.everit.json.schema.ReferenceSchema) {
org.everit.json.schema.Schema sch = ((ReferenceSchema) propertySchema).getReferredSchema();
return beamTypeFromJsonSchemaType(sch);
} else if (propertySchema instanceof org.everit.json.schema.ArraySchema) {
if (((ArraySchema) propertySchema).getAllItemSchema() == null) {
throw new IllegalArgumentException(
"Array schema is not properly formatted or unsupported ("
+ propertySchema
+ "). Note that JSON-schema's tuple-like arrays are not supported by Beam.");
pabloem marked this conversation as resolved.
Show resolved Hide resolved
}
return Schema.FieldType.array(
beamTypeFromJsonSchemaType(((ArraySchema) propertySchema).getAllItemSchema()));
} else {
throw new IllegalArgumentException("Unsupported schema type: " + propertySchema.getClass());
}
}

private static org.everit.json.schema.ObjectSchema jsonSchemaFromString(String jsonSchema) {
JSONObject parsedSchema = new JSONObject(jsonSchema);
org.everit.json.schema.Schema schemaValidator =
org.everit.json.schema.loader.SchemaLoader.load(parsedSchema);
if (!(schemaValidator instanceof ObjectSchema)) {
throw new IllegalArgumentException(
String.format("The schema is not a valid object schema:%n %s", jsonSchema));
}
return (org.everit.json.schema.ObjectSchema) schemaValidator;
}

private abstract static class JsonToRowFn<T> extends SimpleFunction<T, Row> {
final RowJson.RowJsonDeserializer deserializer;
final ObjectMapper objectMapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class JsonSchemaConversionTest {

@Test
public void testBasicJsonSchemaToBeamSchema() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/json-schema/basic_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(
parsedSchema.getFieldNames(),
containsInAnyOrder("booleanProp", "integerProp", "numberProp", "stringProp"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.BOOLEAN.withNullable(true),
Schema.FieldType.INT64.withNullable(true),
Schema.FieldType.DOUBLE.withNullable(true),
Schema.FieldType.STRING.withNullable(true)));
}
}

@Test
public void testNestedStructsJsonSchemaToBeamSchema() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/json-schema/nested_arrays_objects_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("fruits", "vegetables"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.array(Schema.FieldType.STRING).withNullable(true),
Schema.FieldType.array(
Schema.FieldType.row(
Schema.of(
Schema.Field.of("veggieName", Schema.FieldType.STRING),
Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN))))
.withNullable(true)));
}
}

@Test
public void testArrayNestedArrayObjectJsonSchemaToBeamSchema() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/json-schema/array_nested_array_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("complexMatrix"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.array(
Schema.FieldType.array(
Schema.FieldType.row(
Schema.of(
Schema.Field.nullable("imaginary", Schema.FieldType.DOUBLE),
Schema.Field.nullable("real", Schema.FieldType.DOUBLE)))))
.withNullable(true)));
}
}

@Test
public void testObjectNestedObjectArrayJsonSchemaToBeamSchema() throws IOException {
try (InputStream inputStream =
getClass()
.getResourceAsStream("/json-schema/object_nested_object_and_array_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("classroom"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.row(
Schema.of(
Schema.Field.nullable("teacher", Schema.FieldType.STRING),
Schema.Field.nullable(
"classroom",
Schema.FieldType.row(
Schema.of(
Schema.Field.nullable(
"students",
Schema.FieldType.array(
Schema.FieldType.row(
Schema.of(
Schema.Field.nullable(
"name", Schema.FieldType.STRING),
Schema.Field.nullable(
"age", Schema.FieldType.INT64))))
.withNullable(true)),
Schema.Field.nullable(
"building", Schema.FieldType.STRING))))))
.withNullable(true)));
}
}

@Test
public void testArrayWithNestedRefsBeamSchema() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/json-schema/ref_with_ref_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("vegetables"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.array(
Schema.FieldType.row(
Schema.of(
Schema.Field.of("veggieName", Schema.FieldType.STRING),
Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN),
Schema.Field.nullable(
"origin",
Schema.FieldType.row(
Schema.of(
Schema.Field.nullable("country", Schema.FieldType.STRING),
Schema.Field.nullable("town", Schema.FieldType.STRING),
Schema.Field.nullable(
"region", Schema.FieldType.STRING)))))))
.withNullable(true)));
}
}

@Test
public void testUnsupportedTupleArrays() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/json-schema/unsupported_tuple_arrays.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");

IllegalArgumentException thrownException =
assertThrows(
IllegalArgumentException.class,
() -> {
JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);
});

assertThat(
thrownException.getMessage(), containsString("Array schema is not properly formatted"));
}
}

@Test
public void testUnsupportedNestedTupleArrays() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/json-schema/unsupported_nested_tuple_array.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");

IllegalArgumentException thrownException =
assertThrows(
IllegalArgumentException.class,
() -> {
JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);
});

assertThat(
thrownException.getCause().getMessage(),
containsString("Array schema is not properly formatted"));
}
}
}
Loading