Skip to content

Commit

Permalink
ENH: add lock_strategy parameter to Dataset.deposit
Browse files Browse the repository at this point in the history
  • Loading branch information
neutrinoceros committed Aug 30, 2024
1 parent ac5151c commit fd82f42
Showing 1 changed file with 46 additions and 13 deletions.
59 changes: 46 additions & 13 deletions src/gpgi/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

import enum
import math
import threading
import warnings
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, nullcontext
from copy import deepcopy
from functools import cached_property, partial, reduce
from itertools import chain
from textwrap import indent
from threading import Lock
from time import monotonic_ns
from typing import TYPE_CHECKING, Any, Literal, Protocol, Self, assert_never, cast

Expand All @@ -34,6 +35,8 @@
if TYPE_CHECKING:
from ._typing import HCIArray, RealArray


_GLOBAL_DEPOSIT_LOCK = Lock()
BoundarySpec = tuple[tuple[str, str, str], ...]


Expand Down Expand Up @@ -461,7 +464,8 @@ def __init__(
self.metadata = deepcopy(metadata) if metadata is not None else {}

self._hci: HCIArray | None = None
self._hci_lock = threading.Lock()
self._hci_lock = Lock()
self._deposit_lock = Lock()

super().__init__()

Expand Down Expand Up @@ -666,6 +670,7 @@ def deposit(
return_ghost_padded_array: bool = False,
weight_field: Name | None = None,
weight_field_boundaries: dict[Name, tuple[Name, Name]] | None = None,
locking_strategy: Literal["per-instance", "global"] | None = None,
) -> np.ndarray:
r"""
Perform particle deposition and return the result as a grid field.
Expand Down Expand Up @@ -710,6 +715,18 @@ def deposit(
combinations with boundaries.
Boundary recipes are applied the weight field (if any) first.
locking_strategy (keyword only): 'per-instance' (default), 'global', or None
Fine tune performance for multi-threaded applications: select a
locking strategy around the deposition hotloop. The GIL is released
in all cases.
'per-instance': allow multiple Dataset instances to run deposition
concurrently, but forbid concurrent accesses to any specific instance
'global': only allow one thread to run deposition hotloops at once.
None: no locking is applied. Within some restricted conditions (e.g.
depositing 2 fields concurrently in a sorted dataset), this may improve
walltime performance, but it is also expected to degrade it in a more
general case as it encourages cache-misses.
"""
if callable(method):
from inspect import signature
Expand Down Expand Up @@ -760,6 +777,20 @@ def deposit(
self._sanitize_boundaries(boundaries)
self._sanitize_boundaries(weight_field_boundaries)

lock: AbstractContextManager
match locking_strategy:
case "per-instance":
lock = self._deposit_lock
case "global":
lock = _GLOBAL_DEPOSIT_LOCK
case None:
lock = nullcontext()
case _:
raise ValueError(
f"Got {locking_strategy=!r}, "
"expected either 'per-instance', 'global' or None"
)

field = self.particles.fields[particle_field_key]
padded_ret_array = np.zeros(self.grid._padded_shape, dtype=field.dtype)
if weight_field is not None:
Expand All @@ -773,24 +804,26 @@ def deposit(
self._hci = self._setup_host_cell_index(verbose)

tstart = monotonic_ns()
if weight_field is not None:
with lock:
if weight_field is not None:
func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
wfield,
np.ones(0, dtype=field.dtype),
self._hci,
wfield_dep,
)

func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
field,
wfield,
np.ones(0, dtype=field.dtype),
self._hci,
wfield_dep,
padded_ret_array,
)

func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
field,
wfield,
self._hci,
padded_ret_array,
)
tstop = monotonic_ns()
if verbose:
print(
Expand Down

0 comments on commit fd82f42

Please sign in to comment.