Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Netcdf thread safety #5061

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions lib/iris/fileformats/cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
"""

from abc import ABCMeta, abstractmethod
from collections.abc import Iterable, MutableMapping
from collections.abc import Iterable, Mapping, MutableMapping
import os
import re
import threading
import warnings

import netCDF4
Expand Down Expand Up @@ -1021,6 +1022,21 @@ def __repr__(self):


################################################################################
_file_locks: Mapping[str, threading.Lock] = {}


def get_filepath_lock(path, already_exists=None):
if already_exists is not None:
assert already_exists == (path in _file_locks)
if path not in _file_locks:
_file_locks[path] = threading.RLock()
result = _file_locks[path]
return result


GLOBAL_NETCDF_ACCESS_LOCK = threading.Lock()


class CFReader:
"""
This class allows the contents of a netCDF file to be interpreted according
Expand All @@ -1045,28 +1061,49 @@ class CFReader:

def __init__(self, filename, warn=False, monotonic=False):
self._dataset = None
self._filename = os.path.expanduser(filename)
filename = os.path.expanduser(filename)
filename = os.path.abspath(filename)
self._filename = filename
self._lock = get_filepath_lock(self._filename)
# NOTE: we'd really like to defer this to the start of the related context, but
# prior usage requires us to do most of the work within the init call.
self._lock.acquire()

#: Collection of CF-netCDF variables associated with this netCDF file
self.cf_group = self.CFGroup()

self._dataset = netCDF4.Dataset(self._filename, mode="r")
with GLOBAL_NETCDF_ACCESS_LOCK:
self._dataset = netCDF4.Dataset(self._filename, mode="r")

# Issue load optimisation warning.
if warn and self._dataset.file_format in [
"NETCDF3_CLASSIC",
"NETCDF3_64BIT",
]:
warnings.warn(
"Optimise CF-netCDF loading by converting data from NetCDF3 "
'to NetCDF4 file format using the "nccopy" command.'
)

# Issue load optimisation warning.
if warn and self._dataset.file_format in [
"NETCDF3_CLASSIC",
"NETCDF3_64BIT",
]:
warnings.warn(
"Optimise CF-netCDF loading by converting data from NetCDF3 "
'to NetCDF4 file format using the "nccopy" command.'
)
self._check_monotonic = monotonic

self._translate()
self._build_cf_groups()
self._reset()

self._check_monotonic = monotonic
def __enter__(self):
# Enable use as a context manager
# N.B. this **guarantees* closure of the file, when the context is exited.
# Note: ideally, the class would not do so much work in the __init__ call, and
# would do all that here, after acquiring necessary permissions/locks.
# But for legacy reasons, we can't do that. So **effectively**, the context
# (in terms of access control) alreday started, when we created the object.
return self

self._translate()
self._build_cf_groups()
self._reset()
def __exit__(self, exc_type, exc_value, traceback):
# When used as a context-manager, **always** close the file on exit.
self._close()
self._lock.release()

@property
def filename(self):
Expand Down Expand Up @@ -1294,10 +1331,16 @@ def _reset(self):
for nc_var_name in self._dataset.variables.keys():
self.cf_group[nc_var_name].cf_attrs_reset()

def __del__(self):
def _close(self):
# Explicitly close dataset to prevent file remaining open.
if self._dataset is not None:
self._dataset.close()
with GLOBAL_NETCDF_ACCESS_LOCK:
self._dataset.close()
self._dataset = None

def __del__(self):
# Be sure to close dataset when CFReader is destroyed / garbage-collected.
self._close()


def _getncattr(dataset, attr, default=None):
Expand Down
131 changes: 72 additions & 59 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import iris.coords
import iris.exceptions
import iris.fileformats.cf
from iris.fileformats.cf import GLOBAL_NETCDF_ACCESS_LOCK, get_filepath_lock
from iris.fileformats.netcdf.saver import _CF_ATTRS
import iris.io
import iris.util
Expand All @@ -58,28 +59,39 @@ def _actions_engine():
class NetCDFDataProxy:
"""A reference to the data payload of a single NetCDF file variable."""

