diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java index cd6ab7dd414a..de1717f0a45f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PipelineOptionsTranslation.java @@ -43,6 +43,9 @@ public class PipelineOptionsTranslation { new ObjectMapper() .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + public static final String PIPELINE_OPTIONS_URN_PREFIX = "beam:option:"; + public static final String PIPELINE_OPTIONS_URN_SUFFIX = ":v1"; + /** Converts the provided {@link PipelineOptions} to a {@link Struct}. */ public static Struct toProto(PipelineOptions options) { Struct.Builder builder = Struct.newBuilder(); @@ -65,9 +68,9 @@ public static Struct toProto(PipelineOptions options) { while (optionsEntries.hasNext()) { Map.Entry entry = optionsEntries.next(); optionsUsingUrns.put( - "beam:option:" + PIPELINE_OPTIONS_URN_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey()) - + ":v1", + + PIPELINE_OPTIONS_URN_SUFFIX, entry.getValue()); } @@ -92,7 +95,9 @@ public static PipelineOptions fromProto(Struct protoOptions) { mapWithoutUrns.put( CaseFormat.LOWER_UNDERSCORE.to( CaseFormat.LOWER_CAMEL, - optionKey.substring("beam:option:".length(), optionKey.length() - ":v1".length())), + optionKey.substring( + PIPELINE_OPTIONS_URN_PREFIX.length(), + optionKey.length() - PIPELINE_OPTIONS_URN_SUFFIX.length())), optionValue); } return MAPPER.readValue( diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 150fe9729573..9c5b5a0ad136 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; @@ -535,7 +534,7 @@ private static void invokeSetter(ConfigT config, @Nullable Object valu } private @MonotonicNonNull Map registeredTransforms; - private final PipelineOptions pipelineOptions; + private final PipelineOptions commandLineOptions; private final @Nullable String loopbackAddress; public ExpansionService() { @@ -551,7 +550,7 @@ public ExpansionService(PipelineOptions opts) { } public ExpansionService(PipelineOptions opts, @Nullable String loopbackAddress) { - this.pipelineOptions = opts; + this.commandLineOptions = opts; this.loopbackAddress = loopbackAddress; } @@ -587,12 +586,15 @@ private Map loadRegisteredTransforms() { request.getTransform().getSpec().getUrn()); LOG.debug("Full transform: {}", request.getTransform()); Set existingTransformIds = request.getComponents().getTransformsMap().keySet(); - Pipeline pipeline = - createPipeline(PipelineOptionsTranslation.fromProto(request.getPipelineOptions())); + + PipelineOptions pipelineOptionsFromRequest = + PipelineOptionsTranslation.fromProto(request.getPipelineOptions()); + Pipeline pipeline = createPipeline(pipelineOptionsFromRequest); + boolean isUseDeprecatedRead = - ExperimentalOptions.hasExperiment(pipelineOptions, "use_deprecated_read") + ExperimentalOptions.hasExperiment(commandLineOptions, "use_deprecated_read") || ExperimentalOptions.hasExperiment( - pipelineOptions, "beam_fn_api_use_deprecated_read"); + commandLineOptions, "beam_fn_api_use_deprecated_read"); if (!isUseDeprecatedRead) { ExperimentalOptions.addExperiment( pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api"); @@ -629,7 +631,7 @@ private Map loadRegisteredTransforms() { if (transformProvider == null) { if (getUrn(ExpansionMethods.Enum.JAVA_CLASS_LOOKUP).equals(urn)) { AllowList allowList = - pipelineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist(); + commandLineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist(); assert allowList != null; transformProvider = new JavaClassLookupTransformProvider(allowList); } else if (getUrn(SCHEMA_TRANSFORM).equals(urn)) { @@ -671,7 +673,7 @@ private Map loadRegisteredTransforms() { RunnerApi.Environment defaultEnvironment = Environments.createOrGetDefaultEnvironment( pipeline.getOptions().as(PortablePipelineOptions.class)); - if (pipelineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) { + if (commandLineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) { PortablePipelineOptions externalOptions = PipelineOptionsFactory.create().as(PortablePipelineOptions.class); externalOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_EXTERNAL); @@ -723,35 +725,34 @@ private Map loadRegisteredTransforms() { } protected Pipeline createPipeline(PipelineOptions requestOptions) { - // TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation - PipelineOptions effectiveOpts = PipelineOptionsFactory.create(); - PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class); - PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class); - Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType()) - .ifPresent(portableOptions::setDefaultEnvironmentType); - Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig()) - .ifPresent(portableOptions::setDefaultEnvironmentConfig); - List filesToStage = specifiedOptions.getFilesToStage(); + // We expect the ExpansionRequest to contain a valid set of options to be used for this + // expansion. + // Additionally, we override selected options using options values set via command line or + // ExpansionService wide overrides. + + PortablePipelineOptions requestPortablePipelineOptions = + requestOptions.as(PortablePipelineOptions.class); + PortablePipelineOptions commandLinePortablePipelineOptions = + commandLineOptions.as(PortablePipelineOptions.class); + Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentType()) + .ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentType); + Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentConfig()) + .ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentConfig); + List filesToStage = commandLinePortablePipelineOptions.getFilesToStage(); if (filesToStage != null) { - effectiveOpts.as(PortablePipelineOptions.class).setFilesToStage(filesToStage); + requestPortablePipelineOptions + .as(PortablePipelineOptions.class) + .setFilesToStage(filesToStage); } - effectiveOpts + requestPortablePipelineOptions .as(ExperimentalOptions.class) - .setExperiments(pipelineOptions.as(ExperimentalOptions.class).getExperiments()); - effectiveOpts.setRunner(NotRunnableRunner.class); - effectiveOpts + .setExperiments(commandLineOptions.as(ExperimentalOptions.class).getExperiments()); + requestPortablePipelineOptions.setRunner(NotRunnableRunner.class); + requestPortablePipelineOptions .as(ExpansionServiceOptions.class) .setExpansionServiceConfig( - pipelineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig()); - // TODO(https://github.com/apache/beam/issues/20090): Figure out the correct subset of options - // to propagate. - if (requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion() != null) { - effectiveOpts - .as(StreamingOptions.class) - .setUpdateCompatibilityVersion( - requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion()); - } - return Pipeline.create(effectiveOpts); + commandLineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig()); + return Pipeline.create(requestOptions); } @Override diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java index 1c8d515d5c85..9ee0c2c1797b 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.expansion.service; +import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_PREFIX; +import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_SUFFIX; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; @@ -49,6 +51,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; @@ -58,15 +62,20 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Value; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; +import org.junit.Assert; import org.junit.Test; /** Tests for {@link ExpansionService}. */ @@ -76,6 +85,7 @@ public class ExpansionServiceTest { private static final String TEST_URN = "test:beam:transforms:count"; + private static final String TEST_OPTIONS_URN = "test:beam:transforms:test_options"; private static final String TEST_NAME = "TestName"; @@ -98,9 +108,59 @@ public class ExpansionServiceTest { @AutoService(ExpansionService.ExpansionServiceRegistrar.class) public static class TestTransformRegistrar implements ExpansionService.ExpansionServiceRegistrar { + static final String EXPECTED_STRING_VALUE = "abcde"; + static final Boolean EXPECTED_BOOLEAN_VALUE = true; + static final Integer EXPECTED_INTEGER_VALUE = 12345; + @Override public Map knownTransforms() { - return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement()); + return ImmutableMap.of( + TEST_URN, (spec, options) -> Count.perElement(), + TEST_OPTIONS_URN, + (spec, options) -> + new TestOptionsTransform( + EXPECTED_STRING_VALUE, EXPECTED_BOOLEAN_VALUE, EXPECTED_INTEGER_VALUE)); + } + } + + public interface TestOptions extends PipelineOptions { + String getStringOption(); + + void setStringOption(String value); + + Boolean getBooleanOption(); + + void setBooleanOption(Boolean value); + + Integer getIntegerOption(); + + void setIntegerOption(Integer value); + } + + public static class TestOptionsTransform + extends PTransform, PCollection> { + String expectedStringValue; + + Boolean expectedBooleanValue; + + Integer expectedIntegerValue; + + public TestOptionsTransform( + String expectedStringValue, Boolean expectedBooleanValue, Integer expectedIntegerValue) { + this.expectedStringValue = expectedStringValue; + this.expectedBooleanValue = expectedBooleanValue; + this.expectedIntegerValue = expectedIntegerValue; + } + + @Override + public PCollection expand(PCollection input) { + TestOptions testOption = input.getPipeline().getOptions().as(TestOptions.class); + + Assert.assertEquals(expectedStringValue, testOption.getStringOption()); + Assert.assertEquals(expectedBooleanValue, testOption.getBooleanOption()); + Assert.assertEquals(expectedIntegerValue, testOption.getIntegerOption()); + + return input; } } @@ -146,6 +206,58 @@ public void testConstruct() { } } + @Test + public void testConstructWithPipelineOptions() { + PipelineOptionsFactory.register(TestOptions.class); + Pipeline p = Pipeline.create(); + p.apply(Impulse.create()); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + String inputPcollId = + Iterables.getOnlyElement( + Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values()) + .getOutputsMap() + .values()); + + Struct optionsStruct = + Struct.newBuilder() + .putFields( + PIPELINE_OPTIONS_URN_PREFIX + "string_option" + PIPELINE_OPTIONS_URN_SUFFIX, + Value.newBuilder() + .setStringValue(TestTransformRegistrar.EXPECTED_STRING_VALUE) + .build()) + .putFields( + PIPELINE_OPTIONS_URN_PREFIX + "boolean_option" + PIPELINE_OPTIONS_URN_SUFFIX, + Value.newBuilder() + .setBoolValue(TestTransformRegistrar.EXPECTED_BOOLEAN_VALUE) + .build()) + .putFields( + PIPELINE_OPTIONS_URN_PREFIX + "integer_option" + PIPELINE_OPTIONS_URN_SUFFIX, + Value.newBuilder() + .setNumberValue(TestTransformRegistrar.EXPECTED_INTEGER_VALUE) + .build()) + .build(); + ExpansionApi.ExpansionRequest request = + ExpansionApi.ExpansionRequest.newBuilder() + .setComponents(pipelineProto.getComponents()) + .setPipelineOptions(optionsStruct) + .setTransform( + RunnerApi.PTransform.newBuilder() + .setUniqueName(TEST_NAME) + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_OPTIONS_URN)) + .putInputs("input", inputPcollId)) + .setNamespace(TEST_NAMESPACE) + .build(); + ExpansionApi.ExpansionResponse response = expansionService.expand(request); + RunnerApi.PTransform expandedTransform = response.getTransform(); + assertEquals(TEST_NAMESPACE + TEST_NAME, expandedTransform.getUniqueName()); + + // Verify it has the right input. + assertThat(expandedTransform.getInputsMap().values(), contains(inputPcollId)); + + // Verify it has the right output. + assertThat(expandedTransform.getOutputsMap().keySet(), contains("output")); + } + @Test public void testConstructGenerateSequenceWithRegistration() { ExternalTransforms.ExternalConfigurationPayload payload =