Skip to content

Commit

Permalink
Updates Python ExternalTransform to use the transform service when ne…
Browse files Browse the repository at this point in the history
…eded (#27228)

* Updates Python ExternalTransform to use the transform service when needed

* Addressing reviewer comments

* Fix yapf

* Fix lint

* Fix yapf
  • Loading branch information
chamikaramj committed Jun 25, 2023
1 parent e07c461 commit d545243
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 1 deletion.
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,13 @@ def _add_argparse_args(cls, parser):
'Should be a json mapping of gradle build targets to pre-built '
'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).'))

parser.add_argument(
'--use_transform_service',
default=False,
action='store_true',
help='Use the Docker-composed-based transform service when expanding '
'cross-language transforms.')


def additional_option_ptransform_fn():
beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
Expand Down
74 changes: 73 additions & 1 deletion sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import functools
import glob
import logging
import subprocess
import threading
import uuid
from collections import OrderedDict
from collections import namedtuple
from typing import Dict
Expand All @@ -32,6 +34,7 @@

from apache_beam import pvalue
from apache_beam.coders import RowCoder
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_expansion_api_pb2
Expand All @@ -51,6 +54,7 @@
from apache_beam.typehints.typehints import Union
from apache_beam.typehints.typehints import UnionConstraint
from apache_beam.utils import subprocess_server
from apache_beam.utils import transform_service_launcher

DEFAULT_EXPANSION_SERVICE = 'localhost:8097'

Expand Down Expand Up @@ -668,7 +672,10 @@ def expand(self, pvalueish):
transform=transform_proto,
output_coder_requests=output_coders)

with ExternalTransform.service(self._expansion_service) as service:
expansion_service = _maybe_use_transform_service(
self._expansion_service, pipeline.options)

with ExternalTransform.service(expansion_service) as service:
response = service.Expand(request)
if response.error:
raise RuntimeError(response.error)
Expand Down Expand Up @@ -973,6 +980,71 @@ def __init__(
path_to_jar, extra_args, classpath=classpath, append_args=append_args)


def _maybe_use_transform_service(provided_service=None, options=None):
# For anything other than 'JavaJarExpansionService' we just use the
# provided service. For example, string address of an already available
# service.
if not isinstance(provided_service, JavaJarExpansionService):
return provided_service

def is_java_available():
cmd = ['java', '--version']

try:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except: # pylint: disable=bare-except
return False

return True

def is_docker_available():
cmd = ['docker', '--version']

try:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except: # pylint: disable=bare-except
return False

return True

# We try java and docker based expansion services in that order.

java_available = is_java_available()
docker_available = is_docker_available()

use_transform_service = options.view_as(
CrossLanguageOptions).use_transform_service

if (java_available and provided_service and not use_transform_service):
return provided_service
elif docker_available:
if use_transform_service:
error_append = 'it was explicitly requested'
elif not java_available:
error_append = 'the Java executable is not available in the system'
else:
error_append = 'a Java expansion service was not provided.'

project_name = str(uuid.uuid4())
port = subprocess_server.pick_port(None)[0]

logging.info(
'Trying to expand the external transform using the Docker Compose '
'based transform service since %s. Transform service will be under '
'Docker Compose project name %s and will be made available at port %r.'
% (error_append, project_name, str(port)))

from apache_beam import version as beam_version
beam_version = beam_version.__version__

return transform_service_launcher.TransformServiceLauncher(
project_name, port, beam_version)
else:
raise ValueError(
'Cannot start an expansion service since neither Java nor '
'Docker executables are available in the system.')


def memoize(func):
cache = {}

Expand Down
261 changes: 261 additions & 0 deletions sdks/python/apache_beam/utils/transform_service_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from pathlib import Path

import grpc

from apache_beam.utils import subprocess_server

_LOGGER = logging.getLogger(__name__)

_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps']

_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:launcher:build'


class TransformServiceLauncher(object):
_DEFAULT_PROJECT_NAME = 'apache.beam.transform.service'
_DEFAULT_START_WAIT_TIMEOUT = 25000

_launchers = {} # type: ignore

# Maintaining a static list of launchers to prevent temporary resources
# from being created unnecessarily.
def __new__(cls, project_name, port, beam_version=None):
if project_name not in TransformServiceLauncher._launchers:
TransformServiceLauncher._launchers[project_name] = super(
TransformServiceLauncher, cls).__new__(cls)
return TransformServiceLauncher._launchers[project_name]

def __init__(self, project_name, port, beam_version=None):
logging.info('Initializing the Beam Transform Service %s.' % project_name)

self._project_name = project_name
self._port = port
self._address = 'localhost:' + str(self._port)

self._launcher_lock = threading.RLock()

self.docker_compose_command_prefix = [
'docker-compose', '-p', project_name, '-f', 'TODO path'
]

# Setting up Docker Compose configuration.

# We use Docker Compose project name as the name of the temporary directory
# to isolate different transform service instances that may be running in
# the same machine.

temp_dir = os.path.join(tempfile.gettempdir(), project_name)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)

