Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Downloader utility class to use static dask cluster #1161

Merged
merged 23 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
08e2996
update downloader to be static variable
efajardo-nv Aug 29, 2023
e859592
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Aug 29, 2023
5bd447e
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Aug 29, 2023
9e56ddd
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Aug 30, 2023
1ecd371
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Aug 30, 2023
2b79fd6
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 1, 2023
ff36997
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Sep 11, 2023
46bbe5b
make dask cluster static instead per feedback
efajardo-nv Sep 11, 2023
a98b0ea
Merge branch 'file-to-df-loader-fix' of https://github.com/efajardo-n…
efajardo-nv Sep 11, 2023
e40af22
get_dask_cluster update
efajardo-nv Sep 11, 2023
45d9d8e
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Sep 12, 2023
53b967b
update downloader unit tests
efajardo-nv Sep 12, 2023
54fc756
add comment
efajardo-nv Sep 12, 2023
18add2e
update test_dfp_file_to_df
efajardo-nv Sep 12, 2023
52f3703
fix module dfp cmd example
efajardo-nv Sep 13, 2023
c485ded
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 13, 2023
44c1f4c
create second downloader in test_get_dask_cluster
efajardo-nv Sep 13, 2023
ed8cfc4
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Sep 18, 2023
e874961
use mutex when getting dask cluster
efajardo-nv Sep 18, 2023
4fa8d12
mutex update
efajardo-nv Sep 18, 2023
9cb2eb8
style fixes
efajardo-nv Sep 18, 2023
f08b27a
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 19, 2023
356d79a
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ To run the DFP pipelines with the example datasets within the container, run:
--start_time "2022-08-01" \
--duration "60d" \
--train_users generic \
--input_file "./control_messages/duo_payload_load_training_inference.json"
--input_file "./control_messages/duo_payload_load_train_inference.json"
```

* Azure Training Pipeline
Expand Down
32 changes: 19 additions & 13 deletions morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import multiprocessing as mp
import os
import threading
import typing
from enum import Enum

Expand Down Expand Up @@ -62,12 +63,16 @@ class Downloader:
The heartbeat interval to use when using dask or dask_thread.
"""

# This cluster is shared by all Downloader instances that use dask download method.
_dask_cluster = None
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved

_mutex = threading.RLock()

def __init__(self,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD,
dask_heartbeat_interval: str = "30s"):

self._merlin_distributed = None
self._dask_cluster = None
self._dask_heartbeat_interval = dask_heartbeat_interval

download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method)
Expand Down Expand Up @@ -96,23 +101,21 @@ def get_dask_cluster(self):
dask_cuda.LocalCUDACluster
"""

if self._dask_cluster is None:
import dask
import dask.distributed
import dask_cuda.utils
with Downloader._mutex:
if Downloader._dask_cluster is None:
import dask_cuda.utils

logger.debug("Creating dask cluster...")
logger.debug("Creating dask cluster...")

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})
n_workers = dask_cuda.utils.get_n_gpus()
threads_per_worker = mp.cpu_count() // n_workers
n_workers = dask_cuda.utils.get_n_gpus()
threads_per_worker = mp.cpu_count() // n_workers

self._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers,
threads_per_worker=threads_per_worker)

logger.debug("Creating dask cluster... Done. Dashboard: %s", self._dask_cluster.dashboard_link)
logger.debug("Creating dask cluster... Done. Dashboard: %s", Downloader._dask_cluster.dashboard_link)

return self._dask_cluster
return Downloader._dask_cluster

def get_dask_client(self):
"""
Expand All @@ -124,6 +127,9 @@ def get_dask_client(self):
"""
import dask.distributed

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})

mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved
if (self._merlin_distributed is None):
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pandas as pd
import pytest

import morpheus.utils.downloader
from _utils import TEST_DIRS
from _utils.dataset_manager import DatasetManager
from morpheus.common import FileTypes
Expand Down Expand Up @@ -99,9 +100,11 @@ def test_constructor(config: Config):


# pylint: disable=redefined-outer-name
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@pytest.mark.parametrize('use_convert_to_dataframe', [True, False])
@pytest.mark.usefixtures("reload_modules")
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.distributed.Client')
@mock.patch('dask_cuda.LocalCUDACluster')
Expand Down
29 changes: 17 additions & 12 deletions tests/test_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import fsspec
import pytest

import morpheus.utils.downloader
from _utils import TEST_DIRS
from _utils import import_or_skip
from morpheus.utils.downloader import DOWNLOAD_METHODS_MAP
Expand Down Expand Up @@ -87,29 +88,32 @@ def test_constructor_invalid_dltype(use_env: bool):
Downloader(**kwargs)


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.parametrize("dl_method", ["dask", "dask_thread"])
@mock.patch('dask.config')
@pytest.mark.usefixtures("reload_modules")
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved
@mock.patch('dask_cuda.LocalCUDACluster')
def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMock, dl_method: str):
def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.return_value = mock_dask_cluster
downloader = Downloader(download_method=dl_method)
assert downloader.get_dask_cluster() is mock_dask_cluster
downloader1 = Downloader(download_method=dl_method)
assert downloader1.get_dask_cluster() is mock_dask_cluster

# create another downloader then assert that cluster was only created once
downloader2 = Downloader(download_method=dl_method)
downloader2.get_dask_cluster()
assert downloader2.get_dask_cluster() is mock_dask_cluster

mock_dask_config.set.assert_called_once()
mock_dask_cluster.assert_called_once()


@mock.patch('dask.config')
@mock.patch('dask_cuda.LocalCUDACluster')
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.parametrize('dl_method', ["dask", "dask_thread"])
def test_close(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMock, dl_method: str):
@pytest.mark.usefixtures("reload_modules")
@mock.patch('dask_cuda.LocalCUDACluster')
def test_close(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.return_value = mock_dask_cluster
downloader = Downloader(download_method=dl_method)
assert downloader.get_dask_cluster() is mock_dask_cluster

mock_dask_config.set.assert_called_once()

mock_dask_cluster.close.assert_not_called()
downloader.close()

Expand All @@ -127,7 +131,8 @@ def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.close.assert_not_called()


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.usefixtures("reload_modules", "restore_environ")
@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.config')
Expand Down