From f68ba336074041d8fe7cbbc3ff35770255ac4623 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Mon, 25 Sep 2023 11:17:29 -0400 Subject: [PATCH] Remove multiprocess download option (#1189) + Remove multiprocess download option + Update unit tests and docs Closes #1145 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - Eli Fajardo (https://github.com/efajardo-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1189 --- .../6_digital_fingerprinting_reference.md | 2 +- docs/source/loaders/core/file_to_df_loader.md | 2 +- .../production/morpheus/benchmarks/README.md | 2 +- morpheus/loaders/file_to_df_loader.py | 4 +- morpheus/utils/downloader.py | 16 +++---- .../test_dfp_file_to_df.py | 46 ++---------------- tests/test_downloader.py | 47 +++++++++---------- 7 files changed, 37 insertions(+), 82 deletions(-) diff --git a/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md b/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md index 81789e2956..38518c17e0 100644 --- a/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md +++ b/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md @@ -233,7 +233,7 @@ The `DFPFileToDataFrameStage` (examples/digital_fingerprinting/production/morphe | `parser_kwargs` | `dict` or `None` | Optional: additional keyword arguments to be passed into the `DataFrame` parser, currently this is going to be either [`pandas.read_csv`](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html), [`pandas.read_json`](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) or [`pandas.read_parquet`](https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html) | | `cache_dir` | `str` | Optional: path to cache location, defaults to `./.cache/dfp` | -This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: `single_thread`, `multiprocess`, `dask`, and `dask_thread`. The method used is chosen by setting the {envvar}`MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable, and `dask_thread` is used by default, and `single_thread` effectively disables concurrent loading. +This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: `single_thread`, `dask`, and `dask_thread`. The method used is chosen by setting the {envvar}`MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable, and `dask_thread` is used by default, and `single_thread` effectively disables concurrent loading. This stage will cache the resulting `DataFrame` in `cache_dir`, since we are caching the `DataFrame`s and not the source files, a cache hit avoids the cost of parsing the incoming data. In the case of remote storage systems, such as S3, this avoids both parsing and a download on a cache hit. One consequence of this is that any change to the `schema` will require purging cached files in the `cache_dir` before those changes are visible. diff --git a/docs/source/loaders/core/file_to_df_loader.md b/docs/source/loaders/core/file_to_df_loader.md index 921154aafd..12c24f631c 100644 --- a/docs/source/loaders/core/file_to_df_loader.md +++ b/docs/source/loaders/core/file_to_df_loader.md @@ -17,7 +17,7 @@ limitations under the License. ## File to DataFrame Loader -[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, multiprocess, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread or multiprocess is used. +[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread is used. After processing, the resulting dataframe is cached using a hash of the file paths. This loader also has the ability to load file content from S3 buckets, in addition to loading data from the disk. diff --git a/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md b/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md index 7e80c973c8..39ad984193 100644 --- a/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md +++ b/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md @@ -93,7 +93,7 @@ Morpheus pipeline configurations for each workflow are managed using [pipelines_ When using the MRC SegmentModule in a pipeline, it will also require a module configuration which gets generated within the test. Additional information is included in the [Morpheus Pipeline with Modules](../../../../../docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md#morpheus-pipeline-with-modules) -To ensure the [file_to_df_loader.py](../../../../../morpheus/loaders/file_to_df_loader.py) utilizes the same type of downloading mechanism, set `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable with any one of given choices (`multiprocess`, `dask`, `dask thread`, `single thread`). +To ensure the [file_to_df_loader.py](../../../../../morpheus/loaders/file_to_df_loader.py) utilizes the same type of downloading mechanism, set `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable with any one of given choices (`dask`, `dask thread`, `single thread`). ``` export MORPHEUS_FILE_DOWNLOAD_TYPE=dask diff --git a/morpheus/loaders/file_to_df_loader.py b/morpheus/loaders/file_to_df_loader.py index ff69d89366..3915ebbd0f 100644 --- a/morpheus/loaders/file_to_df_loader.py +++ b/morpheus/loaders/file_to_df_loader.py @@ -34,9 +34,9 @@ def file_to_df_loader(control_message: ControlMessage, task: dict): """ This function is used to load files containing data into a dataframe. Dataframe is created by - processing files either using a single thread, multiprocess, dask, or dask_thread. This function determines + processing files either using a single thread, dask, or dask_thread. This function determines the download method to use, and if it starts with "dask," it creates a dask client and uses it to process the files. - Otherwise, it uses a single thread or multiprocess to process the files. This function then caches the resulting + Otherwise, it uses a single thread to process the files. This function then caches the resulting dataframe using a hash of the file paths. The dataframe is wrapped in a MessageMeta and then attached as a payload to a ControlMessage object and passed on to further stages. diff --git a/morpheus/utils/downloader.py b/morpheus/utils/downloader.py index 722c2387b4..d2882afa93 100644 --- a/morpheus/utils/downloader.py +++ b/morpheus/utils/downloader.py @@ -33,8 +33,6 @@ class DownloadMethods(str, Enum): """Valid download methods for the `Downloader` class.""" SINGLE_THREAD = "single_thread" - MULTIPROCESS = "multiprocess" - MULTIPROCESSING = "multiprocessing" DASK = "dask" DASK_THREAD = "dask_thread" @@ -45,14 +43,12 @@ class DownloadMethods(str, Enum): class Downloader: """ Downloads a list of `fsspec.core.OpenFiles` files using one of the following methods: - single_thread, multiprocess, dask or dask_thread + single_thread, dask or dask_thread The download method can be passed in via the `download_method` parameter or via the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. If both are set, the environment variable takes precedence, by default `dask_thread` is used. - When using single_thread, or multiprocess is used `dask` and `dask.distributed` is not reuiqrred to be installed. - - For compatibility reasons "multiprocessing" is an alias for "multiprocess". + When using single_thread, `dask` and `dask.distributed` is not reuiqrred to be installed. Parameters ---------- @@ -78,6 +74,10 @@ def __init__(self, download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method) if isinstance(download_method, str): + if (download_method in ("multiprocess", "multiprocessing")): + raise ValueError( + f"The '{download_method}' download method is no longer supported. Please use 'dask' or " + "'single_thread' instead.") try: download_method = DOWNLOAD_METHODS_MAP[download_method.lower()] except KeyError as exc: @@ -165,10 +165,6 @@ def download(self, dfs = dist.client.map(download_fn, download_buckets) dfs = dist.client.gather(dfs) - elif (self._download_method in ("multiprocess", "multiprocessing")): - # Use multiprocessing here since parallel downloads are a pain - with mp.get_context("spawn").Pool(mp.cpu_count()) as pool: - dfs = pool.map(download_fn, download_buckets) else: # Simply loop for open_file in download_buckets: diff --git a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py index e675ce2de7..716707ecfd 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py +++ b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py @@ -101,11 +101,9 @@ def test_constructor(config: Config): # pylint: disable=redefined-outer-name @pytest.mark.reload_modules(morpheus.utils.downloader) -@pytest.mark.usefixtures("restore_environ") -@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"]) +@pytest.mark.usefixtures("reload_modules", "restore_environ") +@pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"]) @pytest.mark.parametrize('use_convert_to_dataframe', [True, False]) -@pytest.mark.usefixtures("reload_modules") -@mock.patch('multiprocessing.get_context') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') @mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') @@ -116,7 +114,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, - mock_mp_gc: mock.MagicMock, config: Config, dl_type: str, use_convert_to_dataframe: bool, @@ -136,13 +133,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM mock_distributed.__enter__.return_value = mock_distributed mock_distributed.__exit__.return_value = False - mock_mp_gc.return_value = mock_mp_gc - mock_mp_pool = mock.MagicMock() - mock_mp_gc.Pool.return_value = mock_mp_pool - mock_mp_pool.return_value = mock_mp_pool - mock_mp_pool.__enter__.return_value = mock_mp_pool - mock_mp_pool.__exit__.return_value = False - expected_hash = hashlib.md5(json.dumps([{ 'ukey': single_file_obj.fs.ukey(single_file_obj.path) }]).encode()).hexdigest() @@ -161,8 +151,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM if dl_type.startswith('dask'): mock_dist_client.map.return_value = [returned_df] mock_dist_client.gather.return_value = [returned_df] - elif dl_type in ("multiprocess", "multiprocessing"): - mock_mp_pool.map.return_value = [returned_df] else: mock_obf_to_df.return_value = returned_df @@ -179,13 +167,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM (output_df, cache_hit) = stage._controller._get_or_create_dataframe_from_batch((batch, 1)) assert not cache_hit - if dl_type in ("multiprocess", "multiprocessing"): - mock_mp_gc.assert_called_once() - mock_mp_pool.map.assert_called_once() - else: - mock_mp_gc.assert_not_called() - mock_mp_pool.map.assert_not_called() - if dl_type == "single_thread": mock_obf_to_df.assert_called_once() else: @@ -209,9 +190,8 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM @pytest.mark.usefixtures("restore_environ") -@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"]) +@pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"]) @pytest.mark.parametrize('use_convert_to_dataframe', [True, False]) -@mock.patch('multiprocessing.get_context') @mock.patch('dask.config') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') @@ -220,7 +200,6 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, mock_dask_config: mock.MagicMock, - mock_mp_gc: mock.MagicMock, config: Config, dl_type: str, use_convert_to_dataframe: bool, @@ -233,13 +212,6 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic mock_dask_client.__enter__.return_value = mock_dask_client mock_dask_client.__exit__.return_value = False - mock_mp_gc.return_value = mock_mp_gc - mock_mp_pool = mock.MagicMock() - mock_mp_gc.Pool.return_value = mock_mp_pool - mock_mp_pool.return_value = mock_mp_pool - mock_mp_pool.__enter__.return_value = mock_mp_pool - mock_mp_pool.__exit__.return_value = False - file_specs = fsspec.open_files(os.path.abspath(os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.csv'))) # pylint: disable=no-member @@ -268,8 +240,6 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic assert cache_hit # When we get a cache hit, none of the download methods should be executed - mock_mp_gc.assert_not_called() - mock_mp_pool.map.assert_not_called() mock_obf_to_df.assert_not_called() mock_dask_cluster.assert_not_called() mock_dask_client.assert_not_called() @@ -279,9 +249,8 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic @pytest.mark.usefixtures("restore_environ") -@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"]) +@pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"]) @pytest.mark.parametrize('use_convert_to_dataframe', [True, False]) -@mock.patch('multiprocessing.get_context') @mock.patch('dask.config') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') @@ -290,7 +259,6 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, mock_dask_config: mock.MagicMock, - mock_mp_gc: mock.MagicMock, config: Config, dl_type: str, use_convert_to_dataframe: bool, @@ -299,10 +267,6 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic mock_dask_cluster.return_value = mock_dask_cluster mock_dask_client.return_value = mock_dask_client - mock_mp_gc.return_value = mock_mp_gc - mock_mp_pool = mock.MagicMock() - mock_mp_gc.Pool.return_value = mock_mp_pool - os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = dl_type stage = DFPFileToDataFrameStage(config, DataFrameInputSchema(), cache_dir=tmp_path) if use_convert_to_dataframe: @@ -315,7 +279,5 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic mock_dask_cluster.assert_not_called() mock_dask_client.assert_not_called() mock_dask_config.assert_not_called() - mock_mp_gc.assert_not_called() - mock_mp_pool.map.assert_not_called() assert os.listdir(tmp_path) == [] diff --git a/tests/test_downloader.py b/tests/test_downloader.py index 015af44457..f5c67a7afd 100644 --- a/tests/test_downloader.py +++ b/tests/test_downloader.py @@ -48,7 +48,7 @@ def dask_cuda(fail_missing: bool): @pytest.mark.usefixtures("restore_environ") @pytest.mark.parametrize('use_env', [True, False]) -@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"]) +@pytest.mark.parametrize('dl_method', ["single_thread", "dask", "dask_thread"]) def test_constructor_download_type(use_env: bool, dl_method: str): kwargs = {} if use_env: @@ -67,12 +67,11 @@ def test_constructor_enum_vals(dl_method: DownloadMethods): @pytest.mark.usefixtures("restore_environ") -@pytest.mark.parametrize('dl_method', - [DownloadMethods.SINGLE_THREAD, DownloadMethods.DASK, DownloadMethods.DASK_THREAD]) +@pytest.mark.parametrize('dl_method', [DownloadMethods.DASK, DownloadMethods.DASK_THREAD]) def test_constructor_env_wins(dl_method: DownloadMethods): - os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = "multiprocessing" + os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = "single_thread" downloader = Downloader(download_method=dl_method) - assert downloader.download_method == DownloadMethods.MULTIPROCESSING + assert downloader.download_method == DownloadMethods.SINGLE_THREAD @pytest.mark.usefixtures("restore_environ") @@ -119,7 +118,7 @@ def test_close(mock_dask_cluster: mock.MagicMock, dl_method: str): @mock.patch('dask_cuda.LocalCUDACluster') -@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing"]) +@pytest.mark.parametrize('dl_method', ["single_thread"]) def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str): mock_dask_cluster.return_value = mock_dask_cluster downloader = Downloader(download_method=dl_method) @@ -133,15 +132,13 @@ def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str): @pytest.mark.reload_modules(morpheus.utils.downloader) @pytest.mark.usefixtures("reload_modules", "restore_environ") -@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"]) -@mock.patch('multiprocessing.get_context') +@pytest.mark.parametrize('dl_method', ["single_thread", "dask", "dask_thread"]) @mock.patch('dask.config') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') def test_download(mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, mock_dask_config: mock.MagicMock, - mock_mp_gc: mock.MagicMock, dl_method: str): mock_dask_config.get = lambda key: 1.0 if (key == "distributed.comm.timesouts.connect") else None mock_dask_cluster.return_value = mock_dask_cluster @@ -149,13 +146,6 @@ def test_download(mock_dask_cluster: mock.MagicMock, mock_dask_client.__enter__.return_value = mock_dask_client mock_dask_client.__exit__.return_value = False - mock_mp_gc.return_value = mock_mp_gc - mock_mp_pool = mock.MagicMock() - mock_mp_gc.Pool.return_value = mock_mp_pool - mock_mp_pool.return_value = mock_mp_pool - mock_mp_pool.__enter__.return_value = mock_mp_pool - mock_mp_pool.__exit__.return_value = False - input_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield/snapshot-1/*.json') download_buckets = fsspec.open_files(input_glob) num_buckets = len(download_buckets) @@ -165,8 +155,6 @@ def test_download(mock_dask_cluster: mock.MagicMock, returnd_df = mock.MagicMock() if dl_method.startswith('dask'): mock_dask_client.gather.return_value = [returnd_df for _ in range(num_buckets)] - elif dl_method in ("multiprocess", "multiprocessing"): - mock_mp_pool.map.return_value = [returnd_df for _ in range(num_buckets)] else: download_fn.return_value = returnd_df @@ -175,13 +163,6 @@ def test_download(mock_dask_cluster: mock.MagicMock, results = downloader.download(download_buckets, download_fn) assert results == [returnd_df for _ in range(num_buckets)] - if dl_method in ("multiprocess", "multiprocessing"): - mock_mp_gc.assert_called_once() - mock_mp_pool.map.assert_called_once() - else: - mock_mp_gc.assert_not_called() - mock_mp_pool.map.assert_not_called() - if dl_method == "single_thread": download_fn.assert_has_calls([mock.call(bucket) for bucket in download_buckets]) else: @@ -195,3 +176,19 @@ def test_download(mock_dask_cluster: mock.MagicMock, mock_dask_cluster.assert_not_called() mock_dask_client.assert_not_called() mock_dask_config.assert_not_called() + + +@pytest.mark.usefixtures("restore_environ") +@pytest.mark.parametrize('use_env', [True, False]) +@pytest.mark.parametrize('dl_method', ["multiprocess", "multiprocessing"]) +def test_constructor_multiproc_dltype_not_supported(use_env: bool, dl_method: str): + kwargs = {} + if use_env: + os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = dl_method + else: + kwargs['download_method'] = dl_method + + with pytest.raises(ValueError) as excinfo: + Downloader(**kwargs) + + assert "no longer supported" in str(excinfo.value)