Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add acknowledgeable rpc errors #18631

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions chia/rpc/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import contextlib
import dataclasses
import logging
import traceback
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple, get_type_hints
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple, Type, get_type_hints

import aiohttp
from chia_rs import AugSchemeMPL
Expand Down Expand Up @@ -38,6 +39,20 @@
ALL_TRANSLATION_LAYERS: Dict[str, TranslationLayer] = {"CHIP-0028": BLIND_SIGNER_TRANSLATION}


class AcknowledgedError(Exception):
def __init__(self, original: Exception) -> None:
super().__init__(f"Acknowledged error: {original}")
self.original = original


@contextlib.contextmanager
def acknowledge(*exceptions: Type[Exception]):
try:
yield
except exceptions as e:
raise AcknowledgedError(e) from e


def marshal(func: MarshallableRpcEndpoint) -> RpcEndpoint:
hints = get_type_hints(func)
request_hint = hints["request"]
Expand Down Expand Up @@ -87,8 +102,17 @@ async def inner(request) -> aiohttp.web.Response:
if "success" not in res_object:
res_object["success"] = True
except Exception as e:
if isinstance(e, AcknowledgedError):
acknowledged = True
e = e.original
else:
acknowledged = False

tb = traceback.format_exc()
log.warning(f"Error while handling message: {tb}")

if not acknowledged:
log.warning(f"Error while handling message: {tb}")

