diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index dd162fcc098c..d56b464e71c6 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -533,6 +533,13 @@ def _add_argparse_args(cls, parser): 'Should be a json mapping of gradle build targets to pre-built ' 'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).')) + parser.add_argument( + '--use_transform_service', + default=False, + action='store_true', + help='Use the Docker-composed-based transform service when expanding ' + 'cross-language transforms.') + def additional_option_ptransform_fn(): beam.transforms.ptransform.ptransform_fn_typehints_enabled = True diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 104fc4c70142..5182293ed591 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -23,7 +23,9 @@ import functools import glob import logging +import subprocess import threading +import uuid from collections import OrderedDict from collections import namedtuple from typing import Dict @@ -32,6 +34,7 @@ from apache_beam import pvalue from apache_beam.coders import RowCoder +from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_expansion_api_pb2 @@ -51,6 +54,7 @@ from apache_beam.typehints.typehints import Union from apache_beam.typehints.typehints import UnionConstraint from apache_beam.utils import subprocess_server +from apache_beam.utils import transform_service_launcher DEFAULT_EXPANSION_SERVICE = 'localhost:8097' @@ -668,7 +672,10 @@ def expand(self, pvalueish): transform=transform_proto, output_coder_requests=output_coders) - with ExternalTransform.service(self._expansion_service) as service: + expansion_service = _maybe_use_transform_service( + self._expansion_service, pipeline.options) + + with ExternalTransform.service(expansion_service) as service: response = service.Expand(request) if response.error: raise RuntimeError(response.error) @@ -973,6 +980,71 @@ def __init__( path_to_jar, extra_args, classpath=classpath, append_args=append_args) +def _maybe_use_transform_service(provided_service=None, options=None): + # For anything other than 'JavaJarExpansionService' we just use the + # provided service. For example, string address of an already available + # service. + if not isinstance(provided_service, JavaJarExpansionService): + return provided_service + + def is_java_available(): + cmd = ['java', '--version'] + + try: + subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + except: # pylint: disable=bare-except + return False + + return True + + def is_docker_available(): + cmd = ['docker', '--version'] + + try: + subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + except: # pylint: disable=bare-except + return False + + return True + + # We try java and docker based expansion services in that order. + + java_available = is_java_available() + docker_available = is_docker_available() + + use_transform_service = options.view_as( + CrossLanguageOptions).use_transform_service + + if (java_available and provided_service and not use_transform_service): + return provided_service + elif docker_available: + if use_transform_service: + error_append = 'it was explicitly requested' + elif not java_available: + error_append = 'the Java executable is not available in the system' + else: + error_append = 'a Java expansion service was not provided.' + + project_name = str(uuid.uuid4()) + port = subprocess_server.pick_port(None)[0] + + logging.info( + 'Trying to expand the external transform using the Docker Compose ' + 'based transform service since %s. Transform service will be under ' + 'Docker Compose project name %s and will be made available at port %r.' + % (error_append, project_name, str(port))) + + from apache_beam import version as beam_version + beam_version = beam_version.__version__ + + return transform_service_launcher.TransformServiceLauncher( + project_name, port, beam_version) + else: + raise ValueError( + 'Cannot start an expansion service since neither Java nor ' + 'Docker executables are available in the system.') + + def memoize(func): cache = {} diff --git a/sdks/python/apache_beam/utils/transform_service_launcher.py b/sdks/python/apache_beam/utils/transform_service_launcher.py new file mode 100644 index 000000000000..84f081e64ad9 --- /dev/null +++ b/sdks/python/apache_beam/utils/transform_service_launcher.py @@ -0,0 +1,261 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import logging +import os +import shutil +import subprocess +import sys +import tempfile +import threading +import time +import zipfile +from pathlib import Path + +import grpc + +from apache_beam.utils import subprocess_server + +_LOGGER = logging.getLogger(__name__) + +_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps'] + +_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:launcher:build' + + +class TransformServiceLauncher(object): + _DEFAULT_PROJECT_NAME = 'apache.beam.transform.service' + _DEFAULT_START_WAIT_TIMEOUT = 25000 + + _launchers = {} # type: ignore + + # Maintaining a static list of launchers to prevent temporary resources + # from being created unnecessarily. + def __new__(cls, project_name, port, beam_version=None): + if project_name not in TransformServiceLauncher._launchers: + TransformServiceLauncher._launchers[project_name] = super( + TransformServiceLauncher, cls).__new__(cls) + return TransformServiceLauncher._launchers[project_name] + + def __init__(self, project_name, port, beam_version=None): + logging.info('Initializing the Beam Transform Service %s.' % project_name) + + self._project_name = project_name + self._port = port + self._address = 'localhost:' + str(self._port) + + self._launcher_lock = threading.RLock() + + self.docker_compose_command_prefix = [ + 'docker-compose', '-p', project_name, '-f', 'TODO path' + ] + + # Setting up Docker Compose configuration. + + # We use Docker Compose project name as the name of the temporary directory + # to isolate different transform service instances that may be running in + # the same machine. + + temp_dir = os.path.join(tempfile.gettempdir(), project_name) + if not os.path.exists(temp_dir): + os.mkdir(temp_dir) + + # Get the jar with configs + path_to_local_jar = subprocess_server.JavaJarServer.local_jar( + subprocess_server.JavaJarServer.path_to_beam_jar( + _EXPANSION_SERVICE_LAUNCHER_JAR)) + + with zipfile.ZipFile(path_to_local_jar) as launcher_jar: + launcher_jar.extract('docker-compose.yml', path=temp_dir) + launcher_jar.extract('.env', path=temp_dir) + + compose_file = os.path.join(temp_dir, 'docker-compose.yml') + + credentials_dir = os.path.join(temp_dir, 'credentials_dir') + if not os.path.exists(credentials_dir): + os.mkdir(credentials_dir) + + logging.info('Copying the Google Application Default Credentials file.') + + is_windows = 'windows' in os.name.lower() + application_default_path_suffix = ( + '\\gcloud\\application_default_credentials.json' if is_windows else + '.config/gcloud/application_default_credentials.json') + application_default_path_file = os.path.join( + str(Path.home()), application_default_path_suffix) + application_default_path_copied = os.path.join( + credentials_dir, 'application_default_credentials.json') + + if os.path.exists(application_default_path_file): + shutil.copyfile( + application_default_path_file, application_default_path_copied) + else: + logging.info( + 'GCP credentials will not be available for the transform service ' + 'since could not find the Google Cloud application default ' + 'credentials file at the expected location %s.' % + application_default_path_file) + + self._environmental_variables = {} + self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir + self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port) + self._environmental_variables['BEAM_VERSION'] = beam_version + + self._docker_compose_start_command_prefix = [] + self._docker_compose_start_command_prefix.append('docker-compose') + self._docker_compose_start_command_prefix.append('-p') + self._docker_compose_start_command_prefix.append(project_name) + self._docker_compose_start_command_prefix.append('-f') + self._docker_compose_start_command_prefix.append(compose_file) + + def _get_channel(self): + channel_options = [("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1)] + if hasattr(grpc, 'local_channel_credentials'): + # TODO: update this to support secure non-local channels. + return grpc.secure_channel( + self._address, + grpc.local_channel_credentials(), + options=channel_options) + else: + return grpc.insecure_channel(self._address, options=channel_options) + + def __enter__(self): + self.start() + self.wait_till_up(-1) + + self._channel = self._get_channel() + + from apache_beam import external + return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__()) + + def __exit__(self, *args): + self.shutdown() + self._channel.__exit__(*args) + + def _run_docker_compose_command(self, command, output_override=None): + cmd = [] + cmd.extend(self._docker_compose_start_command_prefix) + cmd.extend(command) + + myenv = os.environ.copy() + myenv.update(self._environmental_variables) + + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv) + std_out, _ = process.communicate() + + if output_override: + output_override.write(std_out) + else: + print(std_out.decode(errors='backslashreplace')) + + def start(self): + with self._launcher_lock: + self._run_docker_compose_command(['up', '-d']) + + def shutdown(self): + with self._launcher_lock: + self._run_docker_compose_command(['down']) + + def status(self): + with self._launcher_lock: + self._run_docker_compose_command(['ps']) + + def wait_till_up(self, timeout_ms): + channel = self._get_channel() + + timeout_ms = ( + TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT + if timeout_ms <= 0 else timeout_ms) + + # Waiting till the service is up. + channel_ready = grpc.channel_ready_future(channel) + wait_secs = .1 + start_time = time.time() + while True: + if (time.time() - start_time) * 1000 > timeout_ms > 0: + raise ValueError( + 'Transform service did not start in %s seconds.' % + (timeout_ms / 1000)) + try: + channel_ready.result(timeout=wait_secs) + break + except (grpc.FutureTimeoutError, grpc.RpcError): + wait_secs *= 1.2 + logging.log( + logging.WARNING if wait_secs > 1 else logging.DEBUG, + 'Waiting for the transform service to be ready at %s.', + self._address) + + logging.info('Transform service ' + self._project_name + ' started.') + + def _get_status(self): + tmp = tempfile.NamedTemporaryFile(delete=False) + self._run_docker_compose_command(['ps'], tmp) + tmp.close() + return tmp.name + + +def main(argv): + parser = argparse.ArgumentParser() + parser.add_argument('--project_name', help='Docker Compose project name.') + parser.add_argument( + '--command', + required=True, + choices=_COMMAND_POSSIBLE_VALUES, + help='Command to run. Possible values are ' + + ', '.join(_COMMAND_POSSIBLE_VALUES)) + parser.add_argument( + '--port', + type=int, + default=-1, + help='External visible port of the transform service.') + parser.add_argument( + '--beam_version', + required=True, + help='Beam version of the expansion service containers to be used.') + + known_args, _ = parser.parse_known_args(argv) + + project_name = ( + TransformServiceLauncher._DEFAULT_PROJECT_NAME + if known_args.project_name is None else known_args.project_name) + logging.info( + 'Starting the Beam Transform Service at %s.' % ( + 'the default port' if known_args.port < 0 else + (' port ' + str(known_args.port)))) + launcher = TransformServiceLauncher( + project_name, known_args.port, known_args.beam_version) + + if known_args.command == 'up': + launcher.start() + launcher.wait_till_up(-1) + elif known_args.command == 'down': + launcher.shutdown() + elif known_args.command == 'ps': + launcher.status() + else: + raise ValueError( + 'Unknown command %s possible values are %s' % + (known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES))) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + main(sys.argv)