Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CMIP3 processing does not work in parallel mode #430

Open
mattiarighi opened this issue Jan 15, 2020 · 36 comments
Open

CMIP3 processing does not work in parallel mode #430

mattiarighi opened this issue Jan 15, 2020 · 36 comments
Assignees
Labels
preprocessor Related to the preprocessor

Comments

@mattiarighi
Copy link
Contributor

As discussed in #269, the processing of CMIP3 datasets works only if max_parallel_tasks = 1 is set in config-user.yml.

This could be a serious limitation, e.g. for the ipcc recipe developed here.

@mattiarighi mattiarighi added the preprocessor Related to the preprocessor label Jan 15, 2020
@bouweandela
Copy link
Member

Could you please attach an example recipe that reproduces the problem?

@mattiarighi
Copy link
Contributor Author

Here it goes...

recipe_cmip3.yml.txt

@valeriupredoi
Copy link
Contributor

so here's a dumb question - I did take a look at the CMIP3 code changes and nothing struck me as obvious that may limit the parallel running - can you run any other type of data in parallel?

@mattiarighi
Copy link
Contributor Author

Yes, I always run the tool in parallel.
That's the only case which gives problems.

@bouweandela
Copy link
Member

I also noticed it only hangs on the first dataset in the recipe, if I use the third one instead everything works fine.

@valeriupredoi
Copy link
Contributor

it looks to me like a data loading issue with iris - mine hangs with the BCM2 model and Ctrl-C-ing it I see this:

...
  File "/home/users/valeriu/esmvalcore/esmvalcore/preprocessor/__init__.py", line 197, in _run_preproc_function
    msg = "{}({}, {})".format(function.__name__, items, kwargs)
  File "/home/users/valeriu/iris/lib/iris/cube.py", line 2550, in __str__
    return self.summary()
  File "/home/users/valeriu/iris/lib/iris/cube.py", line 2461, in summary
    coord_cell = coord.cell(0)
  File "/home/users/valeriu/iris/lib/iris/coords.py", line 1927, in cell
    point = tuple(np.array(self.points[index], ndmin=1).flatten())
  File "/home/users/valeriu/iris/lib/iris/coords.py", line 1453, in points
    return self._values
  File "/home/users/valeriu/iris/lib/iris/coords.py", line 218, in _values
    return self._values_dm.data.view()
  File "/home/users/valeriu/iris/lib/iris/_data_manager.py", line 204, in data
    result = as_concrete_data(self._lazy_array)
  File "/home/users/valeriu/iris/lib/iris/_lazy_data.py", line 251, in as_concrete_data
    (data,) = _co_realise_lazy_arrays([data])
  File "/home/users/valeriu/iris/lib/iris/_lazy_data.py", line 214, in _co_realise_lazy_arrays
    computed_arrays = da.compute(*arrays)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/threaded.py", line 81, in get
    **kwargs
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 475, in get_async
    key, res_info, failed = queue_get(queue)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
...

I get the same behaviour for MIPIM/ECHAM5 so there is a trend here, buuut with IPSL-CM4 all goes finey-fine

@valeriupredoi
Copy link
Contributor

OK I figured out where the roadblock is - at save point: it hangs right here (in debug mode one can see):

2020-01-17 11:38:31,482 UTC [55275] DEBUG   Running fix_metadata([<iris 'Cube' of air_temperature / (K) (time: 2892; latitude: 96; longitude: 192)>], {'project': 'CMIP3', 'dataset': 'ECHAM5', 'short_name': 'tas', 'cmor_table': 'CMIP3', 'mip': 'A1', 'frequency': 'mon'})
2020-01-17 11:38:38,883 UTC [55275] WARNING There were warnings in variable tas:
Long name for tas changed from Surface Air Temperature to Near-Surface Air Temperature

2020-01-17 11:38:38,883 UTC [55275] DEBUG   Result [<iris 'Cube' of air_temperature / (K) (time: 2892; latitude: 96; longitude: 192)>]
2020-01-17 11:38:38,890 UTC [55275] DEBUG   Saving cubes [<iris 'Cube' of air_temperature / (K) (time: 2892; latitude: 96; longitude: 192)>] to /home/users/valeriu/esmvaltool_var_test/LEE3/recipe_cmip3_20200117_113811/preproc/diag1/tas/CMIP3_MPIM_ECHAM5_A1_20c3m_r1_tas_1996-1999_1996-1999/00_fix_metadata.nc

Interestingly enough there is no issue saving the originally loaded cube in multiproc mode, see:

import iris
import numpy as np
import multiprocessing as Pool

