From 7a2da9850c1ea1bcf3ad6ec6998f8b364a8e1187 Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Mon, 18 Nov 2019 09:37:41 -0800 Subject: [PATCH 01/17] [BEAM-4132] Set multi-output PCollections types to Any This is a fix to #9810, to support undeclared tags as well. --- sdks/python/apache_beam/pvalue.py | 2 +- .../typehints/typed_pipeline_test.py | 33 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 7c9d869289a6d..e246f3286db64 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -260,7 +260,7 @@ def __getitem__(self, tag): if tag is not None: self._transform.output_tags.add(tag) - pcoll = PCollection(self._pipeline, tag=tag) + pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any) # Transfer the producer from the DoOutputsTuple to the resulting # PCollection. pcoll.producer = self.producer.parts[0] diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index c27bedef612e7..97260645f1d74 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -161,12 +161,45 @@ def process(self, element): res = (p | beam.Create([1, 2, 3]) | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')) + self.assertIsNotNone(res[None].element_type) self.assertIsNotNone(res['even'].element_type) self.assertIsNotNone(res['odd'].element_type) + res_main = (res[None] + | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int)) res_even = (res['even'] | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int)) res_odd = (res['odd'] | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int)) + assert_that(res_main, equal_to([]), label='none_check') + assert_that(res_even, equal_to([2]), label='even_check') + assert_that(res_odd, equal_to([1, 3]), label='odd_check') + p.run() + + with self.assertRaises(ValueError): + _ = res['undeclared tag'] + + def test_typed_dofn_multi_output_no_tags(self): + class MyDoFn(beam.DoFn): + def process(self, element): + if element % 2: + yield beam.pvalue.TaggedOutput('odd', element) + else: + yield beam.pvalue.TaggedOutput('even', element) + + p = TestPipeline() + res = (p + | beam.Create([1, 2, 3]) + | beam.ParDo(MyDoFn()).with_outputs()) + self.assertIsNotNone(res[None].element_type) + self.assertIsNotNone(res['even'].element_type) + self.assertIsNotNone(res['odd'].element_type) + res_main = (res[None] + | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int)) + res_even = (res['even'] + | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int)) + res_odd = (res['odd'] + | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int)) + assert_that(res_main, equal_to([]), label='none_check') assert_that(res_even, equal_to([2]), label='even_check') assert_that(res_odd, equal_to([1, 3]), label='odd_check') p.run() From dedcf30669e14777faa753240387d11b821188fa Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Thu, 21 Nov 2019 17:27:23 -0800 Subject: [PATCH 02/17] [BEAM-7594] Fix flaky filename generation --- sdks/python/apache_beam/io/textio_test.py | 24 +++++++++++------------ 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index fc6da454e9ac0..ecfa6fbe3d510 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -20,7 +20,6 @@ from __future__ import division import bz2 -import datetime import glob import gzip import logging @@ -101,17 +100,19 @@ def write_data( return f.name, [line.decode('utf-8') for line in all_data] -def write_pattern(lines_per_file, no_data=False): +def write_pattern(lines_per_file, no_data=False, return_filenames=False): """Writes a pattern of temporary files. Args: lines_per_file (List[int]): The number of lines to write per file. no_data (bool): If :data:`True`, empty lines will be written, otherwise each line will contain a concatenation of b'line' and the line number. + return_filenames (bool): If True, returned list will contain + (filename, data) pairs. Returns: - Tuple[str, List[str]]: A tuple of the filename pattern and a list of the - utf-8 decoded written data. + Tuple[str, List[Union[str, (str, str)]]]: A tuple of the filename pattern + and a list of the utf-8 decoded written data or (filename, data) pairs. """ temp_dir = tempfile.mkdtemp() @@ -121,7 +122,10 @@ def write_pattern(lines_per_file, no_data=False): for i in range(len(lines_per_file)): file_name, data = write_data(lines_per_file[i], no_data=no_data, directory=temp_dir, prefix='mytemp') - all_data.extend(data) + if return_filenames: + all_data.extend(zip([file_name] * len(data), data)) + else: + all_data.extend(data) start_index += lines_per_file[i] assert file_name @@ -502,14 +506,8 @@ def test_read_from_text_file_pattern(self): pipeline.run() def test_read_from_text_with_file_name_file_pattern(self): - prefix = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - file_name_1, data_1 = write_data(5, prefix=prefix) - file_name_2, data_2 = write_data(5, prefix=prefix) - expected_data = [] - expected_data.extend([(file_name_1, el) for el in data_1]) - expected_data.extend([(file_name_2, el) for el in data_2]) - folder = file_name_1[:file_name_1.rfind(os.path.sep)] - pattern = folder + os.path.sep + prefix + '*' + pattern, expected_data = write_pattern( + lines_per_file=[5, 5], return_filenames=True) assert len(expected_data) == 10 pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(pattern) From 3f40d96bb24c11b2586b9d2eba8fd0d90dfd7dc2 Mon Sep 17 00:00:00 2001 From: Mikhail Gryzykhin Date: Fri, 22 Nov 2019 08:26:23 -0800 Subject: [PATCH 03/17] Bump Release Build Timeout --- .test-infra/jenkins/job_Release_Gradle_Build.groovy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_Release_Gradle_Build.groovy b/.test-infra/jenkins/job_Release_Gradle_Build.groovy index dc825857ffd48..5e02357c8d642 100644 --- a/.test-infra/jenkins/job_Release_Gradle_Build.groovy +++ b/.test-infra/jenkins/job_Release_Gradle_Build.groovy @@ -25,7 +25,8 @@ job('beam_Release_Gradle_Build') { description('Verify Gradle build against the official release version.') // Set common parameters. - commonJobProperties.setTopLevelMainJobProperties(delegate) + commonJobProperties.setTopLevelMainJobProperties(delegate, + defaultTimeout=150) // Allows triggering this build against pull requests. commonJobProperties.enablePhraseTriggeringFromPullRequest( From ee5a23741f66978c0e4416e5cd6662fe229ff48e Mon Sep 17 00:00:00 2001 From: Mikhail Gryzykhin Date: Fri, 22 Nov 2019 08:46:19 -0800 Subject: [PATCH 04/17] fix syntax --- .test-infra/jenkins/job_Release_Gradle_Build.groovy | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_Release_Gradle_Build.groovy b/.test-infra/jenkins/job_Release_Gradle_Build.groovy index 5e02357c8d642..955d37878e707 100644 --- a/.test-infra/jenkins/job_Release_Gradle_Build.groovy +++ b/.test-infra/jenkins/job_Release_Gradle_Build.groovy @@ -25,7 +25,10 @@ job('beam_Release_Gradle_Build') { description('Verify Gradle build against the official release version.') // Set common parameters. - commonJobProperties.setTopLevelMainJobProperties(delegate, + commonJobProperties + .setTopLevelMainJobProperties( + delegate, + defaultBranch='master', defaultTimeout=150) // Allows triggering this build against pull requests. From 6c2f18a345ba6b22f349af4b8e942c8799b451f1 Mon Sep 17 00:00:00 2001 From: Mikhail Gryzykhin <12602502+Ardagan@users.noreply.github.com> Date: Fri, 22 Nov 2019 12:50:22 -0800 Subject: [PATCH 05/17] Bump time to 5 hours. --- .test-infra/jenkins/job_Release_Gradle_Build.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_Release_Gradle_Build.groovy b/.test-infra/jenkins/job_Release_Gradle_Build.groovy index 955d37878e707..cf947488e37bc 100644 --- a/.test-infra/jenkins/job_Release_Gradle_Build.groovy +++ b/.test-infra/jenkins/job_Release_Gradle_Build.groovy @@ -29,7 +29,7 @@ job('beam_Release_Gradle_Build') { .setTopLevelMainJobProperties( delegate, defaultBranch='master', - defaultTimeout=150) + defaultTimeout=300) // Allows triggering this build against pull requests. commonJobProperties.enablePhraseTriggeringFromPullRequest( From 26596c81c2f82ecf1d602e242edfeaef25f7fa56 Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Thu, 28 Nov 2019 21:44:04 +0800 Subject: [PATCH 06/17] [BEAM-8733] Handle the registration request synchronously in the Python SDK harness. --- .../apache_beam/runners/worker/sdk_worker.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 3ab62e126f7b8..e0534ffb22b40 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -145,16 +145,12 @@ def _execute(self, task, request): self._responses.put(response) def _request_register(self, request): - self._request_execute(request) + # registration request is handled synchronously + self._execute( + lambda: self.create_worker().do_instruction(request), request) def _request_process_bundle(self, request): - - def task(): - self._execute( - lambda: self.create_worker().do_instruction(request), request) - self._worker_thread_pool.submit(task) - _LOGGER.debug( - "Currently using %s threads." % len(self._worker_thread_pool._workers)) + self._request_execute(request) def _request_process_bundle_split(self, request): self._request_process_bundle_action(request) @@ -190,6 +186,8 @@ def task(): lambda: self.create_worker().do_instruction(request), request) self._worker_thread_pool.submit(task) + _LOGGER.debug( + "Currently using %s threads." % len(self._worker_thread_pool._workers)) def create_worker(self): return SdkWorker( From e706f6a7625f5bb0b3bee0a584da0b379a2d03ce Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Mon, 2 Dec 2019 11:51:32 -0800 Subject: [PATCH 07/17] [BEAM-8842] Disable the correct test --- sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py | 1 - sdks/python/apache_beam/io/gcp/bigquery_test.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 5238abcf401ad..bbf8d3a9c7a33 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -615,7 +615,6 @@ def setUp(self): self.dataset_id, self.project) @attr('IT') - @unittest.skip('BEAM-8842: Disabled due to reliance on old retry behavior.') def test_multiple_destinations_transform(self): output_table_1 = '%s%s' % (self.output_table, 1) output_table_2 = '%s%s' % (self.output_table, 2) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 6cf4529f588fc..505a68361d0e7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -647,6 +647,7 @@ def test_value_provider_transform(self): method='FILE_LOADS')) @attr('IT') + @unittest.skip('BEAM-8842: Disabled due to reliance on old retry behavior.') def test_multiple_destinations_transform(self): streaming = self.test_pipeline.options.view_as(StandardOptions).streaming if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner): From f148aba6274a18d163cc85908716ee4dfa84cded Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Mon, 2 Dec 2019 15:17:56 -0500 Subject: [PATCH 08/17] [BEAM-8863] experiment=beam_fn_api in runtime/environments page Make it more obvious that experiment=beam_fn_api is needed for batch pipelines. --- website/src/documentation/runtime/environments.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/website/src/documentation/runtime/environments.md b/website/src/documentation/runtime/environments.md index 98e8af932d270..4b29ac00e25f1 100644 --- a/website/src/documentation/runtime/environments.md +++ b/website/src/documentation/runtime/environments.md @@ -99,7 +99,9 @@ python -m apache_beam.examples.wordcount \ --environment_config=path/to/container/image ``` -To test a customized image on the Google Cloud Dataflow runner, use the `DataflowRunner` option and the `worker_harness_container_image` flag: +To test a customized image on the Google Cloud Dataflow runner, use +`DataflowRunner` with the `beam_fn_api` experiment and set +`worker_harness_container_image` to the custom container: ``` python -m apache_beam.examples.wordcount \ From eb49b69abcedec98dfc94fdfddc2c14a94e9b794 Mon Sep 17 00:00:00 2001 From: Ruoyun Huang Date: Mon, 2 Dec 2019 14:17:44 -0800 Subject: [PATCH 09/17] [BEAM-8645] A test case for TimestampCombiner. (#10081) --- .../apache_beam/transforms/combiners_test.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 03d36e61b86fa..7a20fb3b56d09 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -26,12 +26,16 @@ import hamcrest as hc from future.builtins import range +from nose.plugins.attrib import attr import apache_beam as beam import apache_beam.transforms.combiners as combine +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_stream import TestStream from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.testing.util import equal_to_per_window from apache_beam.transforms import window from apache_beam.transforms.core import CombineGlobally from apache_beam.transforms.core import Create @@ -39,7 +43,9 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.ptransform import PTransform +from apache_beam.transforms.window import TimestampCombiner from apache_beam.typehints import TypeCheckError +from apache_beam.utils.timestamp import Timestamp class CombineTest(unittest.TestCase): @@ -480,6 +486,73 @@ def test_with_input_types_decorator_violation(self): pc = p | Create(l_3_tuple) _ = pc | beam.CombineGlobally(self.fn) +# +# Test cases for streaming. +# +@attr('ValidatesRunner') +class TimestampCombinerTest(unittest.TestCase): + + def test_combiner_earliest(self): + """Test TimestampCombiner with EARLIEST.""" + options = PipelineOptions(streaming=True) + with TestPipeline(options=options) as p: + result = (p + | TestStream() + .add_elements([window.TimestampedValue(('k', 100), 2)]) + .add_elements([window.TimestampedValue(('k', 400), 7)]) + .advance_watermark_to_infinity() + | beam.WindowInto( + window.FixedWindows(10), + timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST) + | beam.CombinePerKey(sum)) + + records = (result + | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts))) + + # All the KV pairs are applied GBK using EARLIEST timestamp for the same + # key. + expected_window_to_elements = { + window.IntervalWindow(0, 10): [ + (('k', 500), Timestamp(2)), + ], + } + + assert_that( + records, + equal_to_per_window(expected_window_to_elements), + use_global_window=False, + label='assert per window') + + def test_combiner_latest(self): + """Test TimestampCombiner with LATEST.""" + options = PipelineOptions(streaming=True) + with TestPipeline(options=options) as p: + result = (p + | TestStream() + .add_elements([window.TimestampedValue(('k', 100), 2)]) + .add_elements([window.TimestampedValue(('k', 400), 7)]) + .advance_watermark_to_infinity() + | beam.WindowInto( + window.FixedWindows(10), + timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST) + | beam.CombinePerKey(sum)) + + records = (result + | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts))) + + # All the KV pairs are applied GBK using LATEST timestamp for + # the same key. + expected_window_to_elements = { + window.IntervalWindow(0, 10): [ + (('k', 500), Timestamp(7)), + ], + } + + assert_that( + records, + equal_to_per_window(expected_window_to_elements), + use_global_window=False, + label='assert per window') if __name__ == '__main__': unittest.main() From 8e71e59eae495ab74c5c20f50456a5c9c5416905 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 27 Nov 2019 10:39:12 -0800 Subject: [PATCH 10/17] [BEAM-2929] Ensure that the Beam Java SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization. --- .../dataflow/DataflowPipelineTranslator.java | 8 ++- .../DataflowPipelineTranslatorTest.java | 62 ++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 41e5cbbff50e2..45e0c33d12e35 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -710,8 +711,11 @@ private void addOutput(String name, PValue value, Coder valueCoder) { String generatedName = String.format("%s.out%d", stepName, outputInfoList.size()); addString(outputInfo, PropertyNames.USER_NAME, generatedName); - if (value instanceof PCollection - && translator.runner.doesPCollectionRequireIndexedFormat((PCollection) value)) { + if ((value instanceof PCollection + && translator.runner.doesPCollectionRequireIndexedFormat((PCollection) value)) + || ((value instanceof PCollectionView) + && (Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + ((PCollectionView) value).getViewFn().getMaterialization().getUrn())))) { addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true); } if (valueCoder != null) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 85b1e22cbe775..c5c3ddca2d7e9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -736,8 +736,8 @@ public void testStreamingSplittableParDoTranslation() throws Exception { @Test public void testSplittableParDoTranslationFnApi() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); - DataflowRunner runner = DataflowRunner.fromOptions(options); options.setExperiments(Arrays.asList("beam_fn_api")); + DataflowRunner runner = DataflowRunner.fromOptions(options); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); Pipeline pipeline = Pipeline.create(options); @@ -851,6 +851,66 @@ public void testToIterableTranslationWithIsmSideInput() throws Exception { assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } + @Test + public void testToSingletonTranslationWithFnApiSideInput() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(Arrays.asList("beam_fn_api")); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1)).apply(View.asSingleton()); + DataflowRunner runner = DataflowRunner.fromOptions(options); + runner.replaceTransforms(pipeline); + Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob(); + assertAllStepOutputsHaveUniqueIds(job); + + List steps = job.getSteps(); + assertEquals(14, steps.size()); + + Step collectionToSingletonStep = steps.get(steps.size() - 1); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + + @SuppressWarnings("unchecked") + List> ctsOutputs = + (List>) + steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO); + assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format")); + } + + @Test + public void testToIterableTranslationWithFnApiSideInput() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView> does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(Arrays.asList("beam_fn_api")); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable()); + + DataflowRunner runner = DataflowRunner.fromOptions(options); + runner.replaceTransforms(pipeline); + Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob(); + assertAllStepOutputsHaveUniqueIds(job); + + List steps = job.getSteps(); + assertEquals(10, steps.size()); + + @SuppressWarnings("unchecked") + List> ctsOutputs = + (List>) + steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO); + assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format")); + Step collectionToSingletonStep = steps.get(steps.size() - 1); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + } + @Test public void testStepDisplayData() throws Exception { DataflowPipelineOptions options = buildPipelineOptions(); From 31acee91c6fbbd8d99491d7df3fd51e68c887945 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 27 Nov 2019 11:11:31 -0800 Subject: [PATCH 11/17] [BEAM-2929] Ensure that the Beam Go SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization. --- .../runners/dataflow/dataflowlib/messages.go | 7 ++++--- .../runners/dataflow/dataflowlib/translate.go | 20 +++++++++++++------ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go index dc0c36b9c54d2..6cf39fdab50c6 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go @@ -97,9 +97,10 @@ type propertiesWithPubSubMessage struct { } type output struct { - UserName string `json:"user_name,omitempty"` - OutputName string `json:"output_name,omitempty"` - Encoding *graphx.CoderRef `json:"encoding,omitempty"` + UserName string `json:"user_name,omitempty"` + OutputName string `json:"output_name,omitempty"` + Encoding *graphx.CoderRef `json:"encoding,omitempty"` + UseIndexedFormat bool `json:"use_indexed_format,omitempty"` } type integer struct { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go index 047d81853fca1..3a6701558db4f 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go @@ -137,7 +137,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er rem := reflectx.ShallowClone(t.Inputs).(map[string]string) prop.NonParallelInputs = make(map[string]*outputReference) - for key := range payload.SideInputs { + for key, side_input := range payload.SideInputs { // Side input require an additional conversion step, which must // be before the present one. delete(rem, key) @@ -146,16 +146,24 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er ref := x.pcollections[t.Inputs[key]] c := x.translateCoder(pcol, pcol.CoderId) + var output_info output + output_info = output{ + UserName: "i0", + OutputName: "i0", + Encoding: graphx.WrapIterable(c), + } + if graphx.URNMultimapSideInput == side_input.GetAccessPattern().GetUrn() { + output_info.UseIndexedFormat = true + } + side := &df.Step{ Name: fmt.Sprintf("view%v_%v", id, key), Kind: sideInputKind, Properties: newMsg(properties{ ParallelInput: ref, - OutputInfo: []output{{ - UserName: "i0", - OutputName: "i0", - Encoding: graphx.WrapIterable(c), - }}, + OutputInfo: []output{ + output_info, + }, UserName: userName(trunk, fmt.Sprintf("AsView%v_%v", id, key)), }), } From 9fc641ac8f7c1385c82e71e35d55c1d4d77a1147 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 27 Nov 2019 12:56:59 -0800 Subject: [PATCH 12/17] [BEAM-2929] Ensure that the Beam Python SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization. --- .../runners/dataflow/dataflow_runner.py | 22 ++++++++++++------- .../runners/dataflow/internal/names.py | 1 + 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d07abc4b3d878..718ab61bdbe9c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -607,7 +607,8 @@ def _add_step(self, step_kind, step_label, transform_node, side_tags=()): return step def _add_singleton_step( - self, label, full_label, tag, input_step, windowing_strategy): + self, label, full_label, tag, input_step, windowing_strategy, + access_pattern): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. from apache_beam.runners.dataflow.internal import apiclient @@ -620,12 +621,16 @@ def _add_singleton_step( PropertyNames.STEP_NAME: input_step.proto.name, PropertyNames.OUTPUT_NAME: input_step.get_output(tag)}) step.encoding = self._get_side_input_encoding(input_step.encoding) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{PropertyNames.USER_NAME: ( - '%s.%s' % (full_label, PropertyNames.OUTPUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + + output_info = { + PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT), + PropertyNames.ENCODING: step.encoding, + PropertyNames.OUTPUT_NAME: PropertyNames.OUT + } + if common_urns.side_inputs.MULTIMAP.urn == access_pattern: + output_info[PropertyNames.USE_INDEXED_FORMAT] = True + step.add_property(PropertyNames.OUTPUT_INFO, [output_info]) + step.add_property( PropertyNames.WINDOWING_STRATEGY, self.serialize_windowing_strategy(windowing_strategy)) @@ -820,7 +825,8 @@ def run_ParDo(self, transform_node, options): self._add_singleton_step( step_name, si_full_label, side_pval.pvalue.tag, self._cache.get_pvalue(side_pval.pvalue), - side_pval.pvalue.windowing) + side_pval.pvalue.windowing, + side_pval._side_input_data().access_pattern) si_dict[si_label] = { '@type': 'OutputReference', PropertyNames.STEP_NAME: step_name, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 5b2dd89c9f15e..fdce49b4a6d99 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -117,6 +117,7 @@ class PropertyNames(object): SOURCE_STEP_INPUT = 'custom_source_step_input' SERIALIZED_TEST_STREAM = 'serialized_test_stream' STEP_NAME = 'step_name' + USE_INDEXED_FORMAT = 'use_indexed_format' USER_FN = 'user_fn' USER_NAME = 'user_name' VALIDATE_SINK = 'validate_sink' From dc74f7d041bd06df52e0d552645cc94b501cc8a5 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 2 Dec 2019 15:56:29 -0800 Subject: [PATCH 13/17] [BEAM-2929] Fix go code format for 31acee91c6fbbd8d99491d7df3fd51e68c887945 --- .../beam/runners/dataflow/dataflowlib/translate.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go index 3a6701558db4f..2d3e1d6e0cb1c 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go @@ -137,7 +137,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er rem := reflectx.ShallowClone(t.Inputs).(map[string]string) prop.NonParallelInputs = make(map[string]*outputReference) - for key, side_input := range payload.SideInputs { + for key, sideInput := range payload.SideInputs { // Side input require an additional conversion step, which must // be before the present one. delete(rem, key) @@ -146,14 +146,14 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er ref := x.pcollections[t.Inputs[key]] c := x.translateCoder(pcol, pcol.CoderId) - var output_info output - output_info = output{ + var outputInfo output + outputInfo = output{ UserName: "i0", OutputName: "i0", Encoding: graphx.WrapIterable(c), } - if graphx.URNMultimapSideInput == side_input.GetAccessPattern().GetUrn() { - output_info.UseIndexedFormat = true + if graphx.URNMultimapSideInput == sideInput.GetAccessPattern().GetUrn() { + outputInfo.UseIndexedFormat = true } side := &df.Step{ @@ -162,7 +162,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er Properties: newMsg(properties{ ParallelInput: ref, OutputInfo: []output{ - output_info, + outputInfo, }, UserName: userName(trunk, fmt.Sprintf("AsView%v_%v", id, key)), }), From d1451c02289dcddd09ce2d38d146393bb3709361 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Mon, 2 Dec 2019 18:00:47 -0800 Subject: [PATCH 14/17] Increase overhaed budget for test_sampler_transition_overhead --- sdks/python/apache_beam/runners/worker/statesampler_test.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/statesampler_test.py b/sdks/python/apache_beam/runners/worker/statesampler_test.py index 855d48d811521..ed51ae14bdb1f 100644 --- a/sdks/python/apache_beam/runners/worker/statesampler_test.py +++ b/sdks/python/apache_beam/runners/worker/statesampler_test.py @@ -123,15 +123,11 @@ def test_sampler_transition_overhead(self): state_transition_count = sampler.get_info().transition_count overhead_us = 1000000.0 * elapsed_time / state_transition_count - # TODO: This test is flaky when it is run under load. A better solution - # would be to change the test structure to not depend on specific timings. - overhead_us = 2 * overhead_us - _LOGGER.info('Overhead per transition: %fus', overhead_us) # Conservative upper bound on overhead in microseconds (we expect this to # take 0.17us when compiled in opt mode or 0.48 us when compiled with in # debug mode). - self.assertLess(overhead_us, 10.0) + self.assertLess(overhead_us, 20.0) if __name__ == '__main__': From 577a629b034b6e55d0cf2133c0e9a22f4d953f7d Mon Sep 17 00:00:00 2001 From: David Song Date: Mon, 2 Dec 2019 19:01:43 -0800 Subject: [PATCH 15/17] [BEAM-8814] Changed no_auth option from bool to store_true (#10202) [BEAM-8814] Changed no_auth option from bool to store_true (#10202) Changed no_auth option in pipeline_options to be store_true, so that behavior is as expected when passing in --no_auth=false. Previously this would still evaluate to true. * Updated no_auth help message --- sdks/python/apache_beam/options/pipeline_options.py | 6 +++++- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 4 ++-- .../apache_beam/runners/dataflow/template_runner_test.py | 4 ++-- sdks/python/apache_beam/runners/runner_test.py | 2 +- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 8ab71374d76bd..abe9b84f06241 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -465,7 +465,11 @@ def _add_argparse_args(cls, parser): parser.add_argument('--service_account_email', default=None, help='Identity to run virtual machines as.') - parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False) + parser.add_argument('--no_auth', + dest='no_auth', + action='store_true', + default=False, + help='Skips authorizing credentials with Google Cloud.') # Option to run templated pipelines parser.add_argument('--template_location', default=None, diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 67cb04519e6b2..c47ab8833b3a0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -96,7 +96,7 @@ def setUp(self): '--project=test-project', '--staging_location=ignored', '--temp_location=/dev/null', - '--no_auth=True', + '--no_auth', '--dry_run=True'] @mock.patch('time.sleep', return_value=None) @@ -295,7 +295,7 @@ def test_no_group_by_key_directly_after_bigquery(self): '--project=test-project', '--staging_location=ignored', '--temp_location=/dev/null', - '--no_auth=True' + '--no_auth' ])) rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable')) with self.assertRaises(ValueError, diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py index 82eb76b453c4f..a3988bcae6d8e 100644 --- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py @@ -61,7 +61,7 @@ def test_full_completion(self): '--staging_location=' + dummy_dir, '--temp_location=/dev/null', '--template_location=' + dummy_file_name, - '--no_auth=True'])) + '--no_auth'])) pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned pipeline.run().wait_until_finish() @@ -86,7 +86,7 @@ def test_bad_path(self): '--staging_location=ignored', '--temp_location=/dev/null', '--template_location=/bad/path', - '--no_auth=True'])) + '--no_auth'])) remote_runner.job = apiclient.Job(pipeline._options, pipeline.to_runner_api()) diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 6527e81a1c85a..914fa12a8ceec 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -39,7 +39,7 @@ class RunnerTest(unittest.TestCase): '--project=test-project', '--staging_location=ignored', '--temp_location=/dev/null', - '--no_auth=True'] + '--no_auth'] def test_create_runner(self): self.assertTrue( From 9ae566773ac1025241f353ae1bbb1a95af8cb65c Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Thu, 21 Nov 2019 15:24:48 +0100 Subject: [PATCH 16/17] [BEAM-8470] Update capability matrix: add Spark Structured Streaming runner --- website/src/_data/capability-matrix.yml | 206 +++++++++++++++++++----- 1 file changed, 164 insertions(+), 42 deletions(-) diff --git a/website/src/_data/capability-matrix.yml b/website/src/_data/capability-matrix.yml index fb3eaa0b694d1..307824bed7ba3 100644 --- a/website/src/_data/capability-matrix.yml +++ b/website/src/_data/capability-matrix.yml @@ -18,8 +18,10 @@ columns: name: Google Cloud Dataflow - class: flink name: Apache Flink - - class: spark - name: Apache Spark + - class: spark-rdd + name: Apache Spark (RDD/DStream based) + - class: spark-dataset + name: Apache Spark Structured Streaming (Dataset based) - class: apex name: Apache Apex - class: gearpump @@ -59,10 +61,14 @@ categories: l1: 'Yes' l2: fully supported l3: ParDo itself, as per-element transformation with UDFs, is fully supported by Flink for both batch and streaming. - - class: spark + - class: spark-rdd l1: 'Yes' l2: fully supported l3: ParDo applies per-element transformations as Spark FlatMapFunction. + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: ParDo applies per-element transformations as Spark FlatMapFunction. - class: apex l1: 'Yes' l2: fully supported @@ -109,10 +115,14 @@ categories: l1: 'Yes' l2: fully supported l3: "Uses Flink's keyBy for key grouping. When grouping by window in streaming (creating the panes) the Flink runner uses the Beam code. This guarantees support for all windowing and triggering mechanisms." - - class: spark + - class: spark-rdd l1: 'Partially' l2: fully supported in batch mode l3: "Using Spark's groupByKey. GroupByKey with multiple trigger firings in streaming mode is a work in progress." + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: "Using Spark's groupByKey." - class: apex l1: 'Yes' l2: fully supported @@ -159,10 +169,14 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: fully supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: Some corner cases like flatten on empty collections are not yet supported. - class: apex l1: 'Yes' l2: fully supported @@ -209,10 +223,14 @@ categories: l1: 'Yes' l2: 'fully supported' l3: Uses a combiner for pre-aggregation for batch and streaming. - - class: spark + - class: spark-rdd l1: 'Yes' l2: fully supported l3: "Using Spark's combineByKey and aggregate functions." + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: "Using Spark's Aggregator and agg function" - class: apex l1: 'Yes' l2: 'fully supported' @@ -259,10 +277,14 @@ categories: l1: 'Partially' l2: supported via inlining l3: '' - - class: spark + - class: spark-rdd l1: 'Partially' l2: supported via inlining l3: '' + - class: spark-dataset + l1: 'Partially' + l2: supported via inlining only in batch mode + l3: '' - class: apex l1: 'Partially' l2: supported via inlining @@ -309,10 +331,14 @@ categories: l1: 'Yes' l2: some size restrictions in streaming l3: Batch mode supports a distributed implementation, but streaming mode may force some size restrictions. Neither mode is able to push lookups directly up into key-based sources. - - class: spark + - class: spark-rdd l1: 'Yes' l2: fully supported l3: "Using Spark's broadcast variables. In streaming mode, side inputs may update but only between micro-batches." + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: "Using Spark's broadcast variables." - class: apex l1: 'Yes' l2: size restrictions @@ -359,10 +385,14 @@ categories: l1: 'Yes' l2: fully supported l3: - - class: spark + - class: spark-rdd l1: 'Yes' l2: fully supported - l3: + l3: + - class: spark-dataset + l1: 'Partially' + l2: bounded source only + l3: "Using Spark's DatasourceV2 API in microbatch mode (Continuous streaming mode is tagged experimental in spark and does not support aggregation)." - class: apex l1: 'Yes' l2: fully supported @@ -409,10 +439,14 @@ categories: l1: 'Yes' l2: l3: - - class: spark + - class: spark-rdd l1: 'Partially' l2: supports bounded-per-element SDFs l3: + - class: spark-dataset + l1: 'No' + l2: not implemented + l3: - class: apex l1: 'Partially' l2: supports bounded-per-element SDFs @@ -459,10 +493,14 @@ categories: l1: 'Partially' l2: All metrics types are supported. l3: Only attempted values are supported. No committed values for metrics. - - class: spark + - class: spark-rdd l1: 'Partially' l2: All metric types are supported. l3: Only attempted values are supported. No committed values for metrics. + - class: spark-dataset + l1: 'Partially' + l2: All metric types are supported in batch mode. + l3: Only attempted values are supported. No committed values for metrics. - class: apex l1: 'No' l2: Not implemented in runner. @@ -509,10 +547,14 @@ categories: l1: 'Partially' l2: non-merging windows l3: State is supported for non-merging windows. SetState and MapState are not yet supported. - - class: spark + - class: spark-rdd + l1: 'Partially' + l2: full support in batch mode + l3: + - class: spark-dataset l1: 'No' l2: not implemented - l3: Spark supports per-key state with mapWithState() so support should be straightforward. + l3: - class: apex l1: 'Partially' l2: non-merging windows @@ -566,10 +608,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -616,10 +662,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -666,10 +716,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -716,10 +770,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -766,10 +824,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -816,10 +878,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -866,10 +932,14 @@ categories: l1: 'Yes' l2: supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: supported l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: supported @@ -925,9 +995,13 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark - l1: 'No' - l2: '' + - class: spark-rdd + l1: 'Yes' + l2: fully supported + l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode l3: '' - class: apex l1: 'Yes' @@ -976,9 +1050,13 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark - l1: 'No' - l2: '' + - class: spark-rdd + l1: 'Yes' + l2: fully supported + l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode l3: '' - class: apex l1: 'Yes' @@ -1027,10 +1105,14 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: "This is Spark streaming's native model" l3: "Spark processes streams in micro-batches. The micro-batch size is actually a pre-set, fixed, time interval. Currently, the runner takes the first window size in the pipeline and sets it's size as the batch interval. Any following window operations will be considered processing time windows and will affect triggering." + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: - class: apex l1: 'Yes' l2: fully supported @@ -1078,9 +1160,13 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark - l1: 'No' - l2: '' + - class: spark-rdd + l1: 'Yes' + l2: fully supported + l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode l3: '' - class: apex l1: 'Yes' @@ -1130,10 +1216,14 @@ categories: l1: 'No' l2: pending model support l3: - - class: spark + - class: spark-rdd l1: 'No' l2: pending model support l3: + - class: spark-dataset + l1: 'No' + l2: pending model support + l3: - class: apex l1: 'No' l2: pending model support @@ -1181,9 +1271,13 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark - l1: 'No' - l2: '' + - class: spark-rdd + l1: 'Yes' + l2: fully supported + l3: '' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode l3: '' - class: apex l1: 'Yes' @@ -1232,10 +1326,14 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark + - class: spark-rdd l1: 'No' l2: '' l3: '' + - class: spark-dataset + l1: 'No' + l2: no streaming support in the runner + l3: '' - class: apex l1: 'Yes' l2: fully supported @@ -1283,7 +1381,11 @@ categories: l1: 'Partially' l2: non-merging windows l3: The Flink Runner supports timers in non-merging windows. - - class: spark + - class: spark-rdd + l1: 'Partially' + l2: fully supported in batch mode + l3: '' + - class: spark-dataset l1: 'No' l2: not implemented l3: '' @@ -1342,10 +1444,14 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark + - class: spark-rdd l1: 'Yes' l2: fully supported l3: 'Spark streaming natively discards elements after firing.' + - class: spark-dataset + l1: 'Partially' + l2: fully supported in batch mode + l3: '' - class: apex l1: 'Yes' l2: fully supported @@ -1393,7 +1499,11 @@ categories: l1: 'Yes' l2: fully supported l3: '' - - class: spark + - class: spark-rdd + l1: 'No' + l2: '' + l3: '' + - class: spark-dataset l1: 'No' l2: '' l3: '' @@ -1445,7 +1555,11 @@ categories: l1: 'No' l2: pending model support l3: '' - - class: spark + - class: spark-rdd + l1: 'No' + l2: pending model support + l3: '' + - class: spark-dataset l1: 'No' l2: pending model support l3: '' @@ -1502,10 +1616,14 @@ categories: l1: 'Partially' l2: l3: Flink supports taking a "savepoint" of the pipeline and shutting the pipeline down after its completion. - - class: spark + - class: spark-rdd l1: l2: l3: + - class: spark-dataset + l1: + l2: + l3: - class: apex l1: l2: @@ -1548,10 +1666,14 @@ categories: l1: 'Partially' l2: l3: Flink has a native savepoint capability. - - class: spark + - class: spark-rdd l1: 'Partially' l2: l3: Spark has a native savepoint capability. + - class: spark-dataset + l1: 'No' + l2: + l3: not implemented - class: apex l1: l2: From 9b4d36730e4eb88e71b8aea721bb3904dc6c040e Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Mon, 25 Nov 2019 11:17:11 +0100 Subject: [PATCH 17/17] [BEAM-8470] Update Spark runner page: add Spark Structured Streaming runner --- website/src/documentation/runners/spark.md | 155 ++++++++++++++++----- 1 file changed, 120 insertions(+), 35 deletions(-) diff --git a/website/src/documentation/runners/spark.md b/website/src/documentation/runners/spark.md index fa48df67099c5..cd287107c7ad4 100644 --- a/website/src/documentation/runners/spark.md +++ b/website/src/documentation/runners/spark.md @@ -33,42 +33,47 @@ The Spark Runner executes Beam pipelines on top of Apache Spark, providing: The [Beam Capability Matrix]({{ site.baseurl }}/documentation/runners/capability-matrix/) documents the currently supported capabilities of the Spark Runner. -_**Note:**_ _support for the Beam Model in streaming is currently experimental, follow-up in the [mailing list]({{ site.baseurl }}/get-started/support/) for status updates._ +## Three flavors of the Spark runner +The Spark runner comes in three flavors: -## Portability +1. A *legacy Runner* which supports only Java (and other JVM-based languages) and that is based on Spark RDD/DStream +2. An *Structured Streaming Spark Runner* which supports only Java (and other JVM-based languages) and that is based on Spark Datasets and the [Apache Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) framework. +> **Note:** It is still experimental, its coverage of the Beam model is partial. As for now it only supports batch mode. +2. A *portable Runner* which supports Java, Python, and Go -The Spark runner comes in two flavors: +This guide is split into two parts to document the non-portable and +the portable functionality of the Spark Runner. Please use the switcher below to +select the appropriate Runner: -1. A *legacy Runner* which supports only Java (and other JVM-based languages) -2. A *portable Runner* which supports Java, Python, and Go +## Which runner to use: portable or non portable runner? Beam and its Runners originally only supported JVM-based languages (e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The architecture of the Runners had to be changed significantly to support executing pipelines written in other languages. -If your applications only use Java, then you should currently go with the legacy -Runner. If you want to run Python or Go pipelines with Beam on Spark, you need to use +If your applications only use Java, then you should currently go with one of the java based runners. +If you want to run Python or Go pipelines with Beam on Spark, you need to use the portable Runner. For more information on portability, please visit the [Portability page]({{site.baseurl }}/roadmap/portability/). -This guide is split into two parts to document the legacy and -the portable functionality of the Spark Runner. Please use the switcher below to -select the appropriate Runner: + ## Spark Runner prerequisites and setup -The Spark runner currently supports Spark's 2.x branch, and more specifically any version greater than 2.2.0. +The Spark runner currently supports Spark's 2.x branch, and more specifically any version greater than 2.4.0. -You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following: + +You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following: + ```java @@ -80,23 +85,27 @@ The Spark runner currently supports Spark's 2.x branch, and more specifically an ### Deploying Spark with your application -In some cases, such as running in local mode/Standalone, your (self-contained) application would be required to pack Spark by explicitly adding the following dependencies in your pom.xml: + +In some cases, such as running in local mode/Standalone, your (self-contained) application would be required to pack Spark by explicitly adding the following dependencies in your pom.xml: + ```java org.apache.spark - spark-core_2.10 + spark-core_2.11 ${spark.version} org.apache.spark - spark-streaming_2.10 + spark-streaming_2.11 ${spark.version} ``` -And shading the application jar using the maven shade plugin: + +And shading the application jar using the maven shade plugin: + ```java @@ -134,17 +143,33 @@ The Spark runner currently supports Spark's 2.x branch, and more specifically an ``` -After running mvn package, run ls target and you should see (assuming your artifactId is `beam-examples` and the version is `1.0.0`): + +After running mvn package, run ls target and you should see (assuming your artifactId is `beam-examples` and the version is `1.0.0`): + - +```java beam-examples-1.0.0-shaded.jar - +``` + + +To run against a Standalone cluster simply run: + -To run against a Standalone cluster simply run: + +
For RDD/DStream based runner:
+
- +```java spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner - +``` + + +
For Structured Streaming based runner:
+
+ +```java +spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner +``` You will need Docker to be installed in your execution environment. To develop @@ -164,8 +189,7 @@ download it on the [Downloads page]({{ site.baseurl available. -1. Start the JobService endpoint: `./gradlew :runners:spark:job-server:runShadow` - +1. Start the JobService endpoint: `./gradlew :runners:spark:job-server:runShadow` The JobService is the central instance where you submit your Beam pipeline. @@ -174,8 +198,7 @@ job. To execute the job on a Spark cluster, the Beam JobService needs to be provided with the Spark master address. -2. Submit the Python pipeline to the above endpoint by using the `PortableRunner`, `job_endpoint` set to `localhost:8099` (this is the default address of the JobService), and `environment_type` set to `LOOPBACK`. For example: - +2. Submit the Python pipeline to the above endpoint by using the `PortableRunner`, `job_endpoint` set to `localhost:8099` (this is the default address of the JobService), and `environment_type` set to `LOOPBACK`. For example: ```py import apache_beam as beam @@ -195,16 +218,13 @@ with beam.Pipeline(options) as p: Deploying your Beam pipeline on a cluster that already has a Spark deployment (Spark classes are available in container classpath) does not require any additional dependencies. For more details on the different deployment modes see: [Standalone](http://spark.apache.org/docs/latest/spark-standalone.html), [YARN](http://spark.apache.org/docs/latest/running-on-yarn.html), or [Mesos](http://spark.apache.org/docs/latest/running-on-mesos.html). -1. Start a Spark cluster which exposes the master on port 7077 by default. - +1. Start a Spark cluster which exposes the master on port 7077 by default. -2. Start JobService that will connect with the Spark master: `./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077`. - +2. Start JobService that will connect with the Spark master: `./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077`. 3. Submit the pipeline as above. Note however that `environment_type=LOOPBACK` is only intended for local testing. -See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details. - +See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details. (Note that, depending on your cluster setup, you may need to change the `environment_type` option. @@ -215,6 +235,10 @@ See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for detai When executing your pipeline with the Spark Runner, you should consider the following pipeline options. + +
For RDD/DStream based runner:
+
+ @@ -253,6 +277,53 @@ When executing your pipeline with the Spark Runner, you should consider the foll
Field
+ +
For Structured Streaming based runner:
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FieldDescriptionDefault Value
runnerThe pipeline runner to use. This option allows you to determine the pipeline runner at runtime.Set to SparkStructuredStreamingRunner to run using Spark Structured Streaming.
sparkMasterThe url of the Spark Master. This is the equivalent of setting SparkConf#setMaster(String) and can either be local[x] to run local with x cores, spark://host:port to connect to a Spark Standalone cluster, mesos://host:port to connect to a Mesos cluster, or yarn to connect to a yarn cluster.local[4]
testModeEnable test mode that gives useful debugging information: catalyst execution plans and Beam DAG printingfalse
enableSparkMetricSinksEnable reporting metrics to Spark's metrics Sinks.true
checkpointDirA checkpoint directory for streaming resilience, ignored in batch. For durability, a reliable filesystem such as HDFS/S3/GS is necessary.local dir in /tmp
filesToStageJar-Files to send to all workers and put on the classpath.all files from the classpath
EnableSparkMetricSinksEnable/disable sending aggregator values to Spark's metric sinkstrue
+ @@ -293,14 +364,28 @@ Spark provides a [metrics system](http://spark.apache.org/docs/latest/monitoring ### Streaming Execution +
For RDD/DStream based runner:
If your pipeline uses an UnboundedSource the Spark Runner will automatically set streaming mode. Forcing streaming mode is mostly used for testing and is not recommended. +
+
For Structured Streaming based runner:
+Streaming mode is not implemented yet in the Spark Structured Streaming runner. +
+ + +Streaming is not yet supported on the Spark portable runner. -Streaming is not yet supported on the Spark portable runner. ### Using a provided SparkContext and StreamingListeners +
For RDD/DStream based runner:
If you would like to execute your Spark job with a provided SparkContext, such as when using the [spark-jobserver](https://github.com/spark-jobserver/spark-jobserver), or use StreamingListeners, you can't use SparkPipelineOptions (the context or a listener cannot be passed as a command-line argument anyway). Instead, you should use SparkContextOptions which can only be used programmatically and is not a common PipelineOptions implementation. +
+
For Structured Streaming based runner:
+Provided SparkSession and StreamingListeners are not supported on the Spark Structured Streaming runner +
+ + +Provided SparkContext and StreamingListeners are not supported on the Spark portable runner. -Provided SparkContext and StreamingListeners are not supported on the Spark portable runner.
Field