From e126d9f057a7209b58f3c4a31cf100bcac797baf Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 3 Jun 2024 15:27:17 -0400 Subject: [PATCH 01/17] managed module --- sdks/java/io/expansion-service/build.gradle | 2 + .../python/apache_beam/transforms/__init__.py | 1 + sdks/python/apache_beam/transforms/managed.py | 67 +++++++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 sdks/python/apache_beam/transforms/managed.py diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 95a843e51fd18..2309e94f2e15e 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -37,6 +37,8 @@ ext.summary = "Expansion service serving several Java IOs" dependencies { implementation project(":sdks:java:expansion-service") permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + implementation project(":sdks:java:managed") + permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 implementation project(":sdks:java:io:iceberg") permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 4e66a290842c3..b8b6839019e84 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -22,6 +22,7 @@ from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.external import * +from apache_beam.transforms.managed import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.stats import * from apache_beam.transforms.timeutil import TimeDomain diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py new file mode 100644 index 0000000000000..51487723ea507 --- /dev/null +++ b/sdks/python/apache_beam/transforms/managed.py @@ -0,0 +1,67 @@ +from apache_beam.transforms.ptransform import PTransform +from apache_beam.transforms.external import SchemaAwareExternalTransform +from apache_beam.transforms.external import BeamJarExpansionService +from typing import Any, Dict +import yaml + +MANAGED_IDENTIFIER = "beam:transform:managed:v1" + +class _ManagedTransform(PTransform): + def __init__(self, underlying_identifier: str, config: Dict[str, Any] = None, config_url: str = None, + expansion_service=None): + super().__init__() + self._underlying_identifier = underlying_identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + self._expansion_service = expansion_service + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) + + +class Read(_ManagedTransform): + READ_TRANSFORMS = { + "iceberg": { + "identifier": "beam:schematransform:org.apache.beam:iceberg_read:v1", + "gradle_target": "sdks:java:io:expansion-service:shadowJar" + }, + "kafka": { + "identifier": "beam:schematransform:org.apache.beam:kafka_read:v1", + "gradle_target": "sdks:java:io:expansion-service:shadowJar" + } + } + + def __init__(self, source: str, config: Dict[str, Any] = None, config_url: str = None, expansion_service=None): + transform = self.READ_TRANSFORMS.get(source.lower()) + if not transform: + raise ValueError( + f"An unsupported source was specified: '{source}'. Please specify one of the following sources: {self.READ_TRANSFORMS.keys()}") + expansion_service = expansion_service or BeamJarExpansionService(transform["gradle_target"]) + super().__init__(transform["identifier"], config, config_url, expansion_service) + + +class Write(_ManagedTransform): + WRITE_TRANSFORMS = { + "iceberg": { + "identifier": "beam:schematransform:org.apache.beam:iceberg_write:v1", + "gradle_target": "sdks:java:io:expansion-service:shadowJar" + }, + "kafka": { + "identifier": "beam:schematransform:org.apache.beam:kafka_write:v1", + "gradle_target": "sdks:java:io:expansion-service:shadowJar" + } + } + + def __init__(self, sink: str, config: Dict[str, Any] = None, config_url: str = None, expansion_service=None): + transform = self.WRITE_TRANSFORMS.get(sink.lower()) + if not transform: + raise ValueError( + f"An unsupported source was specified: '{sink}'. Please specify one of the following sources: {self.WRITE_TRANSFORMS.keys()}") + expansion_service = expansion_service or BeamJarExpansionService(transform["gradle_target"]) + super().__init__(transform["identifier"], config, config_url, expansion_service) From 07e12cbbf2bdd95a3f48cc859871a184cfe22dec Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 4 Jun 2024 11:16:10 -0400 Subject: [PATCH 02/17] clean up --- sdks/python/apache_beam/transforms/managed.py | 117 ++++++++++++------ 1 file changed, 76 insertions(+), 41 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 51487723ea507..b96b4b8113ee0 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -1,14 +1,27 @@ -from apache_beam.transforms.ptransform import PTransform -from apache_beam.transforms.external import SchemaAwareExternalTransform -from apache_beam.transforms.external import BeamJarExpansionService -from typing import Any, Dict +from typing import Any +from typing import Dict + import yaml -MANAGED_IDENTIFIER = "beam:transform:managed:v1" +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import SchemaAwareExternalTransform +from apache_beam.transforms.ptransform import PTransform + +ICEBERG = "iceberg" +KAFKA = "kafka" +_MANAGED_IDENTIFIER = "beam:transform:managed:v1" +_GRADLE_TARGETS = {"sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG]} + +__all__ = ["ICEBERG", "KAFKA", "Read", "Write"] + class _ManagedTransform(PTransform): - def __init__(self, underlying_identifier: str, config: Dict[str, Any] = None, config_url: str = None, - expansion_service=None): + def __init__( + self, + underlying_identifier: str, + config: Dict[str, Any] = None, + config_url: str = None, + expansion_service=None): super().__init__() self._underlying_identifier = underlying_identifier self._yaml_config = yaml.dump(config) @@ -17,51 +30,73 @@ def __init__(self, underlying_identifier: str, config: Dict[str, Any] = None, co def expand(self, input): return input | SchemaAwareExternalTransform( - identifier=MANAGED_IDENTIFIER, - expansion_service=self._expansion_service, - rearrange_based_on_discovery=True, - transform_identifier=self._underlying_identifier, - config=self._yaml_config, - config_url=self._config_url) + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) class Read(_ManagedTransform): READ_TRANSFORMS = { - "iceberg": { - "identifier": "beam:schematransform:org.apache.beam:iceberg_read:v1", - "gradle_target": "sdks:java:io:expansion-service:shadowJar" - }, - "kafka": { - "identifier": "beam:schematransform:org.apache.beam:kafka_read:v1", - "gradle_target": "sdks:java:io:expansion-service:shadowJar" - } + ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", + KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1" } - def __init__(self, source: str, config: Dict[str, Any] = None, config_url: str = None, expansion_service=None): - transform = self.READ_TRANSFORMS.get(source.lower()) - if not transform: + def __init__( + self, + source: str, + config: Dict[str, Any] = None, + config_url: str = None, + expansion_service=None): + identifier = self.READ_TRANSFORMS.get(source.lower()) + if not identifier: raise ValueError( - f"An unsupported source was specified: '{source}'. Please specify one of the following sources: {self.READ_TRANSFORMS.keys()}") - expansion_service = expansion_service or BeamJarExpansionService(transform["gradle_target"]) - super().__init__(transform["identifier"], config, config_url, expansion_service) + f"An unsupported source was specified: '{source}'. Please specify " + f"one of the following sources: {self.READ_TRANSFORMS.keys()}" + ) + + expansion_service = _resolve_expansion_service( + source, identifier, expansion_service) + super().__init__(identifier, config, config_url, expansion_service) class Write(_ManagedTransform): WRITE_TRANSFORMS = { - "iceberg": { - "identifier": "beam:schematransform:org.apache.beam:iceberg_write:v1", - "gradle_target": "sdks:java:io:expansion-service:shadowJar" - }, - "kafka": { - "identifier": "beam:schematransform:org.apache.beam:kafka_write:v1", - "gradle_target": "sdks:java:io:expansion-service:shadowJar" - } + ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", + KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1" } - def __init__(self, sink: str, config: Dict[str, Any] = None, config_url: str = None, expansion_service=None): - transform = self.WRITE_TRANSFORMS.get(sink.lower()) - if not transform: + def __init__( + self, + sink: str, + config: Dict[str, Any] = None, + config_url: str = None, + expansion_service=None): + identifier = self.WRITE_TRANSFORMS.get(sink.lower()) + if not identifier: raise ValueError( - f"An unsupported source was specified: '{sink}'. Please specify one of the following sources: {self.WRITE_TRANSFORMS.keys()}") - expansion_service = expansion_service or BeamJarExpansionService(transform["gradle_target"]) - super().__init__(transform["identifier"], config, config_url, expansion_service) + f"An unsupported source was specified: '{sink}'. Please specify " + f"one of the following sources: {self.WRITE_TRANSFORMS.keys()}") + + expansion_service = _resolve_expansion_service( + sink, identifier, expansion_service) + super().__init__(identifier, config, config_url, expansion_service) + + +def _resolve_expansion_service( + transform_name: str, identifier: str, expansion_service): + if expansion_service: + return expansion_service + + default_target = None + for gradle_target in _GRADLE_TARGETS: + if transform_name in gradle_target: + default_target = gradle_target + break + if not default_target: + raise ValueError( + "No expansion service was specified and could not find a " + f"default expansion service for {transform_name}: '{identifier}'.") + return BeamJarExpansionService(default_target) From 7c2ba9829fecc9141590df3950284886bafd28a5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 4 Jun 2024 12:13:59 -0400 Subject: [PATCH 03/17] lint --- sdks/python/apache_beam/transforms/managed.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index b96b4b8113ee0..b2f56c4a31dcb 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -15,6 +15,9 @@ __all__ = ["ICEBERG", "KAFKA", "Read", "Write"] +# type: ignore[assignment] + + class _ManagedTransform(PTransform): def __init__( self, @@ -54,8 +57,7 @@ def __init__( if not identifier: raise ValueError( f"An unsupported source was specified: '{source}'. Please specify " - f"one of the following sources: {self.READ_TRANSFORMS.keys()}" - ) + f"one of the following sources: {self.READ_TRANSFORMS.keys()}") expansion_service = _resolve_expansion_service( source, identifier, expansion_service) From a0393d490e2222aaadc8527b52bd91c8f3ffd5f6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 12 Jun 2024 15:02:15 -0400 Subject: [PATCH 04/17] try with real example --- sdks/java/io/expansion-service/build.gradle | 4 --- sdks/java/io/iceberg/build.gradle | 9 ++--- sdks/python/apache_beam/transforms/managed.py | 33 ++++++++++++++----- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 2309e94f2e15e..5163d2ac1a9c6 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -46,10 +46,6 @@ dependencies { implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 - // Needed by Iceberg I/O users that use GCS for the warehouse location. - implementation library.java.bigdataoss_gcs_connector - permitUnusedDeclared library.java.bigdataoss_gcs_connector - runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 7965cde86e7d9..9775e6fe45933 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -54,11 +54,12 @@ dependencies { implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common + // Needed when using a hadoop catalog type + runtimeOnly library.java.hadoop_client + // Needed when using GCS as a warehouse location + runtimeOnly library.java.bigdataoss_gcsio + runtimeOnly library.java.bigdataoss_gcs_connector - testImplementation library.java.hadoop_client - testImplementation library.java.bigdataoss_gcsio - testImplementation library.java.bigdataoss_gcs_connector - testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index b2f56c4a31dcb..d0cefc6348066 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -1,5 +1,6 @@ from typing import Any from typing import Dict +from typing import Optional import yaml @@ -9,8 +10,14 @@ ICEBERG = "iceberg" KAFKA = "kafka" +BIGQUERY = "bigquery" _MANAGED_IDENTIFIER = "beam:transform:managed:v1" -_GRADLE_TARGETS = {"sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG]} +_GRADLE_TARGETS = { + "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], + "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [ + BIGQUERY + ] +} __all__ = ["ICEBERG", "KAFKA", "Read", "Write"] @@ -22,8 +29,8 @@ class _ManagedTransform(PTransform): def __init__( self, underlying_identifier: str, - config: Dict[str, Any] = None, - config_url: str = None, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, expansion_service=None): super().__init__() self._underlying_identifier = underlying_identifier @@ -50,9 +57,10 @@ class Read(_ManagedTransform): def __init__( self, source: str, - config: Dict[str, Any] = None, - config_url: str = None, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, expansion_service=None): + self._source = source identifier = self.READ_TRANSFORMS.get(source.lower()) if not identifier: raise ValueError( @@ -63,6 +71,9 @@ def __init__( source, identifier, expansion_service) super().__init__(identifier, config, config_url, expansion_service) + def default_label(self) -> str: + return "Managed Read(%s)" % self._source.upper() + class Write(_ManagedTransform): WRITE_TRANSFORMS = { @@ -73,9 +84,10 @@ class Write(_ManagedTransform): def __init__( self, sink: str, - config: Dict[str, Any] = None, - config_url: str = None, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, expansion_service=None): + self._sink = sink identifier = self.WRITE_TRANSFORMS.get(sink.lower()) if not identifier: raise ValueError( @@ -86,6 +98,9 @@ def __init__( sink, identifier, expansion_service) super().__init__(identifier, config, config_url, expansion_service) + def default_label(self) -> str: + return "Managed Write(%s)" % self._sink.upper() + def _resolve_expansion_service( transform_name: str, identifier: str, expansion_service): @@ -93,8 +108,8 @@ def _resolve_expansion_service( return expansion_service default_target = None - for gradle_target in _GRADLE_TARGETS: - if transform_name in gradle_target: + for gradle_target, transforms in _GRADLE_TARGETS.items(): + if transform_name.lower() in transforms: default_target = gradle_target break if not default_target: From 4f8160b6b1a43488a75a5f6ebb914d212c0e3210 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 12 Jun 2024 15:38:03 -0400 Subject: [PATCH 05/17] cleanup --- sdks/python/apache_beam/transforms/managed.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index d0cefc6348066..f4c0323996d59 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -19,10 +19,7 @@ ] } -__all__ = ["ICEBERG", "KAFKA", "Read", "Write"] - - -# type: ignore[assignment] +__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] class _ManagedTransform(PTransform): @@ -51,7 +48,8 @@ def expand(self, input): class Read(_ManagedTransform): READ_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", - KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1" + KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1", + BIGQUERY: "beam:schematransform:org.apache.beam:bigquery_storage_read:v1" } def __init__( @@ -78,7 +76,8 @@ def default_label(self) -> str: class Write(_ManagedTransform): WRITE_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", - KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1" + KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1", + BIGQUERY: "beam:schematransform:org.apache.beam:bigquery_storage_write:v2" } def __init__( From a3067dc02c1294331ed27ec25eab87435434e235 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 17 Jun 2024 16:14:17 -0400 Subject: [PATCH 06/17] add documentation --- sdks/python/apache_beam/transforms/managed.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index f4c0323996d59..d17ddb41c8a8a 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -1,3 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Managed Transforms. + +This module builds and instantiates turnkey transforms that can be managed by +the underlying runner. + +Using Managed Transforms +================= +Managed transforms have a defined configuration and can be built using an +inline :class:`dict` like so:: + + results = p | beam.managed.Read( + beam.managed.ICEBERG, + config={"param_1": "foo", + "param_2": "bar"}) + +A YAML configuration file can also be used to build a Managed transform. Say we +have the following `config.yaml` file:: + + param_1: "foo" + param_2: "bar" + +Simply provide the location to the file like so:: + + input_rows = p | beam.Create(...) + input_rows | beam.managed.Write( + beam.managed.KAFKA, + config_url="path/to/config.yaml") + +Available transforms +============= +Available transforms are: + +- **Kafka** +- **Iceberg** +- **BigQuery** + +**Note:** inputs and outputs need to be PCollections of Beam +:py:class:`apache_beam.pvalue.Row` elements. + +**Note:** This Managed API uses Java's ManagedSchemaTransform under the hood. +""" + from typing import Any from typing import Dict from typing import Optional From 9aaa02d049419bb990e6265b11bf2433a884f0c3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 17 Jun 2024 16:46:17 -0400 Subject: [PATCH 07/17] fix doc --- sdks/python/apache_beam/transforms/managed.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index d17ddb41c8a8a..efa7b4e2a7192 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -21,7 +21,7 @@ the underlying runner. Using Managed Transforms -================= +======================== Managed transforms have a defined configuration and can be built using an inline :class:`dict` like so:: @@ -44,7 +44,7 @@ config_url="path/to/config.yaml") Available transforms -============= +==================== Available transforms are: - **Kafka** @@ -105,6 +105,7 @@ def expand(self, input): class Read(_ManagedTransform): + """Read using Managed Transforms""" READ_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1", @@ -133,6 +134,7 @@ def default_label(self) -> str: class Write(_ManagedTransform): + """Write using Managed Transforms""" WRITE_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1", From 0296f60cf204975a53d8052b1aa3048fb320f99d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 17 Jun 2024 20:20:23 -0400 Subject: [PATCH 08/17] add pyyaml dependency --- sdks/python/setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 3a8856583153e..23d5f22b05f68 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -388,6 +388,7 @@ def get_portability_package_data(): 'requests>=2.24.0,<3.0.0,!=2.32.*', 'typing-extensions>=3.7.0', 'zstandard>=0.18.0,<1', + 'pyyaml>=3.12,<7.0.0', # Dynamic dependencies must be specified in a separate list, otherwise # Dependabot won't be able to parse the main list. Any dynamic # dependencies will not receive updates from Dependabot. @@ -414,7 +415,6 @@ def get_portability_package_data(): 'pandas<2.2.0', 'parameterized>=0.7.1,<0.10.0', 'pyhamcrest>=1.9,!=1.10.0,<3.0.0', - 'pyyaml>=3.12,<7.0.0', 'requests_mock>=1.7,<2.0', 'tenacity>=8.0.0,<9', 'pytest>=7.1.2,<8.0', @@ -513,7 +513,6 @@ def get_portability_package_data(): 'yaml': [ 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.1', - 'pyyaml>=3.12,<7.0.0', 'virtualenv-clone>=0.5,<1.0', ] + dataframe_dependency }, From f705c3e2e4679c962fbe710e46c6d5b5206ab8b3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Jul 2024 14:02:09 -0400 Subject: [PATCH 09/17] cleanup --- sdks/java/io/iceberg/build.gradle | 5 ----- sdks/python/apache_beam/transforms/managed.py | 14 ++++---------- 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 9775e6fe45933..fda8931f663a8 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -54,11 +54,6 @@ dependencies { implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common - // Needed when using a hadoop catalog type - runtimeOnly library.java.hadoop_client - // Needed when using GCS as a warehouse location - runtimeOnly library.java.bigdataoss_gcsio - runtimeOnly library.java.bigdataoss_gcs_connector testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index efa7b4e2a7192..b45ae60738d7e 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -49,7 +49,6 @@ - **Kafka** - **Iceberg** -- **BigQuery** **Note:** inputs and outputs need to be PCollections of Beam :py:class:`apache_beam.pvalue.Row` elements. @@ -69,19 +68,16 @@ ICEBERG = "iceberg" KAFKA = "kafka" -BIGQUERY = "bigquery" _MANAGED_IDENTIFIER = "beam:transform:managed:v1" _GRADLE_TARGETS = { "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], - "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [ - BIGQUERY - ] } -__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] +__all__ = ["ICEBERG", "KAFKA", "Read", "Write"] class _ManagedTransform(PTransform): + """Base class for Managed Transforms.""" def __init__( self, underlying_identifier: str, @@ -109,7 +105,6 @@ class Read(_ManagedTransform): READ_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1", - BIGQUERY: "beam:schematransform:org.apache.beam:bigquery_storage_read:v1" } def __init__( @@ -138,7 +133,6 @@ class Write(_ManagedTransform): WRITE_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1", - BIGQUERY: "beam:schematransform:org.apache.beam:bigquery_storage_write:v2" } def __init__( @@ -151,8 +145,8 @@ def __init__( identifier = self.WRITE_TRANSFORMS.get(sink.lower()) if not identifier: raise ValueError( - f"An unsupported source was specified: '{sink}'. Please specify " - f"one of the following sources: {self.WRITE_TRANSFORMS.keys()}") + f"An unsupported sink was specified: '{sink}'. Please specify " + f"one of the following sinks: {self.WRITE_TRANSFORMS.keys()}") expansion_service = _resolve_expansion_service( sink, identifier, expansion_service) From f819f0a867e28f24e45bee3bad686a3b8fc785d7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Jul 2024 14:04:13 -0400 Subject: [PATCH 10/17] return deps --- sdks/java/io/iceberg/build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index fda8931f663a8..7965cde86e7d9 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -55,6 +55,10 @@ dependencies { implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common + testImplementation library.java.hadoop_client + testImplementation library.java.bigdataoss_gcsio + testImplementation library.java.bigdataoss_gcs_connector + testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") From 5f675b85cc10f528ad6bf34a0170452db2010705 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Jul 2024 14:04:50 -0400 Subject: [PATCH 11/17] return deps --- sdks/java/io/expansion-service/build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 5163d2ac1a9c6..2309e94f2e15e 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -46,6 +46,10 @@ dependencies { implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 + // Needed by Iceberg I/O users that use GCS for the warehouse location. + implementation library.java.bigdataoss_gcs_connector + permitUnusedDeclared library.java.bigdataoss_gcs_connector + runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 } From fdccb41b33943e8253bddfd76f844795031f5602 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Jul 2024 16:34:20 -0400 Subject: [PATCH 12/17] fix doc --- sdks/python/apache_beam/transforms/managed.py | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index b45ae60738d7e..c17ee75c4d0ea 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -76,31 +76,7 @@ __all__ = ["ICEBERG", "KAFKA", "Read", "Write"] -class _ManagedTransform(PTransform): - """Base class for Managed Transforms.""" - def __init__( - self, - underlying_identifier: str, - config: Optional[Dict[str, Any]] = None, - config_url: Optional[str] = None, - expansion_service=None): - super().__init__() - self._underlying_identifier = underlying_identifier - self._yaml_config = yaml.dump(config) - self._config_url = config_url - self._expansion_service = expansion_service - - def expand(self, input): - return input | SchemaAwareExternalTransform( - identifier=_MANAGED_IDENTIFIER, - expansion_service=self._expansion_service, - rearrange_based_on_discovery=True, - transform_identifier=self._underlying_identifier, - config=self._yaml_config, - config_url=self._config_url) - - -class Read(_ManagedTransform): +class Read(PTransform): """Read using Managed Transforms""" READ_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", @@ -113,6 +89,7 @@ def __init__( config: Optional[Dict[str, Any]] = None, config_url: Optional[str] = None, expansion_service=None): + super().__init__() self._source = source identifier = self.READ_TRANSFORMS.get(source.lower()) if not identifier: @@ -120,15 +97,26 @@ def __init__( f"An unsupported source was specified: '{source}'. Please specify " f"one of the following sources: {self.READ_TRANSFORMS.keys()}") - expansion_service = _resolve_expansion_service( + self._expansion_service = _resolve_expansion_service( source, identifier, expansion_service) - super().__init__(identifier, config, config_url, expansion_service) + self._underlying_identifier = identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) def default_label(self) -> str: return "Managed Read(%s)" % self._source.upper() -class Write(_ManagedTransform): +class Write(PTransform): """Write using Managed Transforms""" WRITE_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", @@ -141,6 +129,7 @@ def __init__( config: Optional[Dict[str, Any]] = None, config_url: Optional[str] = None, expansion_service=None): + super().__init__() self._sink = sink identifier = self.WRITE_TRANSFORMS.get(sink.lower()) if not identifier: @@ -148,9 +137,20 @@ def __init__( f"An unsupported sink was specified: '{sink}'. Please specify " f"one of the following sinks: {self.WRITE_TRANSFORMS.keys()}") - expansion_service = _resolve_expansion_service( + self._expansion_service = _resolve_expansion_service( sink, identifier, expansion_service) - super().__init__(identifier, config, config_url, expansion_service) + self._underlying_identifier = identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) def default_label(self) -> str: return "Managed Write(%s)" % self._sink.upper() From a6d961a36cc0887967cc6aedb7ebba8337b4e6d2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 18 Jul 2024 10:45:45 -0400 Subject: [PATCH 13/17] address some comments --- sdks/python/apache_beam/transforms/managed.py | 42 +++++++++++-------- .../apache_beam/transforms/managed_test.py | 0 2 files changed, 24 insertions(+), 18 deletions(-) create mode 100644 sdks/python/apache_beam/transforms/managed_test.py diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index c17ee75c4d0ea..bc8f4ff680f10 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -18,23 +18,28 @@ """Managed Transforms. This module builds and instantiates turnkey transforms that can be managed by -the underlying runner. +the underlying runner. This means the runner may upgrade the transform to a more +optimal/updated version without requiring the user to do anything. Using Managed Transforms ======================== -Managed transforms have a defined configuration and can be built using an +Managed turnkey transforms have a defined configuration and can be built using an inline :class:`dict` like so:: results = p | beam.managed.Read( beam.managed.ICEBERG, - config={"param_1": "foo", - "param_2": "bar"}) + config={"table": "foo", + "catalog_name": "bar", + "catalog_properties": { + "prop1": "value1", + "prop2": "value2"}}) A YAML configuration file can also be used to build a Managed transform. Say we have the following `config.yaml` file:: - param_1: "foo" - param_2: "bar" + topic: "foo" + bootstrap_servers: "localhost:1234" + format: "AVRO" Simply provide the location to the file like so:: @@ -47,13 +52,14 @@ ==================== Available transforms are: -- **Kafka** -- **Iceberg** +- **Kafka Read and Write** +- **Iceberg Read and Write** -**Note:** inputs and outputs need to be PCollections of Beam +**Note:** inputs and outputs need to be PCollection(s) of Beam :py:class:`apache_beam.pvalue.Row` elements. -**Note:** This Managed API uses Java's ManagedSchemaTransform under the hood. +**Note:** Today, all managed transforms are essentially cross-language +transforms, and Java's ManagedSchemaTransform is used under the hood. """ from typing import Any @@ -69,7 +75,7 @@ ICEBERG = "iceberg" KAFKA = "kafka" _MANAGED_IDENTIFIER = "beam:transform:managed:v1" -_GRADLE_TARGETS = { +_EXPANSION_SERVICE_JAR_TARGETS = { "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], } @@ -78,7 +84,7 @@ class Read(PTransform): """Read using Managed Transforms""" - READ_TRANSFORMS = { + _READ_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1", } @@ -91,11 +97,11 @@ def __init__( expansion_service=None): super().__init__() self._source = source - identifier = self.READ_TRANSFORMS.get(source.lower()) + identifier = self._READ_TRANSFORMS.get(source.lower()) if not identifier: raise ValueError( f"An unsupported source was specified: '{source}'. Please specify " - f"one of the following sources: {self.READ_TRANSFORMS.keys()}") + f"one of the following sources: {list(self._READ_TRANSFORMS.keys())}") self._expansion_service = _resolve_expansion_service( source, identifier, expansion_service) @@ -118,7 +124,7 @@ def default_label(self) -> str: class Write(PTransform): """Write using Managed Transforms""" - WRITE_TRANSFORMS = { + _WRITE_TRANSFORMS = { ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1", } @@ -131,11 +137,11 @@ def __init__( expansion_service=None): super().__init__() self._sink = sink - identifier = self.WRITE_TRANSFORMS.get(sink.lower()) + identifier = self._WRITE_TRANSFORMS.get(sink.lower()) if not identifier: raise ValueError( f"An unsupported sink was specified: '{sink}'. Please specify " - f"one of the following sinks: {self.WRITE_TRANSFORMS.keys()}") + f"one of the following sinks: {list(self._WRITE_TRANSFORMS.keys())}") self._expansion_service = _resolve_expansion_service( sink, identifier, expansion_service) @@ -162,7 +168,7 @@ def _resolve_expansion_service( return expansion_service default_target = None - for gradle_target, transforms in _GRADLE_TARGETS.items(): + for gradle_target, transforms in _EXPANSION_SERVICE_JAR_TARGETS.items(): if transform_name.lower() in transforms: default_target = gradle_target break diff --git a/sdks/python/apache_beam/transforms/managed_test.py b/sdks/python/apache_beam/transforms/managed_test.py new file mode 100644 index 0000000000000..e69de29bb2d1d From 2794c4aa5d87b9f03c105b9c225b59a98b3a4559 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 Aug 2024 14:07:20 -0400 Subject: [PATCH 14/17] doc updates --- sdks/python/apache_beam/transforms/managed.py | 10 ++++++---- sdks/standard_expansion_services.yaml | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index bc8f4ff680f10..201abe7e3f0e9 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -18,13 +18,15 @@ """Managed Transforms. This module builds and instantiates turnkey transforms that can be managed by -the underlying runner. This means the runner may upgrade the transform to a more -optimal/updated version without requiring the user to do anything. +the underlying runner. This means the runner can upgrade the transform to a +more optimal/updated version without requiring the user to do anything. It may +also replace the transform with something entirely different if it chooses to. +By default, however, the specified transform will remain unchanged. Using Managed Transforms ======================== -Managed turnkey transforms have a defined configuration and can be built using an -inline :class:`dict` like so:: +Managed turnkey transforms have a defined configuration and can be built using +an inline :class:`dict` like so:: results = p | beam.managed.Read( beam.managed.ICEBERG, diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index 31a1a6343aed5..bf0eb6156f47f 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -44,7 +44,7 @@ # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py - 'beam:schematransform:org.apache.beam:kafka_write:v1' - 'beam:schematransform:org.apache.beam:kafka_read:v1' - # Not ready to generate + # Available through apache_beam.transforms.managed.[Read/Write] - 'beam:schematransform:org.apache.beam:iceberg_write:v1' - 'beam:schematransform:org.apache.beam:iceberg_read:v1' From 540705e20d290e9de2198d31ae1e7667aeedc644 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 Aug 2024 17:04:21 -0400 Subject: [PATCH 15/17] define managed transform URNs in proto --- .../pipeline/v1/external_transforms.proto | 18 ++++++++++++++++++ .../IcebergReadSchemaTransformProvider.java | 6 ++++-- .../IcebergWriteSchemaTransformProvider.java | 6 ++++-- .../KafkaReadSchemaTransformProvider.java | 4 +++- .../KafkaWriteSchemaTransformProvider.java | 5 ++++- .../org/apache/beam/sdk/managed/Managed.java | 11 +++++++---- .../sdk/managed/ManagedTransformConstants.java | 13 +++++-------- .../apache_beam/portability/common_urns.py | 1 + sdks/python/apache_beam/transforms/managed.py | 15 +++++++++++---- 9 files changed, 57 insertions(+), 22 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 429371e110556..e7990e2f10853 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -59,6 +59,24 @@ message ExpansionMethods { } } +// Defines the URNs for managed transforms. +message ManagedTransforms { + enum Urns { + ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_write:v1"]; + KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_read:v1"]; + KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_write:v1"]; + BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"]; + BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"]; + } +} + // A configuration payload for an external transform. // Used to define a Java transform that can be directly instantiated by a Java // expansion service. diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index ef535353efd01..05db81e84bcf3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -17,14 +17,16 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config; -import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.SchemaRegistry; @@ -59,7 +61,7 @@ public List outputCollectionNames() { @Override public String identifier() { - return ManagedTransformConstants.ICEBERG_READ; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ); } @DefaultSchema(AutoValueSchema.class) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index b3de7a88c541d..4b9393955d5f4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -17,14 +17,16 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config; -import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; @@ -80,7 +82,7 @@ public List outputCollectionNames() { @Override public String identifier() { - return ManagedTransformConstants.ICEBERG_WRITE; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE); } @DefaultSchema(AutoValueSchema.class) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index b2eeb1a54d1da..912ab8fc9b631 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; import java.io.FileOutputStream; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; @@ -103,7 +105,7 @@ public Row apply(byte[] input) { @Override public String identifier() { - return "beam:schematransform:org.apache.beam:kafka_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 09b338492b47f..d6f46b11cb7de 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.Serializable; @@ -26,6 +28,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.metrics.Counter; @@ -249,7 +252,7 @@ public byte[] apply(Row input) { @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:kafka_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE); } @Override diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 911e25cdda143..8477726686ee1 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -87,13 +90,13 @@ public class Managed { // Supported SchemaTransforms public static final Map READ_TRANSFORMS = ImmutableMap.builder() - .put(ICEBERG, ManagedTransformConstants.ICEBERG_READ) - .put(KAFKA, ManagedTransformConstants.KAFKA_READ) + .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) + .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder() - .put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE) - .put(KAFKA, ManagedTransformConstants.KAFKA_WRITE) + .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE)) + .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) .build(); /** diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 51d0b67b4b89b..4cf752747be5a 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import java.util.Map; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** @@ -41,12 +44,6 @@ public class ManagedTransformConstants { // Standard input PCollection tag public static final String INPUT = "input"; - public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1"; - public static final String ICEBERG_WRITE = - "beam:schematransform:org.apache.beam:iceberg_write:v1"; - public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1"; - public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1"; - private static final Map KAFKA_READ_MAPPINGS = ImmutableMap.builder().put("data_format", "format").build(); @@ -55,7 +52,7 @@ public class ManagedTransformConstants { public static final Map> MAPPINGS = ImmutableMap.>builder() - .put(KAFKA_READ, KAFKA_READ_MAPPINGS) - .put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS) + .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS) + .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS) .build(); } diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 4effc91c3d402..74d9a39bb052c 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -38,6 +38,7 @@ StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods +ManagedTransforms = external_transforms_pb2_urns.ManagedTransforms MonitoringInfo = metrics_pb2_urns.MonitoringInfo MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 201abe7e3f0e9..e7fec00f6b765 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -73,12 +73,17 @@ from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import SchemaAwareExternalTransform from apache_beam.transforms.ptransform import PTransform +from apache_beam.portability.common_urns import ManagedTransforms ICEBERG = "iceberg" KAFKA = "kafka" +BIGQUERY = "bigquery" _MANAGED_IDENTIFIER = "beam:transform:managed:v1" _EXPANSION_SERVICE_JAR_TARGETS = { "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], + "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [ + BIGQUERY + ], } __all__ = ["ICEBERG", "KAFKA", "Read", "Write"] @@ -87,8 +92,9 @@ class Read(PTransform): """Read using Managed Transforms""" _READ_TRANSFORMS = { - ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1", - KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1", + ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn, + KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn, } def __init__( @@ -127,8 +133,9 @@ def default_label(self) -> str: class Write(PTransform): """Write using Managed Transforms""" _WRITE_TRANSFORMS = { - ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1", - KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1", + ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn, + KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn, } def __init__( From efd4c88192df0a2f02fffe126a2c51fd58f2d07f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 Aug 2024 18:08:21 -0400 Subject: [PATCH 16/17] fix URN --- .../beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 912ab8fc9b631..c22eeef71ee85 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -105,7 +105,7 @@ public Row apply(byte[] input) { @Override public String identifier() { - return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE); + return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ); } @Override From 68dce2632d6613ff54b2254be53f2dc96b4fc2b3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 6 Aug 2024 18:56:35 -0400 Subject: [PATCH 17/17] remove managed dependency --- sdks/java/io/iceberg/build.gradle | 1 - sdks/python/apache_beam/transforms/managed.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 7965cde86e7d9..ba42ce5291dab 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -44,7 +44,6 @@ def orc_version = "1.9.2" dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:managed") implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index e7fec00f6b765..71691f05027f5 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -70,10 +70,10 @@ import yaml +from apache_beam.portability.common_urns import ManagedTransforms from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import SchemaAwareExternalTransform from apache_beam.transforms.ptransform import PTransform -from apache_beam.portability.common_urns import ManagedTransforms ICEBERG = "iceberg" KAFKA = "kafka"