Skip to content

Commit

Permalink
Add install_poetry_project (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Dec 14, 2023
1 parent 57aec01 commit 4d5500b
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 4 deletions.
11 changes: 11 additions & 0 deletions .github/actions/test-python/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,20 @@ runs:
echo "SPARK_HOME=$SPARK_HOME" | tee -a "$GITHUB_ENV"
shell: bash

- name: Prepare poetry tests
run: |
python -m venv .poetry-venv
.poetry-venv/bin/python -m pip install poetry
git clone https://github.com/Textualize/rich.git .rich
cd .rich
git reset --hard 20024635c06c22879fd2fd1e380ec4cccd9935dd
shell: bash

- name: Python Unit Tests
env:
PYTHONPATH: python:python/test
RICH_SOURCES: .rich
POETRY_PYTHON: .poetry-venv/bin/python
run: |
python -m pytest python/test --junit-xml test-results/pytest-$(date +%s.%N)-$RANDOM.xml
shell: bash
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ build.log
python/**/__pycache__
python/requirements.txt
spark-*
.cache
17 changes: 16 additions & 1 deletion PYSPARK-DEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ Such a deployment can be cumbersome, especially when running in an interactive n
The `spark-extension` package allows installing Python packages programmatically by the PySpark application itself (PySpark ≥ 3.1.0).
These packages are only accessible by that PySpark application, and they are removed on calling `spark.stop()`.

## Installing packages with `pip`

Python packages can be installed with `pip` as follows:

```python
# noinspection PyUnresolvedReferences
from gresearch.spark import *
Expand All @@ -20,7 +24,7 @@ Above example installs PIP packages `pandas` and `pyarrow` via `pip`. Method `in
spark.install_pip_package("pandas==1.4.3", "pyarrow~=8.0.0")

# install packages from package sources (e.g. git clone https://github.com/pandas-dev/pandas.git)
spark.install_pip_package("./pandas/")
spark.install_pip_package("../pandas/")

# install packages from git repo
spark.install_pip_package("git+https://github.com/pandas-dev/pandas.git@main")
Expand All @@ -34,3 +38,14 @@ spark.install_pip_package("pandas", "pyarrow", "--index-url", "https://artifacts
# install pip packages quietly (only disables output of PIP)
spark.install_pip_package("pandas", "pyarrow", "--quiet")
```

## Installing Python projects with Poetry

Python projects can be installed from sources, including their dependencies, using [Poetry](https://python-poetry.org/):

```python
# noinspection PyUnresolvedReferences
from gresearch.spark import *

spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python")
```
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ efficiently laid out with a single operation.
or [parquet-cli](https://pypi.org/project/parquet-cli/) by reading from a simple Spark data source.
This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

**[Install PIP packages into PySpark job](PYSPARK-DEPS.md):** Install Python dependencies via PIP directly into your running PySpark job (PySpark ≥ 3.1.0):
**[Install Python packages into PySpark job](PYSPARK-DEPS.md):** Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):

```python
# noinspection PyUnresolvedReferences
from gresearch.spark import *

# using PIP
spark.install_pip_package("pandas==1.4.3", "pyarrow")
spark.install_pip_package("-r", "requirements.txt")

# using Poetry
spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python")
```

**[Fluent method call](CONDITIONAL.md):** `T.call(transformation: T => R): R`: Turns a transformation `T => R`,
Expand Down
78 changes: 77 additions & 1 deletion python/gresearch/spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
# limitations under the License.

import os
import re
import shutil
import subprocess
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Union, List, Optional, Mapping, TYPE_CHECKING
import subprocess

from py4j.java_gateway import JVMView, JavaObject
from pyspark import __version__
Expand Down Expand Up @@ -462,3 +464,77 @@ def install_pip_package(spark: Union[SparkSession, SparkContext], *package_or_pi

SparkSession.install_pip_package = install_pip_package
SparkContext.install_pip_package = install_pip_package


def install_poetry_project(spark: Union[SparkSession, SparkContext],
*project: str,
poetry_python: Optional[str] = None,
pip_args: Optional[List[str]] = None) -> None:
import logging
logger = logging.getLogger()

# spark.install_pip_dependency has this limitation, and it is used by this method
# and we want to fail quickly here
if __version__.startswith('2.') or __version__.startswith('3.0.'):
raise NotImplementedError(f'Not supported for PySpark __version__')

if isinstance(spark, SparkSession):
spark = spark.sparkContext
if poetry_python is None:
poetry_python = sys.executable
if pip_args is None:
pip_args = []

def check_and_log_poetry(proc: subprocess.CompletedProcess) -> List[str]:
stdout = proc.stdout.decode('utf-8').splitlines(keepends=False)
for line in stdout:
logger.info(f"poetry: {line}")

stderr = proc.stderr.decode('utf-8').splitlines(keepends=False)
for line in stderr:
logger.error(f"poetry: {line}")

if proc.returncode != 0:
raise RuntimeError(f'Poetry process terminated with exit code {proc.returncode}')

return stdout

def build_wheel(project: Path) -> Path:
logger.info(f"Running poetry using {poetry_python}")

# make sure the virtual env for this project exists, otherwise we won't get to see the build whl file in stdout
proc = subprocess.run([
poetry_python, '-m', 'poetry',
'env', 'use',
'--directory', str(project.absolute()),
sys.executable
], capture_output=True)
check_and_log_poetry(proc)

# build the whl file
proc = subprocess.run([
poetry_python, '-m', 'poetry',
'build',
'--verbose',
'--no-interaction',
'--format', 'wheel',
'--directory', str(project.absolute())
], capture_output=True)
stdout = check_and_log_poetry(proc)

# first matching line is taken to extract whl file name
whl_pattern = "^ - Built (.*.whl)$"
for line in stdout:
if match := re.match(whl_pattern, line):
return project.joinpath('dist', match.group(1))

raise RuntimeError(f'Could not find wheel file name in poetry output, was looking for "{whl_pattern}"')

wheels = [build_wheel(Path(path)) for path in project]

# install wheels via pip
spark.install_pip_package(*[str(whl.absolute()) for whl in wheels] + pip_args)


SparkSession.install_poetry_project = install_poetry_project
SparkContext.install_poetry_project = install_poetry_project
61 changes: 60 additions & 1 deletion python/test/test_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
from decimal import Decimal
from subprocess import CalledProcessError
from unittest import skipUnless, skipIf
Expand All @@ -24,6 +25,9 @@
timestamp_to_dotnet_ticks, unix_epoch_to_dotnet_ticks, unix_epoch_nanos_to_dotnet_ticks, count_null
from spark_common import SparkTest

POETRY_PYTHON_ENV = "POETRY_PYTHON"
RICH_SOURCES_ENV = "RICH_SOURCES"


class PackageTest(SparkTest):

Expand Down Expand Up @@ -169,7 +173,7 @@ def test_install_pip_package(self):
import emoji
emoji.emojize("this test is :thumbs_up:")

self.spark.install_pip_package("emoji")
self.spark.install_pip_package("emoji", '--cache', '.cache/pypi')

# noinspection PyPackageRequirements
import emoji
Expand Down Expand Up @@ -199,6 +203,61 @@ def test_install_pip_package_not_supported(self):
with self.assertRaises(NotImplementedError):
self.spark.install_pip_package("emoji")

@skipIf(__version__.startswith('3.0.'), 'install_poetry_project not supported for Spark 3.0')
# provide an environment variable with path to the python binary of a virtual env that has poetry installed
@skipIf(POETRY_PYTHON_ENV not in os.environ, f'Environment variable {POETRY_PYTHON_ENV} pointing to '
f'virtual env python with poetry required')
@skipIf(RICH_SOURCES_ENV not in os.environ, f'Environment variable {RICH_SOURCES_ENV} pointing to '
f'rich project sources required')
def test_install_poetry_project(self):
self.spark.sparkContext.setLogLevel("INFO")
with self.assertRaises(ImportError):
# noinspection PyPackageRequirements
from rich.emoji import Emoji
thumbs_up = Emoji("thumbs_up")

rich_path = os.environ[RICH_SOURCES_ENV]
poetry_python = os.environ[POETRY_PYTHON_ENV]
self.spark.install_poetry_project(
rich_path,
poetry_python=poetry_python,
pip_args=['--cache', '.cache/pypi']
)

# noinspection PyPackageRequirements
from rich.emoji import Emoji
thumbs_up = Emoji("thumbs_up")
actual = thumbs_up.replace("this test is :thumbs_up:")
expected = "this test is 👍"
self.assertEqual(expected, actual)

import pandas as pd
actual = self.spark.range(0, 10, 1, 10) \
.mapInPandas(lambda it: [pd.DataFrame.from_dict({"val": [thumbs_up.replace(":thumbs_up:")]})], "val string") \
.collect()
expected = [Row("👍")] * 10
self.assertEqual(expected, actual)

@skipIf(__version__.startswith('3.0.'), 'install_pip_package not supported for Spark 3.0')
# provide an environment variable with path to the python binary of a virtual env that has poetry installed
@skipIf(POETRY_PYTHON_ENV not in os.environ, f'Environment variable {POETRY_PYTHON_ENV} pointing to '
f'virtual env python with poetry required')
@skipIf(RICH_SOURCES_ENV not in os.environ, f'Environment variable {RICH_SOURCES_ENV} pointing to '
f'rich project sources required')
def test_install_poetry_project_wrong_arguments(self):
rich_path = os.environ[RICH_SOURCES_ENV]
poetry_python = os.environ[POETRY_PYTHON_ENV]

with self.assertRaises(RuntimeError):
self.spark.install_poetry_project("non-existing-project", poetry_python=poetry_python)
with self.assertRaises(FileNotFoundError):
self.spark.install_poetry_project(rich_path, poetry_python="non-existing-python")

@skipUnless(__version__.startswith('3.0.'), 'install_poetry_project not supported for Spark 3.0')
def test_install_poetry_project_not_supported(self):
with self.assertRaises(NotImplementedError):
self.spark.install_poetry_project("./rich")


if __name__ == '__main__':
SparkTest.main()

0 comments on commit 4d5500b

Please sign in to comment.