From 9187075124f2d4781391dc52518c00be07d7d4c7 Mon Sep 17 00:00:00 2001 From: Lara Schmidt Date: Thu, 24 Feb 2022 12:00:06 -0800 Subject: [PATCH] Adding a logical type for Schemas using proto serialization. --- .../beam/sdk/schemas/SchemaTranslation.java | 10 +++- .../logicaltypes/SchemaLogicalType.java | 60 +++++++++++++++++++ .../sdk/schemas/SchemaTranslationTest.java | 2 + .../logicaltypes/LogicalTypesTest.java | 17 ++++++ 4 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 4e6921b5226b8..7347d995fe36e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; @@ -60,10 +61,13 @@ public class SchemaTranslation { private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; // TODO(BEAM-7855): Populate this with a LogicalTypeRegistrar, which includes a way to construct - // the LogicalType - // given an argument. + // the LogicalType given an argument. private static final ImmutableMap>> - STANDARD_LOGICAL_TYPES = ImmutableMap.of(MicrosInstant.IDENTIFIER, MicrosInstant.class); + STANDARD_LOGICAL_TYPES = + ImmutableMap.>>builder() + .put(MicrosInstant.IDENTIFIER, MicrosInstant.class) + .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class) + .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java new file mode 100644 index 0000000000000..a7fcbdc50f937 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.InvalidProtocolBufferException; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A schema represented as a serialized proto bytes. */ +public class SchemaLogicalType implements Schema.LogicalType { + public static final String IDENTIFIER = "beam:logical_type:schema:v1"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public @Nullable FieldType getArgumentType() { + return null; + } + + @Override + public FieldType getBaseType() { + return FieldType.BYTES; + } + + @Override + public byte @NonNull [] toBaseType(Schema input) { + return SchemaTranslation.schemaToProto(input, true).toByteArray(); + } + + @Override + public org.apache.beam.sdk.schemas.Schema toInputType(byte @NonNull [] base) { + try { + return SchemaTranslation.schemaFromProto(SchemaApi.Schema.parseFrom(base)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index 9f1f7d47efa7c..604be4fdc2003 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; @@ -173,6 +174,7 @@ public static Iterable data() { .add( Schema.of( Field.of("null_argument", FieldType.logicalType(new NullArgumentLogicalType())))) + .add(Schema.of(Field.of("schema", FieldType.logicalType(new SchemaLogicalType())))) .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java index dd07f8eef8faf..8ee20f1ad167c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java @@ -113,4 +113,21 @@ public void testUuid() { assertEquals(uuid, row.getLogicalTypeValue(0, UUID.class)); assertEquals(uuidAsRow, row.getBaseValue(0, Row.class)); } + + @Test + public void testSchema() { + Schema schemaValue = + Schema.of( + Field.of("fieldOne", FieldType.BOOLEAN), + Field.of("nested", FieldType.logicalType(new SchemaLogicalType()))); + + Schema schema = Schema.builder().addLogicalTypeField("schema", new SchemaLogicalType()).build(); + Row row = Row.withSchema(schema).addValues(schemaValue).build(); + assertEquals(schemaValue, row.getLogicalTypeValue(0, Schema.class)); + + // Check base type conversion. + assertEquals( + schemaValue, + new SchemaLogicalType().toInputType(new SchemaLogicalType().toBaseType(schemaValue))); + } }