Skip to content

Commit

Permalink
Merge pull request #246 from neutrinoceros/enh/deposit_locking_strategy
Browse files Browse the repository at this point in the history
ENH: add `lock` parameter to `Dataset.deposit`
  • Loading branch information
neutrinoceros authored Sep 1, 2024
2 parents 6683a93 + 04e096e commit d5843d2
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 14 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,20 @@ Note `1` is used a placeholder "weight" for `W`, for symmetry reasons: all bound
```
V(x) = (U/W)(x)
```


## Thread safety

Starting in gpgi 2.0.0, thread safety is guaranteed in `Dataset.host_cell_index`
computation and `Dataset.deposit`, and both operations release the
GIL (Global Interpreter Lock) around their respective hotloops. Thread safety is
also tested against the experimental free-threaded build of Python 3.13.

Note that, by default, `Dataset.deposit` still uses a lock per `Dataset`
instance, which in the most general case is preferable since concurrently
depositing many fields can cause catastrophic degradations of performances as
it encourages cache misses. Optimal performance is however application-specific,
so this strategy can be overridden using the `lock` parameter:
- using `lock=None` will not use any lock, which in restricted conditions
leads to better walltime performances
- alternatively, an externally managed `threading.Lock` instance may be supplied
66 changes: 53 additions & 13 deletions src/gpgi/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import enum
import math
import threading
import sys
import warnings
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, nullcontext
from copy import deepcopy
from functools import cached_property, partial, reduce
from itertools import chain
from textwrap import indent
from threading import Lock
from time import monotonic_ns
from typing import TYPE_CHECKING, Any, Literal, Protocol, Self, assert_never, cast

Expand All @@ -31,6 +33,11 @@
)
from ._typing import FieldMap, Name

if sys.version_info >= (3, 13):
LockType = Lock
else:
from _thread import LockType

if TYPE_CHECKING:
from ._typing import HCIArray, RealArray

Expand Down Expand Up @@ -461,7 +468,8 @@ def __init__(
self.metadata = deepcopy(metadata) if metadata is not None else {}

self._hci: HCIArray | None = None
self._hci_lock = threading.Lock()
self._hci_lock = Lock()
self._deposit_lock = Lock()

super().__init__()

Expand Down Expand Up @@ -666,6 +674,7 @@ def deposit(
return_ghost_padded_array: bool = False,
weight_field: Name | None = None,
weight_field_boundaries: dict[Name, tuple[Name, Name]] | None = None,
lock: Literal["per-instance"] | None | LockType = "per-instance",
) -> np.ndarray:
r"""
Perform particle deposition and return the result as a grid field.
Expand Down Expand Up @@ -710,6 +719,21 @@ def deposit(
combinations with boundaries.
Boundary recipes are applied the weight field (if any) first.
lock (keyword only): 'per-instance' (default), None, or threading.Lock
Fine tune performance for multi-threaded applications: define a
locking strategy around the deposition hotloop.
- 'per-instance': allow multiple Dataset instances to run deposition
concurrently, but forbid concurrent accesses to any specific
instance
- None: no locking is applied. Within some restricted conditions
(e.g. depositing a couple fields concurrently in a sorted dataset),
this may improve walltime performance, but it is also expected to
degrade it in a more general case as it encourages cache-misses
- an arbitrary threading.Lock instance may be supplied to implement
a custom strategy
.. versionadded:: 2.0.0
"""
if callable(method):
from inspect import signature
Expand Down Expand Up @@ -760,6 +784,20 @@ def deposit(
self._sanitize_boundaries(boundaries)
self._sanitize_boundaries(weight_field_boundaries)

lock_ctx: AbstractContextManager
match lock:
case "per-instance":
lock_ctx = self._deposit_lock
case None:
lock_ctx = nullcontext()
case LockType():
lock_ctx = lock
case _:
raise ValueError(
f"Received {lock=!r}. Expected either 'per-instance', "
"None, or an instance of threading.Lock"
)

field = self.particles.fields[particle_field_key]
padded_ret_array = np.zeros(self.grid._padded_shape, dtype=field.dtype)
if weight_field is not None:
Expand All @@ -773,24 +811,26 @@ def deposit(
self._hci = self._setup_host_cell_index(verbose)

tstart = monotonic_ns()
if weight_field is not None:
with lock_ctx:
if weight_field is not None:
func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
wfield,
np.array((), dtype=field.dtype),
self._hci,
wfield_dep,
)

func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
field,
wfield,
np.array((), dtype=field.dtype),
self._hci,
wfield_dep,
padded_ret_array,
)

func(
*self._get_padded_cell_edges(),
*self._get_3D_particle_coordinates(),
field,
wfield,
self._hci,
padded_ret_array,
)
tstop = monotonic_ns()
if verbose:
print(
Expand Down
64 changes: 63 additions & 1 deletion tests/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import numpy.testing as npt
import pytest

import gpgi
from gpgi._boundaries import BoundaryRegistry
Expand All @@ -29,7 +31,10 @@ def random_dataset():
"coordinates": {
"x": prng.random(10_000),
"y": prng.random(10_000),
}
},
"fields": {
"mass": prng.random(10_000),
},
},
)

