Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Managed Transforms API #31495

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ message ExpansionMethods {
}
}

// Defines the URNs for managed transforms.
message ManagedTransforms {
enum Urns {
ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_read:v1"];
ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_write:v1"];
KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_read:v1"];
KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_write:v1"];
BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:bigquery_storage_write:v2"];
}
}

// A configuration payload for an external transform.
// Used to define a Java transform that can be directly instantiated by a Java
// expansion service.
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ ext.summary = "Expansion service serving several Java IOs"
dependencies {
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
implementation project(":sdks:java:managed")
permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
implementation project(":sdks:java:io:iceberg")
permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761
implementation project(":sdks:java:io:kafka")
Expand Down
1 change: 0 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def orc_version = "1.9.2"
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:managed")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
Expand Down Expand Up @@ -59,7 +61,7 @@ public List<String> outputCollectionNames() {

@Override
public String identifier() {
return ManagedTransformConstants.ICEBERG_READ;
return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ);
}

@DefaultSchema(AutoValueSchema.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -80,7 +82,7 @@ public List<String> outputCollectionNames() {

@Override
public String identifier() {
return ManagedTransformConstants.ICEBERG_WRITE;
return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE);
}

@DefaultSchema(AutoValueSchema.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import java.io.FileOutputStream;
Expand All @@ -34,6 +35,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
Expand Down Expand Up @@ -103,7 +105,7 @@ public Row apply(byte[] input) {

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:kafka_read:v1";
return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
Expand All @@ -26,6 +28,7 @@
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -249,7 +252,7 @@ public byte[] apply(Row input) {

@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
return "beam:schematransform:org.apache.beam:kafka_write:v1";
return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.managed;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
Expand Down Expand Up @@ -87,13 +90,13 @@ public class Managed {
// Supported SchemaTransforms
public static final Map<String, String> READ_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG, ManagedTransformConstants.ICEBERG_READ)
.put(KAFKA, ManagedTransformConstants.KAFKA_READ)
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
.build();
public static final Map<String, String> WRITE_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE)
.put(KAFKA, ManagedTransformConstants.KAFKA_WRITE)
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.managed;

import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;

import java.util.Map;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

/**
Expand All @@ -41,12 +44,6 @@ public class ManagedTransformConstants {
// Standard input PCollection tag
public static final String INPUT = "input";

public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1";
public static final String ICEBERG_WRITE =
"beam:schematransform:org.apache.beam:iceberg_write:v1";
public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1";
public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1";

private static final Map<String, String> KAFKA_READ_MAPPINGS =
ImmutableMap.<String, String>builder().put("data_format", "format").build();

Expand All @@ -55,7 +52,7 @@ public class ManagedTransformConstants {

public static final Map<String, Map<String, String>> MAPPINGS =
ImmutableMap.<String, Map<String, String>>builder()
.put(KAFKA_READ, KAFKA_READ_MAPPINGS)
.put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
.build();
}
1 change: 1 addition & 0 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes
StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes
ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods
ManagedTransforms = external_transforms_pb2_urns.ManagedTransforms
MonitoringInfo = metrics_pb2_urns.MonitoringInfo
MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs
MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.external import *
from apache_beam.transforms.managed import *
from apache_beam.transforms.ptransform import *
from apache_beam.transforms.stats import *
from apache_beam.transforms.timeutil import TimeDomain
Expand Down
Loading
Loading