def my_func(x1):
    print("Loading cubes...")
    a = iris.load_cube("/badc/cmip3_drs/data/cmip3/output/MPIM/ECHAM5/20c3m/mon/atmos/tas/r1/v1/tas_A1_ECHAM5_20c3m_r1.nc")
    b = iris.load_cube("/badc/cmip3_drs/data/cmip3/output/IPSL/CM4/20c3m/mon/atmos/tas/r1/v1/tas_A1_CM4_20c3m_r1_1860-2000.nc")
    print("Saving cubes...")
    iris.save(a, "./a.nc")
    iris.save(b, "./b.nc")
    return  "I iz done"

with Pool.Pool(processes=8) as pool:
    x = pool.map(my_func, [2])
    print(x)

BTW @jvegasbsc you can test on Jasmin since CMIP3 data is in /badc/cmip3_drs/data/cmip3/output/

@bouweandela
Copy link
Member

OK I figured out where the roadblock is - at save point: it hangs right here (

Based on the stack trace I think the problem is actually with loading the data. This is only done when saving because all preprocessing steps are lazy.

@valeriupredoi
Copy link
Contributor

niet, comrade, it is save:

Traceback (most recent call last):
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/home/users/valeriu/esmvalcore/esmvalcore/_task.py", line 687, in _run_task
    output_files = task.run()
  File "/home/users/valeriu/esmvalcore/esmvalcore/_task.py", line 238, in run
    self.output_files = self._run(input_files)
  File "/home/users/valeriu/esmvalcore/esmvalcore/preprocessor/__init__.py", line 404, in _run
    product.apply(step, self.debug)
  File "/home/users/valeriu/esmvalcore/esmvalcore/preprocessor/__init__.py", line 276, in apply
    save(self.cubes, filename)
  File "/home/users/valeriu/esmvalcore/esmvalcore/preprocessor/_io.py", line 171, in save
    iris.save(cubes, **kwargs)
  File "/home/users/valeriu/iris/lib/iris/io/__init__.py", line 446, in save
    saver(source, target, **kwargs)
  File "/home/users/valeriu/iris/lib/iris/fileformats/netcdf.py", line 2610, in save
    fill_value=fill_value,
  File "/home/users/valeriu/iris/lib/iris/fileformats/netcdf.py", line 1068, in write
    self.check_attribute_compliance(coord, coord.points)
  File "/home/users/valeriu/iris/lib/iris/coords.py", line 1453, in points
    return self._values
  File "/home/users/valeriu/iris/lib/iris/coords.py", line 218, in _values
    return self._values_dm.data.view()
  File "/home/users/valeriu/iris/lib/iris/_data_manager.py", line 204, in data
    result = as_concrete_data(self._lazy_array)
  File "/home/users/valeriu/iris/lib/iris/_lazy_data.py", line 251, in as_concrete_data
    (data,) = _co_realise_lazy_arrays([data])
  File "/home/users/valeriu/iris/lib/iris/_lazy_data.py", line 214, in _co_realise_lazy_arrays
    computed_arrays = da.compute(*arrays)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/threaded.py", line 81, in get
    **kwargs
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 475, in get_async
    key, res_info, failed = queue_get(queue)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/queue.py", line 170, in get
    self.not_empty.wait()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
2020-01-17 13:46:56,512 UTC [124886] ERROR   Program terminated abnormally, see stack trace below for more information
Traceback (most recent call last):
  File "/home/users/valeriu/esmvalcore/esmvalcore/_main.py", line 229, in run
    conf = main(args)
  File "/home/users/valeriu/esmvalcore/esmvalcore/_main.py", line 157, in main
    process_recipe(recipe_file=recipe, config_user=cfg)
  File "/home/users/valeriu/esmvalcore/esmvalcore/_main.py", line 207, in process_recipe
    recipe.run()
  File "/home/users/valeriu/esmvalcore/esmvalcore/_recipe.py", line 1314, in run
    max_parallel_tasks=self._cfg['max_parallel_tasks'])
  File "/home/users/valeriu/esmvalcore/esmvalcore/_task.py", line 608, in run_tasks
    _run_tasks_parallel(tasks, max_parallel_tasks)
  File "/home/users/valeriu/esmvalcore/esmvalcore/_task.py", line 659, in _run_tasks_parallel
    time.sleep(0.1)
KeyboardInterrupt

@valeriupredoi
Copy link
Contributor

