Skip to content

Commit

Permalink
ENH: add lock parameter to Dataset.deposit
Browse files Browse the repository at this point in the history
  • Loading branch information
neutrinoceros committed Aug 31, 2024
1 parent 34b37a0 commit b23c441
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 13 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jobs:
python-version:
- '3.11'
- '3.12'
- 3.13-dev
marker: [''] # needed to avoid collision with PY_LIB job

include:
Expand Down
64 changes: 51 additions & 13 deletions src/gpgi/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import enum
import math
import threading
import sys
import warnings
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, nullcontext
from copy import deepcopy
from functools import cached_property, partial, reduce
from itertools import chain
from textwrap import indent
from threading import Lock
from time import monotonic_ns
from typing import TYPE_CHECKING, Any, Literal, Protocol, Self, assert_never, cast

Expand All @@ -31,6 +33,11 @@
)
from ._typing import FieldMap, Name

if sys.version_info >= (3, 13):
LockType = Lock
else:
from _thread import LockType

if TYPE_CHECKING:
from ._typing import HCIArray, RealArray

Expand Down Expand Up @@ -461,7 +468,8 @@ def __init__(
self.metadata = deepcopy(metadata) if metadata is not None else {}

self._hci: HCIArray | None = None
self._hci_lock = threading.Lock()
self._hci_lock = Lock()
self._deposit_lock = Lock()

super().__init__()

Expand Down Expand Up @@ -666,6 +674,7 @@ def deposit(
return_ghost_padded_array: bool = False,
weight_field: Name | None = None,
weight_field_boundaries: dict[Name, tuple[Name, Name]] | None = None,
lock: Literal["per-instance"] | None | LockType = "per-instance",
) -> np.ndarray:
r"""
Perform particle deposition and return the result as a grid field.
Expand Down Expand Up @@ -710,6 +719,19 @@ def deposit(
combinations with boundaries.
Boundary recipes are applied the weight field (if any) first.
lock (keyword only): 'per-instance' (default), None, or threading.Lock
Fine tune performance for multi-threaded applications: define a
locking strategy around the deposition hotloop.
- 'per-instance': allow multiple Dataset instances to run deposition
concurrently, but forbid concurrent accesses to any specific
instance
- None: no locking is applied. Within some restricted conditions
(e.g. depositing a couple fields concurrently in a sorted dataset),
this may improve walltime performance, but it is also expected to
degrade it in a more general case as it encourages cache-misses
- an arbitrary threading.Lock instance may be supplied to implement
a custom strategy
"""
if callable(method):
from inspect import signature
Expand Down Expand Up @@ -760,6 +782,20 @@ def deposit(
self._sanitize_boundaries(boundaries)
self._sanitize_boundaries(weight_field_boundaries)

lock_ctx: AbstractContextManager
match lock:
case "per-instance":
lock_ctx = self._deposit_lock
case None:
lock_ctx = nullcontext()
case LockType():
lock_ctx = lock
case _:
raise ValueError(
f"Received {lock=!r}. Expected either 'per-instance', "
"None, or an instance of threading.Lock"
)

field = self.particles.fields[particle_field_key]
padded_ret_array = np.zeros(self.grid._padded_shape, dtype=field.dtype)
if weight_field is not None:
Expand All @@ -773,24 +809,26 @@ def deposit(
self._hci = self._setup_host_cell_index(verbose)

tstart = monotonic_ns()
if weight_field is not None:
with lock_ctx:
if weight_field is not None:
func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
wfield,
np.array((), dtype=field.dtype),
self._hci,
wfield_dep,
)

func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
field,
wfield,
np.array((), dtype=field.dtype),
self._hci,
wfield_dep,
padded_ret_array,
)

func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
field,
wfield,
self._hci,
padded_ret_array,
)
tstop = monotonic_ns()
if verbose:
print(
Expand Down
17 changes: 17 additions & 0 deletions tests/test_deposit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import re
from contextlib import nullcontext
from copy import deepcopy
from functools import partial
from threading import Lock

import numpy as np
import numpy.testing as npt
Expand Down Expand Up @@ -590,3 +592,18 @@ def test_particles_on_domain_corners(method, dtype):
},
)
ds.deposit("mass", method=method)


@pytest.mark.parametrize("lock", ["per-instance", None, Lock()])
def test_explicit_lock(lock, sample_2D_dataset):
ds = sample_2D_dataset
mass_dep_ref = ds.deposit("mass", method="nearest_grid_point")
mass_dep_exp = ds.deposit("mass", method="nearest_grid_point", lock=lock)
npt.assert_array_equal(mass_dep_exp, mass_dep_ref)


@pytest.mark.parametrize("lock", ["perinstance", 0, nullcontext()])
def test_invalick_lock(lock, sample_2D_dataset):
ds = sample_2D_dataset
with pytest.raises(ValueError, match="Received lock="):
ds.deposit("mass", method="nearest_grid_point", lock=lock)

0 comments on commit b23c441

Please sign in to comment.