From 85c0e8364376f19f2e0eb5b5c7bea6639702725b Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Mon, 7 Oct 2019 21:45:48 -0700 Subject: [PATCH] Add ToJson transform (#9744) --- .../apache/beam/sdk/transforms/JsonToRow.java | 4 +- .../apache/beam/sdk/transforms/ToJson.java | 74 +++++++++++++++ ...{JsonToRowUtils.java => RowJsonUtils.java} | 23 ++++- .../beam/sdk/transforms/ToJsonTest.java | 95 +++++++++++++++++++ .../provider/pubsub/PubsubMessageToRow.java | 6 +- 5 files changed, 195 insertions(+), 7 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java rename sdks/java/core/src/main/java/org/apache/beam/sdk/util/{JsonToRowUtils.java => RowJsonUtils.java} (72%) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToJsonTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java index b8546b4e4aaf7..33fb2a6710a47 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.transforms; -import static org.apache.beam.sdk.util.JsonToRowUtils.jsonToRow; -import static org.apache.beam.sdk.util.JsonToRowUtils.newObjectMapperWith; +import static org.apache.beam.sdk.util.RowJsonUtils.jsonToRow; +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; import com.fasterxml.jackson.databind.ObjectMapper; import javax.annotation.Nullable; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java new file mode 100644 index 0000000000000..28d6c464994ea --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToJson.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; +import static org.apache.beam.sdk.util.RowJsonUtils.rowToJson; + +import com.fasterxml.jackson.databind.ObjectMapper; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.util.RowJsonSerializer; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** + * Experimental + * + *

Creates a {@link PTransform} that serializes UTF-8 JSON objects from a {@link Schema}-aware + * PCollection (i.e. {@link PCollection#hasSchema()} returns true). JSON format is compatible with + * {@link JsonToRow}. + * + *

For specifics of JSON serialization see {@link RowJsonSerializer}. + */ +@Experimental +public class ToJson extends PTransform, PCollection> { + private transient volatile @Nullable ObjectMapper objectMapper; + + static ToJson of() { + return new ToJson(); + } + + @Override + public PCollection expand(PCollection rows) { + Schema inputSchema = rows.getSchema(); + SerializableFunction toRow = rows.getToRowFunction(); + return rows.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext context) { + context.output( + rowToJson(objectMapper(inputSchema), toRow.apply(context.element()))); + } + })); + } + + private ObjectMapper objectMapper(Schema schema) { + if (this.objectMapper == null) { + synchronized (this) { + if (this.objectMapper == null) { + this.objectMapper = newObjectMapperWith(RowJsonSerializer.forSchema(schema)); + } + } + } + + return this.objectMapper; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java similarity index 72% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java index 8ac834c24bfbe..598dc744f48a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -26,9 +27,9 @@ import org.apache.beam.sdk.util.RowJsonDeserializer.UnsupportedRowJsonException; import org.apache.beam.sdk.values.Row; -/** JsonToRowUtils. */ +/** Utilities for working with {@link RowJsonSerializer} and {@link RowJsonDeserializer}. */ @Internal -public class JsonToRowUtils { +public class RowJsonUtils { public static ObjectMapper newObjectMapperWith(RowJsonDeserializer deserializer) { SimpleModule module = new SimpleModule("rowDeserializationModule"); @@ -40,6 +41,16 @@ public static ObjectMapper newObjectMapperWith(RowJsonDeserializer deserializer) return objectMapper; } + public static ObjectMapper newObjectMapperWith(RowJsonSerializer serializer) { + SimpleModule module = new SimpleModule("rowSerializationModule"); + module.addSerializer(Row.class, serializer); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(module); + + return objectMapper; + } + public static Row jsonToRow(ObjectMapper objectMapper, String jsonString) { try { return objectMapper.readValue(jsonString, Row.class); @@ -49,4 +60,12 @@ public static Row jsonToRow(ObjectMapper objectMapper, String jsonString) { throw new IllegalArgumentException("Unable to parse json object: " + jsonString, e); } } + + public static String rowToJson(ObjectMapper objectMapper, Row row) { + try { + return objectMapper.writeValueAsString(row); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Unable to serilize row: " + row); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToJsonTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToJsonTest.java new file mode 100644 index 0000000000000..76a29a98a7f10 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToJsonTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSchema; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link JsonToRow}. */ +@RunWith(JUnit4.class) +@Category(UsesSchema.class) +public class ToJsonTest implements Serializable { + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + abstract static class Person { + public static Person of(String name, Integer height, Boolean knowsJavascript) { + return new AutoValue_ToJsonTest_Person(name, height, knowsJavascript); + } + + public abstract String getName(); + + public abstract Integer getHeight(); + + public abstract Boolean getKnowsJavascript(); + } + + @Test + @Category(NeedsRunner.class) + public void testSerializesParseableJson() throws Exception { + PCollection persons = + pipeline.apply( + "jsonPersons", + Create.of( + Person.of("person1", 80, true), + Person.of("person2", 70, false), + Person.of("person3", 60, true), + Person.of("person4", 50, false), + Person.of("person5", 40, true))); + + Schema personSchema = + Schema.builder() + .addStringField("name") + .addInt32Field("height") + .addBooleanField("knowsJavascript") + .build(); + + PCollection personRows = + persons.apply(ToJson.of()).apply(JsonToRow.withSchema(personSchema)); + + PAssert.that(personRows) + .containsInAnyOrder( + row(personSchema, "person1", 80, true), + row(personSchema, "person2", 70, false), + row(personSchema, "person3", 60, true), + row(personSchema, "person4", 50, false), + row(personSchema, "person5", 40, true)); + + pipeline.run(); + } + + private Row row(Schema schema, Object... values) { + return Row.withSchema(schema).addValues(values).build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java index d7703808fc6ac..654e7225eecbe 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub; import static java.util.stream.Collectors.toList; -import static org.apache.beam.sdk.util.JsonToRowUtils.newObjectMapperWith; +import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.value.AutoValue; @@ -31,9 +31,9 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.JsonToRowUtils; import org.apache.beam.sdk.util.RowJsonDeserializer; import org.apache.beam.sdk.util.RowJsonDeserializer.UnsupportedRowJsonException; +import org.apache.beam.sdk.util.RowJsonUtils; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; @@ -127,7 +127,7 @@ private Row parsePayloadJsonRow(PubsubMessage pubsubMessage) { objectMapper = newObjectMapperWith(RowJsonDeserializer.forSchema(payloadSchema())); } - return JsonToRowUtils.jsonToRow(objectMapper, payloadJson); + return RowJsonUtils.jsonToRow(objectMapper, payloadJson); } @AutoValue.Builder