OK I figured out where the roadblock is - at save point: it hangs right here (

Based on the stack trace I think the problem is actually with loading the data. This is only done when saving because all preprocessing steps are lazy.

I turned on intermediary file saving

@valeriupredoi
Copy link
Contributor

I confirm that that stacktrace doesn't happen, well the thing goes through, when reducing max_parallel_tasks: 8 to 1

@mattiarighi
Copy link
Contributor Author

Which model(s) did you try?

@valeriupredoi
Copy link
Contributor

or when running with IPSL-CM4

@valeriupredoi
Copy link
Contributor

I ran MIPIM-ECHAM5 (problem with multiproc save), IPSL-CM4 (goes fine) and BCM2 (same problem as ECHAM5)

@valeriupredoi
Copy link
Contributor

what puzzles me is why this works:

import os
import iris
import numpy as np
import multiprocessing as Pool

def my_func(x1):
    print("Loading cubes...")
    a = iris.load_cube("/badc/cmip3_drs/data/cmip3/output/MPIM/ECHAM5/20c3m/mon/atmos/tas/r1/v1/tas_A1_ECHAM5_20c3m_r1.nc")

    print("Saving cubes...")
    iris.save(a, "./a.nc")
    return  "I iz done"

with Pool.Pool(processes=2) as pool:
    x = pool.map(my_func, [2])
    print(x)

and the saver inside the tool doesnt. @bouweandela any ideas?

@valeriupredoi
Copy link
Contributor

afraid upgrading to iris=2.3.0 and dask=2.9.2 is not resolving this still -> most prob something to ping @bjlittle for 🍺

@valeriupredoi
Copy link
Contributor

Huzzah!! I have managed to replicate the very same hang and Trace (via KeyBoardInterrupt) for a toy model and not for the bulky esmvalcore framework:

from dask import compute

def my_func2(x1):
    a = iris.load_cube("/badc/cmip3_drs/data/cmip3/output/MPIM/ECHAM5/20c3m/mon/atmos/tas/r1/v1/tas_A1_ECHAM5_20c3m_r1.nc")
    m = a.core_data() ** 2.0
    s = compute(*m)
    print(s)

my_func2(2)
pool = Pool.Pool(processes=2)
x = pool.map(my_func2, [2])

@valeriupredoi
Copy link
Contributor

OK even more simple and with any sort of netCDF file (not necessarily CMIP3):

from dask import compute

def my_func2(x1):
    a = iris.load_cube("/home/users/valeriu/t2.nc")
    print(a)
    m = a.core_data()
    s = compute(m)

my_func2(2)
pool = Pool.Pool(processes=2)
x = pool.map(my_func2, [2])

the Trace is coming from the lazy data computation in the same manner as for that CMIP3 file

^CTraceback (most recent call last):
  File "load_cmip3.py", line 27, in <module>
Process ForkPoolWorker-1:
Process ForkPoolWorker-2:
    x = pool.map(my_func2, [2])
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 651, in get
    self.wait(timeout)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 648, in wait
    self._event.wait(timeout)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
Traceback (most recent call last):
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "load_cmip3.py", line 20, in my_func2
    a = iris.load_cube("/home/users/valeriu/t2.nc")
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/__init__.py", line 378, in load_cube
    cube = cubes.merge_cube()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/cube.py", line 394, in merge_cube
    proto_cube = iris._merge.ProtoCube(self[0])
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/_merge.py", line 1098, in __init__
    coord_payload = self._extract_coord_payload(cube)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/_merge.py", line 1675, in _extract_coord_payload
    points = coord.points
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/coords.py", line 640, in points
    return self._points_dm.data.view()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/_data_manager.py", line 216, in data
    result = as_concrete_data(self._lazy_array)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/_lazy_data.py", line 267, in as_concrete_data
    data, = _co_realise_lazy_arrays([data])
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/iris/_lazy_data.py", line 230, in _co_realise_lazy_arrays
    computed_arrays = da.compute(*arrays)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/threaded.py", line 81, in get
    **kwargs
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 475, in get_async
    key, res_info, failed = queue_get(queue)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/queue.py", line 170, in get
    self.not_empty.wait()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt

It looks like one worker is tripping on the other worker that thinks it hasn't finished yet. It is interesting to see this happen only if I call my_func2 before calling the mapping func with pool. I don't understand why this is happening since the call to my_func should not affect the pool

@valeriupredoi
Copy link
Contributor

and yes @bouweandela was (partially) correct - it's at save stage where it happens in the case of esmvalcore since that's the only spot where data gets realized (da.compute(*arrays)). Inherently this is an issue with da.compute() (so dask) and pool; in fact, nothing to do with iris either since it all starts from the da.compute() point eg

from dask import compute
import dask.array as da

def my_func2(x1):
    a = da.arange(4)
    print(a)
    s = compute(a)

my_func2(2)
pool = Pool.Pool(processes=2)
x = pool.map(my_func2, [2])

will hang in the same way, Ctrl-C-ing it

dask.array<arange, shape=(4,), dtype=int64, chunksize=(4,), chunktype=numpy.ndarray>
dask.array<arange, shape=(4,), dtype=int64, chunksize=(4,), chunktype=numpy.ndarray>
^CProcess ForkPoolWorker-2:
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "load_cmip3.py", line 27, in <module>
    x = pool.map(my_func2, [2])
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 651, in get
    self.wait(timeout)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 648, in wait
    self._event.wait(timeout)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 552, in wait
Traceback (most recent call last):
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
    signaled = self._cond.wait(timeout)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
Traceback (most recent call last):
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "load_cmip3.py", line 23, in my_func2
    s = compute(a)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/threaded.py", line 81, in get
    **kwargs
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 475, in get_async
    key, res_info, failed = queue_get(queue)
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/queue.py", line 170, in get
    self.not_empty.wait()
  File "/home/users/valeriu/anaconda3R/envs/esmvaltool/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt

The specific setup for the problem is that the same function that needs to realize lazy data is called once then it's called again with pool and the workers trip on each other; here's an interesting hint: if you move the call to my_func after you've built the Pool, all goes fine:

pool = Pool.Pool(processes=2)
my_func2(2)
x = pool.map(my_func2, [2])

@valeriupredoi
Copy link
Contributor

so for some odd reason, in the case of those CMIP3 models, the workflow is somehow calling a function that needs to realize the data before calling the same function in a Pool and things get hanging -> any ideas how this might happen @bouweandela ? 🍺

@jvegreg
Copy link
Contributor

jvegreg commented Jan 20, 2020

In the case of those CMIP3 models, the workflow is somehow calling a function that needs to realize the data

There is one thing: CMIP3 models do not have dates in their name by default, so we read the file at the very beginning to know if the requested data is available or not. The thing that puzzles me is that this is done in a completely different cube

@valeriupredoi
Copy link
Contributor

In the case of those CMIP3 models, the workflow is somehow calling a function that needs to realize the data

There is one thing: CMIP3 models do not have dates in their name by default, so we read the file at the very beginning to know if the requested data is available or not. The thing that puzzles me is that this is done in a completely different cube

and this particular scenario in which a function is run on the cube while Pool is running the same function hence the hanging is happening only for a few select models, the others work fine

@bouweandela
Copy link
Member

Great detective work @valeriupredoi! Did you report the issue #430 (comment) with dask?

@valeriupredoi
Copy link
Contributor

no, should I report it to the dask folks, what you reckon?

@bouweandela
Copy link
Member

Yes, I guess so? If they tell us that's now how we're supposed to use dask we'll have learned something too ;-)

I'm planning to spend some time to see if we can parallelize esmvaltool runs better with dask probably in May.

@bouweandela
Copy link
Member

Actually, I'm surprised no-one ran into this issue when reading the vertical levels to regrid to from another dataset, because the code that does that looks quite similar.

@Peter9192
Copy link
Contributor

There is one thing: CMIP3 models do not have dates in their name by default, so we read the file at the very beginning to know if the requested data is available or not. The thing that puzzles me is that this is done in a completely different cube

I just found this issue as I encountered exactly the same problem on a copy of some CMIP6 data where the dates were removed from the file names (please don't do that!!!).

Is there already some sort of solution for this problem? Otherwise maybe we should add a warning to that function where the cube is originally loaded to read the dates from the cubes?

@bouweandela
Copy link
Member

bouweandela commented Feb 10, 2021

No, I think the way forward is to report the problem mentioned by @valeriupredoi in #430 (comment) with dask (or check if someone else already reported it) and see if they're willing to fix it. If not, we need to

  1. re-think the way we run multiple tasks (possibly not by running multiple processes, each running it's own dask threads) or
  2. figure out a way to read the required information from the netcdf files without using dask, e.g. by using the NetCDF4 library.

@valeriupredoi
Copy link
Contributor

OK I came back to this prepandemic issue, things have not changed - I have polished a bit the toy example:

import dask.array as da
from dask import compute
from dask.distributed import Client
from multiprocessing import Pool


def my_func(x1):
    a = da.arange(4)
    print(a)
    s = compute(a)


def main():
    # call function like a peasant
    my_func(2)

    # call function like an mpi pro
    pool = Pool(processes=2)
    x = pool.map(my_func, [2])

    # call function like a dask distributed pro
    client = Client()
    x = client.map(my_func, [2])
    x = client.gather(x)


if __name__ == '__main__':
    main()

and it still shows the same behavior - note that if you use just the functional call and the dask cluster (comment out the mpi bit) all goes fine (albeit slowly, well, it's expected). I am going to post this to Dask GH now 👍

@valeriupredoi
Copy link
Contributor

Ok guys, we have us an issue dask/dask#8218

@zklaus
Copy link

zklaus commented Oct 7, 2021

It's good to have the issue and gain some experience with basic dask concepts. Please note that nothing here has anything to do with MPI. Some comments on the example program:

import dask.array as da
from dask import compute
from dask.distributed import Client
from multiprocessing import Pool


def my_func(x1):
    a = da.arange(4)
    print(a)
    s = compute(a)

This compute call seems rather misplaced. That turns a always into a numpy array, which kind of defeats the purpose of mapping the function later on. But the whole function is a bit strange: It doesn't use its argument and it doesn't return anything, yet its return value is used later on in the program? It's probably also not a great idea to have print in here since that will not reliably produce output with dask.distributed because things are executed in different processes that don't share the terminal.

def main():
    # call function like a peasant
    my_func(2)

    # call function like an ~mpi~ pro
    pool = Pool(processes=2)
    x = pool.map(my_func, [2])

What is x supposed to be?

    # call function like a dask distributed pro
    client = Client()
    x = client.map(my_func, [2])
    x = client.gather(x)

What is x supposed to be here? This client.map and client.gather stuff seems needlessly complicated, but in any case, it is not something we are doing, so it's also unclear how this can help us.

if __name__ == '__main__':
    main()

It's unclear what the expected behavior of the program is. That makes it very hard to debug.

@valeriupredoi
Copy link
Contributor

@zklaus I reckon we should use the issue in Dask to comment to get those guys to understand the problem at hand. Cheers for your detailed view of the code - but the problem here is (as I told the Dask guys too) we shouldn't debug the toy model - that toy code is what I came up with to describe in minimal code what (I think, see my analysis process above) the chain of events is in our stack, and the reason why the analysis of CMIP3 data hangs. Please have a look at the whole thread here and see if you can come up with a better scale/toy model of the stack so we can decide if it's indeed an issue with the way we use dask or if it's an intrinsic issue with Dask itself 🍺

@bouweandela
Copy link
Member

bouweandela commented Oct 11, 2021

@valeriupredoi I agree with @zklaus that you may need to clarify a few things before the dask developers can do anything with your issue. From the comments it attracted, I can see that it is not quite clear enough now.

Note that the trivially simple solution to most of the problems described in this issue would be to not fall back to reading the start and end year from the file if it cannot be obtained from the filename, i.e. remove this code:

# As final resort, try to get the dates from the file contents
if (start_year is None or end_year is None) and Path(filename).exists():
logger.debug("Must load file %s for daterange ", filename)
cubes = iris.load(filename)
for cube in cubes:
logger.debug(cube)
try:
time = cube.coord('time')
except iris.exceptions.CoordinateNotFoundError:
continue
start_year = time.cell(0).point.year
end_year = time.cell(-1).point.year
break

and return the complete list of input files, without filtering on time if it is not possible to get the start and end time from the filename, here:
def select_files(filenames, start_year, end_year):
"""Select files containing data between start_year and end_year.
This works for filenames matching *_YYYY*-YYYY*.* or *_YYYY*.*
"""
selection = []
for filename in filenames:
start, end = get_start_end_year(filename)
if start <= end_year and end >= start_year:
selection.append(filename)
return selection

@zklaus
Copy link

zklaus commented Oct 11, 2021

I like the idea of just returning everything. Things being lazy, the impact should be rather small, particularly considering that this happens when the time span was not deemed important enough to be included in the filename.

@valeriupredoi
Copy link
Contributor

I like that solution too! Let's plug it in and test, then I can close the issue at Dask - the problem is very simple in my view, but they haven't actually understood what's going on - in fairness, the setup of the used case really doesn't make any sense to anyone with a sane engineer mind anyway, but maybe just a red flag for us not to use dask with compute when we're running the same bit of code in two places, one of them sent to mpi - anyways, I think we've learned enough from this issue, cheers guys! 🍺

@zklaus
Copy link

zklaus commented Oct 11, 2021

Yes, let's try the solution suggested by Bouwe.

Let's also note that MPI is nowhere near any of this, so let's not keep bringing it up.

If there is a problem in dask, it would be good to get to the bottom of it. But we are not using distributed at all and have a grand total of one explicit call to the dask compute function. So we probably really need to better understand what the problem is before we can ask for help in a meaningful way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
preprocessor Related to the preprocessor
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants