Skip to content

Commit

Permalink
Improved chunk locking
Browse files Browse the repository at this point in the history
Switched to ShareableRLock to allow parallel reads.
Added locking in more places to ensure internal synchronisation.
  • Loading branch information
gentlegiantJGC committed Oct 8, 2024
1 parent 0db24ad commit ec4be49
Showing 1 changed file with 70 additions and 72 deletions.
142 changes: 70 additions & 72 deletions src/amulet/level/abc/_chunk_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from amulet.data_types import DimensionId
from amulet.errors import ChunkDoesNotExist, ChunkLoadError
from amulet.utils.signal import Signal
from amulet.utils.shareable_lock import ShareableRLock

from ._level import LevelFriend, LevelT
from ._history import HistoryManagerLayer
Expand Down Expand Up @@ -58,7 +59,7 @@ class ChunkHandle(
Some internal synchronisation is done to catch some threading issues.
"""

_lock: RLock
_lock: ShareableRLock
_dimension: DimensionId
_key: ChunkKey
_chunk_history: HistoryManagerLayer[ChunkKey]
Expand All @@ -84,7 +85,7 @@ def __init__(
cz: int,
) -> None:
super().__init__(level_ref)
self._lock = RLock()
self._lock = ShareableRLock()
self._dimension_id = dimension_id
self._key = ChunkKey(cx, cz)
self._chunk_history = chunk_history
Expand Down Expand Up @@ -135,13 +136,7 @@ def lock(
:raises:
LockNotAcquired: If the lock could not be acquired.
"""
if not self._lock.acquire(blocking, timeout):
# Thread was not acquired
raise LockNotAcquired("Lock was not acquired.")
try:
yield
finally:
self._lock.release()
return self._lock.unique(blocking, timeout)

@contextmanager
def edit(
Expand Down Expand Up @@ -171,7 +166,7 @@ def edit(
:raises:
LockNotAcquired: If the lock could not be acquired.
"""
with self.lock(blocking=blocking, timeout=timeout):
with self._lock.unique(blocking=blocking, timeout=timeout):
chunk = self.get(components)
yield chunk
# If an exception occurs in user code, this line won't be run.
Expand All @@ -187,41 +182,43 @@ def exists(self) -> bool:
:return: True if the chunk exists. Calling get on this chunk handle may still throw ChunkLoadError
"""
if self._chunk_history.has_resource(self._key):
return self._chunk_history.resource_exists(self._key)
else:
# The history system is not aware of the chunk. Look in the level data
return self._get_raw_dimension().has_chunk(self.cx, self.cz)
with self._lock.shared():
if self._chunk_history.has_resource(self._key):
return self._chunk_history.resource_exists(self._key)
else:
# The history system is not aware of the chunk. Look in the level data
return self._get_raw_dimension().has_chunk(self.cx, self.cz)

def _preload(self) -> None:
"""Load the chunk data if it has not already been loaded."""
if not self._chunk_history.has_resource(self._key):
# The history system is not aware of the chunk. Load from the level data
chunk: Chunk
try:
raw_chunk = self._get_raw_dimension().get_raw_chunk(self.cx, self.cz)
chunk = self._get_raw_dimension().raw_chunk_to_native_chunk(
raw_chunk,
self.cx,
self.cz,
)
except ChunkDoesNotExist:
self._chunk_history.set_initial_resource(self._key, b"")
except ChunkLoadError as e:
self._chunk_history.set_initial_resource(self._key, pickle.dumps(e))
else:
self._chunk_history.set_initial_resource(
self._key, pickle.dumps(chunk.chunk_id)
)
for component_id, component_data in chunk.serialise_chunk().items():
if component_data is None:
raise RuntimeError(
"Component must not be None when initialising chunk"
)
self._chunk_data_history.set_initial_resource(
b"/".join((bytes(self._key), component_id.encode())),
component_data,
with self._lock.unique():
if not self._chunk_history.has_resource(self._key):
# The history system is not aware of the chunk. Load from the level data
chunk: Chunk
try:
raw_chunk = self._get_raw_dimension().get_raw_chunk(self.cx, self.cz)
chunk = self._get_raw_dimension().raw_chunk_to_native_chunk(
raw_chunk,
self.cx,
self.cz,
)
except ChunkDoesNotExist:
self._chunk_history.set_initial_resource(self._key, b"")
except ChunkLoadError as e:
self._chunk_history.set_initial_resource(self._key, pickle.dumps(e))
else:
self._chunk_history.set_initial_resource(
self._key, pickle.dumps(chunk.chunk_id)
)
for component_id, component_data in chunk.serialise_chunk().items():
if component_data is None:
raise RuntimeError(
"Component must not be None when initialising chunk"
)
self._chunk_data_history.set_initial_resource(
b"/".join((bytes(self._key), component_id.encode())),
component_data,
)

def _get_null_chunk(self) -> ChunkT:
"""Get a null chunk instance used for this chunk.
Expand Down Expand Up @@ -260,7 +257,7 @@ def get(self, components: Iterable[str] | None = None) -> ChunkT:
:param components: None to load all components or an iterable of component strings to load.
:return: A unique copy of the chunk data.
"""
with self.lock(blocking=False):
with self._lock.shared():
self._preload()
chunk = self._get_null_chunk()
if components is None:
Expand All @@ -277,35 +274,36 @@ def get(self, components: Iterable[str] | None = None) -> ChunkT:
return chunk

def _set(self, chunk: ChunkT | None) -> None:
"""Lock must be acquired before calling this"""
history = self._chunk_history
if not history.has_resource(self._key):
if self._l.history_enabled:
self._preload()
"""Public lock must be acquired before calling this"""
with self._lock.unique():
history = self._chunk_history
if not history.has_resource(self._key):
if self._l.history_enabled:
self._preload()
else:
history.set_initial_resource(self._key, b"")
if chunk is None:
history.set_resource(self._key, b"")
else:
history.set_initial_resource(self._key, b"")
if chunk is None:
history.set_resource(self._key, b"")
else:
self._validate_chunk(chunk)
try:
old_chunk_class = self.get_class()
except ChunkLoadError:
old_chunk_class = None
new_chunk_class = type(chunk)
component_data = chunk.serialise_chunk()
if old_chunk_class != new_chunk_class and None in component_data.values():
raise RuntimeError(
"When changing chunk class all the data must be present."
)
history.set_resource(self._key, pickle.dumps(new_chunk_class))
for component_id, data in component_data.items():
if data is None:
continue
self._chunk_data_history.set_resource(
b"/".join((bytes(self._key), component_id.encode())),
data,
)
self._validate_chunk(chunk)
try:
old_chunk_class = self.get_class()
except ChunkLoadError:
old_chunk_class = None
new_chunk_class = type(chunk)
component_data = chunk.serialise_chunk()
if old_chunk_class != new_chunk_class and None in component_data.values():
raise RuntimeError(
"When changing chunk class all the data must be present."
)
history.set_resource(self._key, pickle.dumps(new_chunk_class))
for component_id, data in component_data.items():
if data is None:
continue
self._chunk_data_history.set_resource(
b"/".join((bytes(self._key), component_id.encode())),
data,
)

@staticmethod
@abstractmethod
Expand All @@ -322,14 +320,14 @@ def set(self, chunk: ChunkT) -> None:
:raises:
LockNotAcquired: If the chunk is already locked by another thread.
"""
with self.lock(blocking=False):
with self._lock.unique(blocking=False):
self._set(chunk)
self.changed.emit()
self._l.changed.emit()

def delete(self) -> None:
"""Delete the chunk from the level."""
with self.lock(blocking=False):
with self._lock.unique(blocking=False):
self._set(None)
self.changed.emit()
self._l.changed.emit()

0 comments on commit ec4be49

Please sign in to comment.