Skip to content

Commit

Permalink
Revert "Updates Python ExternalTransform to use the transform service…
Browse files Browse the repository at this point in the history
… when needed (#27228)"

This reverts commit d545243.
  • Loading branch information
bvolpato committed Jul 3, 2023
1 parent c36f0f1 commit 4b758fc
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 341 deletions.
7 changes: 0 additions & 7 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,6 @@ def _add_argparse_args(cls, parser):
'Should be a json mapping of gradle build targets to pre-built '
'artifacts (e.g. jar files) expansion endpoints (e.g. host:port).'))

parser.add_argument(
'--use_transform_service',
default=False,
action='store_true',
help='Use the Docker-composed-based transform service when expanding '
'cross-language transforms.')


def additional_option_ptransform_fn():
beam.transforms.ptransform.ptransform_fn_typehints_enabled = True
Expand Down
74 changes: 1 addition & 73 deletions sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import functools
import glob
import logging
import subprocess
import threading
import uuid
from collections import OrderedDict
from collections import namedtuple
from typing import Dict
Expand All @@ -34,7 +32,6 @@

from apache_beam import pvalue
from apache_beam.coders import RowCoder
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_expansion_api_pb2
Expand All @@ -54,7 +51,6 @@
from apache_beam.typehints.typehints import Union
from apache_beam.typehints.typehints import UnionConstraint
from apache_beam.utils import subprocess_server
from apache_beam.utils import transform_service_launcher

DEFAULT_EXPANSION_SERVICE = 'localhost:8097'

Expand Down Expand Up @@ -672,10 +668,7 @@ def expand(self, pvalueish):
transform=transform_proto,
output_coder_requests=output_coders)

expansion_service = _maybe_use_transform_service(
self._expansion_service, pipeline.options)

with ExternalTransform.service(expansion_service) as service:
with ExternalTransform.service(self._expansion_service) as service:
response = service.Expand(request)
if response.error:
raise RuntimeError(response.error)
Expand Down Expand Up @@ -980,71 +973,6 @@ def __init__(
path_to_jar, extra_args, classpath=classpath, append_args=append_args)


def _maybe_use_transform_service(provided_service=None, options=None):
# For anything other than 'JavaJarExpansionService' we just use the
# provided service. For example, string address of an already available
# service.
if not isinstance(provided_service, JavaJarExpansionService):
return provided_service

def is_java_available():
cmd = ['java', '--version']

try:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except: # pylint: disable=bare-except
return False

return True

def is_docker_available():
cmd = ['docker', '--version']

try:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except: # pylint: disable=bare-except
return False

return True

# We try java and docker based expansion services in that order.

java_available = is_java_available()
docker_available = is_docker_available()

use_transform_service = options.view_as(
CrossLanguageOptions).use_transform_service

if (java_available and provided_service and not use_transform_service):
return provided_service
elif docker_available:
if use_transform_service:
error_append = 'it was explicitly requested'
elif not java_available:
error_append = 'the Java executable is not available in the system'
else:
error_append = 'a Java expansion service was not provided.'

project_name = str(uuid.uuid4())
port = subprocess_server.pick_port(None)[0]

logging.info(
'Trying to expand the external transform using the Docker Compose '
'based transform service since %s. Transform service will be under '
'Docker Compose project name %s and will be made available at port %r.'
% (error_append, project_name, str(port)))

from apache_beam import version as beam_version
beam_version = beam_version.__version__

return transform_service_launcher.TransformServiceLauncher(
project_name, port, beam_version)
else:
raise ValueError(
'Cannot start an expansion service since neither Java nor '
'Docker executables are available in the system.')


def memoize(func):
cache = {}

Expand Down
261 changes: 0 additions & 261 deletions sdks/python/apache_beam/utils/transform_service_launcher.py

This file was deleted.

0 comments on commit 4b758fc

Please sign in to comment.