Skip to content

Commit

Permalink
Remove multiprocess download option (#1189)
Browse files Browse the repository at this point in the history
+ Remove multiprocess download option
+ Update unit tests and docs

Closes #1145

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Eli Fajardo (https://github.com/efajardo-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1189
  • Loading branch information
efajardo-nv authored Sep 25, 2023
1 parent 969954c commit f68ba33
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ The `DFPFileToDataFrameStage` (examples/digital_fingerprinting/production/morphe
| `parser_kwargs` | `dict` or `None` | Optional: additional keyword arguments to be passed into the `DataFrame` parser, currently this is going to be either [`pandas.read_csv`](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html), [`pandas.read_json`](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) or [`pandas.read_parquet`](https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html) |
| `cache_dir` | `str` | Optional: path to cache location, defaults to `./.cache/dfp` |

This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: `single_thread`, `multiprocess`, `dask`, and `dask_thread`. The method used is chosen by setting the {envvar}`MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable, and `dask_thread` is used by default, and `single_thread` effectively disables concurrent loading.
This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: `single_thread`, `dask`, and `dask_thread`. The method used is chosen by setting the {envvar}`MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable, and `dask_thread` is used by default, and `single_thread` effectively disables concurrent loading.

This stage will cache the resulting `DataFrame` in `cache_dir`, since we are caching the `DataFrame`s and not the source files, a cache hit avoids the cost of parsing the incoming data. In the case of remote storage systems, such as S3, this avoids both parsing and a download on a cache hit. One consequence of this is that any change to the `schema` will require purging cached files in the `cache_dir` before those changes are visible.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/loaders/core/file_to_df_loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.

## File to DataFrame Loader

[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, multiprocess, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread or multiprocess is used.
[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread is used.

After processing, the resulting dataframe is cached using a hash of the file paths. This loader also has the ability to load file content from S3 buckets, in addition to loading data from the disk.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Morpheus pipeline configurations for each workflow are managed using [pipelines_

When using the MRC SegmentModule in a pipeline, it will also require a module configuration which gets generated within the test. Additional information is included in the [Morpheus Pipeline with Modules](../../../../../docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md#morpheus-pipeline-with-modules)

To ensure the [file_to_df_loader.py](../../../../../morpheus/loaders/file_to_df_loader.py) utilizes the same type of downloading mechanism, set `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable with any one of given choices (`multiprocess`, `dask`, `dask thread`, `single thread`).
To ensure the [file_to_df_loader.py](../../../../../morpheus/loaders/file_to_df_loader.py) utilizes the same type of downloading mechanism, set `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable with any one of given choices (`dask`, `dask thread`, `single thread`).

```
export MORPHEUS_FILE_DOWNLOAD_TYPE=dask
Expand Down
4 changes: 2 additions & 2 deletions morpheus/loaders/file_to_df_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
def file_to_df_loader(control_message: ControlMessage, task: dict):
"""
This function is used to load files containing data into a dataframe. Dataframe is created by
processing files either using a single thread, multiprocess, dask, or dask_thread. This function determines
processing files either using a single thread, dask, or dask_thread. This function determines
the download method to use, and if it starts with "dask," it creates a dask client and uses it to process the files.
Otherwise, it uses a single thread or multiprocess to process the files. This function then caches the resulting
Otherwise, it uses a single thread to process the files. This function then caches the resulting
dataframe using a hash of the file paths. The dataframe is wrapped in a MessageMeta and then attached as a payload
to a ControlMessage object and passed on to further stages.
Expand Down
16 changes: 6 additions & 10 deletions morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
class DownloadMethods(str, Enum):
"""Valid download methods for the `Downloader` class."""
SINGLE_THREAD = "single_thread"
MULTIPROCESS = "multiprocess"
MULTIPROCESSING = "multiprocessing"
DASK = "dask"
DASK_THREAD = "dask_thread"

Expand All @@ -45,14 +43,12 @@ class DownloadMethods(str, Enum):
class Downloader:
"""
Downloads a list of `fsspec.core.OpenFiles` files using one of the following methods:
single_thread, multiprocess, dask or dask_thread
single_thread, dask or dask_thread
The download method can be passed in via the `download_method` parameter or via the `MORPHEUS_FILE_DOWNLOAD_TYPE`
environment variable. If both are set, the environment variable takes precedence, by default `dask_thread` is used.
When using single_thread, or multiprocess is used `dask` and `dask.distributed` is not reuiqrred to be installed.
For compatibility reasons "multiprocessing" is an alias for "multiprocess".
When using single_thread, `dask` and `dask.distributed` is not reuiqrred to be installed.
Parameters
----------
Expand All @@ -78,6 +74,10 @@ def __init__(self,
download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method)

if isinstance(download_method, str):
if (download_method in ("multiprocess", "multiprocessing")):
raise ValueError(
f"The '{download_method}' download method is no longer supported. Please use 'dask' or "
"'single_thread' instead.")
try:
download_method = DOWNLOAD_METHODS_MAP[download_method.lower()]
except KeyError as exc:
Expand Down Expand Up @@ -165,10 +165,6 @@ def download(self,
dfs = dist.client.map(download_fn, download_buckets)
dfs = dist.client.gather(dfs)

elif (self._download_method in ("multiprocess", "multiprocessing")):
# Use multiprocessing here since parallel downloads are a pain
with mp.get_context("spawn").Pool(mp.cpu_count()) as pool:
dfs = pool.map(download_fn, download_buckets)
else:
# Simply loop
for open_file in download_buckets:
Expand Down
46 changes: 4 additions & 42 deletions tests/examples/digital_fingerprinting/test_dfp_file_to_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ def test_constructor(config: Config):

# pylint: disable=redefined-outer-name
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@pytest.mark.usefixtures("reload_modules", "restore_environ")
@pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"])
@pytest.mark.parametrize('use_convert_to_dataframe', [True, False])
@pytest.mark.usefixtures("reload_modules")
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.distributed.Client')
@mock.patch('dask_cuda.LocalCUDACluster')
@mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe')
Expand All @@ -116,7 +114,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM
mock_obf_to_df: mock.MagicMock,
mock_dask_cluster: mock.MagicMock,
mock_dask_client: mock.MagicMock,
mock_mp_gc: mock.MagicMock,
config: Config,
dl_type: str,
use_convert_to_dataframe: bool,
Expand All @@ -136,13 +133,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM
mock_distributed.__enter__.return_value = mock_distributed
mock_distributed.__exit__.return_value = False

mock_mp_gc.return_value = mock_mp_gc
mock_mp_pool = mock.MagicMock()
mock_mp_gc.Pool.return_value = mock_mp_pool
mock_mp_pool.return_value = mock_mp_pool
mock_mp_pool.__enter__.return_value = mock_mp_pool
mock_mp_pool.__exit__.return_value = False

expected_hash = hashlib.md5(json.dumps([{
'ukey': single_file_obj.fs.ukey(single_file_obj.path)
}]).encode()).hexdigest()
Expand All @@ -161,8 +151,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM
if dl_type.startswith('dask'):
mock_dist_client.map.return_value = [returned_df]
mock_dist_client.gather.return_value = [returned_df]
elif dl_type in ("multiprocess", "multiprocessing"):
mock_mp_pool.map.return_value = [returned_df]
else:
mock_obf_to_df.return_value = returned_df

Expand All @@ -179,13 +167,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM
(output_df, cache_hit) = stage._controller._get_or_create_dataframe_from_batch((batch, 1))
assert not cache_hit

if dl_type in ("multiprocess", "multiprocessing"):
mock_mp_gc.assert_called_once()
mock_mp_pool.map.assert_called_once()
else:
mock_mp_gc.assert_not_called()
mock_mp_pool.map.assert_not_called()

if dl_type == "single_thread":
mock_obf_to_df.assert_called_once()
else:
Expand All @@ -209,9 +190,8 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"])
@pytest.mark.parametrize('use_convert_to_dataframe', [True, False])
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.config')
@mock.patch('dask.distributed.Client')
@mock.patch('dask_cuda.LocalCUDACluster')
Expand All @@ -220,7 +200,6 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic
mock_dask_cluster: mock.MagicMock,
mock_dask_client: mock.MagicMock,
mock_dask_config: mock.MagicMock,
mock_mp_gc: mock.MagicMock,
config: Config,
dl_type: str,
use_convert_to_dataframe: bool,
Expand All @@ -233,13 +212,6 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic
mock_dask_client.__enter__.return_value = mock_dask_client
mock_dask_client.__exit__.return_value = False

mock_mp_gc.return_value = mock_mp_gc
mock_mp_pool = mock.MagicMock()
mock_mp_gc.Pool.return_value = mock_mp_pool
mock_mp_pool.return_value = mock_mp_pool
mock_mp_pool.__enter__.return_value = mock_mp_pool
mock_mp_pool.__exit__.return_value = False

file_specs = fsspec.open_files(os.path.abspath(os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.csv')))

# pylint: disable=no-member
Expand Down Expand Up @@ -268,8 +240,6 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic
assert cache_hit

# When we get a cache hit, none of the download methods should be executed
mock_mp_gc.assert_not_called()
mock_mp_pool.map.assert_not_called()
mock_obf_to_df.assert_not_called()
mock_dask_cluster.assert_not_called()
mock_dask_client.assert_not_called()
Expand All @@ -279,9 +249,8 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"])
@pytest.mark.parametrize('use_convert_to_dataframe', [True, False])
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.config')
@mock.patch('dask.distributed.Client')
@mock.patch('dask_cuda.LocalCUDACluster')
Expand All @@ -290,7 +259,6 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic
mock_dask_cluster: mock.MagicMock,
mock_dask_client: mock.MagicMock,
mock_dask_config: mock.MagicMock,
mock_mp_gc: mock.MagicMock,
config: Config,
dl_type: str,
use_convert_to_dataframe: bool,
Expand All @@ -299,10 +267,6 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic
mock_dask_cluster.return_value = mock_dask_cluster
mock_dask_client.return_value = mock_dask_client

mock_mp_gc.return_value = mock_mp_gc
mock_mp_pool = mock.MagicMock()
mock_mp_gc.Pool.return_value = mock_mp_pool

os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = dl_type
stage = DFPFileToDataFrameStage(config, DataFrameInputSchema(), cache_dir=tmp_path)
if use_convert_to_dataframe:
Expand All @@ -315,7 +279,5 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic
mock_dask_cluster.assert_not_called()
mock_dask_client.assert_not_called()
mock_dask_config.assert_not_called()
mock_mp_gc.assert_not_called()
mock_mp_pool.map.assert_not_called()

assert os.listdir(tmp_path) == []
47 changes: 22 additions & 25 deletions tests/test_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def dask_cuda(fail_missing: bool):

@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('use_env', [True, False])
@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@pytest.mark.parametrize('dl_method', ["single_thread", "dask", "dask_thread"])
def test_constructor_download_type(use_env: bool, dl_method: str):
kwargs = {}
if use_env:
Expand All @@ -67,12 +67,11 @@ def test_constructor_enum_vals(dl_method: DownloadMethods):


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('dl_method',
[DownloadMethods.SINGLE_THREAD, DownloadMethods.DASK, DownloadMethods.DASK_THREAD])
@pytest.mark.parametrize('dl_method', [DownloadMethods.DASK, DownloadMethods.DASK_THREAD])
def test_constructor_env_wins(dl_method: DownloadMethods):
os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = "multiprocessing"
os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = "single_thread"
downloader = Downloader(download_method=dl_method)
assert downloader.download_method == DownloadMethods.MULTIPROCESSING
assert downloader.download_method == DownloadMethods.SINGLE_THREAD


@pytest.mark.usefixtures("restore_environ")
Expand Down Expand Up @@ -119,7 +118,7 @@ def test_close(mock_dask_cluster: mock.MagicMock, dl_method: str):


@mock.patch('dask_cuda.LocalCUDACluster')
@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing"])
@pytest.mark.parametrize('dl_method', ["single_thread"])
def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.return_value = mock_dask_cluster
downloader = Downloader(download_method=dl_method)
Expand All @@ -133,29 +132,20 @@ def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str):

@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.usefixtures("reload_modules", "restore_environ")
@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@mock.patch('multiprocessing.get_context')
@pytest.mark.parametrize('dl_method', ["single_thread", "dask", "dask_thread"])
@mock.patch('dask.config')
@mock.patch('dask.distributed.Client')
@mock.patch('dask_cuda.LocalCUDACluster')
def test_download(mock_dask_cluster: mock.MagicMock,
mock_dask_client: mock.MagicMock,
mock_dask_config: mock.MagicMock,
mock_mp_gc: mock.MagicMock,
dl_method: str):
mock_dask_config.get = lambda key: 1.0 if (key == "distributed.comm.timesouts.connect") else None
mock_dask_cluster.return_value = mock_dask_cluster
mock_dask_client.return_value = mock_dask_client
mock_dask_client.__enter__.return_value = mock_dask_client
mock_dask_client.__exit__.return_value = False

mock_mp_gc.return_value = mock_mp_gc
mock_mp_pool = mock.MagicMock()
mock_mp_gc.Pool.return_value = mock_mp_pool
mock_mp_pool.return_value = mock_mp_pool
mock_mp_pool.__enter__.return_value = mock_mp_pool
mock_mp_pool.__exit__.return_value = False

input_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield/snapshot-1/*.json')
download_buckets = fsspec.open_files(input_glob)
num_buckets = len(download_buckets)
Expand All @@ -165,8 +155,6 @@ def test_download(mock_dask_cluster: mock.MagicMock,
returnd_df = mock.MagicMock()
if dl_method.startswith('dask'):
mock_dask_client.gather.return_value = [returnd_df for _ in range(num_buckets)]
elif dl_method in ("multiprocess", "multiprocessing"):
mock_mp_pool.map.return_value = [returnd_df for _ in range(num_buckets)]
else:
download_fn.return_value = returnd_df

Expand All @@ -175,13 +163,6 @@ def test_download(mock_dask_cluster: mock.MagicMock,
results = downloader.download(download_buckets, download_fn)
assert results == [returnd_df for _ in range(num_buckets)]

if dl_method in ("multiprocess", "multiprocessing"):
mock_mp_gc.assert_called_once()
mock_mp_pool.map.assert_called_once()
else:
mock_mp_gc.assert_not_called()
mock_mp_pool.map.assert_not_called()

if dl_method == "single_thread":
download_fn.assert_has_calls([mock.call(bucket) for bucket in download_buckets])
else:
Expand All @@ -195,3 +176,19 @@ def test_download(mock_dask_cluster: mock.MagicMock,
mock_dask_cluster.assert_not_called()
mock_dask_client.assert_not_called()
mock_dask_config.assert_not_called()


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('use_env', [True, False])
@pytest.mark.parametrize('dl_method', ["multiprocess", "multiprocessing"])
def test_constructor_multiproc_dltype_not_supported(use_env: bool, dl_method: str):
kwargs = {}
if use_env:
os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = dl_method
else:
kwargs['download_method'] = dl_method

with pytest.raises(ValueError) as excinfo:
Downloader(**kwargs)

assert "no longer supported" in str(excinfo.value)

0 comments on commit f68ba33

Please sign in to comment.