diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 429371e110556..e7990e2f10853 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -59,6 +59,24 @@ message ExpansionMethods { } } +// Defines the URNs for managed transforms. +message ManagedTransforms { + enum Urns { + ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_write:v1"]; + KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_read:v1"]; + KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_write:v1"]; + BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"]; + BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"]; + } +} + // A configuration payload for an external transform. // Used to define a Java transform that can be directly instantiated by a Java // expansion service. diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 95a843e51fd18..2309e94f2e15e 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -37,6 +37,8 @@ ext.summary = "Expansion service serving several Java IOs" dependencies { implementation project(":sdks:java:expansion-service") permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + implementation project(":sdks:java:managed") + permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 implementation project(":sdks:java:io:iceberg") permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 7965cde86e7d9..ba42ce5291dab 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -44,7 +44,6 @@ def orc_version = "1.9.2" dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:managed") implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index ef535353efd01..05db81e84bcf3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -17,14 +17,16 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config; -import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.SchemaRegistry; @@ -59,7 +61,7 @@ public List outputCollectionNames() { @Override public String identifier() { - return ManagedTransformConstants.ICEBERG_READ; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ); } @DefaultSchema(AutoValueSchema.class) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index b3de7a88c541d..4b9393955d5f4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -17,14 +17,16 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config; -import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; @@ -80,7 +82,7 @@ public List outputCollectionNames() { @Override public String identifier() { - return ManagedTransformConstants.ICEBERG_WRITE; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE); } @DefaultSchema(AutoValueSchema.class) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index b2eeb1a54d1da..c22eeef71ee85 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; import java.io.FileOutputStream; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; @@ -103,7 +105,7 @@ public Row apply(byte[] input) { @Override public String identifier() { - return "beam:schematransform:org.apache.beam:kafka_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 09b338492b47f..d6f46b11cb7de 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.Serializable; @@ -26,6 +28,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.metrics.Counter; @@ -249,7 +252,7 @@ public byte[] apply(Row input) { @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:kafka_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE); } @Override diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 911e25cdda143..8477726686ee1 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -87,13 +90,13 @@ public class Managed { // Supported SchemaTransforms public static final Map READ_TRANSFORMS = ImmutableMap.builder() - .put(ICEBERG, ManagedTransformConstants.ICEBERG_READ) - .put(KAFKA, ManagedTransformConstants.KAFKA_READ) + .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) + .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder() - .put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE) - .put(KAFKA, ManagedTransformConstants.KAFKA_WRITE) + .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE)) + .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) .build(); /** diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 51d0b67b4b89b..4cf752747be5a 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import java.util.Map; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** @@ -41,12 +44,6 @@ public class ManagedTransformConstants { // Standard input PCollection tag public static final String INPUT = "input"; - public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1"; - public static final String ICEBERG_WRITE = - "beam:schematransform:org.apache.beam:iceberg_write:v1"; - public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1"; - public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1"; - private static final Map KAFKA_READ_MAPPINGS = ImmutableMap.builder().put("data_format", "format").build(); @@ -55,7 +52,7 @@ public class ManagedTransformConstants { public static final Map> MAPPINGS = ImmutableMap.>builder() - .put(KAFKA_READ, KAFKA_READ_MAPPINGS) - .put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS) + .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS) + .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS) .build(); } diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 4effc91c3d402..74d9a39bb052c 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -38,6 +38,7 @@ StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods +ManagedTransforms = external_transforms_pb2_urns.ManagedTransforms MonitoringInfo = metrics_pb2_urns.MonitoringInfo MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 4e66a290842c3..b8b6839019e84 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -22,6 +22,7 @@ from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.external import * +from apache_beam.transforms.managed import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.stats import * from apache_beam.transforms.timeutil import TimeDomain diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py new file mode 100644 index 0000000000000..71691f05027f5 --- /dev/null +++ b/sdks/python/apache_beam/transforms/managed.py @@ -0,0 +1,188 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Managed Transforms. + +This module builds and instantiates turnkey transforms that can be managed by +the underlying runner. This means the runner can upgrade the transform to a +more optimal/updated version without requiring the user to do anything. It may +also replace the transform with something entirely different if it chooses to. +By default, however, the specified transform will remain unchanged. + +Using Managed Transforms +======================== +Managed turnkey transforms have a defined configuration and can be built using +an inline :class:`dict` like so:: + + results = p | beam.managed.Read( + beam.managed.ICEBERG, + config={"table": "foo", + "catalog_name": "bar", + "catalog_properties": { + "prop1": "value1", + "prop2": "value2"}}) + +A YAML configuration file can also be used to build a Managed transform. Say we +have the following `config.yaml` file:: + + topic: "foo" + bootstrap_servers: "localhost:1234" + format: "AVRO" + +Simply provide the location to the file like so:: + + input_rows = p | beam.Create(...) + input_rows | beam.managed.Write( + beam.managed.KAFKA, + config_url="path/to/config.yaml") + +Available transforms +==================== +Available transforms are: + +- **Kafka Read and Write** +- **Iceberg Read and Write** + +**Note:** inputs and outputs need to be PCollection(s) of Beam +:py:class:`apache_beam.pvalue.Row` elements. + +**Note:** Today, all managed transforms are essentially cross-language +transforms, and Java's ManagedSchemaTransform is used under the hood. +""" + +from typing import Any +from typing import Dict +from typing import Optional + +import yaml + +from apache_beam.portability.common_urns import ManagedTransforms +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import SchemaAwareExternalTransform +from apache_beam.transforms.ptransform import PTransform + +ICEBERG = "iceberg" +KAFKA = "kafka" +BIGQUERY = "bigquery" +_MANAGED_IDENTIFIER = "beam:transform:managed:v1" +_EXPANSION_SERVICE_JAR_TARGETS = { + "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], + "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [ + BIGQUERY + ], +} + +__all__ = ["ICEBERG", "KAFKA", "Read", "Write"] + + +class Read(PTransform): + """Read using Managed Transforms""" + _READ_TRANSFORMS = { + ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn, + KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn, + } + + def __init__( + self, + source: str, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, + expansion_service=None): + super().__init__() + self._source = source + identifier = self._READ_TRANSFORMS.get(source.lower()) + if not identifier: + raise ValueError( + f"An unsupported source was specified: '{source}'. Please specify " + f"one of the following sources: {list(self._READ_TRANSFORMS.keys())}") + + self._expansion_service = _resolve_expansion_service( + source, identifier, expansion_service) + self._underlying_identifier = identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) + + def default_label(self) -> str: + return "Managed Read(%s)" % self._source.upper() + + +class Write(PTransform): + """Write using Managed Transforms""" + _WRITE_TRANSFORMS = { + ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn, + KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn, + } + + def __init__( + self, + sink: str, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, + expansion_service=None): + super().__init__() + self._sink = sink + identifier = self._WRITE_TRANSFORMS.get(sink.lower()) + if not identifier: + raise ValueError( + f"An unsupported sink was specified: '{sink}'. Please specify " + f"one of the following sinks: {list(self._WRITE_TRANSFORMS.keys())}") + + self._expansion_service = _resolve_expansion_service( + sink, identifier, expansion_service) + self._underlying_identifier = identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) + + def default_label(self) -> str: + return "Managed Write(%s)" % self._sink.upper() + + +def _resolve_expansion_service( + transform_name: str, identifier: str, expansion_service): + if expansion_service: + return expansion_service + + default_target = None + for gradle_target, transforms in _EXPANSION_SERVICE_JAR_TARGETS.items(): + if transform_name.lower() in transforms: + default_target = gradle_target + break + if not default_target: + raise ValueError( + "No expansion service was specified and could not find a " + f"default expansion service for {transform_name}: '{identifier}'.") + return BeamJarExpansionService(default_target) diff --git a/sdks/python/apache_beam/transforms/managed_test.py b/sdks/python/apache_beam/transforms/managed_test.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sdks/python/setup.py b/sdks/python/setup.py index c9b2d087d04ca..a8b57d904bbc3 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -388,6 +388,7 @@ def get_portability_package_data(): 'requests>=2.24.0,<3.0.0,!=2.32.*', 'typing-extensions>=3.7.0', 'zstandard>=0.18.0,<1', + 'pyyaml>=3.12,<7.0.0', # Dynamic dependencies must be specified in a separate list, otherwise # Dependabot won't be able to parse the main list. Any dynamic # dependencies will not receive updates from Dependabot. @@ -414,7 +415,6 @@ def get_portability_package_data(): 'pandas<2.2.0', 'parameterized>=0.7.1,<0.10.0', 'pyhamcrest>=1.9,!=1.10.0,<3.0.0', - 'pyyaml>=3.12,<7.0.0', 'requests_mock>=1.7,<2.0', 'tenacity>=8.0.0,<9', 'pytest>=7.1.2,<8.0', @@ -514,7 +514,6 @@ def get_portability_package_data(): 'yaml': [ 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.1', - 'pyyaml>=3.12,<7.0.0', 'virtualenv-clone>=0.5,<1.0', ] + dataframe_dependency }, diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index 31a1a6343aed5..bf0eb6156f47f 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -44,7 +44,7 @@ # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py - 'beam:schematransform:org.apache.beam:kafka_write:v1' - 'beam:schematransform:org.apache.beam:kafka_read:v1' - # Not ready to generate + # Available through apache_beam.transforms.managed.[Read/Write] - 'beam:schematransform:org.apache.beam:iceberg_write:v1' - 'beam:schematransform:org.apache.beam:iceberg_read:v1'