__slots__ = ("shape", "dtype", "path", "variable_name", "fill_value")
__slots__ = (
"shape",
"dtype",
"path",
"variable_name",
"fill_value",
"_file_lock",
)

def __init__(self, shape, dtype, path, variable_name, fill_value):
self.shape = shape
self.dtype = dtype
self.path = path
self.variable_name = variable_name
self.fill_value = fill_value
self._file_lock = get_filepath_lock(self.path, already_exists=True)

@property
def ndim(self):
return len(self.shape)

def __getitem__(self, keys):
dataset = netCDF4.Dataset(self.path)
try:
variable = dataset.variables[self.variable_name]
# Get the NetCDF variable data and slice.
var = variable[keys]
finally:
dataset.close()
return np.asanyarray(var)
with self._file_lock:
with GLOBAL_NETCDF_ACCESS_LOCK:
dataset = netCDF4.Dataset(self.path)
try:
variable = dataset.variables[self.variable_name]
# Get the required section of the NetCDF variable data.
data = variable[keys]
finally:
dataset.close()
result = np.asanyarray(data)
return result

def __repr__(self):
fmt = (
Expand Down Expand Up @@ -541,54 +553,55 @@ def load_cubes(filenames, callback=None, constraints=None):
else:
cf = iris.fileformats.cf.CFReader(filename)

# Process each CF data variable.
data_variables = list(cf.cf_group.data_variables.values()) + list(
cf.cf_group.promoted.values()
)
for cf_var in data_variables:
if var_callback and not var_callback(cf_var):
# Deliver only selected results.
continue

# cf_var-specific mesh handling, if a mesh is present.
# Build the mesh_coords *before* loading the cube - avoids
# mesh-related attributes being picked up by
# _add_unused_attributes().
mesh_name = None
mesh = None
mesh_coords, mesh_dim = [], None
if PARSE_UGRID_ON_LOAD:
mesh_name = getattr(cf_var, "mesh", None)
if mesh_name is not None:
with cf:
# Process each CF data variable.
data_variables = list(cf.cf_group.data_variables.values()) + list(
cf.cf_group.promoted.values()
)
for cf_var in data_variables:
if var_callback and not var_callback(cf_var):
# Deliver only selected results.
continue

# cf_var-specific mesh handling, if a mesh is present.
# Build the mesh_coords *before* loading the cube - avoids
# mesh-related attributes being picked up by
# _add_unused_attributes().
mesh_name = None
mesh = None
mesh_coords, mesh_dim = [], None
if PARSE_UGRID_ON_LOAD:
mesh_name = getattr(cf_var, "mesh", None)
if mesh_name is not None:
try:
mesh = meshes[mesh_name]
except KeyError:
message = (
f"File does not contain mesh: '{mesh_name}' - "
f"referenced by variable: '{cf_var.cf_name}' ."
)
logger.debug(message)
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)

cube = _load_cube(engine, cf, cf_var, filename)

# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
cube.add_aux_coord(mesh_coord, mesh_dim)

# Process any associated formula terms and attach
# the corresponding AuxCoordFactory.
try:
mesh = meshes[mesh_name]
except KeyError:
message = (
f"File does not contain mesh: '{mesh_name}' - "
f"referenced by variable: '{cf_var.cf_name}' ."
)
logger.debug(message)
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)

cube = _load_cube(engine, cf, cf_var, filename)

# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
cube.add_aux_coord(mesh_coord, mesh_dim)

# Process any associated formula terms and attach
# the corresponding AuxCoordFactory.
try:
_load_aux_factory(engine, cube)
except ValueError as e:
warnings.warn("{}".format(e))

# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)

# Callback mechanism may return None, which must not be yielded
if cube is None:
continue

yield cube
_load_aux_factory(engine, cube)
except ValueError as e:
warnings.warn("{}".format(e))

# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)

# Callback mechanism may return None, which must not be yielded
if cube is None:
continue

yield cube
Loading