From 4d5500bd47c95c1973b08599a08986afef327305 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 14 Dec 2023 11:01:43 +0100 Subject: [PATCH] Add install_poetry_project (#216) --- .github/actions/test-python/action.yml | 11 ++++ .gitignore | 1 + PYSPARK-DEPS.md | 17 +++++- README.md | 6 +- python/gresearch/spark/__init__.py | 78 +++++++++++++++++++++++++- python/test/test_package.py | 61 +++++++++++++++++++- 6 files changed, 170 insertions(+), 4 deletions(-) diff --git a/.github/actions/test-python/action.yml b/.github/actions/test-python/action.yml index 7ae95569..b54aa40d 100644 --- a/.github/actions/test-python/action.yml +++ b/.github/actions/test-python/action.yml @@ -76,9 +76,20 @@ runs: echo "SPARK_HOME=$SPARK_HOME" | tee -a "$GITHUB_ENV" shell: bash + - name: Prepare poetry tests + run: | + python -m venv .poetry-venv + .poetry-venv/bin/python -m pip install poetry + git clone https://github.com/Textualize/rich.git .rich + cd .rich + git reset --hard 20024635c06c22879fd2fd1e380ec4cccd9935dd + shell: bash + - name: Python Unit Tests env: PYTHONPATH: python:python/test + RICH_SOURCES: .rich + POETRY_PYTHON: .poetry-venv/bin/python run: | python -m pytest python/test --junit-xml test-results/pytest-$(date +%s.%N)-$RANDOM.xml shell: bash diff --git a/.gitignore b/.gitignore index b868af55..9954b28e 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,4 @@ build.log python/**/__pycache__ python/requirements.txt spark-* +.cache \ No newline at end of file diff --git a/PYSPARK-DEPS.md b/PYSPARK-DEPS.md index ce2427e3..0b364c5e 100644 --- a/PYSPARK-DEPS.md +++ b/PYSPARK-DEPS.md @@ -6,6 +6,10 @@ Such a deployment can be cumbersome, especially when running in an interactive n The `spark-extension` package allows installing Python packages programmatically by the PySpark application itself (PySpark ≥ 3.1.0). These packages are only accessible by that PySpark application, and they are removed on calling `spark.stop()`. +## Installing packages with `pip` + +Python packages can be installed with `pip` as follows: + ```python # noinspection PyUnresolvedReferences from gresearch.spark import * @@ -20,7 +24,7 @@ Above example installs PIP packages `pandas` and `pyarrow` via `pip`. Method `in spark.install_pip_package("pandas==1.4.3", "pyarrow~=8.0.0") # install packages from package sources (e.g. git clone https://github.com/pandas-dev/pandas.git) -spark.install_pip_package("./pandas/") +spark.install_pip_package("../pandas/") # install packages from git repo spark.install_pip_package("git+https://github.com/pandas-dev/pandas.git@main") @@ -34,3 +38,14 @@ spark.install_pip_package("pandas", "pyarrow", "--index-url", "https://artifacts # install pip packages quietly (only disables output of PIP) spark.install_pip_package("pandas", "pyarrow", "--quiet") ``` + +## Installing Python projects with Poetry + +Python projects can be installed from sources, including their dependencies, using [Poetry](https://python-poetry.org/): + +```python +# noinspection PyUnresolvedReferences +from gresearch.spark import * + +spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python") +``` diff --git a/README.md b/README.md index 466f4a39..3b09069c 100644 --- a/README.md +++ b/README.md @@ -22,14 +22,18 @@ efficiently laid out with a single operation. or [parquet-cli](https://pypi.org/project/parquet-cli/) by reading from a simple Spark data source. This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions. -**[Install PIP packages into PySpark job](PYSPARK-DEPS.md):** Install Python dependencies via PIP directly into your running PySpark job (PySpark ≥ 3.1.0): +**[Install Python packages into PySpark job](PYSPARK-DEPS.md):** Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0): ```python # noinspection PyUnresolvedReferences from gresearch.spark import * +# using PIP spark.install_pip_package("pandas==1.4.3", "pyarrow") spark.install_pip_package("-r", "requirements.txt") + +# using Poetry +spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python") ``` **[Fluent method call](CONDITIONAL.md):** `T.call(transformation: T => R): R`: Turns a transformation `T => R`, diff --git a/python/gresearch/spark/__init__.py b/python/gresearch/spark/__init__.py index 0c36fabc..aaa37f75 100644 --- a/python/gresearch/spark/__init__.py +++ b/python/gresearch/spark/__init__.py @@ -13,12 +13,14 @@ # limitations under the License. import os +import re import shutil +import subprocess import sys import time from contextlib import contextmanager +from pathlib import Path from typing import Any, Union, List, Optional, Mapping, TYPE_CHECKING -import subprocess from py4j.java_gateway import JVMView, JavaObject from pyspark import __version__ @@ -462,3 +464,77 @@ def install_pip_package(spark: Union[SparkSession, SparkContext], *package_or_pi SparkSession.install_pip_package = install_pip_package SparkContext.install_pip_package = install_pip_package + + +def install_poetry_project(spark: Union[SparkSession, SparkContext], + *project: str, + poetry_python: Optional[str] = None, + pip_args: Optional[List[str]] = None) -> None: + import logging + logger = logging.getLogger() + + # spark.install_pip_dependency has this limitation, and it is used by this method + # and we want to fail quickly here + if __version__.startswith('2.') or __version__.startswith('3.0.'): + raise NotImplementedError(f'Not supported for PySpark __version__') + + if isinstance(spark, SparkSession): + spark = spark.sparkContext + if poetry_python is None: + poetry_python = sys.executable + if pip_args is None: + pip_args = [] + + def check_and_log_poetry(proc: subprocess.CompletedProcess) -> List[str]: + stdout = proc.stdout.decode('utf-8').splitlines(keepends=False) + for line in stdout: + logger.info(f"poetry: {line}") + + stderr = proc.stderr.decode('utf-8').splitlines(keepends=False) + for line in stderr: + logger.error(f"poetry: {line}") + + if proc.returncode != 0: + raise RuntimeError(f'Poetry process terminated with exit code {proc.returncode}') + + return stdout + + def build_wheel(project: Path) -> Path: + logger.info(f"Running poetry using {poetry_python}") + + # make sure the virtual env for this project exists, otherwise we won't get to see the build whl file in stdout + proc = subprocess.run([ + poetry_python, '-m', 'poetry', + 'env', 'use', + '--directory', str(project.absolute()), + sys.executable + ], capture_output=True) + check_and_log_poetry(proc) + + # build the whl file + proc = subprocess.run([ + poetry_python, '-m', 'poetry', + 'build', + '--verbose', + '--no-interaction', + '--format', 'wheel', + '--directory', str(project.absolute()) + ], capture_output=True) + stdout = check_and_log_poetry(proc) + + # first matching line is taken to extract whl file name + whl_pattern = "^ - Built (.*.whl)$" + for line in stdout: + if match := re.match(whl_pattern, line): + return project.joinpath('dist', match.group(1)) + + raise RuntimeError(f'Could not find wheel file name in poetry output, was looking for "{whl_pattern}"') + + wheels = [build_wheel(Path(path)) for path in project] + + # install wheels via pip + spark.install_pip_package(*[str(whl.absolute()) for whl in wheels] + pip_args) + + +SparkSession.install_poetry_project = install_poetry_project +SparkContext.install_poetry_project = install_poetry_project diff --git a/python/test/test_package.py b/python/test/test_package.py index d19ca23b..7cbdbf34 100644 --- a/python/test/test_package.py +++ b/python/test/test_package.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import datetime +import os from decimal import Decimal from subprocess import CalledProcessError from unittest import skipUnless, skipIf @@ -24,6 +25,9 @@ timestamp_to_dotnet_ticks, unix_epoch_to_dotnet_ticks, unix_epoch_nanos_to_dotnet_ticks, count_null from spark_common import SparkTest +POETRY_PYTHON_ENV = "POETRY_PYTHON" +RICH_SOURCES_ENV = "RICH_SOURCES" + class PackageTest(SparkTest): @@ -169,7 +173,7 @@ def test_install_pip_package(self): import emoji emoji.emojize("this test is :thumbs_up:") - self.spark.install_pip_package("emoji") + self.spark.install_pip_package("emoji", '--cache', '.cache/pypi') # noinspection PyPackageRequirements import emoji @@ -199,6 +203,61 @@ def test_install_pip_package_not_supported(self): with self.assertRaises(NotImplementedError): self.spark.install_pip_package("emoji") + @skipIf(__version__.startswith('3.0.'), 'install_poetry_project not supported for Spark 3.0') + # provide an environment variable with path to the python binary of a virtual env that has poetry installed + @skipIf(POETRY_PYTHON_ENV not in os.environ, f'Environment variable {POETRY_PYTHON_ENV} pointing to ' + f'virtual env python with poetry required') + @skipIf(RICH_SOURCES_ENV not in os.environ, f'Environment variable {RICH_SOURCES_ENV} pointing to ' + f'rich project sources required') + def test_install_poetry_project(self): + self.spark.sparkContext.setLogLevel("INFO") + with self.assertRaises(ImportError): + # noinspection PyPackageRequirements + from rich.emoji import Emoji + thumbs_up = Emoji("thumbs_up") + + rich_path = os.environ[RICH_SOURCES_ENV] + poetry_python = os.environ[POETRY_PYTHON_ENV] + self.spark.install_poetry_project( + rich_path, + poetry_python=poetry_python, + pip_args=['--cache', '.cache/pypi'] + ) + + # noinspection PyPackageRequirements + from rich.emoji import Emoji + thumbs_up = Emoji("thumbs_up") + actual = thumbs_up.replace("this test is :thumbs_up:") + expected = "this test is 👍" + self.assertEqual(expected, actual) + + import pandas as pd + actual = self.spark.range(0, 10, 1, 10) \ + .mapInPandas(lambda it: [pd.DataFrame.from_dict({"val": [thumbs_up.replace(":thumbs_up:")]})], "val string") \ + .collect() + expected = [Row("👍")] * 10 + self.assertEqual(expected, actual) + + @skipIf(__version__.startswith('3.0.'), 'install_pip_package not supported for Spark 3.0') + # provide an environment variable with path to the python binary of a virtual env that has poetry installed + @skipIf(POETRY_PYTHON_ENV not in os.environ, f'Environment variable {POETRY_PYTHON_ENV} pointing to ' + f'virtual env python with poetry required') + @skipIf(RICH_SOURCES_ENV not in os.environ, f'Environment variable {RICH_SOURCES_ENV} pointing to ' + f'rich project sources required') + def test_install_poetry_project_wrong_arguments(self): + rich_path = os.environ[RICH_SOURCES_ENV] + poetry_python = os.environ[POETRY_PYTHON_ENV] + + with self.assertRaises(RuntimeError): + self.spark.install_poetry_project("non-existing-project", poetry_python=poetry_python) + with self.assertRaises(FileNotFoundError): + self.spark.install_poetry_project(rich_path, poetry_python="non-existing-python") + + @skipUnless(__version__.startswith('3.0.'), 'install_poetry_project not supported for Spark 3.0') + def test_install_poetry_project_not_supported(self): + with self.assertRaises(NotImplementedError): + self.spark.install_poetry_project("./rich") + if __name__ == '__main__': SparkTest.main()