Skip to content

Commit

Permalink
♻️ Refactor IndyVdrLedgerPool to manage singleton behaviour
Browse files Browse the repository at this point in the history
Signed-off-by: ff137 <ff137@proton.me>
  • Loading branch information
ff137 committed Feb 12, 2025
1 parent 15e6958 commit dc5019a
Showing 1 changed file with 118 additions and 125 deletions.
243 changes: 118 additions & 125 deletions acapy_agent/ledger/indy_vdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,7 @@ def __init__(
read_only: bool = False,
socks_proxy: Optional[str] = None,
):
"""Private constructor. Use 'create_instance' to instantiate."""
LOGGER.debug(
"Initializing IndyVdrLedgerPool with name: %s, keepalive: %s, "
"cache_duration: %s, read_only: %s",
name,
keepalive,
cache_duration,
read_only,
)

"""Private constructor. Use 'get_or_create' to instantiate."""
# Instance attributes
self.name = name
self.keepalive = keepalive
Expand All @@ -108,17 +99,16 @@ def __init__(
self.read_only = read_only
self.socks_proxy = socks_proxy

self.ref_count = 0
self.ref_lock = asyncio.Lock()
self.close_task: Optional[asyncio.Task] = None
self.handle: Optional[Pool] = None
self.cfg_path_cache: Optional[Path] = None
self.genesis_hash_cache: Optional[str] = None
self.genesis_txns_cache = genesis_transactions
self.init_config = bool(genesis_transactions)
self.taa_cache: Optional[str] = None

LOGGER.debug("Pool %s initialization staged", name)
self._ref_count = 0
self._ref_lock = asyncio.Lock()
self._close_task: Optional[asyncio.Task] = None
self._cfg_path_cache: Optional[Path] = None
self._genesis_hash_cache: Optional[str] = None
self._genesis_txns_cache = genesis_transactions
self._init_config = bool(genesis_transactions)

@classmethod
async def get_or_create(
Expand All @@ -132,7 +122,7 @@ async def get_or_create(
read_only: bool = False,
socks_proxy: Optional[str] = None,
) -> "IndyVdrLedgerPool":
"""Asynchronously get or create the singleton instance based on configuration.
"""Asynchronously get or create a singleton instance based on configuration.
Args:
name: The pool ledger configuration name.
Expand All @@ -148,23 +138,27 @@ async def get_or_create(
"""
LOGGER.debug(
"Creating or retrieving IndyVdrLedgerPool instance with params: name=%s, "
"keepalive=%s, cache_duration=%s, read_only=%s, socks_proxy=%s",
"keepalive=%s, cache_duration=%s, read_only=%s",
name,
keepalive,
cache_duration,
read_only,
socks_proxy,
)

config_key = (name, keepalive, cache_duration, read_only, socks_proxy)
LOGGER.debug("Generated config key: %s", config_key)

config_key = (
name,
keepalive,
cache_duration,
genesis_transactions,
read_only,
socks_proxy,
)
async with cls._lock:
if config_key not in cls._instances:
LOGGER.debug(
"No existing instance found for config key, creating new instance"
)
instance = cls(
ledger_pool_instance = cls(
name=name,
keepalive=keepalive,
cache=cache,
Expand All @@ -175,58 +169,48 @@ async def get_or_create(
)
try:
LOGGER.debug("Initializing new IndyVdrLedgerPool instance")
await instance.initialize()
await ledger_pool_instance._initialize()
except Exception as e:
LOGGER.exception(
"Initialization failed for IndyVdrLedgerPool with config: %s",
LOGGER.error(
"Initialization failed for IndyVdrLedgerPool with config: %s\n%s",
config_key,
exc_info=e,
e,
)
raise
cls._instances[config_key] = instance
cls._instances[config_key] = ledger_pool_instance
LOGGER.debug(
"Successfully created and stored new IndyVdrLedgerPool instance: %s",
config_key,
"Successfully created new IndyVdrLedgerPool instance with name %s",
name,
)
else:
LOGGER.debug(
"Found existing IndyVdrLedgerPool instance for config: %s", config_key
)
instance = cls._instances[config_key]

async with instance.ref_lock:
instance.ref_count += 1
LOGGER.debug(
"Incremented reference count to %s for instance %s",
instance.ref_count,
config_key,
"Found existing IndyVdrLedgerPool instance with name %s", name
)
ledger_pool_instance = cls._instances[config_key]
await ledger_pool_instance._cancel_close_task()

LOGGER.debug(
"Returning IndyVdrLedgerPool instance with ref_count: %s",
instance.ref_count,
)
return instance
return ledger_pool_instance

async def initialize(self):
async def _initialize(self) -> None:
"""Initialize the ledger pool."""
LOGGER.debug("Beginning pool initialization")
if self.init_config:
if self._init_config:
LOGGER.debug("Creating pool config with genesis transactions")
await self.create_pool_config(self.genesis_txns_cache, recreate=True)
self.init_config = False
await self._create_pool_config(self._genesis_txns_cache, recreate=True)
self._init_config = False
LOGGER.debug("Opening pool connection")
await self.open()
await self._open()
LOGGER.debug("Pool initialization complete")

@classmethod
async def release_instance(cls, instance: "IndyVdrLedgerPool"):
async def release_instance(cls, instance: "IndyVdrLedgerPool") -> None:
"""Release a reference to the instance and possibly remove it from the registry.
Args:
instance: The IndyVdrLedgerPool instance to release.
"""
LOGGER.debug("Beginning instance release process for pool: %s", instance.name)

config_key = (
instance.name,
instance.keepalive,
Expand All @@ -235,64 +219,58 @@ async def release_instance(cls, instance: "IndyVdrLedgerPool"):
instance.read_only,
instance.socks_proxy,
)
LOGGER.debug("Generated config key for release: %s", config_key)

async with cls._lock:
async with instance.ref_lock:
instance.ref_count -= 1
if instance._ref_count <= 0:
LOGGER.debug("Reference count is empty, cleaning up instance")
await instance._close()
del cls._instances[config_key]
LOGGER.debug(
"Decremented reference count to %s for instance %s",
instance.ref_count,
config_key,
"Successfully removed IndyVdrLedgerPool instance with name %s",
instance.name,
)
else:
LOGGER.debug(
"Instance %s still has active references: %s",
instance.name,
instance._ref_count,
)
if instance.ref_count <= 0:
LOGGER.debug(
"Reference count is zero or negative, cleaning up instance"
)
await instance.close()
del cls._instances[config_key]
LOGGER.debug(
"Successfully removed IndyVdrLedgerPool instance: %s", config_key
)
else:
LOGGER.debug(
"Instance still has active references: %s", instance.ref_count
)

@property
def cfg_path(self) -> Path:
"""Get the path to the configuration file, ensuring it's created."""
if not self.cfg_path_cache:
if not self._cfg_path_cache:
LOGGER.debug("Creating configuration path cache")
self.cfg_path_cache = storage_path("vdr", create=True)
LOGGER.debug("Configuration path set to: %s", self.cfg_path_cache)
return self.cfg_path_cache
self._cfg_path_cache = storage_path("vdr", create=True)
LOGGER.debug("Configuration path set to: %s", self._cfg_path_cache)
return self._cfg_path_cache

@property
def genesis_hash(self) -> str:
"""Get the hash of the configured genesis transactions."""
if not self.genesis_hash_cache:
if not self._genesis_hash_cache:
LOGGER.debug("Calculating genesis transactions hash")
self.genesis_hash_cache = _hash_txns(self.genesis_txns)
LOGGER.debug("Genesis hash calculated: %s", self.genesis_hash_cache)
return self.genesis_hash_cache
self._genesis_hash_cache = _hash_txns(self.genesis_txns)
LOGGER.debug("Genesis hash calculated: %s", self._genesis_hash_cache)
return self._genesis_hash_cache

@property
def genesis_txns(self) -> str:
"""Get the configured genesis transactions."""
if not self.genesis_txns_cache:
if not self._genesis_txns_cache:
LOGGER.debug("Loading genesis transactions from file")
try:
path = self.cfg_path.joinpath(self.name, "genesis")
LOGGER.debug("Reading genesis file from: %s", path)
self.genesis_txns_cache = _normalize_txns(open(path).read())
self._genesis_txns_cache = _normalize_txns(open(path).read())
LOGGER.debug("Successfully loaded genesis transactions")
except FileNotFoundError:
LOGGER.error("Pool config '%s' not found", self.name)
raise LedgerConfigError("Pool config '%s' not found", self.name) from None
return self.genesis_txns_cache
return self._genesis_txns_cache

async def create_pool_config(self, genesis_transactions: str, recreate: bool = False):
async def _create_pool_config(
self, genesis_transactions: str, recreate: bool = False
) -> None:
"""Create the pool ledger configuration."""
LOGGER.debug("Creating pool config for '%s', recreate=%s", self.name, recreate)

Expand Down Expand Up @@ -336,16 +314,16 @@ async def create_pool_config(self, genesis_transactions: str, recreate: bool = F
raise LedgerConfigError("Error writing genesis transactions") from err
LOGGER.debug("Successfully wrote pool ledger config '%s'", self.name)

self.genesis_txns_cache = genesis
self._genesis_txns_cache = genesis

async def open(self):
async def _open(self) -> None:
"""Open the pool ledger, creating it if necessary."""
LOGGER.debug("Opening pool ledger: %s", self.name)

if self.init_config:
if self._init_config:
LOGGER.debug("Initializing pool config with genesis transactions")
await self.create_pool_config(self.genesis_txns_cache, recreate=True)
self.init_config = False
await self._create_pool_config(self._genesis_txns_cache, recreate=True)
self._init_config = False

genesis_hash = self.genesis_hash
LOGGER.debug("Using genesis hash: %s", genesis_hash)
Expand Down Expand Up @@ -376,7 +354,22 @@ async def open(self):
except OSError:
LOGGER.exception("Error writing cached genesis transactions")

async def close(self):
async def context_open(self) -> None:
"""Open the ledger if necessary and increase the number of active references."""
await self._cancel_close_task()
async with self._ref_lock:
if not self.handle:
LOGGER.debug("Opening the pool ledger")
await self._open()
self._ref_count += 1
self.pending_use_count = max(0, self.pending_use_count - 1) # Clear pending
LOGGER.debug(
"Incremented reference count to %s for instance %s",
self._ref_count,
self.name,
)

async def _close(self) -> None:
"""Close the pool ledger."""
if self.handle:
LOGGER.debug("Attempting to close pool ledger")
Expand Down Expand Up @@ -404,63 +397,63 @@ async def close(self):
LOGGER.exception(
"Failed to close pool ledger after 3 attempts", exc_info=exc
)
self.ref_count += 1 # if we are here, we should have self.ref_lock
self.close_task = None
self._ref_count += 1 # if we are here, we should have self.ref_lock
LOGGER.debug(
"Re-incremented reference count to %s for instance %s",
self._ref_count,
self.name,
)
self._close_task = None
raise LedgerError("Exception when closing pool ledger") from exc

async def context_open(self):
"""Open the ledger if necessary and increase the number of active references."""
async with self.ref_lock:
if self.close_task:
self.close_task.cancel()
if not self.handle:
LOGGER.debug("Opening the pool ledger")
await self.open()
self.ref_count += 1

async def context_close(self):
async def context_close(self) -> None:
"""Release the reference and schedule closing of the pool ledger."""
LOGGER.debug("Context close called for pool %s", self.name)

async def closer(timeout: int):
async def _keepalive_closer(timeout: int) -> None:
"""Close the pool ledger after a timeout."""
try:
LOGGER.debug(
"Coroutine will sleep for %d seconds before closing the pool.",
timeout,
)
await asyncio.sleep(timeout)
async with self.ref_lock:
if not self.ref_count:
LOGGER.debug(
"No more references. Proceeding to close the pool ledger."
)
await self.close()
else:
LOGGER.debug(
"Reference count is %d. Not closing the pool yet.",
self.ref_count,
)

await IndyVdrLedgerPool.release_instance(self)
except Exception as e:
LOGGER.exception(
"Exception occurred in closer coroutine during pool closure.",
exc_info=e,
LOGGER.error(
"Exception occurred in closer coroutine during pool closure: %s",
e,
)

async with self.ref_lock:
self.ref_count -= 1
LOGGER.debug("Decremented ref_count to %d.", self.ref_count)
if not self.ref_count:
async with self._ref_lock:
self._ref_count -= 1
LOGGER.debug(
"Decremented ref_count to %d for instance %s",
self._ref_count,
self.name,
)
if self._ref_count <= 0:
if self.keepalive:
LOGGER.debug(
"Scheduling closer coroutine with keepalive=%s",
self.keepalive,
)
self.close_task = asyncio.create_task(closer(self.keepalive))
self._close_task = asyncio.create_task(
_keepalive_closer(self.keepalive)
)
else:
LOGGER.debug(
"No keepalive set. Proceeding to close the pool immediately."
)
await self.close()
await IndyVdrLedgerPool.release_instance(self)

async def _cancel_close_task(self) -> None:
"""Cancel any pending close task."""
async with self._ref_lock:
if self._close_task:
self._close_task.cancel()
self._close_task = None


class IndyVdrLedger(BaseLedger):
Expand Down

0 comments on commit dc5019a

Please sign in to comment.