From d18543350b1f0ad1f7bef740fed8e6b22a5c4304 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 7 Dec 2023 13:00:33 -0800 Subject: [PATCH] Add a pipeline update compatibility version option. (#29140) This can be used to migrate to best practices and good default with new versions of Beam while still allowing users of older SDKs to update their SDK version without breaking update compatibility. Also add the mechanisms to propagate this option for cross-language transforms. --- .../v1/beam_expansion_api.proto | 5 +++ .../runners/core/construction/External.java | 1 + .../beam/sdk/options/StreamingOptions.java | 11 +++++ .../expansion/service/ExpansionService.java | 15 ++++++- .../service/ExpansionServerTest.java | 15 +++++-- .../apache_beam/options/pipeline_options.py | 43 +++++++++++++++++++ sdks/python/apache_beam/pipeline_test.py | 8 +++- .../runners/portability/expansion_service.py | 15 ++++++- .../python/apache_beam/transforms/external.py | 3 +- 9 files changed, 107 insertions(+), 9 deletions(-) diff --git a/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto b/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto index e70d978c56aca..a4736f8b4938f 100644 --- a/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto +++ b/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto @@ -29,6 +29,7 @@ option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagem option java_package = "org.apache.beam.model.expansion.v1"; option java_outer_classname = "ExpansionApi"; +import "google/protobuf/struct.proto"; import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto"; import "org/apache/beam/model/pipeline/v1/schema.proto"; @@ -57,6 +58,10 @@ message ExpansionRequest { // A set of requirements that must be used by the expansion service to // interpret the components provided with this request. repeated string requirements = 5; + + // (Optional) A set of Pipeline Options that should be used + // when expanding this transform. + google.protobuf.Struct pipeline_options = 6; } message ExpansionResponse { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java index 93a1ade474a5c..3951148d1a582 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java @@ -275,6 +275,7 @@ public OutputT expand(InputT input) { .setTransform(ptransformBuilder.build()) .setNamespace(getNamespace()) .build(); + requestBuilder.setPipelineOptions(PipelineOptionsTranslation.toProto(p.getOptions())); ExpansionApi.ExpansionResponse response = clientFactory.getExpansionServiceClient(endpoint).expand(request); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java index 3fc3c7d4c471c..e389ab89cf9c8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.options; +import org.checkerframework.checker.nullness.qual.Nullable; + /** Options used to configure streaming. */ public interface StreamingOptions extends ApplicationNameOptions, PipelineOptions { /** @@ -30,4 +32,13 @@ public interface StreamingOptions extends ApplicationNameOptions, PipelineOption boolean isStreaming(); void setStreaming(boolean value); + + @Description( + "If set, attempts to produce a pipeline compatible with this prior version of the Beam SDK." + + " This string should be interpreted and compared per https://semver.org/." + + " See, for example, https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline.") + @Nullable + String getUpdateCompatibilityVersion(); + + void setUpdateCompatibilityVersion(@Nullable String updateCompatibilityVersion); } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index ba60fb99d4d10..43690c6037010 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -50,6 +50,7 @@ import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.SdkComponents; @@ -67,6 +68,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; @@ -584,7 +586,8 @@ private Map loadRegisteredTransforms() { request.getTransform().getSpec().getUrn()); LOG.debug("Full transform: {}", request.getTransform()); Set existingTransformIds = request.getComponents().getTransformsMap().keySet(); - Pipeline pipeline = createPipeline(); + Pipeline pipeline = + createPipeline(PipelineOptionsTranslation.fromProto(request.getPipelineOptions())); boolean isUseDeprecatedRead = ExperimentalOptions.hasExperiment(pipelineOptions, "use_deprecated_read") || ExperimentalOptions.hasExperiment( @@ -707,7 +710,7 @@ private Map loadRegisteredTransforms() { .build(); } - protected Pipeline createPipeline() { + protected Pipeline createPipeline(PipelineOptions requestOptions) { // TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation PipelineOptions effectiveOpts = PipelineOptionsFactory.create(); PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class); @@ -728,6 +731,14 @@ protected Pipeline createPipeline() { .as(ExpansionServiceOptions.class) .setExpansionServiceConfig( pipelineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig()); + // TODO(https://github.com/apache/beam/issues/20090): Figure out the correct subset of options + // to propagate. + if (requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion() != null) { + effectiveOpts + .as(StreamingOptions.class) + .setUpdateCompatibilityVersion( + requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion()); + } return Pipeline.create(effectiveOpts); } diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java index 03938a073c4f7..5d0273964db49 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServerTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.core.Is.is; import java.util.Arrays; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.junit.Test; @@ -58,7 +59,7 @@ public void testPassingPipelineArguments() { ExpansionService service = new ExpansionService(args); assertThat( service - .createPipeline() + .createPipeline(PipelineOptionsFactory.create()) .getOptions() .as(PortablePipelineOptions.class) .getDefaultEnvironmentType(), @@ -70,7 +71,11 @@ public void testNonEmptyFilesToStage() { String[] args = {"--filesToStage=nonExistent1.jar,nonExistent2.jar"}; ExpansionService service = new ExpansionService(args); assertThat( - service.createPipeline().getOptions().as(PortablePipelineOptions.class).getFilesToStage(), + service + .createPipeline(PipelineOptionsFactory.create()) + .getOptions() + .as(PortablePipelineOptions.class) + .getFilesToStage(), equalTo(Arrays.asList("nonExistent1.jar", "nonExistent2.jar"))); } @@ -79,7 +84,11 @@ public void testEmptyFilesToStageIsOK() { String[] args = {"--filesToStage="}; ExpansionService service = new ExpansionService(args); assertThat( - service.createPipeline().getOptions().as(PortablePipelineOptions.class).getFilesToStage(), + service + .createPipeline(PipelineOptionsFactory.create()) + .getOptions() + .as(PortablePipelineOptions.class) + .getFilesToStage(), equalTo(Arrays.asList(""))); } } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 2dba789484441..b02aa7da7de1b 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -36,6 +36,7 @@ from apache_beam.options.value_provider import StaticValueProvider from apache_beam.options.value_provider import ValueProvider from apache_beam.transforms.display import HasDisplayData +from apache_beam.utils import proto_utils __all__ = [ 'PipelineOptions', @@ -390,6 +391,36 @@ def get_all_options( return result + def to_runner_api(self): + def to_struct_value(o): + if isinstance(o, (bool, int, str)): + return o + elif isinstance(o, (tuple, list)): + return [to_struct_value(e) for e in o] + elif isinstance(o, dict): + return {str(k): to_struct_value(v) for k, v in o.items()} + else: + return str(o) # Best effort. + + return proto_utils.pack_Struct( + **{ + f'beam:option:{k}:v1': to_struct_value(v) + for (k, v) in self.get_all_options( + drop_default=True, retain_unknown_options=True).items() + if v is not None + }) + + @classmethod + def from_runner_api(cls, proto_options): + def from_urn(key): + assert key.startswith('beam:option:') + assert key.endswith(':v1') + return key[12:-3] + + return cls( + **{from_urn(key): value + for (key, value) in proto_options.items()}) + def display_data(self): return self.get_all_options(drop_default=True, retain_unknown_options=True) @@ -528,6 +559,18 @@ def _add_argparse_args(cls, parser): 'exception if a transform is created with a non-unique label.') +class StreamingOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--update_compatibility_version', + default=None, + help='Attempt to produce a pipeline compatible with the given prior ' + 'version of the Beam SDK. ' + 'See for example, https://cloud.google.com/dataflow/docs/guides/' + 'updating-a-pipeline') + + class CrossLanguageOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 113d1a99990c0..38d8b1f03a11d 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -872,7 +872,9 @@ def test_dir(self): 'slices', 'style', 'view_as', - 'display_data' + 'display_data', + 'from_runner_api', + 'to_runner_api', }, { attr @@ -884,7 +886,9 @@ def test_dir(self): 'get_all_options', 'style', 'view_as', - 'display_data' + 'display_data', + 'from_runner_api', + 'to_runner_api', }, { attr diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index 50c793a0e8bd3..c7728098f30c5 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -19,9 +19,11 @@ """ # pytype: skip-file +import copy import traceback from apache_beam import pipeline as beam_pipeline +from apache_beam.options import pipeline_options from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_expansion_api_pb2 @@ -49,7 +51,18 @@ def __init__(self, options=None, loopback_address=None): def Expand(self, request, context=None): try: - pipeline = beam_pipeline.Pipeline(options=self._options) + options = copy.deepcopy(self._options) + request_options = pipeline_options.PipelineOptions.from_runner_api( + request.pipeline_options) + # TODO(https://github.com/apache/beam/issues/20090): Figure out the + # correct subset of options to apply to expansion. + if request_options.view_as( + pipeline_options.StreamingOptions).update_compatibility_version: + options.view_as( + pipeline_options.StreamingOptions + ).update_compatibility_version = request_options.view_as( + pipeline_options.StreamingOptions).update_compatibility_version + pipeline = beam_pipeline.Pipeline(options=options) def with_pipeline(component, pcoll_id=None): component.pipeline = pipeline diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 93de059ecaaa6..a69ecbaee2208 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -723,7 +723,8 @@ def expand(self, pvalueish): components=components, namespace=self._external_namespace, transform=transform_proto, - output_coder_requests=output_coders) + output_coder_requests=output_coders, + pipeline_options=pipeline._options.to_runner_api()) expansion_service = _maybe_use_transform_service( self._expansion_service, pipeline.options)