From accec2840f0d566c796fa2e9954fdfa9cd909b53 Mon Sep 17 00:00:00 2001 From: Peter Killick Date: Tue, 25 Apr 2017 17:03:10 +0100 Subject: [PATCH] Dask options for Iris processing (#2511) Dask opts controlled by simple function --- lib/iris/_lazy_data.py | 26 ++++++++ .../unit/lazy_data/test_iris_dask_defaults.py | 63 +++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 7d525477d6..da13ae1b9b 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -23,11 +23,37 @@ from __future__ import (absolute_import, division, print_function) from six.moves import (filter, input, map, range, zip) # noqa +import dask import dask.array as da +import dask.context import numpy as np import numpy.ma as ma +def _iris_dask_defaults(): + """ + Set dask defaults for Iris. The current default dask operation mode for + Iris is running single-threaded using `dask.async.get_sync`. This default + ensures that running Iris under "normal" conditions will not use up all + available computational resource. + + Otherwise, by default, `dask` will use a multi-threaded scheduler that uses + all available CPUs. + + .. note:: + We only want Iris to set dask options in the case where doing so will + not change user-specified options that have already been set. + + """ + if 'pool' not in dask.context._globals and \ + 'get' not in dask.context._globals: + dask.set_options(get=dask.async.get_sync) + + +# Run this at import time to set dask options for Iris. +_iris_dask_defaults() + + def is_lazy_data(data): """ Return whether the argument is an Iris 'lazy' data array. diff --git a/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py b/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py new file mode 100644 index 0000000000..4f25a40c82 --- /dev/null +++ b/lib/iris/tests/unit/lazy_data/test_iris_dask_defaults.py @@ -0,0 +1,63 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Test :func:`iris._lazy data._iris_dask_defaults` function. + +""" + +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests + +import dask.context +from iris._lazy_data import _iris_dask_defaults + + +class Test__iris_dask_defaults(tests.IrisTest): + def setUp(self): + set_options = 'dask.set_options' + self.patch_set_options = self.patch(set_options) + get_sync = 'dask.async.get_sync' + self.patch_get_sync = self.patch(get_sync) + + def test_no_user_options(self): + self.patch('dask.context._globals', {}) + _iris_dask_defaults() + self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) + + def test_user_options__pool(self): + self.patch('dask.context._globals', {'pool': 5}) + _iris_dask_defaults() + self.assertEqual(self.patch_set_options.call_count, 0) + + def test_user_options__get(self): + self.patch('dask.context._globals', {'get': 'threaded'}) + _iris_dask_defaults() + self.assertEqual(self.patch_set_options.call_count, 0) + + def test_user_options__wibble(self): + # Test a user-specified dask option that does not affect Iris. + self.patch('dask.context._globals', {'wibble': 'foo'}) + _iris_dask_defaults() + self.patch_set_options.assert_called_once_with(get=self.patch_get_sync) + + +if __name__ == '__main__': + tests.main()