From ec4be4930270c9c641445cd249e81d6acd39319b Mon Sep 17 00:00:00 2001 From: gentlegiantJGC Date: Tue, 8 Oct 2024 11:25:15 +0100 Subject: [PATCH] Improved chunk locking Switched to ShareableRLock to allow parallel reads. Added locking in more places to ensure internal synchronisation. --- src/amulet/level/abc/_chunk_handle.py | 142 +++++++++++++------------- 1 file changed, 70 insertions(+), 72 deletions(-) diff --git a/src/amulet/level/abc/_chunk_handle.py b/src/amulet/level/abc/_chunk_handle.py index 18c3c25b3..c49e83752 100644 --- a/src/amulet/level/abc/_chunk_handle.py +++ b/src/amulet/level/abc/_chunk_handle.py @@ -12,6 +12,7 @@ from amulet.data_types import DimensionId from amulet.errors import ChunkDoesNotExist, ChunkLoadError from amulet.utils.signal import Signal +from amulet.utils.shareable_lock import ShareableRLock from ._level import LevelFriend, LevelT from ._history import HistoryManagerLayer @@ -58,7 +59,7 @@ class ChunkHandle( Some internal synchronisation is done to catch some threading issues. """ - _lock: RLock + _lock: ShareableRLock _dimension: DimensionId _key: ChunkKey _chunk_history: HistoryManagerLayer[ChunkKey] @@ -84,7 +85,7 @@ def __init__( cz: int, ) -> None: super().__init__(level_ref) - self._lock = RLock() + self._lock = ShareableRLock() self._dimension_id = dimension_id self._key = ChunkKey(cx, cz) self._chunk_history = chunk_history @@ -135,13 +136,7 @@ def lock( :raises: LockNotAcquired: If the lock could not be acquired. """ - if not self._lock.acquire(blocking, timeout): - # Thread was not acquired - raise LockNotAcquired("Lock was not acquired.") - try: - yield - finally: - self._lock.release() + return self._lock.unique(blocking, timeout) @contextmanager def edit( @@ -171,7 +166,7 @@ def edit( :raises: LockNotAcquired: If the lock could not be acquired. """ - with self.lock(blocking=blocking, timeout=timeout): + with self._lock.unique(blocking=blocking, timeout=timeout): chunk = self.get(components) yield chunk # If an exception occurs in user code, this line won't be run. @@ -187,41 +182,43 @@ def exists(self) -> bool: :return: True if the chunk exists. Calling get on this chunk handle may still throw ChunkLoadError """ - if self._chunk_history.has_resource(self._key): - return self._chunk_history.resource_exists(self._key) - else: - # The history system is not aware of the chunk. Look in the level data - return self._get_raw_dimension().has_chunk(self.cx, self.cz) + with self._lock.shared(): + if self._chunk_history.has_resource(self._key): + return self._chunk_history.resource_exists(self._key) + else: + # The history system is not aware of the chunk. Look in the level data + return self._get_raw_dimension().has_chunk(self.cx, self.cz) def _preload(self) -> None: """Load the chunk data if it has not already been loaded.""" - if not self._chunk_history.has_resource(self._key): - # The history system is not aware of the chunk. Load from the level data - chunk: Chunk - try: - raw_chunk = self._get_raw_dimension().get_raw_chunk(self.cx, self.cz) - chunk = self._get_raw_dimension().raw_chunk_to_native_chunk( - raw_chunk, - self.cx, - self.cz, - ) - except ChunkDoesNotExist: - self._chunk_history.set_initial_resource(self._key, b"") - except ChunkLoadError as e: - self._chunk_history.set_initial_resource(self._key, pickle.dumps(e)) - else: - self._chunk_history.set_initial_resource( - self._key, pickle.dumps(chunk.chunk_id) - ) - for component_id, component_data in chunk.serialise_chunk().items(): - if component_data is None: - raise RuntimeError( - "Component must not be None when initialising chunk" - ) - self._chunk_data_history.set_initial_resource( - b"/".join((bytes(self._key), component_id.encode())), - component_data, + with self._lock.unique(): + if not self._chunk_history.has_resource(self._key): + # The history system is not aware of the chunk. Load from the level data + chunk: Chunk + try: + raw_chunk = self._get_raw_dimension().get_raw_chunk(self.cx, self.cz) + chunk = self._get_raw_dimension().raw_chunk_to_native_chunk( + raw_chunk, + self.cx, + self.cz, + ) + except ChunkDoesNotExist: + self._chunk_history.set_initial_resource(self._key, b"") + except ChunkLoadError as e: + self._chunk_history.set_initial_resource(self._key, pickle.dumps(e)) + else: + self._chunk_history.set_initial_resource( + self._key, pickle.dumps(chunk.chunk_id) ) + for component_id, component_data in chunk.serialise_chunk().items(): + if component_data is None: + raise RuntimeError( + "Component must not be None when initialising chunk" + ) + self._chunk_data_history.set_initial_resource( + b"/".join((bytes(self._key), component_id.encode())), + component_data, + ) def _get_null_chunk(self) -> ChunkT: """Get a null chunk instance used for this chunk. @@ -260,7 +257,7 @@ def get(self, components: Iterable[str] | None = None) -> ChunkT: :param components: None to load all components or an iterable of component strings to load. :return: A unique copy of the chunk data. """ - with self.lock(blocking=False): + with self._lock.shared(): self._preload() chunk = self._get_null_chunk() if components is None: @@ -277,35 +274,36 @@ def get(self, components: Iterable[str] | None = None) -> ChunkT: return chunk def _set(self, chunk: ChunkT | None) -> None: - """Lock must be acquired before calling this""" - history = self._chunk_history - if not history.has_resource(self._key): - if self._l.history_enabled: - self._preload() + """Public lock must be acquired before calling this""" + with self._lock.unique(): + history = self._chunk_history + if not history.has_resource(self._key): + if self._l.history_enabled: + self._preload() + else: + history.set_initial_resource(self._key, b"") + if chunk is None: + history.set_resource(self._key, b"") else: - history.set_initial_resource(self._key, b"") - if chunk is None: - history.set_resource(self._key, b"") - else: - self._validate_chunk(chunk) - try: - old_chunk_class = self.get_class() - except ChunkLoadError: - old_chunk_class = None - new_chunk_class = type(chunk) - component_data = chunk.serialise_chunk() - if old_chunk_class != new_chunk_class and None in component_data.values(): - raise RuntimeError( - "When changing chunk class all the data must be present." - ) - history.set_resource(self._key, pickle.dumps(new_chunk_class)) - for component_id, data in component_data.items(): - if data is None: - continue - self._chunk_data_history.set_resource( - b"/".join((bytes(self._key), component_id.encode())), - data, - ) + self._validate_chunk(chunk) + try: + old_chunk_class = self.get_class() + except ChunkLoadError: + old_chunk_class = None + new_chunk_class = type(chunk) + component_data = chunk.serialise_chunk() + if old_chunk_class != new_chunk_class and None in component_data.values(): + raise RuntimeError( + "When changing chunk class all the data must be present." + ) + history.set_resource(self._key, pickle.dumps(new_chunk_class)) + for component_id, data in component_data.items(): + if data is None: + continue + self._chunk_data_history.set_resource( + b"/".join((bytes(self._key), component_id.encode())), + data, + ) @staticmethod @abstractmethod @@ -322,14 +320,14 @@ def set(self, chunk: ChunkT) -> None: :raises: LockNotAcquired: If the chunk is already locked by another thread. """ - with self.lock(blocking=False): + with self._lock.unique(blocking=False): self._set(chunk) self.changed.emit() self._l.changed.emit() def delete(self) -> None: """Delete the chunk from the level.""" - with self.lock(blocking=False): + with self._lock.unique(blocking=False): self._set(None) self.changed.emit() self._l.changed.emit()