Skip to content

Commit

Permalink
Allow local packages in requirements.txt dependency list. (#23684)
Browse files Browse the repository at this point in the history
We pull these out and install them with the extra local packages infrastructure.
  • Loading branch information
robertwb authored Oct 21, 2022
1 parent 8dd8749 commit 8f300b0
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 3 deletions.
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.44.0] - Unreleased

## New Features / Improvements

* Local packages can now be used as dependencies in the requirements.txt file, rather
than requiring them to be passed separately via the `--extra_package` option.
([#23684](https://github.com/apache/beam/pull/23684))


# [2.43.0] - Unreleased

## Highlights
Expand Down
26 changes: 25 additions & 1 deletion sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,14 @@ def create_job_resources(options, # type: PipelineOptions
'The file %s cannot be found. It was specified in the '
'--requirements_file command line option.' %
setup_options.requirements_file)
extra_packages, thinned_requirements_file = (
Stager._extract_local_packages(setup_options.requirements_file))
if extra_packages:
setup_options.extra_packages = (
setup_options.extra_packages or []) + extra_packages
resources.append(
Stager._create_file_stage_to_artifact(
setup_options.requirements_file, REQUIREMENTS_FILE))
thinned_requirements_file, REQUIREMENTS_FILE))
# Populate cache with packages from the requirement file option and
# stage the files in the cache.
if not use_beam_default_container:
Expand Down Expand Up @@ -683,6 +688,25 @@ def _remove_dependency_from_requirements(

return tmp_requirements_filename

@staticmethod
def _extract_local_packages(requirements_file):
local_deps = []
pypi_deps = []
with open(requirements_file, 'r') as fin:
for line in fin:
dep = line.strip()
if os.path.exists(dep):
local_deps.append(dep)
else:
pypi_deps.append(dep)
if local_deps:
with tempfile.NamedTemporaryFile(suffix='-requirements.txt',
delete=False) as fout:
fout.write('\n'.join(pypi_deps).encode('utf-8'))
return local_deps, fout.name
else:
return [], requirements_file

@staticmethod
def _get_platform_for_default_sdk_container():
"""
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,49 @@ def test_populate_requirements_cache_with_sdist(self):
self.assertTrue('.tar.gz' in f)
self.assertTrue('.whl' not in f)

def test_populate_requirements_cache_with_local_files(self):
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()
pkg_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)

options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
local_package = os.path.join(pkg_dir, 'local_package.tar.gz')
self.create_temp_file(local_package, 'local-package-content')
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE),
'\n'.join(['fake_pypi', local_package]))
with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._populate_requirements_cache',
staticmethod(self._populate_requitements_cache_fake)):
options.view_as(SetupOptions).requirements_cache_only_sources = True
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]

self.assertEqual(
sorted([
stager.REQUIREMENTS_FILE,
stager.EXTRA_PACKAGES_FILE,
'nothing.tar.gz',
'local_package.tar.gz'
]),
sorted(resources))

with open(os.path.join(staging_dir, stager.REQUIREMENTS_FILE)) as fin:
requirements_contents = fin.read()
self.assertIn('fake_pypi', requirements_contents)
self.assertNotIn('local_package', requirements_contents)

with open(os.path.join(staging_dir, stager.EXTRA_PACKAGES_FILE)) as fin:
extra_packages_contents = fin.read()
self.assertNotIn('fake_pypi', extra_packages_contents)
self.assertIn('local_package', extra_packages_contents)


class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ If your pipeline uses public packages from the [Python Package Index](https://py

This command creates a `requirements.txt` file that lists all packages that are installed on your machine, regardless of where they were installed from.

2. Edit the `requirements.txt` file and leave only the packages that were installed from PyPI and are used in the workflow source. Delete all packages that are not relevant to your code.
2. Edit the `requirements.txt` file and delete all packages that are not relevant to your code.

3. Run your pipeline with the following command-line option:

--requirements_file requirements.txt

The runner will use the `requirements.txt` file to install your additional dependencies onto the remote workers.

**Important:** Remote workers will install all packages listed in the `requirements.txt` file. Because of this, it's very important that you delete non-PyPI packages from the `requirements.txt` file, as stated in step 2. If you don't remove non-PyPI packages, the remote workers will fail when attempting to install packages from sources that are unknown to them.
> **NOTE**: An alternative to `pip freeze` is to use a library like [pip-tools](https://github.com/jazzband/pip-tools) to compile all the dependencies required for the pipeline from a `--requirements_file`, where only top-level dependencies are mentioned.
## Custom Containers {#custom-containers}
Expand Down

0 comments on commit 8f300b0

Please sign in to comment.