-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask options for Iris processing #2511
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,11 +23,37 @@ | |
from __future__ import (absolute_import, division, print_function) | ||
from six.moves import (filter, input, map, range, zip) # noqa | ||
|
||
import dask | ||
import dask.array as da | ||
import dask.context | ||
import numpy as np | ||
import numpy.ma as ma | ||
|
||
|
||
def _iris_dask_defaults(): | ||
""" | ||
Set dask defaults for Iris. The current default dask operation mode for | ||
Iris is running single-threaded using `dask.async.get_sync`. This default | ||
ensures that running Iris under "normal" conditions will not use up all | ||
available computational resource. | ||
|
||
Otherwise, by default, `dask` will use a multi-threaded scheduler that uses | ||
all available CPUs. | ||
|
||
.. note:: | ||
We only want Iris to set dask options in the case where doing so will | ||
not change user-specified options that have already been set. | ||
|
||
""" | ||
if 'pool' not in dask.context._globals and \ | ||
'get' not in dask.context._globals: | ||
dask.set_options(get=dask.async.get_sync) | ||
|
||
|
||
# Run this at import time to set dask options for Iris. | ||
_iris_dask_defaults() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dkillick and @pp-mo I'm assuming that the user can simply override this default behaviour by simply using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup! That's the idea. You could do something like the following... from multiprocessing.pool import ThreadPool
import dask
import iris # This will set default options for dask processing in Iris.
dask.set_options(pool=ThreadPool(5)) # Now the user can apply case-specific options. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool. As long as the user can override the default. I love the Also, and most importantly for me, if the user does override the default, then they are taking explicit accountability for their actions ... 💥 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice job @dkillick and @pp-mo 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way, I'll make sure examples like the above are included in the user documentation when we come around to writing it... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dkillick If it helps, add a note in the Dask Documentation project, so that we don't forget 👍 |
||
|
||
|
||
def is_lazy_data(data): | ||
""" | ||
Return whether the argument is an Iris 'lazy' data array. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# (C) British Crown Copyright 2017, Met Office | ||
# | ||
# This file is part of Iris. | ||
# | ||
# Iris is free software: you can redistribute it and/or modify it under | ||
# the terms of the GNU Lesser General Public License as published by the | ||
# Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# Iris is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Lesser General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Lesser General Public License | ||
# along with Iris. If not, see <http://www.gnu.org/licenses/>. | ||
""" | ||
Test :func:`iris._lazy data._iris_dask_defaults` function. | ||
|
||
""" | ||
|
||
from __future__ import (absolute_import, division, print_function) | ||
from six.moves import (filter, input, map, range, zip) # noqa | ||
|
||
# Import iris.tests first so that some things can be initialised before | ||
# importing anything else. | ||
import iris.tests as tests | ||
|
||
import dask.context | ||
from iris._lazy_data import _iris_dask_defaults | ||
|
||
|
||
class Test__iris_dask_defaults(tests.IrisTest): | ||
def setUp(self): | ||
set_options = 'dask.set_options' | ||
self.patch_set_options = self.patch(set_options) | ||
get_sync = 'dask.async.get_sync' | ||
self.patch_get_sync = self.patch(get_sync) | ||
|
||
def test_no_user_options(self): | ||
self.patch('dask.context._globals', {}) | ||
_iris_dask_defaults() | ||
self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) | ||
|
||
def test_user_options__pool(self): | ||
self.patch('dask.context._globals', {'pool': 5}) | ||
_iris_dask_defaults() | ||
self.assertEqual(self.patch_set_options.call_count, 0) | ||
|
||
def test_user_options__get(self): | ||
self.patch('dask.context._globals', {'get': 'threaded'}) | ||
_iris_dask_defaults() | ||
self.assertEqual(self.patch_set_options.call_count, 0) | ||
|
||
def test_user_options__wibble(self): | ||
# Test a user-specified dask option that does not affect Iris. | ||
self.patch('dask.context._globals', {'wibble': 'foo'}) | ||
_iris_dask_defaults() | ||
self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) | ||
|
||
|
||
if __name__ == '__main__': | ||
tests.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could usefully explain what dask does by default, that we don't want.
E.G. "Otherwise, by default, dask.array will use a threaded scheduler and grab all available CPUs."