if len(e.args) > 0:
res_object = {"success": False, "error": f"{e.args[0]}", "traceback": f"{tb}"}
else:
Expand Down
37 changes: 24 additions & 13 deletions chia/rpc/wallet_rpc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from chia.pools.pool_wallet_info import FARMING_TO_POOL, PoolState, PoolWalletInfo, create_pool_state
from chia.protocols.wallet_protocol import CoinState
from chia.rpc.rpc_server import Endpoint, EndpointResult, default_get_connections
from chia.rpc.util import marshal, tx_endpoint
from chia.rpc.util import acknowledge, marshal, tx_endpoint
from chia.rpc.wallet_request_types import (
ApplySignatures,
ApplySignaturesResponse,
Expand Down Expand Up @@ -133,6 +133,7 @@
from chia.wallet.wallet_node import WalletNode
from chia.wallet.wallet_protocol import WalletProtocol
from chia.wallet.wallet_spend_bundle import WalletSpendBundle
from chia.wallet.wallet_state_manager import WalletOfTypeNotFoundError

# Timeout for response from wallet/full node for sending a transaction
TIMEOUT = 30
Expand Down Expand Up @@ -4255,7 +4256,7 @@ async def create_new_dl(

try:
dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
except ValueError:
except WalletOfTypeNotFoundError:
async with self.service.wallet_state_manager.lock:
dl_wallet = await DataLayerWallet.create_new_dl_wallet(self.service.wallet_state_manager)

Expand All @@ -4282,7 +4283,7 @@ async def dl_track_new(self, request: Dict[str, Any]) -> EndpointResult:
raise ValueError("The wallet service is not currently initialized")
try:
dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
except ValueError:
except WalletOfTypeNotFoundError:
async with self.service.wallet_state_manager.lock:
dl_wallet = await DataLayerWallet.create_new_dl_wallet(
self.service.wallet_state_manager,
Expand All @@ -4306,7 +4307,8 @@ async def dl_stop_tracking(self, request: Dict[str, Any]) -> EndpointResult:
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
await dl_wallet.stop_tracking_singleton(bytes32.from_hexstr(request["launcher_id"]))
return {}

Expand All @@ -4318,7 +4320,8 @@ async def dl_latest_singleton(self, request: Dict[str, Any]) -> EndpointResult:
only_confirmed = request.get("only_confirmed")
if only_confirmed is None:
only_confirmed = False
wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
record = await wallet.get_latest_singleton(bytes32.from_hexstr(request["launcher_id"]), only_confirmed)
return {"singleton": None if record is None else record.to_json_dict()}

Expand All @@ -4327,7 +4330,8 @@ async def dl_singletons_by_root(self, request: Dict[str, Any]) -> EndpointResult
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
records = await wallet.get_singletons_by_root(
bytes32.from_hexstr(request["launcher_id"]), bytes32.from_hexstr(request["root"])
)
Expand All @@ -4345,7 +4349,8 @@ async def dl_update_root(
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
async with self.service.wallet_state_manager.lock:
await wallet.create_update_state_spend(
bytes32.from_hexstr(request["launcher_id"]),
Expand All @@ -4371,7 +4376,8 @@ async def dl_update_multiple(
if self.service.wallet_state_manager is None:
return {"success": False, "error": "not_initialized"}

wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
async with self.service.wallet_state_manager.lock:
# TODO: This method should optionally link the singletons with announcements.
# Otherwise spends are vulnerable to signature subtraction.
Expand All @@ -4394,7 +4400,8 @@ async def dl_history(self, request: Dict[str, Any]) -> EndpointResult:
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
additional_kwargs = {}

if "min_generation" in request:
Expand All @@ -4413,7 +4420,8 @@ async def dl_owned_singletons(self, request: Dict[str, Any]) -> EndpointResult:
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
singletons = await wallet.get_owned_singletons()
singletons_json = [singleton.to_json_dict() for singleton in singletons]

Expand All @@ -4424,7 +4432,8 @@ async def dl_get_mirrors(self, request: Dict[str, Any]) -> EndpointResult:
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
wallet = self.service.wallet_state_manager.get_dl_wallet()
mirrors_json = []
for mirror in await wallet.get_mirrors_for_launcher(bytes32.from_hexstr(request["launcher_id"])):
mirrors_json.append(mirror.to_json_dict())
Expand All @@ -4442,7 +4451,8 @@ async def dl_new_mirror(
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
async with self.service.wallet_state_manager.lock:
await dl_wallet.create_new_mirror(
bytes32.from_hexstr(request["launcher_id"]),
Expand All @@ -4468,7 +4478,8 @@ async def dl_delete_mirror(
if self.service.wallet_state_manager is None:
raise ValueError("The wallet service is not currently initialized")

dl_wallet = self.service.wallet_state_manager.get_dl_wallet()
with acknowledge(WalletOfTypeNotFoundError):
dl_wallet = self.service.wallet_state_manager.get_dl_wallet()

async with self.service.wallet_state_manager.lock:
await dl_wallet.delete_mirror(
Expand Down
14 changes: 12 additions & 2 deletions chia/wallet/wallet_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,23 @@

TWalletType = TypeVar("TWalletType", bound=WalletProtocol[Any])


class WalletError(Exception):
pass


if TYPE_CHECKING:
from chia.wallet.wallet_node import WalletNode


PendingTxCallback = Callable[[], None]


class WalletOfTypeNotFoundError(WalletError):
def __init__(self, wallet_type: WalletType):
super().__init__(f"Wallet type {wallet_type.name} not found")


class WalletStateManager:
interested_ph_cache: Dict[bytes32, List[int]] = {}
interested_coin_cache: Dict[bytes32, List[int]] = {}
Expand Down Expand Up @@ -1757,7 +1767,7 @@ async def _add_coin_states(
wallet_identifier, coin_data = await self.determine_coin_type(peer, coin_state, fork_height)
try:
dl_wallet = self.get_dl_wallet()
except ValueError:
except WalletOfTypeNotFoundError:
pass
else:
if (
Expand Down Expand Up @@ -2634,7 +2644,7 @@ def get_dl_wallet(self) -> DataLayerWallet:
wallet, DataLayerWallet
), f"WalletType.DATA_LAYER should be a DataLayerWallet instance got: {type(wallet).__name__}"
return wallet
raise ValueError("DataLayerWallet not available")
raise WalletOfTypeNotFoundError(WalletType.DATA_LAYER)

async def get_or_create_vc_wallet(self) -> VCWallet:
for _, wallet in self.wallets.items():
Expand Down
Loading