From 4b758fcd9caeaf2220794cdd2cf98efc5e01166e Mon Sep 17 00:00:00 2001 From: Bruno Volpato Date: Mon, 3 Jul 2023 14:31:16 -0400 Subject: [PATCH] Revert "Updates Python ExternalTransform to use the transform service when needed (#27228)" This reverts commit d545243b30d1192b76379659b2345fd349ec456d. --- .../apache_beam/options/pipeline_options.py | 7 - .../python/apache_beam/transforms/external.py | 74 +---- .../utils/transform_service_launcher.py | 261 ------------------ 3 files changed, 1 insertion(+), 341 deletions(-) delete mode 100644 sdks/python/apache_beam/utils/transform_service_launcher.py diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index d56b464e71c6..dd162fcc098c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -533,13 +533,6 @@ def _add_argparse_args(cls, parser): 'Should be a json mapping of gradle build targets to pre-built ' 'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).')) - parser.add_argument( - '--use_transform_service', - default=False, - action='store_true', - help='Use the Docker-composed-based transform service when expanding ' - 'cross-language transforms.') - def additional_option_ptransform_fn(): beam.transforms.ptransform.ptransform_fn_typehints_enabled = True diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 5182293ed591..104fc4c70142 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -23,9 +23,7 @@ import functools import glob import logging -import subprocess import threading -import uuid from collections import OrderedDict from collections import namedtuple from typing import Dict @@ -34,7 +32,6 @@ from apache_beam import pvalue from apache_beam.coders import RowCoder -from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_artifact_api_pb2_grpc from apache_beam.portability.api import beam_expansion_api_pb2 @@ -54,7 +51,6 @@ from apache_beam.typehints.typehints import Union from apache_beam.typehints.typehints import UnionConstraint from apache_beam.utils import subprocess_server -from apache_beam.utils import transform_service_launcher DEFAULT_EXPANSION_SERVICE = 'localhost:8097' @@ -672,10 +668,7 @@ def expand(self, pvalueish): transform=transform_proto, output_coder_requests=output_coders) - expansion_service = _maybe_use_transform_service( - self._expansion_service, pipeline.options) - - with ExternalTransform.service(expansion_service) as service: + with ExternalTransform.service(self._expansion_service) as service: response = service.Expand(request) if response.error: raise RuntimeError(response.error) @@ -980,71 +973,6 @@ def __init__( path_to_jar, extra_args, classpath=classpath, append_args=append_args) -def _maybe_use_transform_service(provided_service=None, options=None): - # For anything other than 'JavaJarExpansionService' we just use the - # provided service. For example, string address of an already available - # service. - if not isinstance(provided_service, JavaJarExpansionService): - return provided_service - - def is_java_available(): - cmd = ['java', '--version'] - - try: - subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - except: # pylint: disable=bare-except - return False - - return True - - def is_docker_available(): - cmd = ['docker', '--version'] - - try: - subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - except: # pylint: disable=bare-except - return False - - return True - - # We try java and docker based expansion services in that order. - - java_available = is_java_available() - docker_available = is_docker_available() - - use_transform_service = options.view_as( - CrossLanguageOptions).use_transform_service - - if (java_available and provided_service and not use_transform_service): - return provided_service - elif docker_available: - if use_transform_service: - error_append = 'it was explicitly requested' - elif not java_available: - error_append = 'the Java executable is not available in the system' - else: - error_append = 'a Java expansion service was not provided.' - - project_name = str(uuid.uuid4()) - port = subprocess_server.pick_port(None)[0] - - logging.info( - 'Trying to expand the external transform using the Docker Compose ' - 'based transform service since %s. Transform service will be under ' - 'Docker Compose project name %s and will be made available at port %r.' - % (error_append, project_name, str(port))) - - from apache_beam import version as beam_version - beam_version = beam_version.__version__ - - return transform_service_launcher.TransformServiceLauncher( - project_name, port, beam_version) - else: - raise ValueError( - 'Cannot start an expansion service since neither Java nor ' - 'Docker executables are available in the system.') - - def memoize(func): cache = {} diff --git a/sdks/python/apache_beam/utils/transform_service_launcher.py b/sdks/python/apache_beam/utils/transform_service_launcher.py deleted file mode 100644 index 84f081e64ad9..000000000000 --- a/sdks/python/apache_beam/utils/transform_service_launcher.py +++ /dev/null @@ -1,261 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import argparse -import logging -import os -import shutil -import subprocess -import sys -import tempfile -import threading -import time -import zipfile -from pathlib import Path - -import grpc - -from apache_beam.utils import subprocess_server - -_LOGGER = logging.getLogger(__name__) - -_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps'] - -_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:launcher:build' - - -class TransformServiceLauncher(object): - _DEFAULT_PROJECT_NAME = 'apache.beam.transform.service' - _DEFAULT_START_WAIT_TIMEOUT = 25000 - - _launchers = {} # type: ignore - - # Maintaining a static list of launchers to prevent temporary resources - # from being created unnecessarily. - def __new__(cls, project_name, port, beam_version=None): - if project_name not in TransformServiceLauncher._launchers: - TransformServiceLauncher._launchers[project_name] = super( - TransformServiceLauncher, cls).__new__(cls) - return TransformServiceLauncher._launchers[project_name] - - def __init__(self, project_name, port, beam_version=None): - logging.info('Initializing the Beam Transform Service %s.' % project_name) - - self._project_name = project_name - self._port = port - self._address = 'localhost:' + str(self._port) - - self._launcher_lock = threading.RLock() - - self.docker_compose_command_prefix = [ - 'docker-compose', '-p', project_name, '-f', 'TODO path' - ] - - # Setting up Docker Compose configuration. - - # We use Docker Compose project name as the name of the temporary directory - # to isolate different transform service instances that may be running in - # the same machine. - - temp_dir = os.path.join(tempfile.gettempdir(), project_name) - if not os.path.exists(temp_dir): - os.mkdir(temp_dir) - - # Get the jar with configs - path_to_local_jar = subprocess_server.JavaJarServer.local_jar( - subprocess_server.JavaJarServer.path_to_beam_jar( - _EXPANSION_SERVICE_LAUNCHER_JAR)) - - with zipfile.ZipFile(path_to_local_jar) as launcher_jar: - launcher_jar.extract('docker-compose.yml', path=temp_dir) - launcher_jar.extract('.env', path=temp_dir) - - compose_file = os.path.join(temp_dir, 'docker-compose.yml') - - credentials_dir = os.path.join(temp_dir, 'credentials_dir') - if not os.path.exists(credentials_dir): - os.mkdir(credentials_dir) - - logging.info('Copying the Google Application Default Credentials file.') - - is_windows = 'windows' in os.name.lower() - application_default_path_suffix = ( - '\\gcloud\\application_default_credentials.json' if is_windows else - '.config/gcloud/application_default_credentials.json') - application_default_path_file = os.path.join( - str(Path.home()), application_default_path_suffix) - application_default_path_copied = os.path.join( - credentials_dir, 'application_default_credentials.json') - - if os.path.exists(application_default_path_file): - shutil.copyfile( - application_default_path_file, application_default_path_copied) - else: - logging.info( - 'GCP credentials will not be available for the transform service ' - 'since could not find the Google Cloud application default ' - 'credentials file at the expected location %s.' % - application_default_path_file) - - self._environmental_variables = {} - self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir - self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port) - self._environmental_variables['BEAM_VERSION'] = beam_version - - self._docker_compose_start_command_prefix = [] - self._docker_compose_start_command_prefix.append('docker-compose') - self._docker_compose_start_command_prefix.append('-p') - self._docker_compose_start_command_prefix.append(project_name) - self._docker_compose_start_command_prefix.append('-f') - self._docker_compose_start_command_prefix.append(compose_file) - - def _get_channel(self): - channel_options = [("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1)] - if hasattr(grpc, 'local_channel_credentials'): - # TODO: update this to support secure non-local channels. - return grpc.secure_channel( - self._address, - grpc.local_channel_credentials(), - options=channel_options) - else: - return grpc.insecure_channel(self._address, options=channel_options) - - def __enter__(self): - self.start() - self.wait_till_up(-1) - - self._channel = self._get_channel() - - from apache_beam import external - return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__()) - - def __exit__(self, *args): - self.shutdown() - self._channel.__exit__(*args) - - def _run_docker_compose_command(self, command, output_override=None): - cmd = [] - cmd.extend(self._docker_compose_start_command_prefix) - cmd.extend(command) - - myenv = os.environ.copy() - myenv.update(self._environmental_variables) - - process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv) - std_out, _ = process.communicate() - - if output_override: - output_override.write(std_out) - else: - print(std_out.decode(errors='backslashreplace')) - - def start(self): - with self._launcher_lock: - self._run_docker_compose_command(['up', '-d']) - - def shutdown(self): - with self._launcher_lock: - self._run_docker_compose_command(['down']) - - def status(self): - with self._launcher_lock: - self._run_docker_compose_command(['ps']) - - def wait_till_up(self, timeout_ms): - channel = self._get_channel() - - timeout_ms = ( - TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT - if timeout_ms <= 0 else timeout_ms) - - # Waiting till the service is up. - channel_ready = grpc.channel_ready_future(channel) - wait_secs = .1 - start_time = time.time() - while True: - if (time.time() - start_time) * 1000 > timeout_ms > 0: - raise ValueError( - 'Transform service did not start in %s seconds.' % - (timeout_ms / 1000)) - try: - channel_ready.result(timeout=wait_secs) - break - except (grpc.FutureTimeoutError, grpc.RpcError): - wait_secs *= 1.2 - logging.log( - logging.WARNING if wait_secs > 1 else logging.DEBUG, - 'Waiting for the transform service to be ready at %s.', - self._address) - - logging.info('Transform service ' + self._project_name + ' started.') - - def _get_status(self): - tmp = tempfile.NamedTemporaryFile(delete=False) - self._run_docker_compose_command(['ps'], tmp) - tmp.close() - return tmp.name - - -def main(argv): - parser = argparse.ArgumentParser() - parser.add_argument('--project_name', help='Docker Compose project name.') - parser.add_argument( - '--command', - required=True, - choices=_COMMAND_POSSIBLE_VALUES, - help='Command to run. Possible values are ' + - ', '.join(_COMMAND_POSSIBLE_VALUES)) - parser.add_argument( - '--port', - type=int, - default=-1, - help='External visible port of the transform service.') - parser.add_argument( - '--beam_version', - required=True, - help='Beam version of the expansion service containers to be used.') - - known_args, _ = parser.parse_known_args(argv) - - project_name = ( - TransformServiceLauncher._DEFAULT_PROJECT_NAME - if known_args.project_name is None else known_args.project_name) - logging.info( - 'Starting the Beam Transform Service at %s.' % ( - 'the default port' if known_args.port < 0 else - (' port ' + str(known_args.port)))) - launcher = TransformServiceLauncher( - project_name, known_args.port, known_args.beam_version) - - if known_args.command == 'up': - launcher.start() - launcher.wait_till_up(-1) - elif known_args.command == 'down': - launcher.shutdown() - elif known_args.command == 'ps': - launcher.status() - else: - raise ValueError( - 'Unknown command %s possible values are %s' % - (known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES))) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - main(sys.argv)