Skip to content

Commit

Permalink
[KED-2717] Update docstrings to mention the meaning of the _SINGLE_PR…
Browse files Browse the repository at this point in the history
…OCESS flag in datasets (#1196)
  • Loading branch information
jiriklein authored Aug 3, 2021
1 parent c6f45e6 commit 885fc3e
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 3 deletions.
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ from kedro.framework.project import settings

print(settings.CONF_ROOT)
```
* Added a check on `kedro.runner.parallel_runner.ParallelRunner` which checks datasets for the `_SINGLE_PROCESS` attribute in the `_validate_catalog` method. If this attribute is set to `True` in an instance of a dataset (e.g. `SparkDataSet`), the `ParallelRunner` will raise an `AttributeError`.
* Any user-defined dataset that should not be used with `ParallelRunner` may now have the `_SINGLE_PROCESS` attribute set to `True`.

## Bug fixes and other changes
* The version of a packaged modular pipeline now defaults to the version of the project package.
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
"https://www.oracle.com/java/technologies/javase-downloads.html", # "forbidden" url
"https://towardsdatascience.com/the-importance-of-layered-thinking-in-data-engineering-a09f685edc71",
"https://medium.com/quantumblack/beyond-the-notebook-and-into-the-data-science-framework-revolution-a7fd364ab9c4",
"https://www.java.com/en/download/help/download_options.html", # "403 Client Error: Forbidden for url"
]

# retry before render a link broken (fix for "too many requests")
Expand Down
4 changes: 3 additions & 1 deletion docs/source/07_extend_kedro/03_custom_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ You may also want to consult the [in-depth documentation about the Versioning AP

## Thread-safety

Every Kedro dataset should work with the [SequentialRunner](/kedro.runner.SequentialRunner) and the [ParallelRunner](/kedro.runner.ParallelRunner), so must be fully serialisable by the [Python multiprocessing package](https://docs.python.org/3/library/multiprocessing.html). This means that your datasets should not make use of lambda functions, nested functions, closures etc. If you are using custom decorators, you need to ensure that they are using [`functools.wraps()`](https://docs.python.org/3/library/functools.html#functools.wraps).
Kedro datasets should work with the [SequentialRunner](/kedro.runner.SequentialRunner) and the [ParallelRunner](/kedro.runner.ParallelRunner), so they must be fully serialisable by the [Python multiprocessing package](https://docs.python.org/3/library/multiprocessing.html). This means that your datasets should not make use of lambda functions, nested functions, closures etc. If you are using custom decorators, you need to ensure that they are using [`functools.wraps()`](https://docs.python.org/3/library/functools.html#functools.wraps).

There is one dataset that is an exception: [SparkDataSet](/kedro.extras.datasets.spark.SparkDataSet). The explanation for this exception is that [Apache Spark](https://spark.apache.org/) uses its own parallelism and therefore doesn't work with Kedro [ParallelRunner](/kedro.runner.ParallelRunner). For parallelism within a Kedro project that leverages Spark please consider the alternative [ThreadRunner](/kedro.runner.ThreadRunner).

To verify whether your dataset is serialisable by `multiprocessing`, use the console or an iPython session to try dumping it using `multiprocessing.reduction.ForkingPickler`:

Expand Down
5 changes: 4 additions & 1 deletion kedro/extras/datasets/spark/spark_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def hdfs_glob(self, pattern: str) -> List[str]:

class SparkDataSet(AbstractVersionedDataSet):
"""``SparkDataSet`` loads and saves Spark dataframes.
Example:
::
Expand All @@ -203,6 +202,10 @@ class SparkDataSet(AbstractVersionedDataSet):
>>> reloaded.take(4)
"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
# for parallelism within a Spark pipeline please consider
# ``ThreadRunner`` instead
_SINGLE_PROCESS = True
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]
Expand Down
4 changes: 3 additions & 1 deletion kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ class AbstractDataSet(abc.ABC):
"""``AbstractDataSet`` is the base class for all data set implementations.
All data set implementations should extend this abstract class
and implement the methods marked as abstract.
If a specific dataset implementation cannot be used in conjunction with
the ``ParallelRunner``, such user-defined dataset should have the
attribute `_SINGLE_PROCESS = True`.
Example:
::
Expand Down
3 changes: 3 additions & 0 deletions kedro/runner/parallel_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def _run_node_synchronization( # pylint: disable=too-many-arguments
class ParallelRunner(AbstractRunner):
"""``ParallelRunner`` is an ``AbstractRunner`` implementation. It can
be used to run the ``Pipeline`` in parallel groups formed by toposort.
Please note that this `runner` implementation validates dataset using the
``_validate_catalog`` method, which checks if any of the datasets are
single process only using the `_SINGLE_PROCESS` dataset attribute.
"""

def __init__(self, max_workers: int = None, is_async: bool = False):
Expand Down

0 comments on commit 885fc3e

Please sign in to comment.