Skip to content

Commit

Permalink
Add a pipeline update compatibility version option. (apache#29140)
Browse files Browse the repository at this point in the history
This can be used to migrate to best practices and good default with new versions of Beam
while still allowing users of older SDKs to update their SDK version without breaking
update compatibility.

Also add the mechanisms to propagate this option for cross-language transforms.
  • Loading branch information
robertwb authored and Naireen committed Jan 3, 2024
1 parent 9864b59 commit d185433
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ option go_package = "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagem
option java_package = "org.apache.beam.model.expansion.v1";
option java_outer_classname = "ExpansionApi";

import "google/protobuf/struct.proto";
import "org/apache/beam/model/pipeline/v1/beam_runner_api.proto";
import "org/apache/beam/model/pipeline/v1/schema.proto";

Expand Down Expand Up @@ -57,6 +58,10 @@ message ExpansionRequest {
// A set of requirements that must be used by the expansion service to
// interpret the components provided with this request.
repeated string requirements = 5;

// (Optional) A set of Pipeline Options that should be used
// when expanding this transform.
google.protobuf.Struct pipeline_options = 6;
}

message ExpansionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public OutputT expand(InputT input) {
.setTransform(ptransformBuilder.build())
.setNamespace(getNamespace())
.build();
requestBuilder.setPipelineOptions(PipelineOptionsTranslation.toProto(p.getOptions()));

ExpansionApi.ExpansionResponse response =
clientFactory.getExpansionServiceClient(endpoint).expand(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.options;

import org.checkerframework.checker.nullness.qual.Nullable;

/** Options used to configure streaming. */
public interface StreamingOptions extends ApplicationNameOptions, PipelineOptions {
/**
Expand All @@ -30,4 +32,13 @@ public interface StreamingOptions extends ApplicationNameOptions, PipelineOption
boolean isStreaming();

void setStreaming(boolean value);

@Description(
"If set, attempts to produce a pipeline compatible with this prior version of the Beam SDK."
+ " This string should be interpreted and compared per https://semver.org/."
+ " See, for example, https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline.")
@Nullable
String getUpdateCompatibilityVersion();

void setUpdateCompatibilityVersion(@Nullable String updateCompatibilityVersion);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
Expand All @@ -67,6 +68,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand Down Expand Up @@ -584,7 +586,8 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
request.getTransform().getSpec().getUrn());
LOG.debug("Full transform: {}", request.getTransform());
Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
Pipeline pipeline = createPipeline();
Pipeline pipeline =
createPipeline(PipelineOptionsTranslation.fromProto(request.getPipelineOptions()));
boolean isUseDeprecatedRead =
ExperimentalOptions.hasExperiment(pipelineOptions, "use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
Expand Down Expand Up @@ -707,7 +710,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
.build();
}

protected Pipeline createPipeline() {
protected Pipeline createPipeline(PipelineOptions requestOptions) {
// TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation
PipelineOptions effectiveOpts = PipelineOptionsFactory.create();
PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class);
Expand All @@ -728,6 +731,14 @@ protected Pipeline createPipeline() {
.as(ExpansionServiceOptions.class)
.setExpansionServiceConfig(
pipelineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig());
// TODO(https://github.com/apache/beam/issues/20090): Figure out the correct subset of options
// to propagate.
if (requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion() != null) {
effectiveOpts
.as(StreamingOptions.class)
.setUpdateCompatibilityVersion(
requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion());
}
return Pipeline.create(effectiveOpts);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.core.Is.is;

import java.util.Arrays;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.junit.Test;

Expand Down Expand Up @@ -58,7 +59,7 @@ public void testPassingPipelineArguments() {
ExpansionService service = new ExpansionService(args);
assertThat(
service
.createPipeline()
.createPipeline(PipelineOptionsFactory.create())
.getOptions()
.as(PortablePipelineOptions.class)
.getDefaultEnvironmentType(),
Expand All @@ -70,7 +71,11 @@ public void testNonEmptyFilesToStage() {
String[] args = {"--filesToStage=nonExistent1.jar,nonExistent2.jar"};
ExpansionService service = new ExpansionService(args);
assertThat(
service.createPipeline().getOptions().as(PortablePipelineOptions.class).getFilesToStage(),
service
.createPipeline(PipelineOptionsFactory.create())
.getOptions()
.as(PortablePipelineOptions.class)
.getFilesToStage(),
equalTo(Arrays.asList("nonExistent1.jar", "nonExistent2.jar")));
}

Expand All @@ -79,7 +84,11 @@ public void testEmptyFilesToStageIsOK() {
String[] args = {"--filesToStage="};
ExpansionService service = new ExpansionService(args);
assertThat(
service.createPipeline().getOptions().as(PortablePipelineOptions.class).getFilesToStage(),
service
.createPipeline(PipelineOptionsFactory.create())
.getOptions()
.as(PortablePipelineOptions.class)
.getFilesToStage(),
equalTo(Arrays.asList("")));
}
}
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms.display import HasDisplayData
from apache_beam.utils import proto_utils

__all__ = [
'PipelineOptions',
Expand Down Expand Up @@ -390,6 +391,36 @@ def get_all_options(

return result

def to_runner_api(self):
def to_struct_value(o):
if isinstance(o, (bool, int, str)):
return o
elif isinstance(o, (tuple, list)):
return [to_struct_value(e) for e in o]
elif isinstance(o, dict):
return {str(k): to_struct_value(v) for k, v in o.items()}
else:
return str(o) # Best effort.

return proto_utils.pack_Struct(
**{
f'beam:option:{k}:v1': to_struct_value(v)
for (k, v) in self.get_all_options(
drop_default=True, retain_unknown_options=True).items()
if v is not None
})

@classmethod
def from_runner_api(cls, proto_options):
def from_urn(key):
assert key.startswith('beam:option:')
assert key.endswith(':v1')
return key[12:-3]

return cls(
**{from_urn(key): value
for (key, value) in proto_options.items()})

def display_data(self):
return self.get_all_options(drop_default=True, retain_unknown_options=True)

Expand Down Expand Up @@ -528,6 +559,18 @@ def _add_argparse_args(cls, parser):
'exception if a transform is created with a non-unique label.')


class StreamingOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--update_compatibility_version',
default=None,
help='Attempt to produce a pipeline compatible with the given prior '
'version of the Beam SDK. '
'See for example, https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')


class CrossLanguageOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,9 @@ def test_dir(self):
'slices',
'style',
'view_as',
'display_data'
'display_data',
'from_runner_api',
'to_runner_api',
},
{
attr
Expand All @@ -884,7 +886,9 @@ def test_dir(self):
'get_all_options',
'style',
'view_as',
'display_data'
'display_data',
'from_runner_api',
'to_runner_api',
},
{
attr
Expand Down
15 changes: 14 additions & 1 deletion sdks/python/apache_beam/runners/portability/expansion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
"""
# pytype: skip-file

import copy
import traceback

from apache_beam import pipeline as beam_pipeline
from apache_beam.options import pipeline_options
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_expansion_api_pb2
Expand Down Expand Up @@ -49,7 +51,18 @@ def __init__(self, options=None, loopback_address=None):

def Expand(self, request, context=None):
try:
pipeline = beam_pipeline.Pipeline(options=self._options)
options = copy.deepcopy(self._options)
request_options = pipeline_options.PipelineOptions.from_runner_api(
request.pipeline_options)
# TODO(https://github.com/apache/beam/issues/20090): Figure out the
# correct subset of options to apply to expansion.
if request_options.view_as(
pipeline_options.StreamingOptions).update_compatibility_version:
options.view_as(
pipeline_options.StreamingOptions
).update_compatibility_version = request_options.view_as(
pipeline_options.StreamingOptions).update_compatibility_version
pipeline = beam_pipeline.Pipeline(options=options)

def with_pipeline(component, pcoll_id=None):
component.pipeline = pipeline
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ def expand(self, pvalueish):
components=components,
namespace=self._external_namespace,
transform=transform_proto,
output_coder_requests=output_coders)
output_coder_requests=output_coders,
pipeline_options=pipeline._options.to_runner_api())

expansion_service = _maybe_use_transform_service(
self._expansion_service, pipeline.options)
Expand Down

0 comments on commit d185433

Please sign in to comment.