Skip to content

Commit

Permalink
Adding Beam Schemas capability to parse json-schemas. This is the de-…
Browse files Browse the repository at this point in the history
…facto standard way to define JSON schemas
  • Loading branch information
pabloem committed Nov 22, 2022
1 parent 67e2008 commit 6add8d0
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ class BeamModulePlugin implements Plugin<Project> {
def jackson_version = "2.13.3"
def jaxb_api_version = "2.3.3"
def jsr305_version = "3.0.2"
def json_org_version = "20200518"
def everit_json_version = "1.14.1"
def kafka_version = "2.4.1"
def nemo_version = "0.1"
def netty_version = "4.1.77.Final"
Expand Down Expand Up @@ -678,6 +680,8 @@ class BeamModulePlugin implements Plugin<Project> {
joda_time : "joda-time:joda-time:2.10.10",
jsonassert : "org.skyscreamer:jsonassert:1.5.0",
jsr305 : "com.google.code.findbugs:jsr305:$jsr305_version",
json_org : "org.json:json:${json_org_version}",
everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}",
junit : "junit:junit:4.13.1",
kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ dependencies {
shadow library.java.avro
shadow library.java.snappy_java
shadow library.java.joda_time
shadow library.java.json_org
shadow library.java.everit_json_schema
provided library.java.junit
provided library.java.hamcrest
provided 'io.airlift:aircompressor:0.18'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import org.apache.beam.sdk.util.RowJson;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.Row;
import org.everit.json.schema.ArraySchema;
import org.everit.json.schema.NumberSchema;
import org.everit.json.schema.ObjectSchema;
import org.everit.json.schema.ReferenceSchema;
import org.json.JSONObject;

/** Utils to convert JSON records to Beam {@link Row}. */
@Experimental(Kind.SCHEMAS)
Expand Down Expand Up @@ -73,6 +78,77 @@ public String apply(Row input) {
};
}

public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
return beamSchemaFromJsonSchema(jsonSchema);
}

private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
Schema.Builder beamSchemaBuilder = Schema.builder();
for (String propertyName : jsonSchema.getPropertySchemas().keySet()) {
org.everit.json.schema.Schema propertySchema =
jsonSchema.getPropertySchemas().get(propertyName);
if (propertySchema == null) {
throw new IllegalArgumentException("Unable to parse schema " + jsonSchema.toString());
}
if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) {
beamSchemaBuilder =
beamSchemaBuilder.addField(
Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema)));
} else if (propertySchema.getClass().equals(org.everit.json.schema.ArraySchema.class)) {
beamSchemaBuilder =
beamSchemaBuilder.addField(
Schema.Field.of(
propertyName,
Schema.FieldType.array(
beamTypeFromJsonSchemaType(
((ArraySchema) propertySchema).getAllItemSchema()))));
} else {
try {
beamSchemaBuilder =
beamSchemaBuilder.addField(
Schema.Field.of(propertyName, beamTypeFromJsonSchemaType(propertySchema)));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported field type in field " + propertyName, e);
}
}
}
return beamSchemaBuilder.build();
}

private static Schema.FieldType beamTypeFromJsonSchemaType(
org.everit.json.schema.Schema propertySchema) {
if (propertySchema.getClass().equals(org.everit.json.schema.ObjectSchema.class)) {
return Schema.FieldType.row(beamSchemaFromJsonSchema((ObjectSchema) propertySchema));
} else if (propertySchema.getClass().equals(org.everit.json.schema.BooleanSchema.class)) {
return Schema.FieldType.BOOLEAN;
} else if (propertySchema.getClass().equals(org.everit.json.schema.NumberSchema.class)) {
return ((NumberSchema) propertySchema).requiresInteger()
? Schema.FieldType.INT64
: Schema.FieldType.DOUBLE;
}
if (propertySchema.getClass().equals(org.everit.json.schema.StringSchema.class)) {
return Schema.FieldType.STRING;
} else if (propertySchema.getClass().equals(org.everit.json.schema.ReferenceSchema.class)) {
org.everit.json.schema.Schema sch = ((ReferenceSchema) propertySchema).getReferredSchema();
return beamTypeFromJsonSchemaType(sch);
} else {
throw new IllegalArgumentException(
"Unsupported schema type: " + propertySchema.getClass().toString());
}
}

private static org.everit.json.schema.ObjectSchema jsonSchemaFromString(String jsonSchema) {
JSONObject parsedSchema = new JSONObject(jsonSchema);
org.everit.json.schema.Schema schemaValidator =
org.everit.json.schema.loader.SchemaLoader.load(parsedSchema);
if (!schemaValidator.getClass().equals(ObjectSchema.class)) {
throw new IllegalArgumentException(
String.format("The schema is not a valid object schema:\n%s", jsonSchema));
}
return (org.everit.json.schema.ObjectSchema) schemaValidator;
}

private abstract static class JsonToRowFn<T> extends SimpleFunction<T, Row> {
final RowJson.RowJsonDeserializer deserializer;
final ObjectMapper objectMapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class JsonSchemaConversionTest {

@Test
public void testBasicJsonSchemaToBeamSchema() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/schemas/json/basic_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(
parsedSchema.getFieldNames(),
containsInAnyOrder("booleanProp", "integerProp", "numberProp", "stringProp"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.BOOLEAN,
Schema.FieldType.INT64,
Schema.FieldType.DOUBLE,
Schema.FieldType.STRING));
}
}

@Test
public void testNestedStructsJsonSchemaToBeamSchema() throws IOException {
try (InputStream inputStream =
getClass().getResourceAsStream("/schemas/json/nested_arrays_objects_json_schema.json")) {
String stringJsonSchema = new String(ByteStreams.toByteArray(inputStream), "UTF-8");
Schema parsedSchema = JsonUtils.beamSchemaFromJsonSchema(stringJsonSchema);

assertThat(parsedSchema.getFieldNames(), containsInAnyOrder("fruits", "vegetables"));
assertThat(
parsedSchema.getFields().stream().map(Schema.Field::getType).collect(Collectors.toList()),
containsInAnyOrder(
Schema.FieldType.array(Schema.FieldType.STRING),
Schema.FieldType.array(
Schema.FieldType.row(
Schema.of(
Schema.Field.of("veggieName", Schema.FieldType.STRING),
Schema.Field.of("veggieLike", Schema.FieldType.BOOLEAN))))));
}
}
}

0 comments on commit 6add8d0

Please sign in to comment.