Skip to content

Commit

Permalink
[BEAM-14071] Enabling Flink on Dataproc for Interactive Beam (#17044)
Browse files Browse the repository at this point in the history
  • Loading branch information
victorplusc authored Mar 21, 2022
1 parent 57c8647 commit d352d60
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 23 deletions.
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/runners/interactive/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,14 @@ def __init__(self, cache_dir=None, cache_format='text'):

def size(self, *labels):
if self.exists(*labels):
return sum(os.path.getsize(path) for path in self._match(*labels))
matched_path = self._match(*labels)
# if any matched path has a gs:// prefix, it must be cached on GCS
if 'gs://' in matched_path[0]:
from apache_beam.io.gcp import gcsio
return sum(
sum(gcsio.GcsIO().list_prefix(path).values())
for path in matched_path)
return sum(os.path.getsize(path) for path in matched_path)
return 0

def exists(self, *labels):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.utils import progress_indicated
from apache_beam.version import __version__ as beam_version

try:
from google.cloud import dataproc_v1
Expand Down Expand Up @@ -67,9 +66,6 @@ class DataprocClusterManager:
required for creating and deleting Dataproc clusters for use
under Interactive Beam.
"""
IMAGE_VERSION = '2.0.31-debian10'
STAGING_LOG_NAME = 'dataproc-startup-script_output'

def __init__(self, cluster_metadata: MasterURLIdentifier) -> None:
"""Initializes the DataprocClusterManager with properties required
to interface with the Dataproc ClusterControllerClient.
Expand Down Expand Up @@ -162,13 +158,13 @@ def create_flink_cluster(self) -> None:
'cluster_name': self.cluster_metadata.cluster_name,
'config': {
'software_config': {
'image_version': self.IMAGE_VERSION,
'image_version': ie.current_env().clusters.
DATAPROC_IMAGE_VERSION,
'optional_components': ['DOCKER', 'FLINK']
},
'gce_cluster_config': {
'metadata': {
'flink-start-yarn-session': 'true',
'PIP_PACKAGES': 'apache-beam[gcp]=={}'.format(beam_version)
'flink-start-yarn-session': 'true'
},
'service_account_scopes': [
'https://www.googleapis.com/auth/cloud-platform'
Expand Down Expand Up @@ -310,7 +306,7 @@ def get_master_url_and_dashboard(
"""Returns the master_url of the current cluster."""
startup_logs = []
for file in self._fs._list(staging_bucket):
if self.STAGING_LOG_NAME in file.path:
if ie.current_env().clusters.DATAPROC_STAGING_LOG_NAME in file.path:
startup_logs.append(file.path)

for log in startup_logs:
Expand Down
19 changes: 12 additions & 7 deletions sdks/python/apache_beam/runners/interactive/interactive_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional

import pandas as pd

import apache_beam as beam
from apache_beam.dataframe.frame_base import DeferredBase
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager
from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.display.pcoll_visualization import visualize
from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll
Expand Down Expand Up @@ -349,17 +346,26 @@ class Clusters:
Example of calling the Interactive Beam clusters describe method::
ib.clusters.describe()
"""
# Explicitly set the Flink version here to ensure compatibility with 2.0
# Dataproc images:
# https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
DATAPROC_FLINK_VERSION = '1.12'
DATAPROC_IMAGE_VERSION = '2.0.31-debian10'
DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'

def __init__(self) -> None:
"""Instantiates default values for Dataproc cluster interactions.
"""
# Set the default_cluster_name that will be used when creating Dataproc
# clusters.
self.default_cluster_name = 'interactive-beam-cluster'
# Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
# (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
# MasterURLIdentifier -> str.
self.master_urls: Mapping[str, MasterURLIdentifier] = bidict()
self.master_urls = bidict()
# self.dataproc_cluster_managers map string pipeline ids to instances of
# DataprocClusterManager.
self.dataproc_cluster_managers: Dict[str, DataprocClusterManager] = {}
self.dataproc_cluster_managers = {}
# self.master_urls_to_pipelines map string master_urls to lists of
# pipelines that use the corresponding master_url.
self.master_urls_to_pipelines: DefaultDict[
Expand Down Expand Up @@ -457,8 +463,7 @@ def cleanup(
# Examples:
# ib.clusters.describe(p)
# Check the docstrings for detailed usages.
# TODO(victorhc): Resolve connection issue and add a working example
# clusters = Clusters()
clusters = Clusters()


def watch(watchable):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,8 @@ def __init__(self):
self._test_stream_service_controllers = {}
self._cached_source_signature = {}
self._tracked_user_pipelines = UserPipelineTracker()
# TODO(victorhc): remove the cluster instantiation after the
# interactive_beam.clusters class has been enabled.
from apache_beam.runners.interactive.interactive_beam import Clusters
self.clusters = Clusters()
from apache_beam.runners.interactive.interactive_beam import clusters
self.clusters = clusters

# Tracks the computation completeness of PCollections. PCollections tracked
# here don't need to be re-computed when data introspection is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ def run_pipeline(self, pipeline, options):
master_url = self._get_dataproc_cluster_master_url_if_applicable(
user_pipeline)
if master_url:
options.view_as(FlinkRunnerOptions).flink_master = master_url
flink_options = options.view_as(FlinkRunnerOptions)
flink_options.flink_master = master_url
flink_options.flink_version = ie.current_env(
).clusters.DATAPROC_FLINK_VERSION
pipeline_instrument = inst.build_pipeline_instrument(pipeline, options)

# The user_pipeline analyzed might be None if the pipeline given has nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.direct import direct_runner
Expand Down Expand Up @@ -543,12 +544,39 @@ def test_get_master_url_flink_master_provided(self):
from apache_beam.runners.portability.flink_runner import FlinkRunner
p = beam.Pipeline(
interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
options=PipelineOptions(
flink_master='--flink_master=example.internal:1'))
options=PipelineOptions(flink_master='--flink_master=test.internal:1'))
runner._get_dataproc_cluster_master_url_if_applicable(p)
self.assertEqual(ie.current_env().clusters.describe(), {})
ie.current_env().clusters = ib.Clusters()

@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@patch(
'apache_beam.runners.interactive.interactive_runner.'
'InteractiveRunner._get_dataproc_cluster_master_url_if_applicable',
return_value='test.internal:1')
def test_set_flink_dataproc_version(self, mock_get_master_url):
runner = interactive_runner.InteractiveRunner()
options = PipelineOptions()
p = beam.Pipeline(interactive_runner.InteractiveRunner())

# Watch the local scope for Interactive Beam so that values will be cached.
ib.watch(locals())

# This is normally done in the interactive_utils when a transform is
# applied but needs an IPython environment. So we manually run this here.
ie.current_env().track_user_pipelines()

# Run the pipeline
runner.run_pipeline(p, options)

# Check that the Flink version is set to the Dataproc image Flink version
# inside ib.clusters.
self.assertEqual(
options.view_as(FlinkRunnerOptions).flink_version,
ib.clusters.DATAPROC_FLINK_VERSION)


if __name__ == '__main__':
unittest.main()

0 comments on commit d352d60

Please sign in to comment.