Skip to content

Commit

Permalink
[BEAM-4032]Support staging binary distributions of dependency packages (
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva authored Feb 11, 2022
1 parent 3ad0552 commit 7975127
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 28 deletions.
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@

## New Features / Improvements

* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

* Pipeline dependencies supplied through `--requirements_file` will now be staged to the runner using their binary distributions (wheels) of the PyPI packages for linux_x86_64 platform ([BEAM-4032](https://issues.apache.org/jira/browse/BEAM-4032)).
To restore the behavior to use source distributions, set pipeline option `--requirements_cache_only_sources`. To skip staging the packages at submission time, set pipeline option `--requirements_cache=skip` (Python)
## Breaking Changes

* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
Expand Down
15 changes: 14 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,20 @@ def _add_argparse_args(cls, parser):
default=None,
help=(
'Path to a folder to cache the packages specified in '
'the requirements file using the --requirements_file option.'))
'the requirements file using the --requirements_file option.'
'If you want to skip populating requirements cache, please '
'specify --requirements_cache="skip".'))
parser.add_argument(
'--requirements_cache_only_sources',
action='store_true',
help=(
'Enable this flag to populate requirements cache only '
'with Source distributions(sdists) of the dependencies '
'mentioned in the --requirements_file'
'Note: (BEAM-4032): This flag may significantly slow down '
'the pipeline submission. It is added to preserve the requirements'
' cache behavior prior to 2.37.0 and will likely be removed in '
'future releases.'))
parser.add_argument(
'--setup_file',
default=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A pipeline to verify the installation of packages specified in the
requirements.txt. A requirements text is created during runtime with
package specified in _PACKAGE_IN_REQUIREMENTS_FILE.
"""

import argparse
import logging
import os
import shutil
import tempfile

import pkg_resources as pkg

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

_PACKAGE_IN_REQUIREMENTS_FILE = ['matplotlib', 'seaborn']


def verify_packages_from_requirements_file_are_installed(unused_element):
_PACKAGE_NOT_IN_REQUIREMENTS_FILE = ['torch']
packages_to_test = _PACKAGE_IN_REQUIREMENTS_FILE + (
_PACKAGE_NOT_IN_REQUIREMENTS_FILE)
for package_name in packages_to_test:
try:
output = pkg.get_distribution(package_name)
except pkg.DistributionNotFound as e: # pylint: disable=unused-variable
output = None
if package_name in _PACKAGE_IN_REQUIREMENTS_FILE:
assert output is not None, ('Please check if package %s is specified'
' in requirements file' % package_name)
if package_name in _PACKAGE_NOT_IN_REQUIREMENTS_FILE:
assert output is None


def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
temp_dir = tempfile.mkdtemp()
requirements_text_path = os.path.join(temp_dir, 'requirements.txt')
with open(requirements_text_path, 'w') as f:
f.write('\n'.join(_PACKAGE_IN_REQUIREMENTS_FILE))
pipeline_options.view_as(
SetupOptions).requirements_file = requirements_text_path

with beam.Pipeline(options=pipeline_options) as p:
( # pylint: disable=expression-not-assigned
p
| beam.Create([None])
| beam.Map(verify_packages_from_requirements_file_are_installed))
shutil.rmtree(temp_dir)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
97 changes: 77 additions & 20 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import shutil
import sys
import tempfile
from distutils.version import StrictVersion
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
Expand Down Expand Up @@ -82,6 +84,8 @@
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'
# One of the choices for user to use for requirements cache during staging
SKIP_REQUIREMENTS_CACHE = 'skip'

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,7 +166,7 @@ def create_job_resources(options, # type: PipelineOptions
temp_dir, # type: str
build_setup_args=None, # type: Optional[List[str]]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
skip_prestaged_dependencies=False, # type: Optional[bool]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -197,6 +201,8 @@ def create_job_resources(options, # type: PipelineOptions
resources = [] # type: List[beam_runner_api_pb2.ArtifactInformation]

setup_options = options.view_as(SetupOptions)
use_beam_default_container = options.view_as(
WorkerOptions).sdk_container_image is None

pickler.set_library(setup_options.pickle_library)

Expand All @@ -205,8 +211,8 @@ def create_job_resources(options, # type: PipelineOptions
# if we know we are using a dependency pre-installed sdk container image.
if not skip_prestaged_dependencies:
requirements_cache_path = (
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache')
if setup_options.requirements_cache is None else
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if
(setup_options.requirements_cache is None) else
setup_options.requirements_cache)
if not os.path.exists(requirements_cache_path):
os.makedirs(requirements_cache_path)
Expand All @@ -223,10 +229,20 @@ def create_job_resources(options, # type: PipelineOptions
setup_options.requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file, requirements_cache_path)
if not use_beam_default_container:
_LOGGER.warning(
'When using a custom container image, prefer installing'
' additional PyPI dependencies directly into the image,'
' instead of specifying them via runtime options, '
'such as --requirements_file. ')

if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file,
requirements_cache_path,
setup_options.requirements_cache_only_sources)

if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
Expand All @@ -235,12 +251,16 @@ def create_job_resources(options, # type: PipelineOptions
resources.append(Stager._create_file_pip_requirements_artifact(tf.name))
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name, requirements_cache_path)

if setup_options.requirements_file is not None or pypi_requirements:
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name,
requirements_cache_path,
setup_options.requirements_cache_only_sources)

if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
setup_options.requirements_file is not None or pypi_requirements):
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
Expand Down Expand Up @@ -398,7 +418,7 @@ def create_and_stage_job_resources(
build_setup_args=None, # type: Optional[List[str]]
temp_dir=None, # type: Optional[str]
pypi_requirements=None, # type: Optional[List[str]]
populate_requirements_cache=None, # type: Optional[str]
populate_requirements_cache=None, # type: Optional[Callable[[str, str, bool], None]]
staging_location=None # type: Optional[str]
):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down Expand Up @@ -644,7 +664,7 @@ def _get_python_executable():
return python_bin

@staticmethod
def remove_dependency_from_requirements(
def _remove_dependency_from_requirements(
requirements_file, # type: str
dependency_to_remove, # type: str
temp_directory_path):
Expand All @@ -663,10 +683,31 @@ def remove_dependency_from_requirements(

return tmp_requirements_filename

@staticmethod
def _get_platform_for_default_sdk_container():
"""
Get the platform for apache beam SDK container based on Pip version.
Note: pip is still expected to download compatible wheel of a package
with platform tag manylinux1 if the package on PyPI doesn't
have (manylinux2014) or (manylinux2010) wheels.
Reference: https://www.python.org/dev/peps/pep-0599/#id21
"""

# TODO(anandinguva): When https://github.com/pypa/pip/issues/10760 is
# addressed, download wheel based on glibc version in Beam's Python
# Base image
pip_version = pkg_resources.get_distribution('pip').version
if StrictVersion(pip_version) >= StrictVersion('19.3'):
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'

@staticmethod
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
def _populate_requirements_cache(requirements_file, cache_dir):
def _populate_requirements_cache(
requirements_file, cache_dir, populate_cache_with_sdists=False):
# The 'pip download' command will not download again if it finds the
# tarball with the proper version already present.
# It will get the packages downloaded in the order they are presented in
Expand All @@ -675,10 +716,11 @@ def _populate_requirements_cache(requirements_file, cache_dir):
# The apache-beam dependency is excluded from requirements cache population
# because we stage the SDK separately.
with tempfile.TemporaryDirectory() as temp_directory:
tmp_requirements_filepath = Stager.remove_dependency_from_requirements(
tmp_requirements_filepath = Stager._remove_dependency_from_requirements(
requirements_file=requirements_file,
dependency_to_remove='apache-beam',
temp_directory_path=temp_directory)

cmd_args = [
Stager._get_python_executable(),
'-m',
Expand All @@ -690,10 +732,25 @@ def _populate_requirements_cache(requirements_file, cache_dir):
tmp_requirements_filepath,
'--exists-action',
'i',
# Download from PyPI source distributions.
'--no-binary',
':all:'
'--no-deps'
]

if populate_cache_with_sdists:
cmd_args.extend(['--no-binary', ':all:'])
else:
language_implementation_tag = 'cp'
abi_suffix = 'm' if sys.version_info < (3, 8) else ''
abi_tag = 'cp%d%d%s' % (
sys.version_info[0], sys.version_info[1], abi_suffix)
platform_tag = Stager._get_platform_for_default_sdk_container()
cmd_args.extend([
'--implementation',
language_implementation_tag,
'--abi',
abi_tag,
'--platform',
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)

Expand Down
Loading

0 comments on commit 7975127

Please sign in to comment.