# Get the jar with configs
path_to_local_jar = subprocess_server.JavaJarServer.local_jar(
subprocess_server.JavaJarServer.path_to_beam_jar(
_EXPANSION_SERVICE_LAUNCHER_JAR))

with zipfile.ZipFile(path_to_local_jar) as launcher_jar:
launcher_jar.extract('docker-compose.yml', path=temp_dir)
launcher_jar.extract('.env', path=temp_dir)

compose_file = os.path.join(temp_dir, 'docker-compose.yml')

credentials_dir = os.path.join(temp_dir, 'credentials_dir')
if not os.path.exists(credentials_dir):
os.mkdir(credentials_dir)

logging.info('Copying the Google Application Default Credentials file.')

is_windows = 'windows' in os.name.lower()
application_default_path_suffix = (
'\\gcloud\\application_default_credentials.json' if is_windows else
'.config/gcloud/application_default_credentials.json')
application_default_path_file = os.path.join(
str(Path.home()), application_default_path_suffix)
application_default_path_copied = os.path.join(
credentials_dir, 'application_default_credentials.json')

if os.path.exists(application_default_path_file):
shutil.copyfile(
application_default_path_file, application_default_path_copied)
else:
logging.info(
'GCP credentials will not be available for the transform service '
'since could not find the Google Cloud application default '
'credentials file at the expected location %s.' %
application_default_path_file)

self._environmental_variables = {}
self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir
self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port)
self._environmental_variables['BEAM_VERSION'] = beam_version

self._docker_compose_start_command_prefix = []
self._docker_compose_start_command_prefix.append('docker-compose')
self._docker_compose_start_command_prefix.append('-p')
self._docker_compose_start_command_prefix.append(project_name)
self._docker_compose_start_command_prefix.append('-f')
self._docker_compose_start_command_prefix.append(compose_file)

def _get_channel(self):
channel_options = [("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)]
if hasattr(grpc, 'local_channel_credentials'):
# TODO: update this to support secure non-local channels.
return grpc.secure_channel(
self._address,
grpc.local_channel_credentials(),
options=channel_options)
else:
return grpc.insecure_channel(self._address, options=channel_options)

def __enter__(self):
self.start()
self.wait_till_up(-1)

self._channel = self._get_channel()

from apache_beam import external
return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__())

def __exit__(self, *args):
self.shutdown()
self._channel.__exit__(*args)

def _run_docker_compose_command(self, command, output_override=None):
cmd = []
cmd.extend(self._docker_compose_start_command_prefix)
cmd.extend(command)

myenv = os.environ.copy()
myenv.update(self._environmental_variables)

process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv)
std_out, _ = process.communicate()

if output_override:
output_override.write(std_out)
else:
print(std_out.decode(errors='backslashreplace'))

def start(self):
with self._launcher_lock:
self._run_docker_compose_command(['up', '-d'])

def shutdown(self):
with self._launcher_lock:
self._run_docker_compose_command(['down'])

def status(self):
with self._launcher_lock:
self._run_docker_compose_command(['ps'])

def wait_till_up(self, timeout_ms):
channel = self._get_channel()

timeout_ms = (
TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT
if timeout_ms <= 0 else timeout_ms)

# Waiting till the service is up.
channel_ready = grpc.channel_ready_future(channel)
wait_secs = .1
start_time = time.time()
while True:
if (time.time() - start_time) * 1000 > timeout_ms > 0:
raise ValueError(
'Transform service did not start in %s seconds.' %
(timeout_ms / 1000))
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for the transform service to be ready at %s.',
self._address)

logging.info('Transform service ' + self._project_name + ' started.')

def _get_status(self):
tmp = tempfile.NamedTemporaryFile(delete=False)
self._run_docker_compose_command(['ps'], tmp)
tmp.close()
return tmp.name


def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--project_name', help='Docker Compose project name.')
parser.add_argument(
'--command',
required=True,
choices=_COMMAND_POSSIBLE_VALUES,
help='Command to run. Possible values are ' +
', '.join(_COMMAND_POSSIBLE_VALUES))
parser.add_argument(
'--port',
type=int,
default=-1,
help='External visible port of the transform service.')
parser.add_argument(
'--beam_version',
required=True,
help='Beam version of the expansion service containers to be used.')

known_args, _ = parser.parse_known_args(argv)

project_name = (
TransformServiceLauncher._DEFAULT_PROJECT_NAME
if known_args.project_name is None else known_args.project_name)
logging.info(
'Starting the Beam Transform Service at %s.' % (
'the default port' if known_args.port < 0 else
(' port ' + str(known_args.port))))
launcher = TransformServiceLauncher(
project_name, known_args.port, known_args.beam_version)

if known_args.command == 'up':
launcher.start()
launcher.wait_till_up(-1)
elif known_args.command == 'down':
launcher.shutdown()
elif known_args.command == 'ps':
launcher.status()
else:
raise ValueError(
'Unknown command %s possible values are %s' %
(known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES)))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main(sys.argv)

0 comments on commit d545243

Please sign in to comment.