From 10d999f4a30e7d539225d790a5b32dcacabe52ce Mon Sep 17 00:00:00 2001 From: Victor Chen Date: Wed, 16 Mar 2022 15:08:59 -0400 Subject: [PATCH] [BEAM-14071] Enabling Flink on Dataproc for Interactive Beam --- .../runners/interactive/cache_manager.py | 9 +++++- .../dataproc/dataproc_cluster_manager.py | 12 +++---- .../runners/interactive/interactive_beam.py | 10 ++++-- .../interactive/interactive_environment.py | 6 ++-- .../runners/interactive/interactive_runner.py | 5 ++- .../interactive/interactive_runner_test.py | 32 +++++++++++++++++-- 6 files changed, 56 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 300f3a3b5efc0..1960733ba38d3 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -190,7 +190,14 @@ def __init__(self, cache_dir=None, cache_format='text'): def size(self, *labels): if self.exists(*labels): - return sum(os.path.getsize(path) for path in self._match(*labels)) + matched_path = self._match(*labels) + # if any matched path has a gs:// prefix, it must be cached on GCS + if 'gs://' in matched_path[0]: + from apache_beam.io.gcp import gcsio + return sum( + sum(gcsio.GcsIO().list_prefix(path).values()) + for path in matched_path) + return sum(os.path.getsize(path) for path in matched_path) return 0 def exists(self, *labels): diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index 4a9c688f2f97d..a5dc28dd2e22c 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -25,9 +25,9 @@ from typing import Tuple from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.runners.interactive import interactive_beam as ib from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive.utils import progress_indicated -from apache_beam.version import __version__ as beam_version try: from google.cloud import dataproc_v1 @@ -67,9 +67,6 @@ class DataprocClusterManager: required for creating and deleting Dataproc clusters for use under Interactive Beam. """ - IMAGE_VERSION = '2.0.31-debian10' - STAGING_LOG_NAME = 'dataproc-startup-script_output' - def __init__(self, cluster_metadata: MasterURLIdentifier) -> None: """Initializes the DataprocClusterManager with properties required to interface with the Dataproc ClusterControllerClient. @@ -162,13 +159,12 @@ def create_flink_cluster(self) -> None: 'cluster_name': self.cluster_metadata.cluster_name, 'config': { 'software_config': { - 'image_version': self.IMAGE_VERSION, + 'image_version': ib.clusters.DATAPROC_IMAGE_VERSION, 'optional_components': ['DOCKER', 'FLINK'] }, 'gce_cluster_config': { 'metadata': { - 'flink-start-yarn-session': 'true', - 'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version) + 'flink-start-yarn-session': 'true' }, 'service_account_scopes': [ 'https://www.googleapis.com/auth/cloud-platform' @@ -310,7 +306,7 @@ def get_master_url_and_dashboard( """Returns the master_url of the current cluster.""" startup_logs = [] for file in self._fs._list(staging_bucket): - if self.STAGING_LOG_NAME in file.path: + if ib.clusters.DATAPROC_STAGING_LOG_NAME in file.path: startup_logs.append(file.path) for log in startup_logs: diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 6098b07ab09c5..e3a8457b4a4e9 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -349,6 +349,13 @@ class Clusters: Example of calling the Interactive Beam clusters describe method:: ib.clusters.describe() """ + # Explicitly set the Flink version here to ensure compatibility with 2.0 + # Dataproc images: + # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0 + DATAPROC_FLINK_VERSION = '1.12' + DATAPROC_IMAGE_VERSION = '2.0.31-debian10' + DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output' + def __init__(self) -> None: """Instantiates default values for Dataproc cluster interactions. """ @@ -457,8 +464,7 @@ def cleanup( # Examples: # ib.clusters.describe(p) # Check the docstrings for detailed usages. -# TODO(victorhc): Resolve connection issue and add a working example -# clusters = Clusters() +clusters = Clusters() def watch(watchable): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py index befb564ee80e6..88297f9bb0d3c 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py @@ -170,10 +170,8 @@ def __init__(self): self._test_stream_service_controllers = {} self._cached_source_signature = {} self._tracked_user_pipelines = UserPipelineTracker() - # TODO(victorhc): remove the cluster instantiation after the - # interactive_beam.clusters class has been enabled. - from apache_beam.runners.interactive.interactive_beam import Clusters - self.clusters = Clusters() + from apache_beam.runners.interactive.interactive_beam import clusters + self.clusters = clusters # Tracks the computation completeness of PCollections. PCollections tracked # here don't need to be re-computed when data introspection is needed. diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index a384be9bdebe4..b2da72cadf298 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -145,7 +145,10 @@ def run_pipeline(self, pipeline, options): master_url = self._get_dataproc_cluster_master_url_if_applicable( user_pipeline) if master_url: - options.view_as(FlinkRunnerOptions).flink_master = master_url + flink_options = options.view_as(FlinkRunnerOptions) + flink_options.flink_master = master_url + flink_options.flink_version = ie.current_env( + ).clusters.DATAPROC_FLINK_VERSION pipeline_instrument = inst.build_pipeline_instrument(pipeline, options) # The user_pipeline analyzed might be None if the pipeline given has nothing diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index 47dedd6578c6e..7d6510b49535b 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -31,6 +31,7 @@ import apache_beam as beam from apache_beam.dataframe.convert import to_dataframe +from apache_beam.options.pipeline_options import FlinkRunnerOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.direct import direct_runner @@ -543,12 +544,39 @@ def test_get_master_url_flink_master_provided(self): from apache_beam.runners.portability.flink_runner import FlinkRunner p = beam.Pipeline( interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()), - options=PipelineOptions( - flink_master='--flink_master=example.internal:1')) + options=PipelineOptions(flink_master='--flink_master=test.internal:1')) runner._get_dataproc_cluster_master_url_if_applicable(p) self.assertEqual(ie.current_env().clusters.describe(), {}) ie.current_env().clusters = ib.Clusters() + @unittest.skipIf( + not ie.current_env().is_interactive_ready, + '[interactive] dependency is not installed.') + @patch( + 'apache_beam.runners.interactive.interactive_runner.' + 'InteractiveRunner._get_dataproc_cluster_master_url_if_applicable', + return_value='test.internal:1') + def test_set_flink_dataproc_version(self, mock_get_master_url): + runner = interactive_runner.InteractiveRunner() + options = options = PipelineOptions() + p = beam.Pipeline(interactive_runner.InteractiveRunner()) + + # Watch the local scope for Interactive Beam so that values will be cached. + ib.watch(locals()) + + # This is normally done in the interactive_utils when a transform is + # applied but needs an IPython environment. So we manually run this here. + ie.current_env().track_user_pipelines() + + # Run the pipeline + runner.run_pipeline(p, options) + + # Check that the Flink version is set to the Dataproc image Flink version + # inside ib.clusters. + self.assertEqual( + options.view_as(FlinkRunnerOptions).flink_version, + ib.clusters.DATAPROC_FLINK_VERSION) + if __name__ == '__main__': unittest.main()