-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Python] Managed Transforms API #31495
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #31495 +/- ##
=========================================
Coverage 71.14% 71.14%
Complexity 3008 3008
=========================================
Files 1055 1055
Lines 133439 133439
Branches 3248 3248
=========================================
Hits 94929 94929
Misses 35382 35382
Partials 3128 3128 ☔ View full report in Codecov by Sentry. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
…on_managed_transforms
Successful Dataflow run (2024-07-15_13_54_11-2004515317250011573) with the following code: import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
options = PipelineOptions([
"--runner=DataflowRunner",
"--job_name=managed-iceberg-read-demo",
"--project=apache-beam-testing",
"--temp_location=gs://apache-beam-testing-ahmedabualsaud/tmp",
"--region=us-central1",
"--sdk_location=sdks/python/dist/apache_beam-2.59.0.dev0.tar.gz",
"--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest"
])
with beam.Pipeline(options=options) as p:
p | beam.managed.Read(
"iceberg",
config={
"table": "my_database.my_table",
"catalog_name": "ahmed_catalog",
"catalog_properties": {
"catalog-impl": "org.apache.iceberg.hadoop.HadoopCatalog",
"warehouse": "gs://ahmedabualsaud-apache-beam-testing"
}}) | "Log rows" >> beam.Map(_LOGGER.info) Note: I had to include the following dependencies (needed by Iceberg) in the expansion service jar:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments but Cham would have more context.
inline :class:`dict` like so:: | ||
|
||
results = p | beam.managed.Read( | ||
beam.managed.ICEBERG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about the boundaries for the 'managed' scope.
- Is it always a source/sink, or potentially any transform?
- is the transform being managed is named 'Read'? if so, why I wonder if beam.managed.ICEBERG should be part of a config instead of a separate argument here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1) It can be any transform.
(2) API wise, I think it's cleaner to separate these. Basically what the user is saying is, "I want to use the Managed Iceberg transform and use this config to configure it". The config can be available locally or remotely. Also, this make it easy to identify the primary function of a transform looking at a pipeline definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments but Cham would be have more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
inline :class:`dict` like so:: | ||
|
||
results = p | beam.managed.Read( | ||
beam.managed.ICEBERG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1) It can be any transform.
(2) API wise, I think it's cleaner to separate these. Basically what the user is saying is, "I want to use the Managed Iceberg transform and use this config to configure it". The config can be available locally or remotely. Also, this make it easy to identify the primary function of a transform looking at a pipeline definition.
self._yaml_config = yaml.dump(config) | ||
self._config_url = config_url | ||
|
||
def expand(self, input): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some unit-test coverage using simple pipelines ?
LGTM. We can merge after existing comments are addressed. |
Adding Python API for Managed transforms, similar to the Java API.