diff --git a/.test-infra/jenkins/dependency_check/dependency_check_report_generator.py b/.test-infra/jenkins/dependency_check/dependency_check_report_generator.py index 4ece37987d825..a1fe2ee0e2d67 100644 --- a/.test-infra/jenkins/dependency_check/dependency_check_report_generator.py +++ b/.test-infra/jenkins/dependency_check/dependency_check_report_generator.py @@ -16,13 +16,13 @@ # limitations under the License. # +import dependency_check.version_comparer as version_comparer import logging import os.path import re import requests import sys import traceback -import version_comparer from datetime import datetime from dependency_check.bigquery_client_utils import BigQueryClientUtils diff --git a/.test-infra/jenkins/job_PostCommit_Website_Publish.groovy b/.test-infra/jenkins/job_PostCommit_Website_Publish.groovy index 669a84abcfeee..0396be9f24899 100644 --- a/.test-infra/jenkins/job_PostCommit_Website_Publish.groovy +++ b/.test-infra/jenkins/job_PostCommit_Website_Publish.groovy @@ -21,7 +21,7 @@ import PostcommitJobBuilder // This job builds and publishes the website into the asf-site branch of the beam repo. -PostcommitJobBuilder.postCommitJob('beam_PostCommit_Website_Publish', '', +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Website_Publish', 'Run Website Publish', 'Website Publish', this) { description('Publish generated website content into asf-site branch for hosting.') diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index 0f9b8fd904927..34e4247bcc06a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Filter; @@ -60,6 +61,7 @@ import org.apache.beam.sdk.transforms.Partition; import org.apache.beam.sdk.transforms.Partition.PartitionFn; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; @@ -431,6 +433,17 @@ public interface Options extends ExampleOptions, ExampleBigQueryTableOptions, St void setOutputToBigQuery(Boolean value); + @Description("Whether to send output to checksum Transform.") + @Default.Boolean(true) + Boolean getOutputToChecksum(); + + void setOutputToChecksum(Boolean value); + + @Description("Expected result of the checksum transform.") + Long getExpectedChecksum(); + + void setExpectedChecksum(Long value); + @Description("Whether output to Cloud Datastore") @Default.Boolean(false) Boolean getOutputToDatastore(); @@ -449,8 +462,7 @@ public interface Options extends ExampleOptions, ExampleBigQueryTableOptions, St void setOutputProject(String value); } - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + public static void runAutocompletePipeline(Options options) throws IOException { options.setBigQuerySchema(FormatForBigquery.getSchema()); ExampleUtils exampleUtils = new ExampleUtils(options); @@ -506,10 +518,35 @@ public static void main(String[] args) throws IOException { : BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); } + if (options.getOutputToChecksum()) { + PCollection checksum = + toWrite + .apply( + ParDo.of( + new DoFn>, Long>() { + @ProcessElement + public void process(ProcessContext c) { + KV> elm = c.element(); + Long listHash = + c.element().getValue().stream().mapToLong(cc -> cc.hashCode()).sum(); + c.output(Long.valueOf(elm.getKey().hashCode()) + listHash); + } + })) + .apply(Sum.longsGlobally()); + + PAssert.that(checksum).containsInAnyOrder(options.getExpectedChecksum()); + } + // Run the pipeline. PipelineResult result = p.run(); // ExampleUtils will try to cancel the pipeline and the injector before the program exists. exampleUtils.waitToFinish(result); } + + public static void main(String[] args) throws IOException { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + runAutocompletePipeline(options); + } } diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteIT.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteIT.java new file mode 100644 index 0000000000000..6b01d1acce48a --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteIT.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete; + +import org.apache.beam.examples.complete.AutoComplete.Options; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Run integration tests for a basic AutoComplete pipeline. */ +@RunWith(JUnit4.class) +public class AutoCompleteIT { + + public static final String DEFAULT_INPUT = + "gs://apache-beam-samples/shakespeare/kinglear-hashtag.txt"; + + public static final Long DEFAULT_INPUT_CHECKSUM = -25447108232L; + + /** Options for the Autocomplete Integration test. */ + public interface AutoCompleteITOptions extends TestPipelineOptions, Options {} + + @BeforeClass + public static void setUp() { + PipelineOptionsFactory.register(TestPipelineOptions.class); + } + + @Test + public void testE2EAutoComplete() throws Exception { + AutoCompleteITOptions options = + TestPipeline.testingPipelineOptions().as(AutoCompleteITOptions.class); + + options.setInputFile(DEFAULT_INPUT); + options.setOutputToBigQuery(false); + options.setOutputToDatastore(false); + options.setOutputToChecksum(true); + options.setExpectedChecksum(DEFAULT_INPUT_CHECKSUM); + + AutoComplete.runAutocompletePipeline(options); + } +} diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java index 97cf08f0a70b2..e209eb6b80dd7 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java @@ -18,50 +18,69 @@ package org.apache.beam.examples.cookbook; import com.google.api.services.bigquery.model.TableRow; -import java.util.List; +import com.google.common.collect.ImmutableList; import org.apache.beam.examples.cookbook.BigQueryTornadoes.ExtractTornadoesFn; import org.apache.beam.examples.cookbook.BigQueryTornadoes.FormatCountsFn; -import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; -import org.hamcrest.CoreMatchers; -import org.junit.Assert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Test case for {@link BigQueryTornadoes}. */ @RunWith(JUnit4.class) public class BigQueryTornadoesTest { + @Rule public TestPipeline p = TestPipeline.create(); @Test - public void testExtractTornadoes() throws Exception { + @Category(ValidatesRunner.class) + public void testExtractTornadoes() { TableRow row = new TableRow().set("month", "6").set("tornado", true); - DoFnTester extractWordsFn = DoFnTester.of(new ExtractTornadoesFn()); - Assert.assertThat(extractWordsFn.processBundle(row), CoreMatchers.hasItems(6)); + PCollection input = p.apply(Create.of(ImmutableList.of(row))); + PCollection result = input.apply(ParDo.of(new ExtractTornadoesFn())); + PAssert.that(result).containsInAnyOrder(6); + p.run().waitUntilFinish(); } @Test - public void testNoTornadoes() throws Exception { + @Category(ValidatesRunner.class) + public void testNoTornadoes() { TableRow row = new TableRow().set("month", 6).set("tornado", false); - DoFnTester extractWordsFn = DoFnTester.of(new ExtractTornadoesFn()); - Assert.assertTrue(extractWordsFn.processBundle(row).isEmpty()); + PCollection inputs = p.apply(Create.of(ImmutableList.of(row))); + PCollection result = inputs.apply(ParDo.of(new ExtractTornadoesFn())); + PAssert.that(result).empty(); + p.run().waitUntilFinish(); } @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testFormatCounts() throws Exception { - DoFnTester, TableRow> formatCountsFn = DoFnTester.of(new FormatCountsFn()); - KV empty[] = {}; - List results = formatCountsFn.processBundle(empty); - Assert.assertTrue(results.isEmpty()); - KV input[] = {KV.of(3, 0L), KV.of(4, Long.MAX_VALUE), KV.of(5, Long.MIN_VALUE)}; - results = formatCountsFn.processBundle(input); - Assert.assertEquals(3, results.size()); - Assert.assertEquals(3, results.get(0).get("month")); - Assert.assertEquals(0L, results.get(0).get("tornado_count")); - Assert.assertEquals(4, results.get(1).get("month")); - Assert.assertEquals(Long.MAX_VALUE, results.get(1).get("tornado_count")); - Assert.assertEquals(5, results.get(2).get("month")); - Assert.assertEquals(Long.MIN_VALUE, results.get(2).get("tornado_count")); + @Category(ValidatesRunner.class) + public void testEmpty() { + PCollection> inputs = + p.apply(Create.empty(new TypeDescriptor>() {})); + PCollection result = inputs.apply(ParDo.of(new FormatCountsFn())); + PAssert.that(result).empty(); + p.run().waitUntilFinish(); + } + + @Test + @Category(ValidatesRunner.class) + public void testFormatCounts() { + PCollection> inputs = + p.apply(Create.of(KV.of(3, 0L), KV.of(4, Long.MAX_VALUE), KV.of(5, Long.MIN_VALUE))); + PCollection result = inputs.apply(ParDo.of(new FormatCountsFn())); + PAssert.that(result) + .containsInAnyOrder( + new TableRow().set("month", 3).set("tornado_count", 0), + new TableRow().set("month", 4).set("tornado_count", Long.MAX_VALUE), + new TableRow().set("month", 5).set("tornado_count", Long.MIN_VALUE)); + p.run().waitUntilFinish(); } } diff --git a/release/src/main/python-release/python_release_automation_utils.sh b/release/src/main/python-release/python_release_automation_utils.sh index 91d5977839b2f..f45428cf5878f 100644 --- a/release/src/main/python-release/python_release_automation_utils.sh +++ b/release/src/main/python-release/python_release_automation_utils.sh @@ -295,3 +295,4 @@ PUBSUB_SUBSCRIPTION='wordstream-python-sub2' # Mobile Gaming Configurations DATASET='beam_postrelease_mobile_gaming' USERSCORE_OUTPUT_PREFIX='python-userscore_result' +GAME_INPUT_DATA='gs://dataflow-samples/game/5000_gaming_data.csv' diff --git a/release/src/main/python-release/run_release_candidate_python_mobile_gaming.sh b/release/src/main/python-release/run_release_candidate_python_mobile_gaming.sh index e9a8566415d5d..2bfe974b947b7 100755 --- a/release/src/main/python-release/run_release_candidate_python_mobile_gaming.sh +++ b/release/src/main/python-release/run_release_candidate_python_mobile_gaming.sh @@ -81,7 +81,7 @@ function verify_userscore_direct() { --output=$output_file_name \ --project=$PROJECT_ID \ --dataset=$DATASET \ - --input=gs://$BUCKET_NAME/5000_gaming_data.csv + --input=$GAME_INPUT_DATA verify_user_score "direct" } @@ -103,7 +103,7 @@ function verify_userscore_dataflow() { --runner=DataflowRunner \ --temp_location=gs://$BUCKET_NAME/temp/ \ --sdk_location=$BEAM_PYTHON_SDK \ - --input=gs://$BUCKET_NAME/5000_gaming_data.csv \ + --input=$GAME_INPUT_DATA \ --output=gs://$BUCKET_NAME/$output_file_name verify_user_score "dataflow" @@ -122,7 +122,7 @@ function verify_hourlyteamscore_direct() { python -m apache_beam.examples.complete.game.hourly_team_score \ --project=$PROJECT_ID \ --dataset=$DATASET \ - --input=gs://$BUCKET_NAME/5000_gaming_data.csv \ + --input=$GAME_INPUT_DATA \ --table="hourly_team_score_python_direct" verify_hourly_team_score "direct" @@ -145,7 +145,7 @@ function verify_hourlyteamscore_dataflow() { --runner=DataflowRunner \ --temp_location=gs://$BUCKET_NAME/temp/ \ --sdk_location $BEAM_PYTHON_SDK \ - --input=gs://$BUCKET_NAME/5000_gaming_data.csv \ + --input=$GAME_INPUT_DATA \ --table="hourly_team_score_python_dataflow" verify_hourly_team_score "dataflow" diff --git a/runners/reference/java/build.gradle b/runners/reference/java/build.gradle index 7bbaed0912603..932a788b25f6e 100644 --- a/runners/reference/java/build.gradle +++ b/runners/reference/java/build.gradle @@ -29,6 +29,7 @@ configurations { } dependencies { + compile library.java.guava compile library.java.hamcrest_library shadow project(path: ":beam-model-pipeline", configuration: "shadow") shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow") diff --git a/runners/reference/job-server/build.gradle b/runners/reference/job-server/build.gradle new file mode 100644 index 0000000000000..1c81fba2dd595 --- /dev/null +++ b/runners/reference/job-server/build.gradle @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import groovy.json.JsonOutput + +apply plugin: org.apache.beam.gradle.BeamModulePlugin +applyJavaNature( + validateShadowJar: false, + shadowClosure: { + } +) + +description = "Apache Beam :: Runners :: Reference :: Job Server" + +apply plugin: "application" + +mainClassName = "org.apache.beam.runners.direct.portable.job.ReferenceRunnerJobServer" + +dependencies { + compile project(path: ":beam-runners-direct-java", configuration: "shadow") + compile project(path: ":beam-runners-java-fn-execution", configuration: "shadow") + compile library.java.slf4j_simple +} + +run { + args = [] + if (project.hasProperty('port')) + args += ["--port=${project.property('port')}"] + + // Enable remote debugging. + jvmArgs = ["-Xdebug", "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"] + if (project.hasProperty("logLevel")) + jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"] +} diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go index 1f8828b26cd0f..b71d7b0f10ac2 100644 --- a/sdks/go/pkg/beam/options/jobopts/options.go +++ b/sdks/go/pkg/beam/options/jobopts/options.go @@ -41,7 +41,7 @@ var ( EnvironmentType = flag.String("environment_type", "DOCKER", "Environment Type. Possible options are DOCKER and PROCESS.") - // EnvironmentType is the environment type to run the user code. + // EnvironmentConfig is the environment configuration for running the user code. EnvironmentConfig = flag.String("environment_config", "", "Set environment configuration for running the user code.\n"+ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java index bee175ec261bb..0a553f7d93113 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java @@ -23,7 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import java.util.Arrays; +import com.google.common.primitives.Bytes; import javax.annotation.Nullable; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; @@ -131,28 +131,8 @@ public String toString() { */ @VisibleForTesting static ByteKey next(ByteKey key) { - return ByteKey.copyFrom(unsignedCopyAndIncrement(key.getBytes())); + return ByteKey.copyFrom(Bytes.concat(key.getBytes(), ZERO_BYTE_ARRAY)); } - /** - * @return The value of the input incremented by one using byte arithmetic. It treats the input - * byte[] as an unsigned series of bytes, most significant bits first. - */ - private static byte[] unsignedCopyAndIncrement(byte[] input) { - if (input.length == 0) { - return new byte[] {0}; - } - byte[] copy = Arrays.copyOf(input, input.length); - for (int i = copy.length - 1; i >= 0; --i) { - if (copy[i] != (byte) 0xff) { - ++copy[i]; - return copy; - } - copy[i] = 0; - } - byte[] out = new byte[copy.length + 1]; - out[0] = 1; - System.arraycopy(copy, 0, out, 1, copy.length); - return out; - } + private static final byte[] ZERO_BYTE_ARRAY = new byte[] {0}; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java index ef6eba41ff1b2..a285ec82ef62d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java @@ -71,8 +71,9 @@ public void testCheckpointJustStarted() throws Exception { ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); assertTrue(tracker.tryClaim(ByteKey.of(0x10))); ByteKeyRange checkpoint = tracker.checkpoint(); - assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x11)), tracker.currentRestriction()); - assertEquals(ByteKeyRange.of(ByteKey.of(0x11), ByteKey.of(0xc0)), checkpoint); + assertEquals( + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10, 0x00)), tracker.currentRestriction()); + assertEquals(ByteKeyRange.of(ByteKey.of(0x10, 0x00), ByteKey.of(0xc0)), checkpoint); } @Test @@ -82,8 +83,9 @@ public void testCheckpointRegular() throws Exception { assertTrue(tracker.tryClaim(ByteKey.of(0x50))); assertTrue(tracker.tryClaim(ByteKey.of(0x90))); ByteKeyRange checkpoint = tracker.checkpoint(); - assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x91)), tracker.currentRestriction()); - assertEquals(ByteKeyRange.of(ByteKey.of(0x91), ByteKey.of(0xc0)), checkpoint); + assertEquals( + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x90, 0x00)), tracker.currentRestriction()); + assertEquals(ByteKeyRange.of(ByteKey.of(0x90, 0x00), ByteKey.of(0xc0)), checkpoint); } @Test @@ -94,8 +96,9 @@ public void testCheckpointClaimedLast() throws Exception { assertTrue(tracker.tryClaim(ByteKey.of(0x90))); assertTrue(tracker.tryClaim(ByteKey.of(0xbf))); ByteKeyRange checkpoint = tracker.checkpoint(); - assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction()); - assertEquals(ByteKeyRange.of(ByteKey.of(0xc0), ByteKey.of(0xc0)), checkpoint); + assertEquals( + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xbf, 0x00)), tracker.currentRestriction()); + assertEquals(ByteKeyRange.of(ByteKey.of(0xbf, 0x00), ByteKey.of(0xc0)), checkpoint); } @Test @@ -107,8 +110,9 @@ public void testCheckpointAfterFailedClaim() throws Exception { assertTrue(tracker.tryClaim(ByteKey.of(0xa0))); assertFalse(tracker.tryClaim(ByteKey.of(0xd0))); ByteKeyRange checkpoint = tracker.checkpoint(); - assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa1)), tracker.currentRestriction()); - assertEquals(ByteKeyRange.of(ByteKey.of(0xa1), ByteKey.of(0xc0)), checkpoint); + assertEquals( + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa0, 0x00)), tracker.currentRestriction()); + assertEquals(ByteKeyRange.of(ByteKey.of(0xa0, 0x00), ByteKey.of(0xc0)), checkpoint); } @Test @@ -158,6 +162,9 @@ public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() { assertTrue(tracker.tryClaim(ByteKey.of(0x50))); assertTrue(tracker.tryClaim(ByteKey.of(0x90))); assertTrue(tracker.tryClaim(ByteKey.of(0xbf))); + expected.expectMessage( + "Last attempted key was [bf] in range ByteKeyRange{startKey=[10], endKey=[c0]}, " + + "claiming work in [[bf00], [c0]) was not attempted"); tracker.checkDone(); } @@ -169,7 +176,7 @@ public void testCheckDoneWhenNotDone() { assertTrue(tracker.tryClaim(ByteKey.of(0x90))); expected.expectMessage( "Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, " - + "claiming work in [[91], [c0]) was not attempted"); + + "claiming work in [[9000], [c0]) was not attempted"); tracker.checkDone(); } @@ -194,11 +201,11 @@ public void testCheckDoneUnstarted() { @Test public void testNextByteKey() { assertEquals(next(ByteKey.EMPTY), ByteKey.of(0x00)); - assertEquals(next(ByteKey.of(0x00)), ByteKey.of(0x01)); - assertEquals(next(ByteKey.of(0x9f)), ByteKey.of(0xa0)); - assertEquals(next(ByteKey.of(0xff)), ByteKey.of(0x01, 0x00)); - assertEquals(next(ByteKey.of(0x10, 0x10)), ByteKey.of(0x10, 0x11)); - assertEquals(next(ByteKey.of(0x00, 0xff)), ByteKey.of(0x01, 0x00)); - assertEquals(next(ByteKey.of(0xff, 0xff)), ByteKey.of(0x01, 0x00, 0x00)); + assertEquals(next(ByteKey.of(0x00)), ByteKey.of(0x00, 0x00)); + assertEquals(next(ByteKey.of(0x9f)), ByteKey.of(0x9f, 0x00)); + assertEquals(next(ByteKey.of(0xff)), ByteKey.of(0xff, 0x00)); + assertEquals(next(ByteKey.of(0x10, 0x10)), ByteKey.of(0x10, 0x10, 0x00)); + assertEquals(next(ByteKey.of(0x00, 0xff)), ByteKey.of(0x00, 0xff, 0x00)); + assertEquals(next(ByteKey.of(0xff, 0xff)), ByteKey.of(0xff, 0xff, 0x00)); } } diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index 0f9b2d1db5ea0..1fd6efda54fe7 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -197,8 +197,9 @@ SqlCreate SqlCreateTableNotSupportedMessage(Span s, boolean replace) : { { - throw new ParseException("'CREATE TABLE' is not supported in BeamSQL. You can use " - + "'CREATE EXTERNAL TABLE' to register an external data source to BeamSQL"); + throw new ParseException("'CREATE TABLE' is not supported in SQL. You can use " + + "'CREATE EXTERNAL TABLE' to register an external data source to SQL. For more details, " + + "please check: https://beam.apache.org/documentation/dsls/sql/create-external-table"); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java index e2d21b71ac759..a306c0420136b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java @@ -225,12 +225,7 @@ ThrowingFunction, KV> createExtractOutputsMapFun combinePayload.getCombineFn().getSpec().getPayload().toByteArray(), "CombineFn"); return (KV> input) -> { - AccumT accumulator = combineFn.createAccumulator(); - Iterable inputValues = input.getValue(); - for (InputT inputValue : inputValues) { - accumulator = combineFn.addInput(accumulator, inputValue); - } - return KV.of(input.getKey(), combineFn.extractOutput(accumulator)); + return KV.of(input.getKey(), combineFn.apply(input.getValue())); }; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index edad185323c0b..293071fed04e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -848,18 +848,25 @@ public List split(long desiredBundleSizeBytes, PipelineOptions o // Delegate to testable helper. List splits = splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options)); - return reduceSplits(splits, options, MAX_SPLIT_COUNT); + + // Reduce the splits. + List reduced = reduceSplits(splits, options, MAX_SPLIT_COUNT); + // Randomize the result before returning an immutable copy of the splits, the default behavior + // may lead to multiple workers hitting the same tablet. + Collections.shuffle(reduced); + return ImmutableList.copyOf(reduced); } + /** Returns a mutable list of reduced splits. */ @VisibleForTesting protected List reduceSplits( List splits, PipelineOptions options, long maxSplitCounts) throws IOException { int numberToCombine = (int) ((splits.size() + maxSplitCounts - 1) / maxSplitCounts); if (splits.size() < maxSplitCounts || numberToCombine < 2) { - return splits; + return new ArrayList<>(splits); } - ImmutableList.Builder reducedSplits = ImmutableList.builder(); + List reducedSplits = new ArrayList<>(); List previousSourceRanges = new ArrayList(); int counter = 0; long size = 0; @@ -879,7 +886,7 @@ protected List reduceSplits( if (size > 0) { reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size)); } - return reducedSplits.build(); + return reducedSplits; } /** @@ -1095,7 +1102,7 @@ public void validate() { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("tableId", config.getTableId().get()).withLabel("Table ID")); + builder.add(DisplayData.item("tableId", config.getTableId()).withLabel("Table ID")); if (filter != null) { builder.add(DisplayData.item("rowFilter", filter.toString()).withLabel("Table Row Filter")); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index cadb908be5a83..741a2bb047a36 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -102,7 +102,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; -import org.hamcrest.collection.IsIterableContainingInOrder; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -736,10 +736,10 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception { keyRanges, null /*size*/); - List splits = - source.split(numRows * bytesPerRow / numSamples, null /* options */); - - assertThat(splits, hasSize(keyRanges.size())); + List splits = new ArrayList<>(); + for (ByteKeyRange range : keyRanges) { + splits.add(source.withSingleRange(range)); + } List reducedSplits = source.reduceSplits(splits, null, maxSplit); @@ -753,7 +753,8 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception { assertThat( actualRangesAfterSplit, - IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray())); + IsIterableContainingInAnyOrder.containsInAnyOrder( + expectedKeyRangesAfterReducedSplits.toArray())); } /** Tests reduce split with all non adjacent ranges. */ @@ -786,10 +787,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception { keyRanges, null /*size*/); - List splits = - source.split(numRows * bytesPerRow / numSamples, null /* options */); - - assertThat(splits, hasSize(keyRanges.size())); + List splits = new ArrayList<>(); + for (ByteKeyRange range : keyRanges) { + splits.add(source.withSingleRange(range)); + } List reducedSplits = source.reduceSplits(splits, null, maxSplit); @@ -801,8 +802,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception { assertAllSourcesHaveSingleRanges(reducedSplits); - //The expected split source ranges are exactly same as original - assertThat(actualRangesAfterSplit, IsIterableContainingInOrder.contains(keyRanges.toArray())); + // The expected split source ranges are exactly same as original + assertThat( + actualRangesAfterSplit, + IsIterableContainingInAnyOrder.containsInAnyOrder(keyRanges.toArray())); } /** Tests reduce Splits with all adjacent ranges. */ @@ -826,10 +829,22 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception { Arrays.asList(ByteKeyRange.ALL_KEYS), null /*size*/); - List splits = - source.split(numRows * bytesPerRow / numSamples, null /* options */); - - assertThat(splits, hasSize(numSamples)); + List splits = new ArrayList<>(); + List keyRanges = + Arrays.asList( + ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)), + ByteKeyRange.of(createByteKey(1), createByteKey(2)), + ByteKeyRange.of(createByteKey(2), createByteKey(3)), + ByteKeyRange.of(createByteKey(3), createByteKey(4)), + ByteKeyRange.of(createByteKey(4), createByteKey(5)), + ByteKeyRange.of(createByteKey(5), createByteKey(6)), + ByteKeyRange.of(createByteKey(6), createByteKey(7)), + ByteKeyRange.of(createByteKey(7), createByteKey(8)), + ByteKeyRange.of(createByteKey(8), createByteKey(9)), + ByteKeyRange.of(createByteKey(9), ByteKey.EMPTY)); + for (ByteKeyRange range : keyRanges) { + splits.add(source.withSingleRange(range)); + } //Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..] //expected reduced Split source ranges are [..4][4..8][8..] @@ -849,7 +864,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception { assertThat( actualRangesAfterSplit, - IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray())); + IsIterableContainingInAnyOrder.containsInAnyOrder( + expectedKeyRangesAfterReducedSplits.toArray())); assertAllSourcesHaveSingleAdjacentRanges(reducedSplits); assertSourcesEqualReferenceSource(source, reducedSplits, null /* options */); } @@ -1060,6 +1076,39 @@ public void testReadingPrimitiveDisplayData() throws IOException, InterruptedExc Matchers.hasItem(hasDisplayItem("rowFilter"))); } + @Test + public void testReadingDisplayDataFromRuntimeParameters() { + ReadOptions options = PipelineOptionsFactory.fromArgs().withValidation().as(ReadOptions.class); + BigtableIO.Read read = + BigtableIO.read() + .withBigtableOptions(BIGTABLE_OPTIONS) + .withProjectId(options.getBigtableProject()) + .withInstanceId(options.getBigtableInstanceId()) + .withTableId(options.getBigtableTableId()); + DisplayData displayData = DisplayData.from(read); + assertThat( + displayData, + hasDisplayItem( + allOf( + hasKey("projectId"), + hasLabel("Bigtable Project Id"), + hasValue("RuntimeValueProvider{propertyName=bigtableProject, default=null}")))); + assertThat( + displayData, + hasDisplayItem( + allOf( + hasKey("instanceId"), + hasLabel("Bigtable Instance Id"), + hasValue("RuntimeValueProvider{propertyName=bigtableInstanceId, default=null}")))); + assertThat( + displayData, + hasDisplayItem( + allOf( + hasKey("tableId"), + hasLabel("Bigtable Table Id"), + hasValue("RuntimeValueProvider{propertyName=bigtableTableId, default=null}")))); + } + @Test public void testReadWithoutValidate() { final String table = "fooTable"; diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 211430bd60ff1..f93c5341594fa 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -165,7 +165,9 @@ def new_save_module_dict(pickler, obj): if obj_id not in known_module_dicts: for m in sys.modules.values(): try: - if m and m.__name__ != '__main__': + if (m + and m.__name__ != '__main__' + and isinstance(m, dill.dill.ModuleType)): d = m.__dict__ known_module_dicts[id(d)] = m, d except AttributeError: diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5a4c1dc9228d6..68e313909a3a7 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -60,6 +60,7 @@ from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions @@ -152,6 +153,14 @@ def __init__(self, runner=None, options=None, argv=None): raise ValueError( 'Pipeline has validations errors: \n' + '\n'.join(errors)) + # set default experiments for portable runner + # (needs to occur prior to pipeline construction) + if self._options.view_as(StandardOptions).runner == 'PortableRunner': + experiments = (self._options.view_as(DebugOptions).experiments or []) + if not 'beam_fn_api' in experiments: + experiments.append('beam_fn_api') + self._options.view_as(DebugOptions).experiments = experiments + # Default runner to be used. self.runner = runner # Stack of transforms generated by nested apply() calls. The stack will diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py index 02b424eb994d0..cbe88f99eaf49 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py @@ -75,33 +75,6 @@ class ProjectsJobsDebugService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsJobsDebugService, self).__init__(client) - self._method_configs = { - 'GetConfig': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.debug.getConfig', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig', - request_field=u'getDebugConfigRequest', - request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest', - response_type_name=u'GetDebugConfigResponse', - supports_download=False, - ), - 'SendCapture': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.debug.sendCapture', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture', - request_field=u'sendDebugCaptureRequest', - request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest', - response_type_name=u'SendDebugCaptureResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -118,6 +91,19 @@ def GetConfig(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + GetConfig.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.debug.getConfig', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/getConfig', + request_field=u'getDebugConfigRequest', + request_type_name=u'DataflowProjectsJobsDebugGetConfigRequest', + response_type_name=u'GetDebugConfigResponse', + supports_download=False, + ) + def SendCapture(self, request, global_params=None): """Send encoded debug capture data for component. @@ -131,6 +117,19 @@ def SendCapture(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + SendCapture.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.debug.sendCapture', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/debug/sendCapture', + request_field=u'sendDebugCaptureRequest', + request_type_name=u'DataflowProjectsJobsDebugSendCaptureRequest', + response_type_name=u'SendDebugCaptureResponse', + supports_download=False, + ) + class ProjectsJobsMessagesService(base_api.BaseApiService): """Service class for the projects_jobs_messages resource.""" @@ -138,21 +137,6 @@ class ProjectsJobsMessagesService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsJobsMessagesService, self).__init__(client) - self._method_configs = { - 'List': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.messages.list', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'endTime', u'location', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/messages', - request_field='', - request_type_name=u'DataflowProjectsJobsMessagesListRequest', - response_type_name=u'ListJobMessagesResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -169,6 +153,19 @@ def List(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.messages.list', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'endTime', u'location', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/messages', + request_field='', + request_type_name=u'DataflowProjectsJobsMessagesListRequest', + response_type_name=u'ListJobMessagesResponse', + supports_download=False, + ) + class ProjectsJobsWorkItemsService(base_api.BaseApiService): """Service class for the projects_jobs_workItems resource.""" @@ -176,33 +173,6 @@ class ProjectsJobsWorkItemsService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsJobsWorkItemsService, self).__init__(client) - self._method_configs = { - 'Lease': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.workItems.lease', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:lease', - request_field=u'leaseWorkItemRequest', - request_type_name=u'DataflowProjectsJobsWorkItemsLeaseRequest', - response_type_name=u'LeaseWorkItemResponse', - supports_download=False, - ), - 'ReportStatus': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.workItems.reportStatus', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:reportStatus', - request_field=u'reportWorkItemStatusRequest', - request_type_name=u'DataflowProjectsJobsWorkItemsReportStatusRequest', - response_type_name=u'ReportWorkItemStatusResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -219,6 +189,19 @@ def Lease(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Lease.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.workItems.lease', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:lease', + request_field=u'leaseWorkItemRequest', + request_type_name=u'DataflowProjectsJobsWorkItemsLeaseRequest', + response_type_name=u'LeaseWorkItemResponse', + supports_download=False, + ) + def ReportStatus(self, request, global_params=None): """Reports the status of dataflow WorkItems leased by a worker. @@ -232,6 +215,19 @@ def ReportStatus(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + ReportStatus.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.workItems.reportStatus', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/workItems:reportStatus', + request_field=u'reportWorkItemStatusRequest', + request_type_name=u'DataflowProjectsJobsWorkItemsReportStatusRequest', + response_type_name=u'ReportWorkItemStatusResponse', + supports_download=False, + ) + class ProjectsJobsService(base_api.BaseApiService): """Service class for the projects_jobs resource.""" @@ -239,81 +235,6 @@ class ProjectsJobsService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsJobsService, self).__init__(client) - self._method_configs = { - 'Aggregated': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.aggregated', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'filter', u'location', u'pageSize', u'pageToken', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs:aggregated', - request_field='', - request_type_name=u'DataflowProjectsJobsAggregatedRequest', - response_type_name=u'ListJobsResponse', - supports_download=False, - ), - 'Create': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.jobs.create', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'location', u'replaceJobId', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs', - request_field=u'job', - request_type_name=u'DataflowProjectsJobsCreateRequest', - response_type_name=u'Job', - supports_download=False, - ), - 'Get': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.get', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'location', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', - request_field='', - request_type_name=u'DataflowProjectsJobsGetRequest', - response_type_name=u'Job', - supports_download=False, - ), - 'GetMetrics': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.getMetrics', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'location', u'startTime'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/metrics', - request_field='', - request_type_name=u'DataflowProjectsJobsGetMetricsRequest', - response_type_name=u'JobMetrics', - supports_download=False, - ), - 'List': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.jobs.list', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'filter', u'location', u'pageSize', u'pageToken', u'view'], - relative_path=u'v1b3/projects/{projectId}/jobs', - request_field='', - request_type_name=u'DataflowProjectsJobsListRequest', - response_type_name=u'ListJobsResponse', - supports_download=False, - ), - 'Update': base_api.ApiMethodInfo( - http_method=u'PUT', - method_id=u'dataflow.projects.jobs.update', - ordered_params=[u'projectId', u'jobId'], - path_params=[u'jobId', u'projectId'], - query_params=[u'location'], - relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', - request_field=u'job', - request_type_name=u'DataflowProjectsJobsUpdateRequest', - response_type_name=u'Job', - supports_download=False, - ), - } - self._upload_configs = { } @@ -330,6 +251,19 @@ def Aggregated(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Aggregated.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.aggregated', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'filter', u'location', u'pageSize', u'pageToken', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs:aggregated', + request_field='', + request_type_name=u'DataflowProjectsJobsAggregatedRequest', + response_type_name=u'ListJobsResponse', + supports_download=False, + ) + def Create(self, request, global_params=None): """Creates a Cloud Dataflow job. @@ -343,6 +277,19 @@ def Create(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.jobs.create', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'location', u'replaceJobId', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs', + request_field=u'job', + request_type_name=u'DataflowProjectsJobsCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + def Get(self, request, global_params=None): """Gets the state of the specified Cloud Dataflow job. @@ -356,6 +303,19 @@ def Get(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Get.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.get', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'location', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', + request_field='', + request_type_name=u'DataflowProjectsJobsGetRequest', + response_type_name=u'Job', + supports_download=False, + ) + def GetMetrics(self, request, global_params=None): """Request the job status. @@ -369,6 +329,19 @@ def GetMetrics(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + GetMetrics.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.getMetrics', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'location', u'startTime'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}/metrics', + request_field='', + request_type_name=u'DataflowProjectsJobsGetMetricsRequest', + response_type_name=u'JobMetrics', + supports_download=False, + ) + def List(self, request, global_params=None): """List the jobs of a project in a given region. @@ -382,6 +355,19 @@ def List(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.jobs.list', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'filter', u'location', u'pageSize', u'pageToken', u'view'], + relative_path=u'v1b3/projects/{projectId}/jobs', + request_field='', + request_type_name=u'DataflowProjectsJobsListRequest', + response_type_name=u'ListJobsResponse', + supports_download=False, + ) + def Update(self, request, global_params=None): """Updates the state of an existing Cloud Dataflow job. @@ -395,6 +381,19 @@ def Update(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Update.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'PUT', + method_id=u'dataflow.projects.jobs.update', + ordered_params=[u'projectId', u'jobId'], + path_params=[u'jobId', u'projectId'], + query_params=[u'location'], + relative_path=u'v1b3/projects/{projectId}/jobs/{jobId}', + request_field=u'job', + request_type_name=u'DataflowProjectsJobsUpdateRequest', + response_type_name=u'Job', + supports_download=False, + ) + class ProjectsLocationsJobsDebugService(base_api.BaseApiService): """Service class for the projects_locations_jobs_debug resource.""" @@ -402,33 +401,6 @@ class ProjectsLocationsJobsDebugService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsLocationsJobsDebugService, self).__init__(client) - self._method_configs = { - 'GetConfig': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.debug.getConfig', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/debug/getConfig', - request_field=u'getDebugConfigRequest', - request_type_name=u'DataflowProjectsLocationsJobsDebugGetConfigRequest', - response_type_name=u'GetDebugConfigResponse', - supports_download=False, - ), - 'SendCapture': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.debug.sendCapture', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/debug/sendCapture', - request_field=u'sendDebugCaptureRequest', - request_type_name=u'DataflowProjectsLocationsJobsDebugSendCaptureRequest', - response_type_name=u'SendDebugCaptureResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -445,6 +417,19 @@ def GetConfig(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + GetConfig.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.debug.getConfig', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/debug/getConfig', + request_field=u'getDebugConfigRequest', + request_type_name=u'DataflowProjectsLocationsJobsDebugGetConfigRequest', + response_type_name=u'GetDebugConfigResponse', + supports_download=False, + ) + def SendCapture(self, request, global_params=None): """Send encoded debug capture data for component. @@ -458,6 +443,19 @@ def SendCapture(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + SendCapture.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.debug.sendCapture', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/debug/sendCapture', + request_field=u'sendDebugCaptureRequest', + request_type_name=u'DataflowProjectsLocationsJobsDebugSendCaptureRequest', + response_type_name=u'SendDebugCaptureResponse', + supports_download=False, + ) + class ProjectsLocationsJobsMessagesService(base_api.BaseApiService): """Service class for the projects_locations_jobs_messages resource.""" @@ -465,21 +463,6 @@ class ProjectsLocationsJobsMessagesService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsLocationsJobsMessagesService, self).__init__(client) - self._method_configs = { - 'List': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.messages.list', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[u'endTime', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/messages', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsMessagesListRequest', - response_type_name=u'ListJobMessagesResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -496,6 +479,19 @@ def List(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.messages.list', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[u'endTime', u'minimumImportance', u'pageSize', u'pageToken', u'startTime'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/messages', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsMessagesListRequest', + response_type_name=u'ListJobMessagesResponse', + supports_download=False, + ) + class ProjectsLocationsJobsWorkItemsService(base_api.BaseApiService): """Service class for the projects_locations_jobs_workItems resource.""" @@ -503,33 +499,6 @@ class ProjectsLocationsJobsWorkItemsService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsLocationsJobsWorkItemsService, self).__init__(client) - self._method_configs = { - 'Lease': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.workItems.lease', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:lease', - request_field=u'leaseWorkItemRequest', - request_type_name=u'DataflowProjectsLocationsJobsWorkItemsLeaseRequest', - response_type_name=u'LeaseWorkItemResponse', - supports_download=False, - ), - 'ReportStatus': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.workItems.reportStatus', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:reportStatus', - request_field=u'reportWorkItemStatusRequest', - request_type_name=u'DataflowProjectsLocationsJobsWorkItemsReportStatusRequest', - response_type_name=u'ReportWorkItemStatusResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -546,6 +515,19 @@ def Lease(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Lease.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.workItems.lease', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:lease', + request_field=u'leaseWorkItemRequest', + request_type_name=u'DataflowProjectsLocationsJobsWorkItemsLeaseRequest', + response_type_name=u'LeaseWorkItemResponse', + supports_download=False, + ) + def ReportStatus(self, request, global_params=None): """Reports the status of dataflow WorkItems leased by a worker. @@ -559,6 +541,19 @@ def ReportStatus(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + ReportStatus.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.workItems.reportStatus', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/workItems:reportStatus', + request_field=u'reportWorkItemStatusRequest', + request_type_name=u'DataflowProjectsLocationsJobsWorkItemsReportStatusRequest', + response_type_name=u'ReportWorkItemStatusResponse', + supports_download=False, + ) + class ProjectsLocationsJobsService(base_api.BaseApiService): """Service class for the projects_locations_jobs resource.""" @@ -566,69 +561,6 @@ class ProjectsLocationsJobsService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsLocationsJobsService, self).__init__(client) - self._method_configs = { - 'Create': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.jobs.create', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[u'replaceJobId', u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', - request_field=u'job', - request_type_name=u'DataflowProjectsLocationsJobsCreateRequest', - response_type_name=u'Job', - supports_download=False, - ), - 'Get': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.get', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsGetRequest', - response_type_name=u'Job', - supports_download=False, - ), - 'GetMetrics': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.getMetrics', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[u'startTime'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/metrics', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsGetMetricsRequest', - response_type_name=u'JobMetrics', - supports_download=False, - ), - 'List': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.jobs.list', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[u'filter', u'pageSize', u'pageToken', u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', - request_field='', - request_type_name=u'DataflowProjectsLocationsJobsListRequest', - response_type_name=u'ListJobsResponse', - supports_download=False, - ), - 'Update': base_api.ApiMethodInfo( - http_method=u'PUT', - method_id=u'dataflow.projects.locations.jobs.update', - ordered_params=[u'projectId', u'location', u'jobId'], - path_params=[u'jobId', u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', - request_field=u'job', - request_type_name=u'DataflowProjectsLocationsJobsUpdateRequest', - response_type_name=u'Job', - supports_download=False, - ), - } - self._upload_configs = { } @@ -645,6 +577,19 @@ def Create(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.jobs.create', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[u'replaceJobId', u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', + request_field=u'job', + request_type_name=u'DataflowProjectsLocationsJobsCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + def Get(self, request, global_params=None): """Gets the state of the specified Cloud Dataflow job. @@ -658,6 +603,19 @@ def Get(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Get.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.get', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsGetRequest', + response_type_name=u'Job', + supports_download=False, + ) + def GetMetrics(self, request, global_params=None): """Request the job status. @@ -671,6 +629,19 @@ def GetMetrics(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + GetMetrics.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.getMetrics', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[u'startTime'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}/metrics', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsGetMetricsRequest', + response_type_name=u'JobMetrics', + supports_download=False, + ) + def List(self, request, global_params=None): """List the jobs of a project in a given region. @@ -684,6 +655,19 @@ def List(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + List.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.jobs.list', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[u'filter', u'pageSize', u'pageToken', u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs', + request_field='', + request_type_name=u'DataflowProjectsLocationsJobsListRequest', + response_type_name=u'ListJobsResponse', + supports_download=False, + ) + def Update(self, request, global_params=None): """Updates the state of an existing Cloud Dataflow job. @@ -697,6 +681,19 @@ def Update(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Update.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'PUT', + method_id=u'dataflow.projects.locations.jobs.update', + ordered_params=[u'projectId', u'location', u'jobId'], + path_params=[u'jobId', u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}', + request_field=u'job', + request_type_name=u'DataflowProjectsLocationsJobsUpdateRequest', + response_type_name=u'Job', + supports_download=False, + ) + class ProjectsLocationsTemplatesService(base_api.BaseApiService): """Service class for the projects_locations_templates resource.""" @@ -704,45 +701,6 @@ class ProjectsLocationsTemplatesService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsLocationsTemplatesService, self).__init__(client) - self._method_configs = { - 'Create': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.templates.create', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/templates', - request_field=u'createJobFromTemplateRequest', - request_type_name=u'DataflowProjectsLocationsTemplatesCreateRequest', - response_type_name=u'Job', - supports_download=False, - ), - 'Get': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.locations.templates.get', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[u'gcsPath', u'view'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/templates:get', - request_field='', - request_type_name=u'DataflowProjectsLocationsTemplatesGetRequest', - response_type_name=u'GetTemplateResponse', - supports_download=False, - ), - 'Launch': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.templates.launch', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[u'gcsPath', u'validateOnly'], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/templates:launch', - request_field=u'launchTemplateParameters', - request_type_name=u'DataflowProjectsLocationsTemplatesLaunchRequest', - response_type_name=u'LaunchTemplateResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -759,6 +717,19 @@ def Create(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.templates.create', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/templates', + request_field=u'createJobFromTemplateRequest', + request_type_name=u'DataflowProjectsLocationsTemplatesCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + def Get(self, request, global_params=None): """Get the template associated with a template. @@ -772,6 +743,19 @@ def Get(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Get.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.locations.templates.get', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[u'gcsPath', u'view'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/templates:get', + request_field='', + request_type_name=u'DataflowProjectsLocationsTemplatesGetRequest', + response_type_name=u'GetTemplateResponse', + supports_download=False, + ) + def Launch(self, request, global_params=None): """Launch a template. @@ -785,6 +769,19 @@ def Launch(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Launch.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.templates.launch', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[u'gcsPath', u'validateOnly'], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/templates:launch', + request_field=u'launchTemplateParameters', + request_type_name=u'DataflowProjectsLocationsTemplatesLaunchRequest', + response_type_name=u'LaunchTemplateResponse', + supports_download=False, + ) + class ProjectsLocationsService(base_api.BaseApiService): """Service class for the projects_locations resource.""" @@ -792,21 +789,6 @@ class ProjectsLocationsService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsLocationsService, self).__init__(client) - self._method_configs = { - 'WorkerMessages': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.locations.workerMessages', - ordered_params=[u'projectId', u'location'], - path_params=[u'location', u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/locations/{location}/WorkerMessages', - request_field=u'sendWorkerMessagesRequest', - request_type_name=u'DataflowProjectsLocationsWorkerMessagesRequest', - response_type_name=u'SendWorkerMessagesResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -823,6 +805,19 @@ def WorkerMessages(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + WorkerMessages.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.locations.workerMessages', + ordered_params=[u'projectId', u'location'], + path_params=[u'location', u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/locations/{location}/WorkerMessages', + request_field=u'sendWorkerMessagesRequest', + request_type_name=u'DataflowProjectsLocationsWorkerMessagesRequest', + response_type_name=u'SendWorkerMessagesResponse', + supports_download=False, + ) + class ProjectsTemplatesService(base_api.BaseApiService): """Service class for the projects_templates resource.""" @@ -830,45 +825,6 @@ class ProjectsTemplatesService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsTemplatesService, self).__init__(client) - self._method_configs = { - 'Create': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.templates.create', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/templates', - request_field=u'createJobFromTemplateRequest', - request_type_name=u'DataflowProjectsTemplatesCreateRequest', - response_type_name=u'Job', - supports_download=False, - ), - 'Get': base_api.ApiMethodInfo( - http_method=u'GET', - method_id=u'dataflow.projects.templates.get', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'gcsPath', u'location', u'view'], - relative_path=u'v1b3/projects/{projectId}/templates:get', - request_field='', - request_type_name=u'DataflowProjectsTemplatesGetRequest', - response_type_name=u'GetTemplateResponse', - supports_download=False, - ), - 'Launch': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.templates.launch', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[u'gcsPath', u'location', u'validateOnly'], - relative_path=u'v1b3/projects/{projectId}/templates:launch', - request_field=u'launchTemplateParameters', - request_type_name=u'DataflowProjectsTemplatesLaunchRequest', - response_type_name=u'LaunchTemplateResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -885,6 +841,19 @@ def Create(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Create.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.templates.create', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/templates', + request_field=u'createJobFromTemplateRequest', + request_type_name=u'DataflowProjectsTemplatesCreateRequest', + response_type_name=u'Job', + supports_download=False, + ) + def Get(self, request, global_params=None): """Get the template associated with a template. @@ -898,6 +867,19 @@ def Get(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Get.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'GET', + method_id=u'dataflow.projects.templates.get', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'gcsPath', u'location', u'view'], + relative_path=u'v1b3/projects/{projectId}/templates:get', + request_field='', + request_type_name=u'DataflowProjectsTemplatesGetRequest', + response_type_name=u'GetTemplateResponse', + supports_download=False, + ) + def Launch(self, request, global_params=None): """Launch a template. @@ -911,6 +893,19 @@ def Launch(self, request, global_params=None): return self._RunMethod( config, request, global_params=global_params) + Launch.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.templates.launch', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[u'gcsPath', u'location', u'validateOnly'], + relative_path=u'v1b3/projects/{projectId}/templates:launch', + request_field=u'launchTemplateParameters', + request_type_name=u'DataflowProjectsTemplatesLaunchRequest', + response_type_name=u'LaunchTemplateResponse', + supports_download=False, + ) + class ProjectsService(base_api.BaseApiService): """Service class for the projects resource.""" @@ -918,21 +913,6 @@ class ProjectsService(base_api.BaseApiService): def __init__(self, client): super(DataflowV1b3.ProjectsService, self).__init__(client) - self._method_configs = { - 'WorkerMessages': base_api.ApiMethodInfo( - http_method=u'POST', - method_id=u'dataflow.projects.workerMessages', - ordered_params=[u'projectId'], - path_params=[u'projectId'], - query_params=[], - relative_path=u'v1b3/projects/{projectId}/WorkerMessages', - request_field=u'sendWorkerMessagesRequest', - request_type_name=u'DataflowProjectsWorkerMessagesRequest', - response_type_name=u'SendWorkerMessagesResponse', - supports_download=False, - ), - } - self._upload_configs = { } @@ -948,3 +928,16 @@ def WorkerMessages(self, request, global_params=None): config = self.GetMethodConfig('WorkerMessages') return self._RunMethod( config, request, global_params=global_params) + + WorkerMessages.method_config = lambda: base_api.ApiMethodInfo( + http_method=u'POST', + method_id=u'dataflow.projects.workerMessages', + ordered_params=[u'projectId'], + path_params=[u'projectId'], + query_params=[], + relative_path=u'v1b3/projects/{projectId}/WorkerMessages', + request_field=u'sendWorkerMessagesRequest', + request_type_name=u'DataflowProjectsWorkerMessagesRequest', + response_type_name=u'SendWorkerMessagesResponse', + supports_download=False, + ) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py index bdb5c6d266203..4eb6696baf3d2 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -120,6 +120,9 @@ class AutoscalingEvent(_messages.Message): resize to use. time: The time this event was emitted to indicate a new target or current num_workers value. + workerPool: A short and friendly name for the worker pool this event + refers to, populated from the value of + PoolStageRelation::user_pool_name. """ class EventTypeValueValuesEnum(_messages.Enum): @@ -155,6 +158,7 @@ class EventTypeValueValuesEnum(_messages.Enum): eventType = _messages.EnumField('EventTypeValueValuesEnum', 3) targetNumWorkers = _messages.IntegerField(4) time = _messages.StringField(5) + workerPool = _messages.StringField(6) class AutoscalingSettings(_messages.Message): @@ -185,6 +189,36 @@ class AlgorithmValueValuesEnum(_messages.Enum): maxNumWorkers = _messages.IntegerField(2, variant=_messages.Variant.INT32) +class BigQueryIODetails(_messages.Message): + """Metadata for a BigQuery connector used by the job. + + Fields: + dataset: Dataset accessed in the connection. + projectId: Project accessed in the connection. + query: Query used to access data in the connection. + table: Table accessed in the connection. + """ + + dataset = _messages.StringField(1) + projectId = _messages.StringField(2) + query = _messages.StringField(3) + table = _messages.StringField(4) + + +class BigTableIODetails(_messages.Message): + """Metadata for a BigTable connector used by the job. + + Fields: + instanceId: InstanceId accessed in the connection. + projectId: ProjectId accessed in the connection. + tableId: TableId accessed in the connection. + """ + + instanceId = _messages.StringField(1) + projectId = _messages.StringField(2) + tableId = _messages.StringField(3) + + class CPUTime(_messages.Message): """Modeled after information exposed by /proc/stat. @@ -1308,6 +1342,18 @@ class DataflowProjectsWorkerMessagesRequest(_messages.Message): sendWorkerMessagesRequest = _messages.MessageField('SendWorkerMessagesRequest', 2) +class DatastoreIODetails(_messages.Message): + """Metadata for a Datastore connector used by the job. + + Fields: + namespace: Namespace used in the connection. + projectId: ProjectId accessed in the connection. + """ + + namespace = _messages.StringField(1) + projectId = _messages.StringField(2) + + class DerivedSource(_messages.Message): """Specification of one of the bundles produced as a result of splitting a Source (e.g. when executing a SourceSplitRequest, or when splitting an @@ -1664,13 +1710,16 @@ class ExecutionStageStateValueValuesEnum(_messages.Enum): was requested. This state is a terminal state, may only be set by the Cloud Dataflow service, and only as a transition from `JOB_STATE_DRAINING`. - JOB_STATE_PENDING: 'JOB_STATE_PENDING' indicates that the job has been + JOB_STATE_PENDING: `JOB_STATE_PENDING` indicates that the job has been created but is not yet running. Jobs that are pending may only transition to `JOB_STATE_RUNNING`, or `JOB_STATE_FAILED`. - JOB_STATE_CANCELLING: 'JOB_STATE_CANCELLING' indicates that the job has + JOB_STATE_CANCELLING: `JOB_STATE_CANCELLING` indicates that the job has been explicitly cancelled and is in the process of stopping. Jobs - that are cancelling may only transition to 'JOB_STATE_CANCELLED' or - 'JOB_STATE_FAILED'. + that are cancelling may only transition to `JOB_STATE_CANCELLED` or + `JOB_STATE_FAILED`. + JOB_STATE_QUEUED: `JOB_STATE_QUEUED` indicates that the job has been + created but is being delayed until launch. Jobs that are queued may + only transition to `JOB_STATE_PENDING` or `JOB_STATE_CANCELLED`. """ JOB_STATE_UNKNOWN = 0 JOB_STATE_STOPPED = 1 @@ -1683,6 +1732,7 @@ class ExecutionStageStateValueValuesEnum(_messages.Enum): JOB_STATE_DRAINED = 8 JOB_STATE_PENDING = 9 JOB_STATE_CANCELLING = 10 + JOB_STATE_QUEUED = 11 currentStateTime = _messages.StringField(1) executionStageName = _messages.StringField(2) @@ -1752,6 +1802,16 @@ class FailedLocation(_messages.Message): name = _messages.StringField(1) +class FileIODetails(_messages.Message): + """Metadata for a File connector used by the job. + + Fields: + filePattern: File Pattern used to access files by the connector. + """ + + filePattern = _messages.StringField(1) + + class FlattenInstruction(_messages.Message): """An instruction that copies its inputs (zero or more) to its (single) output. @@ -1998,6 +2058,9 @@ class Job(_messages.Message): id: The unique ID of this job. This field is set by the Cloud Dataflow service when the Job is created, and is immutable for the life of the job. + jobMetadata: This field is populated by the Dataflow service to support + filtering jobs by the metadata values provided here. Populated for + ListJobs and all GetJob views SUMMARY and higher. labels: User-defined labels for this job. The labels map can contain no more than 64 entries. Entries of the labels map are UTF8 strings that comply with the following restrictions: * Keys must conform to regexp: @@ -2085,13 +2148,16 @@ class CurrentStateValueValuesEnum(_messages.Enum): was requested. This state is a terminal state, may only be set by the Cloud Dataflow service, and only as a transition from `JOB_STATE_DRAINING`. - JOB_STATE_PENDING: 'JOB_STATE_PENDING' indicates that the job has been + JOB_STATE_PENDING: `JOB_STATE_PENDING` indicates that the job has been created but is not yet running. Jobs that are pending may only transition to `JOB_STATE_RUNNING`, or `JOB_STATE_FAILED`. - JOB_STATE_CANCELLING: 'JOB_STATE_CANCELLING' indicates that the job has + JOB_STATE_CANCELLING: `JOB_STATE_CANCELLING` indicates that the job has been explicitly cancelled and is in the process of stopping. Jobs - that are cancelling may only transition to 'JOB_STATE_CANCELLED' or - 'JOB_STATE_FAILED'. + that are cancelling may only transition to `JOB_STATE_CANCELLED` or + `JOB_STATE_FAILED`. + JOB_STATE_QUEUED: `JOB_STATE_QUEUED` indicates that the job has been + created but is being delayed until launch. Jobs that are queued may + only transition to `JOB_STATE_PENDING` or `JOB_STATE_CANCELLED`. """ JOB_STATE_UNKNOWN = 0 JOB_STATE_STOPPED = 1 @@ -2104,6 +2170,7 @@ class CurrentStateValueValuesEnum(_messages.Enum): JOB_STATE_DRAINED = 8 JOB_STATE_PENDING = 9 JOB_STATE_CANCELLING = 10 + JOB_STATE_QUEUED = 11 class RequestedStateValueValuesEnum(_messages.Enum): """The job's requested state. `UpdateJob` may be used to switch between @@ -2148,13 +2215,16 @@ class RequestedStateValueValuesEnum(_messages.Enum): was requested. This state is a terminal state, may only be set by the Cloud Dataflow service, and only as a transition from `JOB_STATE_DRAINING`. - JOB_STATE_PENDING: 'JOB_STATE_PENDING' indicates that the job has been + JOB_STATE_PENDING: `JOB_STATE_PENDING` indicates that the job has been created but is not yet running. Jobs that are pending may only transition to `JOB_STATE_RUNNING`, or `JOB_STATE_FAILED`. - JOB_STATE_CANCELLING: 'JOB_STATE_CANCELLING' indicates that the job has + JOB_STATE_CANCELLING: `JOB_STATE_CANCELLING` indicates that the job has been explicitly cancelled and is in the process of stopping. Jobs - that are cancelling may only transition to 'JOB_STATE_CANCELLED' or - 'JOB_STATE_FAILED'. + that are cancelling may only transition to `JOB_STATE_CANCELLED` or + `JOB_STATE_FAILED`. + JOB_STATE_QUEUED: `JOB_STATE_QUEUED` indicates that the job has been + created but is being delayed until launch. Jobs that are queued may + only transition to `JOB_STATE_PENDING` or `JOB_STATE_CANCELLED`. """ JOB_STATE_UNKNOWN = 0 JOB_STATE_STOPPED = 1 @@ -2167,6 +2237,7 @@ class RequestedStateValueValuesEnum(_messages.Enum): JOB_STATE_DRAINED = 8 JOB_STATE_PENDING = 9 JOB_STATE_CANCELLING = 10 + JOB_STATE_QUEUED = 11 class TypeValueValuesEnum(_messages.Enum): """The type of Cloud Dataflow job. @@ -2245,19 +2316,20 @@ class AdditionalProperty(_messages.Message): environment = _messages.MessageField('Environment', 5) executionInfo = _messages.MessageField('JobExecutionInfo', 6) id = _messages.StringField(7) - labels = _messages.MessageField('LabelsValue', 8) - location = _messages.StringField(9) - name = _messages.StringField(10) - pipelineDescription = _messages.MessageField('PipelineDescription', 11) - projectId = _messages.StringField(12) - replaceJobId = _messages.StringField(13) - replacedByJobId = _messages.StringField(14) - requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 15) - stageStates = _messages.MessageField('ExecutionStageState', 16, repeated=True) - steps = _messages.MessageField('Step', 17, repeated=True) - tempFiles = _messages.StringField(18, repeated=True) - transformNameMapping = _messages.MessageField('TransformNameMappingValue', 19) - type = _messages.EnumField('TypeValueValuesEnum', 20) + jobMetadata = _messages.MessageField('JobMetadata', 8) + labels = _messages.MessageField('LabelsValue', 9) + location = _messages.StringField(10) + name = _messages.StringField(11) + pipelineDescription = _messages.MessageField('PipelineDescription', 12) + projectId = _messages.StringField(13) + replaceJobId = _messages.StringField(14) + replacedByJobId = _messages.StringField(15) + requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 16) + stageStates = _messages.MessageField('ExecutionStageState', 17, repeated=True) + steps = _messages.MessageField('Step', 18, repeated=True) + tempFiles = _messages.StringField(19, repeated=True) + transformNameMapping = _messages.MessageField('TransformNameMappingValue', 20) + type = _messages.EnumField('TypeValueValuesEnum', 21) class JobExecutionInfo(_messages.Message): @@ -2367,6 +2439,33 @@ class MessageImportanceValueValuesEnum(_messages.Enum): time = _messages.StringField(4) +class JobMetadata(_messages.Message): + """Metadata available primarily for filtering jobs. Will be included in the + ListJob response and Job SUMMARY view+. + + Fields: + bigTableDetails: Identification of a BigTable source used in the Dataflow + job. + bigqueryDetails: Identification of a BigQuery source used in the Dataflow + job. + datastoreDetails: Identification of a Datastore source used in the + Dataflow job. + fileDetails: Identification of a File source used in the Dataflow job. + pubsubDetails: Identification of a PubSub source used in the Dataflow job. + sdkVersion: The SDK version used to run the job. + spannerDetails: Identification of a Spanner source used in the Dataflow + job. + """ + + bigTableDetails = _messages.MessageField('BigTableIODetails', 1, repeated=True) + bigqueryDetails = _messages.MessageField('BigQueryIODetails', 2, repeated=True) + datastoreDetails = _messages.MessageField('DatastoreIODetails', 3, repeated=True) + fileDetails = _messages.MessageField('FileIODetails', 4, repeated=True) + pubsubDetails = _messages.MessageField('PubSubIODetails', 5, repeated=True) + sdkVersion = _messages.MessageField('SdkVersion', 6) + spannerDetails = _messages.MessageField('SpannerIODetails', 7, repeated=True) + + class JobMetrics(_messages.Message): """JobMetrics contains a collection of metrics descibing the detailed progress of a Dataflow job. Metrics correspond to user-defined and system- @@ -2643,6 +2742,9 @@ class MetricUpdate(_messages.Message): reported as a delta that is not associated with any WorkItem. distribution: A struct value describing properties of a distribution of numeric values. + gauge: A struct value describing properties of a Gauge. Metrics of gauge + type show the value of a metric across time, and is aggregated based on + the newest value. internal: Worker-computed aggregate value for internal use by the Dataflow service. kind: Metric aggregation kind. The possible metric aggregation kinds are @@ -2672,14 +2774,15 @@ class MetricUpdate(_messages.Message): cumulative = _messages.BooleanField(1) distribution = _messages.MessageField('extra_types.JsonValue', 2) - internal = _messages.MessageField('extra_types.JsonValue', 3) - kind = _messages.StringField(4) - meanCount = _messages.MessageField('extra_types.JsonValue', 5) - meanSum = _messages.MessageField('extra_types.JsonValue', 6) - name = _messages.MessageField('MetricStructuredName', 7) - scalar = _messages.MessageField('extra_types.JsonValue', 8) - set = _messages.MessageField('extra_types.JsonValue', 9) - updateTime = _messages.StringField(10) + gauge = _messages.MessageField('extra_types.JsonValue', 3) + internal = _messages.MessageField('extra_types.JsonValue', 4) + kind = _messages.StringField(5) + meanCount = _messages.MessageField('extra_types.JsonValue', 6) + meanSum = _messages.MessageField('extra_types.JsonValue', 7) + name = _messages.MessageField('MetricStructuredName', 8) + scalar = _messages.MessageField('extra_types.JsonValue', 9) + set = _messages.MessageField('extra_types.JsonValue', 10) + updateTime = _messages.StringField(11) class MountedDataDisk(_messages.Message): @@ -2998,6 +3101,18 @@ class Position(_messages.Message): shufflePosition = _messages.StringField(6) +class PubSubIODetails(_messages.Message): + """Metadata for a PubSub connector used by the job. + + Fields: + subscription: Subscription used in the connection. + topic: Topic accessed in the connection. + """ + + subscription = _messages.StringField(1) + topic = _messages.StringField(2) + + class PubsubLocation(_messages.Message): """Identifies a pubsub location to use for transferring data into or out of a streaming Dataflow job. @@ -3142,6 +3257,42 @@ class RuntimeEnvironment(_messages.Message): zone = _messages.StringField(9) +class SdkVersion(_messages.Message): + """The version of the SDK used to run the jobl + + Enums: + SdkSupportStatusValueValuesEnum: The support status for this SDK version. + + Fields: + sdkSupportStatus: The support status for this SDK version. + version: The version of the SDK used to run the job. + versionDisplayName: A readable string describing the version of the sdk. + """ + + class SdkSupportStatusValueValuesEnum(_messages.Enum): + """The support status for this SDK version. + + Values: + UNKNOWN: Cloud Dataflow is unaware of this version. + SUPPORTED: This is a known version of an SDK, and is supported. + STALE: A newer version of the SDK family exists, and an update is + recommended. + DEPRECATED: This version of the SDK is deprecated and will eventually be + no longer supported. + UNSUPPORTED: Support for this SDK version has ended and it should no + longer be used. + """ + UNKNOWN = 0 + SUPPORTED = 1 + STALE = 2 + DEPRECATED = 3 + UNSUPPORTED = 4 + + sdkSupportStatus = _messages.EnumField('SdkSupportStatusValueValuesEnum', 1) + version = _messages.StringField(2) + versionDisplayName = _messages.StringField(3) + + class SendDebugCaptureRequest(_messages.Message): """Request to send encoded debug information. @@ -3694,6 +3845,20 @@ class DerivationModeValueValuesEnum(_messages.Enum): source = _messages.MessageField('Source', 2) +class SpannerIODetails(_messages.Message): + """Metadata for a Spanner connector used by the job. + + Fields: + databaseId: DatabaseId accessed in the connection. + instanceId: InstanceId accessed in the connection. + projectId: ProjectId accessed in the connection. + """ + + databaseId = _messages.StringField(1) + instanceId = _messages.StringField(2) + projectId = _messages.StringField(3) + + class SplitInt64(_messages.Message): """A representation of an int64, n, that is immune to precision loss when encoded in JSON. @@ -3736,14 +3901,12 @@ class StandardQueryParameters(_messages.Message): f__xgafv: V1 error format. access_token: OAuth access token. alt: Data format for response. - bearer_token: OAuth bearer token. callback: JSONP fields: Selector specifying which fields to include in a partial response. key: API key. Your API key identifies your project and provides you with API access, quota, and reports. Required unless you provide an OAuth 2.0 token. oauth_token: OAuth 2.0 token for the current user. - pp: Pretty-print response. prettyPrint: Returns response with indentations and line breaks. quotaUser: Available to use for quota purposes for server-side applications. Can be any arbitrary string assigned to a user, but should @@ -3779,17 +3942,15 @@ class FXgafvValueValuesEnum(_messages.Enum): f__xgafv = _messages.EnumField('FXgafvValueValuesEnum', 1) access_token = _messages.StringField(2) alt = _messages.EnumField('AltValueValuesEnum', 3, default=u'json') - bearer_token = _messages.StringField(4) - callback = _messages.StringField(5) - fields = _messages.StringField(6) - key = _messages.StringField(7) - oauth_token = _messages.StringField(8) - pp = _messages.BooleanField(9, default=True) - prettyPrint = _messages.BooleanField(10, default=True) - quotaUser = _messages.StringField(11) - trace = _messages.StringField(12) - uploadType = _messages.StringField(13) - upload_protocol = _messages.StringField(14) + callback = _messages.StringField(4) + fields = _messages.StringField(5) + key = _messages.StringField(6) + oauth_token = _messages.StringField(7) + prettyPrint = _messages.BooleanField(8, default=True) + quotaUser = _messages.StringField(9) + trace = _messages.StringField(10) + uploadType = _messages.StringField(11) + upload_protocol = _messages.StringField(12) class StateFamilyConfig(_messages.Message): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 7066e7f7627e0..11c22af3514a3 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -42,7 +42,7 @@ # Update this version to the next version whenever there is a change that will # require changes to legacy Dataflow worker execution environment. -BEAM_CONTAINER_VERSION = 'beam-master-20180709' +BEAM_CONTAINER_VERSION = 'beam-master-20180927' # Update this version to the next version whenever there is a change that # requires changes to SDK harness container or SDK harness launcher. BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20180920' diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py index 5a247cd272900..3e347517972a9 100644 --- a/sdks/python/apache_beam/runners/job/utils.py +++ b/sdks/python/apache_beam/runners/job/utils.py @@ -27,7 +27,7 @@ def dict_to_struct(dict_obj): - return json_format.Parse(json.dumps(dict_obj), struct_pb2.Struct()) + return json_format.ParseDict(dict_obj, struct_pb2.Struct()) def struct_to_dict(struct_obj): diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 5aba3f1607672..09261c9030be9 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -25,7 +25,6 @@ import apache_beam as beam from apache_beam.options.pipeline_options import DebugOptions -from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability import portable_runner_test @@ -66,7 +65,6 @@ def get_runner(cls): def create_options(self): options = super(FlinkRunnerTest, self).create_options() options.view_as(DebugOptions).experiments = ['beam_fn_api'] - options.view_as(SetupOptions).sdk_location = 'container' if streaming: options.view_as(StandardOptions).streaming = True return options @@ -80,6 +78,9 @@ def test_read(self): def test_no_subtransform_composite(self): raise unittest.SkipTest("BEAM-4781") + def test_pardo_state_only(self): + raise unittest.SkipTest("BEAM-2918 - User state not yet supported.") + # Inherits all other tests. # Run the tests. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 98f073387b552..a1a4c655a4d21 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -243,11 +243,6 @@ def cross_product(elem, sides): equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')])) def test_pardo_state_only(self): - p = self.create_pipeline() - if not isinstance(p.runner, fn_api_runner.FnApiRunner): - # test is inherited by Flink PVR, which does not support the feature yet - self.skipTest('User state not supported.') - index_state_spec = userstate.CombiningValueStateSpec( 'index', beam.coders.VarIntCoder(), sum) @@ -265,7 +260,7 @@ def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)): ('B', 'b', 2), ('B', 'b', 3)] - with p: + with self.create_pipeline() as p: assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()), equal_to(expected)) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index ec53726632498..485038fd781b0 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -28,6 +28,7 @@ from apache_beam import metrics from apache_beam.internal import pickler from apache_beam.options.pipeline_options import PortableOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_job_api_pb2_grpc @@ -104,6 +105,12 @@ def _create_environment(options): def run_pipeline(self, pipeline): portable_options = pipeline.options.view_as(PortableOptions) job_endpoint = portable_options.job_endpoint + + # TODO: https://issues.apache.org/jira/browse/BEAM-5525 + # portable runner specific default + if pipeline.options.view_as(SetupOptions).sdk_location == 'default': + pipeline.options.view_as(SetupOptions).sdk_location = 'container' + if not job_endpoint: docker = DockerizedJobServer() job_endpoint = docker.start() @@ -136,7 +143,8 @@ def run_pipeline(self, pipeline): del transform_proto.subtransforms[:] # TODO: Define URNs for options. - options = {'beam:option:' + k + ':v1': v + # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 + options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v) for k, v in pipeline._options.get_all_options().items() if v is not None} diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index dd9c8f1fd2ae5..b99f153697f7d 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -245,9 +245,7 @@ task portableWordCount(dependsOn: 'installGcpTest') { def options = [ "--input=/etc/profile", "--output=/tmp/py-wordcount-direct", - "--experiments=beam_fn_api", "--runner=PortableRunner", - "--sdk_location=container", ] if (project.hasProperty("streaming")) options += ["--streaming"] diff --git a/settings.gradle b/settings.gradle index 2c806ed0a47bf..3600e41e862d6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -58,6 +58,8 @@ include "beam-runners-local-java-core" project(":beam-runners-local-java-core").dir = file("runners/local-java") include "beam-runners-reference-java" project(":beam-runners-reference-java").dir = file("runners/reference/java") +include "beam-runners-reference-job-server" +project(":beam-runners-reference-job-server").dir = file("runners/reference/job-server") include "beam-runners-spark" project(":beam-runners-spark").dir = file("runners/spark") include "beam-runners-samza" diff --git a/website/build.gradle b/website/build.gradle index 7d22492e988ba..a73628d0184d8 100644 --- a/website/build.gradle +++ b/website/build.gradle @@ -29,6 +29,14 @@ def buildContentDir = "${project.rootDir}/build/website/generated-content" def repoContentDir = "${project.rootDir}/website/generated-content" def commitedChanges = false +def shell = { cmd -> + println cmd + exec { + executable 'sh' + args '-c', cmd + } +} + task buildDockerImage(type: Exec) { inputs.files 'Gemfile', 'Gemfile.lock' commandLine 'docker', 'build', '-t', dockerImageTag, '.' @@ -116,7 +124,10 @@ task commitWebsite << { def git = grgit.open() // get the latest commit on master def latestCommit = grgit.log(maxCommits: 1)[0].abbreviatedId + + shell "git fetch origin asf-site" git.checkout(branch: 'asf-site') + shell "git reset --hard origin/asf-site" // Delete the previous content. git.remove(patterns: [ 'website/generated-content' ]) @@ -142,7 +153,6 @@ task commitWebsite << { } } - /* * Pushes the asf-site branch commits. * @@ -166,23 +176,9 @@ task publishWebsite << { println 'No changes to push' return } - if (!git.remote.list().find { it.name == 'website-publish' }) { - println "Adding website-publish remote" - // Cannot authenticate to the default github uri, so specify gitbox. - git.remote.add(name: 'website-publish', - url: 'https://gitbox.apache.org/repos/asf/beam.git', - pushRefSpecs: ['refs/heads/asf-site']) - } + // Because git.push() fails to authenticate, run git push directly. - exec { - executable 'sh' - args '-c', "git push website-publish asf-site" - } - // Remove the remote. grgit does not have a remote.remove method. - exec { - executable 'sh' - args '-c', "git remote remove website-publish" - } + shell "git push https://gitbox.apache.org/repos/asf/beam.git asf-site" } commitWebsite.dependsOn testWebsite