diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 105bdbda521a7..7fdb5aaf5e861 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1055,6 +1055,10 @@ message StandardCoders { // - A timestamp without a timezone where seconds + micros represents the // amount of time since the epoch. // + // beam:logical_type:schema:v1 + // - Representation type: BYTES + // - A Beam Schema stored as a serialized proto. + // // The payload for RowCoder is an instance of Schema. // Components: None // Experimental. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index 25983caf2b15c..e1112b2472dee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; @@ -67,10 +68,13 @@ public class SchemaTranslation { private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; // TODO(BEAM-7855): Populate this with a LogicalTypeRegistrar, which includes a way to construct - // the LogicalType - // given an argument. + // the LogicalType given an argument. private static final ImmutableMap>> - STANDARD_LOGICAL_TYPES = ImmutableMap.of(MicrosInstant.IDENTIFIER, MicrosInstant.class); + STANDARD_LOGICAL_TYPES = + ImmutableMap.>>builder() + .put(MicrosInstant.IDENTIFIER, MicrosInstant.class) + .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class) + .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java new file mode 100644 index 0000000000000..a7fcbdc50f937 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SchemaLogicalType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.InvalidProtocolBufferException; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A schema represented as a serialized proto bytes. */ +public class SchemaLogicalType implements Schema.LogicalType { + public static final String IDENTIFIER = "beam:logical_type:schema:v1"; + + @Override + public String getIdentifier() { + return IDENTIFIER; + } + + @Override + public @Nullable FieldType getArgumentType() { + return null; + } + + @Override + public FieldType getBaseType() { + return FieldType.BYTES; + } + + @Override + public byte @NonNull [] toBaseType(Schema input) { + return SchemaTranslation.schemaToProto(input, true).toByteArray(); + } + + @Override + public org.apache.beam.sdk.schemas.Schema toInputType(byte @NonNull [] base) { + try { + return SchemaTranslation.schemaFromProto(SchemaApi.Schema.parseFrom(base)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index 2c0cadb45eaac..f4274de02ea56 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; @@ -180,6 +181,7 @@ public static Iterable data() { .add(Schema.of(Field.of("logical_argument", FieldType.logicalType(new DateTime())))) .add( Schema.of(Field.of("single_arg_argument", FieldType.logicalType(FixedBytes.of(100))))) + .add(Schema.of(Field.of("schema", FieldType.logicalType(new SchemaLogicalType())))) .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java index dd07f8eef8faf..8ee20f1ad167c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/logicaltypes/LogicalTypesTest.java @@ -113,4 +113,21 @@ public void testUuid() { assertEquals(uuid, row.getLogicalTypeValue(0, UUID.class)); assertEquals(uuidAsRow, row.getBaseValue(0, Row.class)); } + + @Test + public void testSchema() { + Schema schemaValue = + Schema.of( + Field.of("fieldOne", FieldType.BOOLEAN), + Field.of("nested", FieldType.logicalType(new SchemaLogicalType()))); + + Schema schema = Schema.builder().addLogicalTypeField("schema", new SchemaLogicalType()).build(); + Row row = Row.withSchema(schema).addValues(schemaValue).build(); + assertEquals(schemaValue, row.getLogicalTypeValue(0, Schema.class)); + + // Check base type conversion. + assertEquals( + schemaValue, + new SchemaLogicalType().toInputType(new SchemaLogicalType().toBaseType(schemaValue))); + } }