Skip to content

Commit

Permalink
[BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner…
Browse files Browse the repository at this point in the history
… when FlinkRunner is used (#16904)
  • Loading branch information
victorplusc authored Mar 2, 2022
1 parent 75c25f0 commit 1e4106b
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,26 @@
# pytype: skip-file

import logging
import re
import time
from dataclasses import dataclass
from typing import Optional
from typing import Tuple

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.utils import progress_indicated
from apache_beam.version import __version__ as beam_version

try:
from google.cloud import dataproc_v1
from apache_beam.io.gcp import gcsfilesystem #pylint: disable=ungrouped-imports
except ImportError:

class UnimportedDataproc:
Cluster = None

dataproc_v1 = UnimportedDataproc()

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,6 +67,9 @@ class DataprocClusterManager:
required for creating and deleting Dataproc clusters for use
under Interactive Beam.
"""
IMAGE_VERSION = '2.0.31-debian10'
STAGING_LOG_NAME = 'dataproc-startup-script_output'

def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
"""Initializes the DataprocClusterManager with properties required
to interface with the Dataproc ClusterControllerClient.
Expand All @@ -69,7 +88,6 @@ def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
self.cluster_metadata.cluster_name = ie.current_env(
).clusters.default_cluster_name

from google.cloud import dataproc_v1
self._cluster_client = dataproc_v1.ClusterControllerClient(
client_options={
'api_endpoint': \
Expand All @@ -79,9 +97,16 @@ def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
if self.cluster_metadata in ie.current_env().clusters.master_urls.inverse:
self.master_url = ie.current_env().clusters.master_urls.inverse[
self.cluster_metadata]
self.dashboard = ie.current_env().clusters.master_urls_to_dashboards[
self.master_url]
else:
self.master_url = None
self.dashboard = None

self._fs = gcsfilesystem.GCSFileSystem(PipelineOptions())
self._staging_directory = None

@progress_indicated
def create_cluster(self, cluster: dict) -> None:
"""Attempts to create a cluster using attributes that were
initialized with the DataprocClusterManager instance.
Expand All @@ -103,7 +128,10 @@ def create_cluster(self, cluster: dict) -> None:
_LOGGER.info(
'Cluster created successfully: %s',
self.cluster_metadata.cluster_name)
self.master_url = self.get_master_url(self.cluster_metadata)
self._staging_directory = self.get_staging_location(self.cluster_metadata)
self.master_url, self.dashboard = self.get_master_url_and_dashboard(
self.cluster_metadata,
self._staging_directory)
except Exception as e:
if e.code == 409:
_LOGGER.info(
Expand All @@ -127,19 +155,20 @@ def create_cluster(self, cluster: dict) -> None:
'Unable to create cluster: %s', self.cluster_metadata.cluster_name)
raise e

# TODO(victorhc): Add support for user-specified pip packages
def create_flink_cluster(self) -> None:
"""Calls _create_cluster with a configuration that enables FlinkRunner."""
cluster = {
'project_id': self.cluster_metadata.project_id,
'cluster_name': self.cluster_metadata.cluster_name,
'config': {
'software_config': {
'image_version': self.IMAGE_VERSION,
'optional_components': ['DOCKER', 'FLINK']
},
'gce_cluster_config': {
'metadata': {
'flink-start-yarn-session': 'true'
'flink-start-yarn-session': 'true',
'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version)
},
'service_account_scopes': [
'https://www.googleapis.com/auth/cloud-platform'
Expand All @@ -156,6 +185,8 @@ def cleanup(self) -> None:
"""Deletes the cluster that uses the attributes initialized
with the DataprocClusterManager instance."""
try:
if self._staging_directory:
self.cleanup_staging_files(self._staging_directory)
self._cluster_client.delete_cluster(
request={
'project_id': self.cluster_metadata.project_id,
Expand Down Expand Up @@ -186,15 +217,111 @@ def describe(self) -> None:
"""Returns a dictionary describing the cluster."""
return {
'cluster_metadata': self.cluster_metadata,
'master_url': self.master_url
'master_url': self.master_url,
'dashboard': self.dashboard
}

def get_master_url(self, identifier) -> None:
def get_cluster_details(
self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster:
"""Gets the Dataproc_v1 Cluster object for the current cluster manager."""
try:
return self._cluster_client.get_cluster(
request={
'project_id': cluster_metadata.project_id,
'region': cluster_metadata.region,
'cluster_name': cluster_metadata.cluster_name
})
except Exception as e:
if e.code == 403:
_LOGGER.error(
'Due to insufficient project permissions, '
'unable to retrieve information for cluster: %s',
cluster_metadata.cluster_name)
raise ValueError(
'You cannot view clusters in project: {}'.format(
cluster_metadata.project_id))
elif e.code == 404:
_LOGGER.error(
'Cluster does not exist: %s', cluster_metadata.cluster_name)
raise ValueError(
'Cluster was not found: {}'.format(cluster_metadata.cluster_name))
else:
_LOGGER.error(
'Failed to get information for cluster: %s',
cluster_metadata.cluster_name)
raise e

def wait_for_cluster_to_provision(
self, cluster_metadata: MasterURLIdentifier) -> None:
while self.get_cluster_details(
cluster_metadata).status.state.name == 'CREATING':
time.sleep(15)

def get_staging_location(self, cluster_metadata: MasterURLIdentifier) -> str:
"""Gets the staging bucket of an existing Dataproc cluster."""
try:
self.wait_for_cluster_to_provision(cluster_metadata)
cluster_details = self.get_cluster_details(cluster_metadata)
bucket_name = cluster_details.config.config_bucket
gcs_path = 'gs://' + bucket_name + '/google-cloud-dataproc-metainfo/'
for file in self._fs._list(gcs_path):
if cluster_metadata.cluster_name in file.path:
# this file path split will look something like:
# ['gs://.../google-cloud-dataproc-metainfo/{staging_dir}/',
# '-{node-type}/dataproc-startup-script_output']
return file.path.split(cluster_metadata.cluster_name)[0]
except Exception as e:
_LOGGER.error(
'Failed to get %s cluster staging bucket.',
cluster_metadata.cluster_name)
raise e

def parse_master_url_and_dashboard(
self, cluster_metadata: MasterURLIdentifier,
line: str) -> Tuple[str, str]:
"""Parses the master_url and YARN application_id of the Flink process from
an input line. The line containing both the master_url and application id
is always formatted as such:
{text} Found Web Interface {master_url} of application
'{application_id}'.\\n
Truncated example where '...' represents additional text between segments:
... google-dataproc-startup[000]: ... activate-component-flink[0000]:
...org.apache.flink.yarn.YarnClusterDescriptor... [] -
Found Web Interface example-master-url:50000 of application
'application_123456789000_0001'.
Returns the flink_master_url and dashboard link as a tuple."""
cluster_details = self.get_cluster_details(cluster_metadata)
yarn_endpoint = cluster_details.config.endpoint_config.http_ports[
'YARN ResourceManager']
segment = line.split('Found Web Interface ')[1].split(' of application ')
master_url = segment[0]
application_id = re.sub('\'|.\n', '', segment[1])
dashboard = re.sub(
'/yarn/',
'/gateway/default/yarn/proxy/' + application_id + '/',
yarn_endpoint)
return master_url, dashboard

def get_master_url_and_dashboard(
self, cluster_metadata: MasterURLIdentifier,
staging_bucket) -> Tuple[Optional[str], Optional[str]]:
"""Returns the master_url of the current cluster."""
# TODO(victorhc): Implement the following method to fetch the cluster
# master_url from Dataproc.
return '.'.join([
self.cluster_metadata.project_id,
self.cluster_metadata.region,
self.cluster_metadata.cluster_name
])
startup_logs = []
for file in self._fs._list(staging_bucket):
if self.STAGING_LOG_NAME in file.path:
startup_logs.append(file.path)

for log in startup_logs:
content = self._fs.open(log)
for line in content.readlines():
decoded_line = line.decode()
if 'Found Web Interface' in decoded_line:
return self.parse_master_url_and_dashboard(
cluster_metadata, decoded_line)
return None, None

def cleanup_staging_files(self, staging_directory: str) -> None:
staging_files = [file.path for file in self._fs._list(staging_directory)]
self._fs.delete(staging_files)
Loading

0 comments on commit 1e4106b

Please sign in to comment.