Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Managed Transforms API #31495

Merged
merged 31 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e126d9f
managed module
ahmedabu98 Jun 3, 2024
07e12cb
clean up
ahmedabu98 Jun 4, 2024
7c2ba98
lint
ahmedabu98 Jun 4, 2024
a0393d4
try with real example
ahmedabu98 Jun 12, 2024
4f8160b
cleanup
ahmedabu98 Jun 12, 2024
a3067dc
add documentation
ahmedabu98 Jun 17, 2024
9aaa02d
fix doc
ahmedabu98 Jun 17, 2024
0296f60
add pyyaml dependency
ahmedabu98 Jun 18, 2024
f705c3e
cleanup
ahmedabu98 Jul 15, 2024
f819f0a
return deps
ahmedabu98 Jul 15, 2024
5f675b8
return deps
ahmedabu98 Jul 15, 2024
fdccb41
fix doc
ahmedabu98 Jul 15, 2024
66b9b24
Merge branch 'master' of https://github.com/ahmedabu98/beam into pyth…
ahmedabu98 Jul 15, 2024
a6d961a
address some comments
ahmedabu98 Jul 18, 2024
2794c4a
doc updates
ahmedabu98 Aug 6, 2024
540705e
define managed transform URNs in proto
ahmedabu98 Aug 6, 2024
efd4c88
fix URN
ahmedabu98 Aug 6, 2024
68dce26
remove managed dependency
ahmedabu98 Aug 6, 2024
f643728
Merge branch 'master' of https://github.com/ahmedabu98/beam into pyth…
ahmedabu98 Oct 10, 2024
acc38d5
add managed iceberg integration test
ahmedabu98 Oct 10, 2024
98a1700
lint
ahmedabu98 Oct 10, 2024
ffda2d1
lint
ahmedabu98 Oct 10, 2024
42af6e9
dependency fix
ahmedabu98 Oct 10, 2024
d19921f
lint
ahmedabu98 Oct 10, 2024
ae2ce9e
dependency fix
ahmedabu98 Oct 10, 2024
c1d4eed
Merge branch 'master' of https://github.com/ahmedabu98/beam into pyth…
ahmedabu98 Oct 10, 2024
ec21b35
dependency fix
ahmedabu98 Oct 10, 2024
213db41
lint
ahmedabu98 Oct 10, 2024
fe3ddec
lint
ahmedabu98 Oct 11, 2024
288afc9
dependency fix
ahmedabu98 Oct 11, 2024
913a1d2
rename test file
ahmedabu98 Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,15 @@ ext.summary = "Expansion service serving several Java IOs"
dependencies {
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
implementation project(":sdks:java:managed")
permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
implementation project(":sdks:java:io:iceberg")
permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761
implementation project(":sdks:java:io:kafka")
permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761
implementation project(":sdks:java:io:kafka:upgrade")
permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761

// Needed by Iceberg I/O users that use GCS for the warehouse location.
implementation library.java.bigdataoss_gcs_connector
permitUnusedDeclared library.java.bigdataoss_gcs_connector

runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14
}
Expand Down
9 changes: 5 additions & 4 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ dependencies {
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
// Needed when using a hadoop catalog type
runtimeOnly library.java.hadoop_client
// Needed when using GCS as a warehouse location
runtimeOnly library.java.bigdataoss_gcsio
runtimeOnly library.java.bigdataoss_gcs_connector

testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
testImplementation library.java.bigdataoss_util_hadoop
testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.external import *
from apache_beam.transforms.managed import *
from apache_beam.transforms.ptransform import *
from apache_beam.transforms.stats import *
from apache_beam.transforms.timeutil import TimeDomain
Expand Down
179 changes: 179 additions & 0 deletions sdks/python/apache_beam/transforms/managed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Managed Transforms.

This module builds and instantiates turnkey transforms that can be managed by
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
the underlying runner.

Using Managed Transforms
========================
Managed transforms have a defined configuration and can be built using an
inline :class:`dict` like so::

results = p | beam.managed.Read(
beam.managed.ICEBERG,
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
config={"param_1": "foo",
"param_2": "bar"})

A YAML configuration file can also be used to build a Managed transform. Say we
have the following `config.yaml` file::

param_1: "foo"
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
param_2: "bar"

Simply provide the location to the file like so::

input_rows = p | beam.Create(...)
input_rows | beam.managed.Write(
beam.managed.KAFKA,
config_url="path/to/config.yaml")

Available transforms
====================
Available transforms are:

- **Kafka**
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
- **Iceberg**
- **BigQuery**

**Note:** inputs and outputs need to be PCollections of Beam
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
:py:class:`apache_beam.pvalue.Row` elements.

**Note:** This Managed API uses Java's ManagedSchemaTransform under the hood.
"""

from typing import Any
from typing import Dict
from typing import Optional

import yaml

from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import SchemaAwareExternalTransform
from apache_beam.transforms.ptransform import PTransform

ICEBERG = "iceberg"
KAFKA = "kafka"
BIGQUERY = "bigquery"
_MANAGED_IDENTIFIER = "beam:transform:managed:v1"
_GRADLE_TARGETS = {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
"sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG],
"sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [
BIGQUERY
]
}

__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]


class _ManagedTransform(PTransform):
def __init__(
self,
underlying_identifier: str,
config: Optional[Dict[str, Any]] = None,
config_url: Optional[str] = None,
expansion_service=None):
super().__init__()
self._underlying_identifier = underlying_identifier
self._yaml_config = yaml.dump(config)
self._config_url = config_url
self._expansion_service = expansion_service

def expand(self, input):
return input | SchemaAwareExternalTransform(
identifier=_MANAGED_IDENTIFIER,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
transform_identifier=self._underlying_identifier,
config=self._yaml_config,
config_url=self._config_url)


class Read(_ManagedTransform):
"""Read using Managed Transforms"""
READ_TRANSFORMS = {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
ICEBERG: "beam:schematransform:org.apache.beam:iceberg_read:v1",
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
KAFKA: "beam:schematransform:org.apache.beam:kafka_read:v1",
BIGQUERY: "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"
}

def __init__(
self,
source: str,
config: Optional[Dict[str, Any]] = None,
config_url: Optional[str] = None,
expansion_service=None):
self._source = source
identifier = self.READ_TRANSFORMS.get(source.lower())
if not identifier:
raise ValueError(
f"An unsupported source was specified: '{source}'. Please specify "
f"one of the following sources: {self.READ_TRANSFORMS.keys()}")

expansion_service = _resolve_expansion_service(
source, identifier, expansion_service)
super().__init__(identifier, config, config_url, expansion_service)

def default_label(self) -> str:
return "Managed Read(%s)" % self._source.upper()


class Write(_ManagedTransform):
"""Write using Managed Transforms"""
WRITE_TRANSFORMS = {
ICEBERG: "beam:schematransform:org.apache.beam:iceberg_write:v1",
KAFKA: "beam:schematransform:org.apache.beam:kafka_write:v1",
BIGQUERY: "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"
}

def __init__(
self,
sink: str,
config: Optional[Dict[str, Any]] = None,
config_url: Optional[str] = None,
expansion_service=None):
self._sink = sink
identifier = self.WRITE_TRANSFORMS.get(sink.lower())
if not identifier:
raise ValueError(
f"An unsupported source was specified: '{sink}'. Please specify "
f"one of the following sources: {self.WRITE_TRANSFORMS.keys()}")
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

expansion_service = _resolve_expansion_service(
sink, identifier, expansion_service)
super().__init__(identifier, config, config_url, expansion_service)

def default_label(self) -> str:
return "Managed Write(%s)" % self._sink.upper()


def _resolve_expansion_service(
transform_name: str, identifier: str, expansion_service):
if expansion_service:
return expansion_service

default_target = None
for gradle_target, transforms in _GRADLE_TARGETS.items():
if transform_name.lower() in transforms:
default_target = gradle_target
break
if not default_target:
raise ValueError(
"No expansion service was specified and could not find a "
f"default expansion service for {transform_name}: '{identifier}'.")
return BeamJarExpansionService(default_target)
3 changes: 1 addition & 2 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def get_portability_package_data():
'requests>=2.24.0,<3.0.0,!=2.32.*',
'typing-extensions>=3.7.0',
'zstandard>=0.18.0,<1',
'pyyaml>=3.12,<7.0.0',
# Dynamic dependencies must be specified in a separate list, otherwise
# Dependabot won't be able to parse the main list. Any dynamic
# dependencies will not receive updates from Dependabot.
Expand All @@ -414,7 +415,6 @@ def get_portability_package_data():
'pandas<2.2.0',
'parameterized>=0.7.1,<0.10.0',
'pyhamcrest>=1.9,!=1.10.0,<3.0.0',
'pyyaml>=3.12,<7.0.0',
'requests_mock>=1.7,<2.0',
'tenacity>=8.0.0,<9',
'pytest>=7.1.2,<8.0',
Expand Down Expand Up @@ -513,7 +513,6 @@ def get_portability_package_data():
'yaml': [
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.1',
'pyyaml>=3.12,<7.0.0',
'virtualenv-clone>=0.5,<1.0',
] + dataframe_dependency
},
Expand Down
Loading