From e5a27f485f7c6deabecb8de6e65047b6debc17c5 Mon Sep 17 00:00:00 2001 From: Victor Chen Date: Fri, 25 Feb 2022 17:50:00 -0500 Subject: [PATCH] Introducing dashboard link to DataprocClusterManager and master_urls_to_dashboards mapping --- .../dataproc/dataproc_cluster_manager.py | 40 ++++++++++++++---- .../dataproc/dataproc_cluster_manager_test.py | 41 +++++-------------- .../runners/interactive/interactive_beam.py | 11 +++-- .../interactive/interactive_beam_test.py | 17 ++++---- .../runners/interactive/interactive_runner.py | 24 ++++++++--- .../interactive/interactive_runner_test.py | 8 ++-- .../runners/interactive/utils_test.py | 3 -- 7 files changed, 83 insertions(+), 61 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index 9b88d701c7fb6..e4bbb055ef24d 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -22,6 +22,7 @@ from dataclasses import dataclass from threading import Thread from typing import Optional +from typing import Tuple from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.interactive import interactive_environment as ie @@ -89,8 +90,11 @@ def __init__(self, cluster_metadata: MasterURLIdentifier) -> None: if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse: self.master_url = ie.current_env().clusters.master_urls.inverse[ self.cluster_metadata] + self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[ + self.master_url] else: self.master_url = None + self.dashboard = None self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions()) self._staging_directory = None @@ -117,7 +121,9 @@ def create_cluster(self, cluster: dict) -> None: 'Cluster created successfully: %s', self.cluster_metadata.cluster_name) self._staging_directory = self.get_staging_location(self.cluster_metadata) - self.master_url = self.get_master_url(self._staging_directory) + self.master_url, self.dashboard = self.get_master_url_and_dashboard( + self.cluster_metadata, + self._staging_directory) except Exception as e: if e.code == 409: _LOGGER.info( @@ -202,10 +208,12 @@ def describe(self) -> None: """Returns a dictionary describing the cluster.""" return { 'cluster_metadata': self.cluster_metadata, - 'master_url': self.master_url + 'master_url': self.master_url, + 'dashboard': self.dashboard } def get_cluster_details(self, cluster_metadata): + """Gets the Dataproc_v1 Cluster object for the current cluster manager.""" return self._cluster_client.get_cluster( request={ 'project_id': cluster_metadata.project_id, @@ -240,21 +248,39 @@ def get_staging_location(self, cluster_metadata): cluster_metadata.cluster_name) raise e - def get_master_url(self, staging_bucket) -> None: + def parse_master_url_and_dashboard(self, cluster_metadata, + line: str) -> Tuple[str, str]: + """Parses the master_url and YARN application_id of the Flink process from + an input line. + + Returns the flink_master_url and dashboard link as a tuple.""" + cluster_details = self.get_cluster_details(cluster_metadata) + yarn_endpoint = cluster_details.config.endpoint_config.http_ports[ + 'YARN ResourceManager'] + segment = line.split('Found Web Interface ')[1].split(' of application ') + master_url = segment[0] + application_id = segment[1].rstrip('.\n').strip('\'') + dashboard = yarn_endpoint.rstrip( + 'yarn/') + '/gateway/default/yarn/proxy/' + application_id + '/' + return master_url, dashboard + + def get_master_url_and_dashboard( + self, cluster_metadata, + staging_bucket) -> Tuple[Optional[str], Optional[str]]: """Returns the master_url of the current cluster.""" startup_logs = [] for file in self._fs._list(staging_bucket): if self.STAGING_LOG_NAME in file.path: startup_logs.append(file.path) - master_url = None for log in startup_logs: content = self._fs.open(log) for line in content.readlines(): decoded_line = line.decode() - if 'FLINK_MASTER_URL=' in decoded_line: - return decoded_line.split('FLINK_MASTER_URL=')[1].strip('\n') - return master_url + if 'Found Web Interface' in decoded_line: + return self.parse_master_url_and_dashboard( + cluster_metadata, decoded_line) + return None, None def cleanup_staging_files(self, staging_directory): staging_files = [file.path for file in self._fs._list(staging_directory)] diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py index 172f2c8130027..6d6026239e068 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py @@ -33,6 +33,11 @@ _dataproc_imported = True +class MockProperty: + def __init__(self, property, value): + object.__setattr__(self, property, value) + + class MockException(Exception): def __init__(self, code=-1): self.code = code @@ -40,33 +45,8 @@ def __init__(self, code=-1): class MockCluster: def __init__(self, config_bucket=None): - self.config = MockConfig(config_bucket) - self.status = MockStatus() - - -class MockStatus: - def __init__(self): - self.state = MockState('RUNNING') - - -class MockState: - def __init__(self, name=None): - self.name = name - - -class MockThread: - def __init__(self, target=None): - self.target = None - - -class MockConfig: - def __init__(self, config_bucket=None): - self.config_bucket = config_bucket - - -class MockFileMetadata: - def __init__(self, path): - self.path = path + self.config = MockProperty('config_bucket', config_bucket) + self.status = MockProperty('state', MockProperty('name', None)) @unittest.skipIf(not _dataproc_imported, 'dataproc package was not imported.') @@ -200,11 +180,12 @@ def test_cleanup_other_exception(self, mock_cluster_client, mock_cleanup): self.assertRaises(MockException, cluster_manager.cleanup) self.assertTrue('Failed to delete cluster' in context_manager.output[0]) - @patch('threading.Thread', return_value=MockThread) + @patch('threading.Thread', return_value=MockProperty('target', None)) @patch( 'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._list', return_value=[ - MockFileMetadata( + MockProperty( + 'path', 'gs://test-bucket/google-cloud-dataproc-metainfo' '/test-cluster/item') ]) @@ -226,7 +207,7 @@ def test_get_staging_location( cluster_manager.get_staging_location(cluster_metadata), 'gs://test-bucket/google-cloud-dataproc-metainfo/') - @patch('threading.Thread', return_value=MockThread) + @patch('threading.Thread', return_value=MockProperty('target', None)) @patch( 'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster', side_effect=MockException()) diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 4874d3c83e76c..16308911f8bb5 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -364,20 +364,23 @@ def __init__(self) -> None: # pipelines that use the corresponding master_url. self.master_urls_to_pipelines: DefaultDict[ str, List[beam.Pipeline]] = defaultdict(list) + # self.master_urls_to_dashboards map string master_urls to the + # corresponding Apache Flink dashboards. + self.master_urls_to_dashboards: Dict[str, str] = {} def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict: """Returns a description of the cluster associated to the given pipeline. If no pipeline is given then this returns a dictionary of descriptions for - all pipelines. + all pipelines, mapped to by id. """ description = { - ie.current_env().pipeline_id_to_pipeline(pid): dcm.describe() + pid: dcm.describe() for pid, dcm in self.dataproc_cluster_managers.items() } if pipeline: - return description.get(pipeline, None) + return description.get(str(id(pipeline)), None) return description def cleanup( @@ -419,6 +422,7 @@ def cleanup( cluster_manager.cleanup() self.master_urls.pop(master_url, None) self.master_urls_to_pipelines.pop(master_url, None) + self.master_urls_to_dashboards.pop(master_url, None) self.dataproc_cluster_managers.pop(str(id(pipeline)), None) else: cluster_manager_identifiers = set() @@ -429,6 +433,7 @@ def cleanup( self.dataproc_cluster_managers.clear() self.master_urls.clear() self.master_urls_to_pipelines.clear() + self.master_urls_to_dashboards.clear() # Users can set options to guide how Interactive Beam works. diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py index b14848c0afc62..4541463c1d9fc 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py @@ -306,15 +306,16 @@ def test_clusters_describe(self): region=region, )) cluster_metadata = MasterURLIdentifier(project_id=project, region=region) - clusters.dataproc_cluster_managers[p] = DataprocClusterManager( - cluster_metadata) - self.assertEqual('test-project', clusters.describe()[None] \ - ['cluster_metadata'].project_id) + clusters.dataproc_cluster_managers[str( + id(p))] = DataprocClusterManager(cluster_metadata) + self.assertEqual( + 'test-project', + clusters.describe()[str(id(p))]['cluster_metadata'].project_id) @patch( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' - 'DataprocClusterManager.get_master_url', - return_value='test-master-url') + 'DataprocClusterManager.get_master_url_and_dashboard', + return_value=('test-master-url', None)) @patch( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' 'DataprocClusterManager.cleanup', @@ -350,8 +351,8 @@ def test_clusters_cleanup_forcefully(self, mock_cleanup, mock_master_url): @patch( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' - 'DataprocClusterManager.get_master_url', - return_value='test-master-url') + 'DataprocClusterManager.get_master_url_and_dashboard', + return_value=('test-master-url', None)) def test_clusters_cleanup_skip_on_duplicate(self, mock_master_url): clusters = ib.Clusters() project = 'test-project' diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index d76d68961af0c..53a70e29162cd 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -137,6 +137,20 @@ def run_pipeline(self, pipeline, options): watch_sources(pipeline) user_pipeline = ie.current_env().user_pipeline(pipeline) + if user_pipeline: + # When the underlying_runner is a FlinkRunner instance, create a + # corresponding DataprocClusterManager for it if no flink_master_url + # is provided. + master_url = self._get_dataproc_cluster_master_url_if_applicable( + user_pipeline) + if master_url: + flink_master_option = '--flink_master={}'.format(master_url) + for i, option in enumerate(options._flags): + if 'flink_master' in option: + options._flags[i] = flink_master_option + break + else: + options._flags.append(flink_master_option) pipeline_instrument = inst.build_pipeline_instrument(pipeline, options) # The user_pipeline analyzed might be None if the pipeline given has nothing @@ -169,11 +183,6 @@ def exception_handler(e): ie.current_env().set_test_stream_service_controller( user_pipeline, test_stream_service) - # When the underlying_runner is a FlinkRunner instance, create a - # corresponding DataprocClusterManager for it if no flink_master_url - # is provided. - self._create_dataproc_cluster_if_applicable(user_pipeline) - pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api( pipeline_instrument.instrumented_pipeline_proto(), self._underlying_runner, @@ -220,7 +229,7 @@ def visit_transform(self, transform_node): # TODO(victorhc): Move this method somewhere else if performance is impacted # by generating a cluster during runtime. - def _create_dataproc_cluster_if_applicable(self, user_pipeline): + def _get_dataproc_cluster_master_url_if_applicable(self, user_pipeline): """ Creates a Dataproc cluster if the provided user_pipeline is running FlinkRunner and no flink_master_url was provided as an option. A cluster is not created when a flink_master_url is detected. @@ -278,6 +287,9 @@ def _create_dataproc_cluster_if_applicable(self, user_pipeline): id(user_pipeline))] = cluster_manager clusters.master_urls_to_pipelines[cluster_manager.master_url].append( str(id(user_pipeline))) + clusters.master_urls_to_dashboards[ + cluster_manager.master_url] = cluster_manager.dashboard + return cluster_manager.master_url class PipelineResult(beam.runners.runner.PipelineResult): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index de5f1a5464d90..da9e1f340a15e 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -491,7 +491,7 @@ def enter_composite_transform( 'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.' 'DataprocClusterManager.create_flink_cluster', return_value=None) - def test_create_dataproc_cluster_no_flink_master_or_master_url( + def test_get_dataproc_cluster_master_url_no_flink_master_or_master_url( self, mock_create_cluster): from apache_beam.runners.portability.flink_runner import FlinkRunner runner = interactive_runner.InteractiveRunner( @@ -501,7 +501,7 @@ def test_create_dataproc_cluster_no_flink_master_or_master_url( project='test-project', region='test-region', )) - runner._create_dataproc_cluster_if_applicable(p) + runner._get_dataproc_cluster_master_url_if_applicable(p) ie.current_env()._tracked_user_pipelines.add_user_pipeline(p) self.assertEqual( ie.current_env().clusters.describe(p)['cluster_metadata'].project_id, @@ -511,14 +511,14 @@ def test_create_dataproc_cluster_no_flink_master_or_master_url( @unittest.skipIf( not ie.current_env().is_interactive_ready, '[interactive] dependency is not installed.') - def test_create_dataproc_cluster_flink_master_provided(self): + def test_get_dataproc_cluster_master_url_flink_master_provided(self): runner = interactive_runner.InteractiveRunner() from apache_beam.runners.portability.flink_runner import FlinkRunner p = beam.Pipeline( interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()), options=PipelineOptions( flink_master='--flink_master=example.internal:1')) - runner._create_dataproc_cluster_if_applicable(p) + runner._get_dataproc_cluster_master_url_if_applicable(p) self.assertEqual(ie.current_env().clusters.describe(), {}) ie.current_env().clusters = ib.Clusters() diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py index 26734d4098198..99847914ad93f 100644 --- a/sdks/python/apache_beam/runners/interactive/utils_test.py +++ b/sdks/python/apache_beam/runners/interactive/utils_test.py @@ -55,9 +55,6 @@ class MockBuckets(): - def __init__(self): - pass - def Get(self, path): if path == 'test-bucket-not-found': raise HttpNotFoundError({'status': 404}, {}, '')