Expand Down Expand Up @@ -93,6 +98,63 @@ def closure():
self.check(results)


@pytest.mark.parametrize("lock", ["per-instance", None, threading.Lock()])
class TestDeposit:
def check(self, results):
ref = results[0]
for res in results[1:]:
npt.assert_array_equal(res, ref)
assert len({id(res) for res in results}) == len(results)

def test_concurrent_threading(self, lock):
# Defines a thread barrier that will be spawned before parallel execution
# this increases the probability of concurrent access clashes.
barrier = threading.Barrier(N_THREADS)

# This object will be shared by all the threads.
ds = random_dataset()

results = []

def closure():
# Ensure that all threads reach this point before concurrent execution.
barrier.wait()
dep = ds.deposit("mass", method="nearest_grid_point", lock=lock)
results.append(dep)

# Spawn n threads that call _setup_host_cell_index concurrently.
workers = []
for _ in range(0, N_THREADS):
workers.append(threading.Thread(target=closure))

for worker in workers:
worker.start()

for worker in workers:
worker.join()

self.check(results)

def test_concurrent_pool(self, lock):
# Defines a thread barrier that will be spawned before parallel execution
# this increases the probability of concurrent access clashes.
barrier = threading.Barrier(N_THREADS)

# This object will be shared by all the threads.
ds = random_dataset()

def closure():
# Ensure that all threads reach this point before concurrent execution.
barrier.wait()
return ds.deposit("mass", method="nearest_grid_point", lock=lock)

with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
futures = [executor.submit(closure) for _ in range(N_THREADS)]

results = [f.result() for f in futures]
self.check(results)


class TestBoundaryRegistry:
def check(self, results):
# only one thread can succeed registration, all others should raise.
Expand Down
17 changes: 17 additions & 0 deletions tests/test_deposit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import re
from contextlib import nullcontext
from copy import deepcopy
from functools import partial
from threading import Lock

import numpy as np
import numpy.testing as npt
Expand Down Expand Up @@ -590,3 +592,18 @@ def test_particles_on_domain_corners(method, dtype):
},
)
ds.deposit("mass", method=method)


@pytest.mark.parametrize("lock", ["per-instance", None, Lock()])
def test_explicit_lock(lock, sample_2D_dataset):
ds = sample_2D_dataset
mass_dep_ref = ds.deposit("mass", method="nearest_grid_point")
mass_dep_exp = ds.deposit("mass", method="nearest_grid_point", lock=lock)
npt.assert_array_equal(mass_dep_exp, mass_dep_ref)


@pytest.mark.parametrize("lock", ["perinstance", 0, nullcontext()])
def test_invalick_lock(lock, sample_2D_dataset):
ds = sample_2D_dataset
with pytest.raises(ValueError, match="Received lock="):
ds.deposit("mass", method="nearest_grid_point", lock=lock)

0 comments on commit d5843d2

Please sign in to comment.