Skip to content

Commit

Permalink
Introducing dashboard link to DataprocClusterManager and master_urls_…
Browse files Browse the repository at this point in the history
…to_dashboards mapping
  • Loading branch information
victorplusc committed Feb 25, 2022
1 parent 140ea00 commit e5a27f4
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dataclasses import dataclass
from threading import Thread
from typing import Optional
from typing import Tuple

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_environment as ie
Expand Down Expand Up @@ -89,8 +90,11 @@ def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse:
self.master_url = ie.current_env().clusters.master_urls.inverse[
self.cluster_metadata]
self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[
self.master_url]
else:
self.master_url = None
self.dashboard = None

self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
self._staging_directory = None
Expand All @@ -117,7 +121,9 @@ def create_cluster(self, cluster: dict) -> None:
'Cluster created successfully: %s',
self.cluster_metadata.cluster_name)
self._staging_directory = self.get_staging_location(self.cluster_metadata)
self.master_url = self.get_master_url(self._staging_directory)
self.master_url, self.dashboard = self.get_master_url_and_dashboard(
self.cluster_metadata,
self._staging_directory)
except Exception as e:
if e.code == 409:
_LOGGER.info(
Expand Down Expand Up @@ -202,10 +208,12 @@ def describe(self) -> None:
"""Returns a dictionary describing the cluster."""
return {
'cluster_metadata': self.cluster_metadata,
'master_url': self.master_url
'master_url': self.master_url,
'dashboard': self.dashboard
}

def get_cluster_details(self, cluster_metadata):
"""Gets the Dataproc_v1 Cluster object for the current cluster manager."""
return self._cluster_client.get_cluster(
request={
'project_id': cluster_metadata.project_id,
Expand Down Expand Up @@ -240,21 +248,39 @@ def get_staging_location(self, cluster_metadata):
cluster_metadata.cluster_name)
raise e

def get_master_url(self, staging_bucket) -> None:
def parse_master_url_and_dashboard(self, cluster_metadata,
line: str) -> Tuple[str, str]:
"""Parses the master_url and YARN application_id of the Flink process from
an input line.
Returns the flink_master_url and dashboard link as a tuple."""
cluster_details = self.get_cluster_details(cluster_metadata)
yarn_endpoint = cluster_details.config.endpoint_config.http_ports[
'YARN ResourceManager']
segment = line.split('Found Web Interface ')[1].split(' of application ')
master_url = segment[0]
application_id = segment[1].rstrip('.\n').strip('\'')
dashboard = yarn_endpoint.rstrip(
'yarn/') + '/gateway/default/yarn/proxy/' + application_id + '/'
return master_url, dashboard

def get_master_url_and_dashboard(
self, cluster_metadata,
staging_bucket) -> Tuple[Optional[str], Optional[str]]:
"""Returns the master_url of the current cluster."""
startup_logs = []
for file in self._fs._list(staging_bucket):
if self.STAGING_LOG_NAME in file.path:
startup_logs.append(file.path)

master_url = None
for log in startup_logs:
content = self._fs.open(log)
for line in content.readlines():
decoded_line = line.decode()
if 'FLINK_MASTER_URL=' in decoded_line:
return decoded_line.split('FLINK_MASTER_URL=')[1].strip('\n')
return master_url
if 'Found Web Interface' in decoded_line:
return self.parse_master_url_and_dashboard(
cluster_metadata, decoded_line)
return None, None

def cleanup_staging_files(self, staging_directory):
staging_files = [file.path for file in self._fs._list(staging_directory)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,20 @@
_dataproc_imported = True


class MockProperty:
def __init__(self, property, value):
object.__setattr__(self, property, value)


class MockException(Exception):
def __init__(self, code=-1):
self.code = code


class MockCluster:
def __init__(self, config_bucket=None):
self.config = MockConfig(config_bucket)
self.status = MockStatus()


class MockStatus:
def __init__(self):
self.state = MockState('RUNNING')


class MockState:
def __init__(self, name=None):
self.name = name


class MockThread:
def __init__(self, target=None):
self.target = None


class MockConfig:
def __init__(self, config_bucket=None):
self.config_bucket = config_bucket


class MockFileMetadata:
def __init__(self, path):
self.path = path
self.config = MockProperty('config_bucket', config_bucket)
self.status = MockProperty('state', MockProperty('name', None))


@unittest.skipIf(not _dataproc_imported, 'dataproc package was not imported.')
Expand Down Expand Up @@ -200,11 +180,12 @@ def test_cleanup_other_exception(self, mock_cluster_client, mock_cleanup):
self.assertRaises(MockException, cluster_manager.cleanup)
self.assertTrue('Failed to delete cluster' in context_manager.output[0])

@patch('threading.Thread', return_value=MockThread)
@patch('threading.Thread', return_value=MockProperty('target', None))
@patch(
'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem._list',
return_value=[
MockFileMetadata(
MockProperty(
'path',
'gs://test-bucket/google-cloud-dataproc-metainfo'
'/test-cluster/item')
])
Expand All @@ -226,7 +207,7 @@ def test_get_staging_location(
cluster_manager.get_staging_location(cluster_metadata),
'gs://test-bucket/google-cloud-dataproc-metainfo/')

@patch('threading.Thread', return_value=MockThread)
@patch('threading.Thread', return_value=MockProperty('target', None))
@patch(
'google.cloud.dataproc_v1.ClusterControllerClient.get_cluster',
side_effect=MockException())
Expand Down
11 changes: 8 additions & 3 deletions sdks/python/apache_beam/runners/interactive/interactive_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,20 +364,23 @@ def __init__(self) -> None:
# pipelines that use the corresponding master_url.
self.master_urls_to_pipelines: DefaultDict[
str, List[beam.Pipeline]] = defaultdict(list)
# self.master_urls_to_dashboards map string master_urls to the
# corresponding Apache Flink dashboards.
self.master_urls_to_dashboards: Dict[str, str] = {}

def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
"""Returns a description of the cluster associated to the given pipeline.
If no pipeline is given then this returns a dictionary of descriptions for
all pipelines.
all pipelines, mapped to by id.
"""
description = {
ie.current_env().pipeline_id_to_pipeline(pid): dcm.describe()
pid: dcm.describe()
for pid,
dcm in self.dataproc_cluster_managers.items()
}
if pipeline:
return description.get(pipeline, None)
return description.get(str(id(pipeline)), None)
return description

def cleanup(
Expand Down Expand Up @@ -419,6 +422,7 @@ def cleanup(
cluster_manager.cleanup()
self.master_urls.pop(master_url, None)
self.master_urls_to_pipelines.pop(master_url, None)
self.master_urls_to_dashboards.pop(master_url, None)
self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
else:
cluster_manager_identifiers = set()
Expand All @@ -429,6 +433,7 @@ def cleanup(
self.dataproc_cluster_managers.clear()
self.master_urls.clear()
self.master_urls_to_pipelines.clear()
self.master_urls_to_dashboards.clear()


# Users can set options to guide how Interactive Beam works.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,16 @@ def test_clusters_describe(self):
region=region,
))
cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
clusters.dataproc_cluster_managers[p] = DataprocClusterManager(
cluster_metadata)
self.assertEqual('test-project', clusters.describe()[None] \
['cluster_metadata'].project_id)
clusters.dataproc_cluster_managers[str(
id(p))] = DataprocClusterManager(cluster_metadata)
self.assertEqual(
'test-project',
clusters.describe()[str(id(p))]['cluster_metadata'].project_id)

@patch(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
'DataprocClusterManager.get_master_url',
return_value='test-master-url')
'DataprocClusterManager.get_master_url_and_dashboard',
return_value=('test-master-url', None))
@patch(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
'DataprocClusterManager.cleanup',
Expand Down Expand Up @@ -350,8 +351,8 @@ def test_clusters_cleanup_forcefully(self, mock_cleanup, mock_master_url):

@patch(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
'DataprocClusterManager.get_master_url',
return_value='test-master-url')
'DataprocClusterManager.get_master_url_and_dashboard',
return_value=('test-master-url', None))
def test_clusters_cleanup_skip_on_duplicate(self, mock_master_url):
clusters = ib.Clusters()
project = 'test-project'
Expand Down
24 changes: 18 additions & 6 deletions sdks/python/apache_beam/runners/interactive/interactive_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ def run_pipeline(self, pipeline, options):
watch_sources(pipeline)

user_pipeline = ie.current_env().user_pipeline(pipeline)
if user_pipeline:
# When the underlying_runner is a FlinkRunner instance, create a
# corresponding DataprocClusterManager for it if no flink_master_url
# is provided.
master_url = self._get_dataproc_cluster_master_url_if_applicable(
user_pipeline)
if master_url:
flink_master_option = '--flink_master={}'.format(master_url)
for i, option in enumerate(options._flags):
if 'flink_master' in option:
options._flags[i] = flink_master_option
break
else:
options._flags.append(flink_master_option)
pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)

# The user_pipeline analyzed might be None if the pipeline given has nothing
Expand Down Expand Up @@ -169,11 +183,6 @@ def exception_handler(e):
ie.current_env().set_test_stream_service_controller(
user_pipeline, test_stream_service)

# When the underlying_runner is a FlinkRunner instance, create a
# corresponding DataprocClusterManager for it if no flink_master_url
# is provided.
self._create_dataproc_cluster_if_applicable(user_pipeline)

pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
pipeline_instrument.instrumented_pipeline_proto(),
self._underlying_runner,
Expand Down Expand Up @@ -220,7 +229,7 @@ def visit_transform(self, transform_node):

# TODO(victorhc): Move this method somewhere else if performance is impacted
# by generating a cluster during runtime.
def _create_dataproc_cluster_if_applicable(self, user_pipeline):
def _get_dataproc_cluster_master_url_if_applicable(self, user_pipeline):
""" Creates a Dataproc cluster if the provided user_pipeline is running
FlinkRunner and no flink_master_url was provided as an option. A cluster
is not created when a flink_master_url is detected.
Expand Down Expand Up @@ -278,6 +287,9 @@ def _create_dataproc_cluster_if_applicable(self, user_pipeline):
id(user_pipeline))] = cluster_manager
clusters.master_urls_to_pipelines[cluster_manager.master_url].append(
str(id(user_pipeline)))
clusters.master_urls_to_dashboards[
cluster_manager.master_url] = cluster_manager.dashboard
return cluster_manager.master_url


class PipelineResult(beam.runners.runner.PipelineResult):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def enter_composite_transform(
'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
'DataprocClusterManager.create_flink_cluster',
return_value=None)
def test_create_dataproc_cluster_no_flink_master_or_master_url(
def test_get_dataproc_cluster_master_url_no_flink_master_or_master_url(
self, mock_create_cluster):
from apache_beam.runners.portability.flink_runner import FlinkRunner
runner = interactive_runner.InteractiveRunner(
Expand All @@ -501,7 +501,7 @@ def test_create_dataproc_cluster_no_flink_master_or_master_url(
project='test-project',
region='test-region',
))
runner._create_dataproc_cluster_if_applicable(p)
runner._get_dataproc_cluster_master_url_if_applicable(p)
ie.current_env()._tracked_user_pipelines.add_user_pipeline(p)
self.assertEqual(
ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
Expand All @@ -511,14 +511,14 @@ def test_create_dataproc_cluster_no_flink_master_or_master_url(
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
def test_create_dataproc_cluster_flink_master_provided(self):
def test_get_dataproc_cluster_master_url_flink_master_provided(self):
runner = interactive_runner.InteractiveRunner()
from apache_beam.runners.portability.flink_runner import FlinkRunner
p = beam.Pipeline(
interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
options=PipelineOptions(
flink_master='--flink_master=example.internal:1'))
runner._create_dataproc_cluster_if_applicable(p)
runner._get_dataproc_cluster_master_url_if_applicable(p)
self.assertEqual(ie.current_env().clusters.describe(), {})
ie.current_env().clusters = ib.Clusters()

Expand Down
3 changes: 0 additions & 3 deletions sdks/python/apache_beam/runners/interactive/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@


class MockBuckets():
def __init__(self):
pass

def Get(self, path):
if path == 'test-bucket-not-found':
raise HttpNotFoundError({'status': 404}, {}, '')
Expand Down

0 comments on commit e5a27f4

Please sign in to comment.