From 861518ca390e7aa967961dbc0f329739642cad36 Mon Sep 17 00:00:00 2001 From: jamshale Date: Tue, 5 Mar 2024 00:09:37 +0000 Subject: [PATCH 1/7] Add upgrade to anoncreds via api Signed-off-by: jamshale --- aries_cloudagent/admin/server.py | 21 +- .../admin/tests/test_admin_server.py | 35 +- .../anoncreds/default/legacy_indy/registry.py | 6 +- .../legacy_indy/tests/test_registry.py | 32 +- aries_cloudagent/core/conductor.py | 75 +- aries_cloudagent/core/tests/test_conductor.py | 59 +- aries_cloudagent/ledger/base.py | 2 +- aries_cloudagent/ledger/indy.py | 4 +- aries_cloudagent/ledger/indy_vdr.py | 4 +- aries_cloudagent/multitenant/manager.py | 8 + .../endorse_transaction/v1_0/manager.py | 9 +- aries_cloudagent/storage/type.py | 1 + aries_cloudagent/utils/profiles.py | 28 + aries_cloudagent/wallet/anoncreds_upgrade.py | 646 ++++++++++++++++++ aries_cloudagent/wallet/routes.py | 85 +++ .../wallet/tests/test_anoncreds_upgrade.py | 210 ++++++ aries_cloudagent/wallet/tests/test_routes.py | 19 + aries_cloudagent/wallet/upgrade_singleton.py | 22 + demo/features/steps/0586-sign-transaction.py | 1 - demo/features/steps/upgrade.py | 24 + demo/features/upgrade.feature | 26 + demo/runners/faber.py | 23 +- 22 files changed, 1258 insertions(+), 82 deletions(-) create mode 100644 aries_cloudagent/wallet/anoncreds_upgrade.py create mode 100644 aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py create mode 100644 aries_cloudagent/wallet/upgrade_singleton.py create mode 100644 demo/features/steps/upgrade.py create mode 100644 demo/features/upgrade.feature diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index c5fb4dc516..4b7b7ffccc 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -18,7 +18,6 @@ setup_aiohttp_apispec, validation_middleware, ) - from marshmallow import fields from ..config.injection_context import InjectionContext @@ -38,6 +37,7 @@ from ..utils.stats import Collector from ..utils.task_queue import TaskQueue from ..version import __version__ +from ..wallet.upgrade_singleton import UpgradeSingleton from .base_server import BaseAdminServer from .error import AdminSetupError from .request_context import AdminRequestContext @@ -58,6 +58,8 @@ "acapy::keylist::updated": "keylist", } +upgrade_singleton = UpgradeSingleton() + class AdminModulesSchema(OpenAPISchema): """Schema for the modules endpoint.""" @@ -205,6 +207,17 @@ async def ready_middleware(request: web.BaseRequest, handler: Coroutine): raise web.HTTPServiceUnavailable(reason="Shutdown in progress") +@web.middleware +async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine): + """Blocking middleware for upgrades.""" + context: AdminRequestContext = request["context"] + + if context._profile.name in upgrade_singleton.current_upgrades: + raise web.HTTPServiceUnavailable(reason="Upgrade in progress") + + return await handler(request) + + @web.middleware async def debug_middleware(request: web.BaseRequest, handler: Coroutine): """Show request detail in debug log.""" @@ -351,6 +364,8 @@ async def check_multitenant_authorization(request: web.Request, handler): is_multitenancy_path = path.startswith("/multitenancy") is_server_path = path in self.server_paths or path == "/features" + # allow base wallets to trigger update through api + is_upgrade_path = path.startswith("/anoncreds/wallet/upgrade") # subwallets are not allowed to access multitenancy routes if authorization_header and is_multitenancy_path: @@ -380,6 +395,7 @@ async def check_multitenant_authorization(request: web.Request, handler): and not is_unprotected_path(path) and not base_limited_access_path and not (request.method == "OPTIONS") # CORS fix + and not is_upgrade_path ): raise web.HTTPUnauthorized() @@ -453,6 +469,9 @@ async def setup_context(request: web.Request, handler): middlewares.append(setup_context) + # Upgrade middleware needs the context setup + middlewares.append(upgrade_middleware) + # Register validation_middleware last avoiding unauthorized validations middlewares.append(validation_middleware) diff --git a/aries_cloudagent/admin/tests/test_admin_server.py b/aries_cloudagent/admin/tests/test_admin_server.py index 300e82f758..22a82baa7c 100644 --- a/aries_cloudagent/admin/tests/test_admin_server.py +++ b/aries_cloudagent/admin/tests/test_admin_server.py @@ -1,22 +1,24 @@ import gc import json +from unittest import IsolatedAsyncioTestCase import pytest -from aries_cloudagent.tests import mock -from unittest import IsolatedAsyncioTestCase from aiohttp import ClientSession, DummyCookieJar, TCPConnector, web from aiohttp.test_utils import unused_port +from aries_cloudagent.tests import mock + from ...config.default_context import DefaultContextBuilder from ...config.injection_context import InjectionContext from ...core.event_bus import Event +from ...core.goal_code_registry import GoalCodeRegistry from ...core.in_memory import InMemoryProfile from ...core.protocol_registry import ProtocolRegistry -from ...core.goal_code_registry import GoalCodeRegistry from ...utils.stats import Collector from ...utils.task_queue import TaskQueue - +from ...wallet.upgrade_singleton import UpgradeSingleton from .. import server as test_module +from ..request_context import AdminRequestContext from ..server import AdminServer, AdminSetupError @@ -477,6 +479,31 @@ async def test_server_health_state(self): assert response.status == 503 await server.stop() + async def test_upgrade_middleware(self): + upgrade_singleton = UpgradeSingleton() + self.context = AdminRequestContext.test_context( + {}, InMemoryProfile.test_profile() + ) + self.request_dict = { + "context": self.context, + } + request = mock.MagicMock( + method="GET", + path_qs="/schemas/created", + match_info={}, + __getitem__=lambda _, k: self.request_dict[k], + ) + handler = mock.CoroutineMock() + + await test_module.upgrade_middleware(request, handler) + + upgrade_singleton.set_wallet("test-profile") + with self.assertRaises(test_module.web.HTTPServiceUnavailable): + await test_module.upgrade_middleware(request, handler) + + upgrade_singleton.remove_wallet("test-profile") + await test_module.upgrade_middleware(request, handler) + @pytest.fixture async def server(): diff --git a/aries_cloudagent/anoncreds/default/legacy_indy/registry.py b/aries_cloudagent/anoncreds/default/legacy_indy/registry.py index 5bed179f96..31eca4f977 100644 --- a/aries_cloudagent/anoncreds/default/legacy_indy/registry.py +++ b/aries_cloudagent/anoncreds/default/legacy_indy/registry.py @@ -1123,7 +1123,7 @@ async def fix_ledger_entry( async def txn_submit( self, - profile: Profile, + ledger: BaseLedger, ledger_transaction: str, sign: bool = None, taa_accept: bool = None, @@ -1131,10 +1131,6 @@ async def txn_submit( write_ledger: bool = True, ) -> str: """Submit a transaction to the ledger.""" - ledger = profile.inject(BaseLedger) - - if not ledger: - raise LedgerError("No ledger available") try: async with ledger: diff --git a/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py b/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py index 58631cfadb..830a0bb722 100644 --- a/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py +++ b/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py @@ -9,7 +9,6 @@ from base58 import alphabet from .....anoncreds.base import ( - AnonCredsRegistrationError, AnonCredsSchemaAlreadyExists, ) from .....anoncreds.models.anoncreds_schema import ( @@ -21,7 +20,7 @@ from .....connections.models.conn_record import ConnRecord from .....core.in_memory.profile import InMemoryProfile from .....ledger.base import BaseLedger -from .....ledger.error import LedgerError, LedgerObjectAlreadyExistsError +from .....ledger.error import LedgerObjectAlreadyExistsError from .....messaging.responder import BaseResponder from .....protocols.endorse_transaction.v1_0.manager import ( TransactionManager, @@ -728,27 +727,16 @@ async def test_register_revocation_registry_definition_with_create_transaction_a assert mock_create_record.called async def test_txn_submit(self): - self.profile.inject = mock.MagicMock( - side_effect=[ - None, - mock.CoroutineMock( - txn_submit=mock.CoroutineMock(side_effect=LedgerError("test error")) - ), - mock.CoroutineMock( - txn_submit=mock.CoroutineMock(return_value="transaction response") - ), - ] + self.profile.context.injector.bind_instance( + BaseLedger, + mock.MagicMock( + txn_submit=mock.CoroutineMock(return_value="transaction_id") + ), ) - - # No ledger - with self.assertRaises(LedgerError): - await self.registry.txn_submit(self.profile, "test_txn") - # Write error - with self.assertRaises(AnonCredsRegistrationError): - await self.registry.txn_submit(self.profile, "test_txn") - - result = await self.registry.txn_submit(self.profile, "test_txn") - assert result == "transaction response" + async with self.profile.session() as session: + ledger = session.inject(BaseLedger) + result = await self.registry.txn_submit(ledger, "test_txn") + assert result == "transaction_id" async def test_register_revocation_list_no_endorsement(self): self.profile.context.injector.bind_instance( diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index d1f44e68ab..7c44ce11a8 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -7,6 +7,7 @@ """ +import asyncio import hashlib import json import logging @@ -17,18 +18,12 @@ from ..admin.base_server import BaseAdminServer from ..admin.server import AdminResponder, AdminServer -from ..commands.upgrade import ( - add_version_record, - get_upgrade_version_list, - upgrade, -) +from ..commands.upgrade import (add_version_record, get_upgrade_version_list, + upgrade) from ..config.default_context import ContextBuilder from ..config.injection_context import InjectionContext -from ..config.ledger import ( - get_genesis_transactions, - ledger_config, - load_multiple_genesis_transactions_from_config, -) +from ..config.ledger import (get_genesis_transactions, ledger_config, + load_multiple_genesis_transactions_from_config) from ..config.logging import LoggingConfigurator from ..config.provider import ClassProvider from ..config.wallet import wallet_config @@ -36,30 +31,28 @@ from ..indy.verifier import IndyVerifier from ..ledger.base import BaseLedger from ..ledger.error import LedgerConfigError, LedgerTransactionError -from ..ledger.multiple_ledger.base_manager import ( - BaseMultipleLedgerManager, - MultipleLedgerManagerError, -) -from ..ledger.multiple_ledger.ledger_requests_executor import IndyLedgerRequestsExecutor -from ..ledger.multiple_ledger.manager_provider import MultiIndyLedgerManagerProvider +from ..ledger.multiple_ledger.base_manager import (BaseMultipleLedgerManager, + MultipleLedgerManagerError) +from ..ledger.multiple_ledger.ledger_requests_executor import \ + IndyLedgerRequestsExecutor +from ..ledger.multiple_ledger.manager_provider import \ + MultiIndyLedgerManagerProvider from ..messaging.responder import BaseResponder from ..multitenant.base import BaseMultitenantManager from ..multitenant.manager_provider import MultitenantManagerProvider -from ..protocols.connections.v1_0.manager import ( - ConnectionManager, - ConnectionManagerError, -) -from ..protocols.connections.v1_0.messages.connection_invitation import ( - ConnectionInvitation, -) -from ..protocols.coordinate_mediation.mediation_invite_store import MediationInviteStore +from ..protocols.connections.v1_0.manager import (ConnectionManager, + ConnectionManagerError) +from ..protocols.connections.v1_0.messages.connection_invitation import \ + ConnectionInvitation +from ..protocols.coordinate_mediation.mediation_invite_store import \ + MediationInviteStore from ..protocols.coordinate_mediation.v1_0.manager import MediationManager from ..protocols.coordinate_mediation.v1_0.route_manager import RouteManager -from ..protocols.coordinate_mediation.v1_0.route_manager_provider import ( - RouteManagerProvider, -) +from ..protocols.coordinate_mediation.v1_0.route_manager_provider import \ + RouteManagerProvider from ..protocols.out_of_band.v1_0.manager import OutOfBandManager -from ..protocols.out_of_band.v1_0.messages.invitation import HSProto, InvitationMessage +from ..protocols.out_of_band.v1_0.messages.invitation import ( + HSProto, InvitationMessage) from ..storage.base import BaseStorage from ..storage.error import StorageNotFoundError from ..storage.record import StorageRecord @@ -67,14 +60,17 @@ from ..transport.inbound.manager import InboundTransportManager from ..transport.inbound.message import InboundMessage from ..transport.outbound.base import OutboundDeliveryError -from ..transport.outbound.manager import OutboundTransportManager, QueuedOutboundMessage +from ..transport.outbound.manager import (OutboundTransportManager, + QueuedOutboundMessage) from ..transport.outbound.message import OutboundMessage from ..transport.outbound.status import OutboundSendStatus from ..transport.wire_format import BaseWireFormat +from ..utils.profiles import get_subwallet_profiles_from_storage from ..utils.stats import Collector from ..utils.task_queue import CompletedTask, TaskQueue from ..vc.ld_proofs.document_loader import DocumentLoader from ..version import RECORD_TYPE_ACAPY_VERSION, __version__ +from ..wallet.anoncreds_upgrade import upgrade_wallet_to_anoncreds from ..wallet.did_info import DIDInfo from .dispatcher import Dispatcher from .error import StartupError @@ -522,7 +518,9 @@ async def start(self) -> None: except Exception: LOGGER.exception("Error accepting mediation invitation") - # notify protocols of startup status + await self.check_for_wallet_upgrades_in_progress() + + # notify protcols of startup status await self.root_profile.notify(STARTUP_EVENT_TOPIC, {}) async def stop(self, timeout=1.0): @@ -823,3 +821,20 @@ async def check_for_valid_wallet_type(self, profile): raise StartupError( f"Wallet type config [{storage_type_from_config}] doesn't match with the wallet type in storage [{storage_type_record.value}]" # noqa: E501 ) + + async def check_for_wallet_upgrades_in_progress(self): + """Check for upgrade and upgrade if needed.""" + multitenant_mgr = self.context.inject_or(BaseMultitenantManager) + if multitenant_mgr: + subwallet_profiles = await get_subwallet_profiles_from_storage( + self.root_profile + ) + await asyncio.gather( + *[ + upgrade_wallet_to_anoncreds(profile, True) + for profile in subwallet_profiles + ] + ) + + else: + await upgrade_wallet_to_anoncreds(self.root_profile) diff --git a/aries_cloudagent/core/tests/test_conductor.py b/aries_cloudagent/core/tests/test_conductor.py index 55a8ab0c4b..e8e6a1e39f 100644 --- a/aries_cloudagent/core/tests/test_conductor.py +++ b/aries_cloudagent/core/tests/test_conductor.py @@ -117,6 +117,8 @@ async def test_startup_version_record_exists(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -166,6 +168,7 @@ async def test_startup_version_record_exists(self): mock_inbound_mgr.return_value.stop.assert_awaited_once_with() mock_outbound_mgr.return_value.stop.assert_awaited_once_with() + assert mock_upgrade.called async def test_startup_version_no_upgrade_add_record(self): builder: ContextBuilder = StubContextBuilder(self.test_settings) @@ -176,6 +179,8 @@ async def test_startup_version_no_upgrade_add_record(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -213,6 +218,8 @@ async def test_startup_version_no_upgrade_add_record(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -257,6 +264,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -296,6 +305,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -335,6 +346,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -373,6 +386,8 @@ async def test_startup_version_record_not_exists(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -449,6 +464,8 @@ async def test_startup_no_public_did(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -492,6 +509,8 @@ async def test_stats(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger: mock_inbound_mgr.return_value.sessions = ["dummy"] @@ -884,6 +903,8 @@ async def test_admin(self): ) as admin_start, mock.patch.object( admin, "stop", autospec=True ) as admin_stop, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -936,6 +957,8 @@ async def test_admin_startx(self): ) as oob_mgr, mock.patch.object( test_module, "ConnectionManager" ) as conn_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -992,7 +1015,9 @@ async def test_start_static(self): ), ), mock.patch.object( test_module, "OutboundTransportManager", autospec=True - ) as mock_outbound_mgr: + ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } @@ -1166,7 +1191,9 @@ async def test_print_invite_connection(self): ), ), mock.patch.object( test_module, "OutboundTransportManager", autospec=True - ) as mock_outbound_mgr: + ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } @@ -1203,6 +1230,8 @@ async def test_clear_default_mediator(self): "MediationManager", return_value=mock.MagicMock(clear_default_mediator=mock.CoroutineMock()), ) as mock_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1254,7 +1283,9 @@ async def test_set_default_mediator(self): mock.MagicMock(value=f"v{__version__}"), ] ), - ): + ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: await conductor.start() await conductor.stop() mock_mgr.return_value.set_default_mediator_by_id.assert_called_once() @@ -1277,6 +1308,8 @@ async def test_set_default_mediator_x(self): "retrieve_by_id", mock.CoroutineMock(side_effect=Exception()), ), mock.patch.object(test_module, "LOGGER") as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1425,6 +1458,8 @@ async def test_mediator_invitation_0160(self, mock_from_url, _): ) as mock_mgr, mock.patch.object( mock_conn_record, "metadata_set", mock.CoroutineMock() ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1484,6 +1519,8 @@ async def test_mediator_invitation_0434(self, mock_from_url, _): ) ), ) as mock_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1542,6 +1579,8 @@ async def test_mediation_invitation_should_use_stored_invitation( ), mock.patch.object( test_module, "MediationManager", return_value=mock_mediation_manager ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1596,7 +1635,9 @@ async def test_mediation_invitation_should_not_create_connection_for_old_invitat mock.MagicMock(value=f"v{__version__}"), ] ), - ): + ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: # when await conductor.start() await conductor.stop() @@ -1631,6 +1672,8 @@ async def test_mediator_invitation_x(self, _): ) as mock_from_url, mock.patch.object( test_module, "LOGGER" ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1694,6 +1737,8 @@ async def test_startup_x_no_storage_version(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LOGGER" ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1735,6 +1780,8 @@ async def test_startup_storage_type_exists_and_matches(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1838,6 +1885,8 @@ async def test_startup_storage_type_does_not_exist_and_existing_agent_then_set_t ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1902,6 +1951,8 @@ async def test_startup_storage_type_does_not_exist_and_new_anoncreds_agent( ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( diff --git a/aries_cloudagent/ledger/base.py b/aries_cloudagent/ledger/base.py index 1ebc8926c5..509dc48d4b 100644 --- a/aries_cloudagent/ledger/base.py +++ b/aries_cloudagent/ledger/base.py @@ -643,7 +643,7 @@ async def send_schema_anoncreds( try: legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, schema_req, sign=True, sign_did=public_info, diff --git a/aries_cloudagent/ledger/indy.py b/aries_cloudagent/ledger/indy.py index 9dc18cbc7a..9967500546 100644 --- a/aries_cloudagent/ledger/indy.py +++ b/aries_cloudagent/ledger/indy.py @@ -1186,7 +1186,7 @@ async def send_revoc_reg_def( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, rev_reg_def_req, sign=True, sign_did=did_info, @@ -1255,7 +1255,7 @@ async def send_revoc_reg_entry( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, rev_reg_def_entry_req, sign=True, sign_did=did_info, diff --git a/aries_cloudagent/ledger/indy_vdr.py b/aries_cloudagent/ledger/indy_vdr.py index 7444311343..1c04677e2d 100644 --- a/aries_cloudagent/ledger/indy_vdr.py +++ b/aries_cloudagent/ledger/indy_vdr.py @@ -1147,7 +1147,7 @@ async def send_revoc_reg_def( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, rev_reg_def_req, sign=True, sign_did=did_info, @@ -1222,7 +1222,7 @@ async def send_revoc_reg_entry( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, revoc_reg_entry_req, sign=True, sign_did=did_info, diff --git a/aries_cloudagent/multitenant/manager.py b/aries_cloudagent/multitenant/manager.py index 550389f0db..4f1cf89134 100644 --- a/aries_cloudagent/multitenant/manager.py +++ b/aries_cloudagent/multitenant/manager.py @@ -3,6 +3,7 @@ import logging from typing import Iterable, Optional +from ..askar.profile_anon import AskarAnoncredsProfile from ..config.injection_context import InjectionContext from ..config.wallet import wallet_config from ..core.profile import Profile @@ -84,6 +85,13 @@ async def get_wallet_profile( profile, _ = await wallet_config(context, provision=provision) self._profiles.put(wallet_id, profile) + # return anoncreds profile if explicitly set as wallet type + if profile.context.settings.get("wallet.type") == "askar-anoncreds": + return AskarAnoncredsProfile( + profile.opened, + profile.context, + ) + return profile async def update_wallet(self, wallet_id: str, new_settings: dict) -> WalletRecord: diff --git a/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py b/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py index a96a2e8fe9..293a8448ef 100644 --- a/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py +++ b/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py @@ -415,6 +415,9 @@ async def complete_transaction( if (not endorser) and ( txn_goal_code != TransactionRecord.WRITE_DID_TRANSACTION ): + ledger = self.profile.inject(BaseLedger) + if not ledger: + raise TransactionManagerError("No ledger available") if ( self._profile.context.settings.get_value("wallet.type") == "askar-anoncreds" @@ -425,13 +428,9 @@ async def complete_transaction( legacy_indy_registry = LegacyIndyRegistry() ledger_response_json = await legacy_indy_registry.txn_submit( - self._profile, ledger_transaction, sign=False, taa_accept=False + ledger, ledger_transaction, sign=False, taa_accept=False ) else: - ledger = self.profile.inject(BaseLedger) - if not ledger: - raise TransactionManagerError("No ledger available") - async with ledger: try: ledger_response_json = await shield( diff --git a/aries_cloudagent/storage/type.py b/aries_cloudagent/storage/type.py index 7a0cc9aab7..5dbcd9d12b 100644 --- a/aries_cloudagent/storage/type.py +++ b/aries_cloudagent/storage/type.py @@ -1,3 +1,4 @@ """Library version information.""" RECORD_TYPE_ACAPY_STORAGE_TYPE = "acapy_storage_type" +RECORD_TYPE_ACAPY_UPGRADING = "acapy_upgrading" diff --git a/aries_cloudagent/utils/profiles.py b/aries_cloudagent/utils/profiles.py index 45a440ed79..d5433f3afd 100644 --- a/aries_cloudagent/utils/profiles.py +++ b/aries_cloudagent/utils/profiles.py @@ -1,10 +1,15 @@ """Profile utilities.""" +import json + from aiohttp import web from ..anoncreds.error_messages import ANONCREDS_PROFILE_REQUIRED_MSG from ..askar.profile_anon import AskarAnoncredsProfile from ..core.profile import Profile +from ..multitenant.manager import MultitenantManager +from ..storage.base import BaseStorageSearch +from ..wallet.models.wallet_record import WalletRecord def is_anoncreds_profile_raise_web_exception(profile: Profile) -> None: @@ -29,3 +34,26 @@ def subwallet_type_not_same_as_base_wallet_raise_web_exception( raise web.HTTPForbidden( reason="Subwallet type must be the same as the base wallet type" ) + + +async def get_subwallet_profiles_from_storage(root_profile: Profile) -> list[Profile]: + """Get subwallet profiles from storage.""" + subwallet_profiles = [] + base_storage_search = root_profile.inject(BaseStorageSearch) + search_session = base_storage_search.search_records( + type_filter=WalletRecord.RECORD_TYPE, page_size=10 + ) + while search_session._done is False: + wallet_storage_records = await search_session.fetch() + for wallet_storage_record in wallet_storage_records: + wallet_record = WalletRecord.from_storage( + wallet_storage_record.id, + json.loads(wallet_storage_record.value), + ) + subwallet_profiles.append( + await MultitenantManager(root_profile).get_wallet_profile( + base_context=root_profile.context, + wallet_record=wallet_record, + ) + ) + return subwallet_profiles diff --git a/aries_cloudagent/wallet/anoncreds_upgrade.py b/aries_cloudagent/wallet/anoncreds_upgrade.py new file mode 100644 index 0000000000..dbe45639a8 --- /dev/null +++ b/aries_cloudagent/wallet/anoncreds_upgrade.py @@ -0,0 +1,646 @@ +"""Functions for upgrading records to anoncreds.""" + +import asyncio +import json +import logging +from typing import Optional + +from anoncreds import ( + CredentialDefinition, + CredentialDefinitionPrivate, + KeyCorrectnessProof, + RevocationRegistryDefinitionPrivate, + Schema, +) + +from ..anoncreds.issuer import ( + CATEGORY_CRED_DEF, + CATEGORY_CRED_DEF_KEY_PROOF, + CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_SCHEMA, +) +from ..anoncreds.models.anoncreds_cred_def import CredDef, CredDefState +from ..anoncreds.models.anoncreds_revocation import ( + RevList, + RevListState, + RevRegDef, + RevRegDefState, + RevRegDefValue, +) +from ..anoncreds.models.anoncreds_schema import SchemaState +from ..anoncreds.revocation import ( + CATEGORY_REV_LIST, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, +) +from ..core.profile import Profile +from ..ledger.multiple_ledger.ledger_requests_executor import ( + GET_CRED_DEF, + GET_SCHEMA, + IndyLedgerRequestsExecutor, +) +from ..messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE +from ..messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE +from ..multitenant.base import BaseMultitenantManager +from ..revocation.models.issuer_cred_rev_record import IssuerCredRevRecord +from ..revocation.models.issuer_rev_reg_record import IssuerRevRegRecord +from ..storage.base import BaseStorage +from ..storage.error import StorageNotFoundError +from ..storage.record import StorageRecord +from ..storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING +from .upgrade_singleton import UpgradeSingleton + +LOGGER = logging.getLogger(__name__) + +# Number of times to retry upgrading records +max_retries = 5 + +upgrade_singleton = UpgradeSingleton() + + +class SchemaUpgradeObj: + """Schema upgrade object.""" + + def __init__( + self, + schema_id: str, + schema: Schema, + name: str, + version: str, + issuer_id: str, + old_record_id: str, + ): + """Initialize schema upgrade object.""" + self.schema_id = schema_id + self.schema = schema + self.name = name + self.version = version + self.issuer_id = issuer_id + self.old_record_id = old_record_id + + +class CredDefUpgradeObj: + """Cred def upgrade object.""" + + def __init__( + self, + cred_def_id: str, + cred_def: CredentialDefinition, + cred_def_private: CredentialDefinitionPrivate, + key_proof: KeyCorrectnessProof, + revocation: Optional[bool] = None, + askar_cred_def: Optional[any] = None, + max_cred_num: Optional[int] = None, + ): + """Initialize cred def upgrade object.""" + self.cred_def_id = cred_def_id + self.cred_def = cred_def + self.cred_def_private = cred_def_private + self.key_proof = key_proof + self.revocation = revocation + self.askar_cred_def = askar_cred_def + self.max_cred_num = max_cred_num + + +class RevRegDefUpgradeObj: + """Rev reg def upgrade object.""" + + def __init__( + self, + rev_reg_def_id: str, + rev_reg_def: RevRegDef, + rev_reg_def_private: RevocationRegistryDefinitionPrivate, + active: bool = False, + ): + """Initialize rev reg def upgrade object.""" + self.rev_reg_def_id = rev_reg_def_id + self.rev_reg_def = rev_reg_def + self.rev_reg_def_private = rev_reg_def_private + self.active = active + + +class RevListUpgradeObj: + """Rev entry upgrade object.""" + + def __init__( + self, + rev_list: RevList, + pending: list, + rev_reg_def_id: str, + cred_rev_records: list, + ): + """Initialize rev entry upgrade object.""" + self.rev_list = rev_list + self.pending = pending + self.rev_reg_def_id = rev_reg_def_id + self.cred_rev_records = cred_rev_records + + +async def get_schema_upgrade_object( + profile: Profile, schema_id: str, askar_schema +) -> SchemaUpgradeObj: + """Get schema upgrade object.""" + + async with profile.session() as session: + schema_id = askar_schema.tags.get("schema_id") + issuer_did = askar_schema.tags.get("schema_issuer_did") + # Need to get schema from the ledger because the attribute names + # are not stored in the wallet + multitenant_mgr = session.inject_or(BaseMultitenantManager) + if multitenant_mgr: + ledger_exec_inst = IndyLedgerRequestsExecutor(profile) + else: + ledger_exec_inst = session.inject(IndyLedgerRequestsExecutor) + + _, ledger = await ledger_exec_inst.get_ledger_for_identifier( + schema_id, + txn_record_type=GET_SCHEMA, + ) + async with ledger: + schema_from_ledger = await ledger.get_schema(schema_id) + + return SchemaUpgradeObj( + schema_id, + Schema.create( + schema_id, + askar_schema.tags.get("schema_name"), + issuer_did, + schema_from_ledger["attrNames"], + ), + askar_schema.tags.get("schema_name"), + askar_schema.tags.get("schema_version"), + issuer_did, + askar_schema.id, + ) + + +async def get_cred_def_upgrade_object( + profile: Profile, askar_cred_def +) -> CredDefUpgradeObj: + """Get cred def upgrade object.""" + cred_def_id = askar_cred_def.tags.get("cred_def_id") + async with profile.session() as session: + # Need to get cred_def from the ledger because the tag + # is not stored in the wallet and don't know wether it supports revocation + multitenant_mgr = session.inject_or(BaseMultitenantManager) + if multitenant_mgr: + ledger_exec_inst = IndyLedgerRequestsExecutor(profile) + else: + ledger_exec_inst = session.inject(IndyLedgerRequestsExecutor) + _, ledger = await ledger_exec_inst.get_ledger_for_identifier( + cred_def_id, + txn_record_type=GET_CRED_DEF, + ) + async with ledger: + cred_def_from_ledger = await ledger.get_credential_definition(cred_def_id) + + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_cred_def_private = await storage.get_record( + CATEGORY_CRED_DEF_PRIVATE, cred_def_id + ) + askar_cred_def_key_proof = await storage.get_record( + CATEGORY_CRED_DEF_KEY_PROOF, cred_def_id + ) + + cred_def = CredDef( + issuer_id=askar_cred_def.tags.get("issuer_did"), + schema_id=askar_cred_def.tags.get("schema_id"), + tag=cred_def_from_ledger["tag"], + type=cred_def_from_ledger["type"], + value=cred_def_from_ledger["value"], + ) + + return CredDefUpgradeObj( + cred_def_id, + cred_def, + askar_cred_def_private.value, + askar_cred_def_key_proof.value, + cred_def_from_ledger["value"].get("revocation", None), + askar_cred_def=askar_cred_def, + ) + + +async def get_rev_reg_def_upgrade_object( + profile: Profile, + cred_def_upgrade_obj: CredDefUpgradeObj, + askar_issuer_rev_reg_def, + is_active: bool, +) -> RevRegDefUpgradeObj: + """Get rev reg def upgrade object.""" + rev_reg_def_id = askar_issuer_rev_reg_def.tags.get("revoc_reg_id") + + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_reg_rev_def_private = await storage.get_record( + CATEGORY_REV_REG_DEF_PRIVATE, rev_reg_def_id + ) + + revoc_reg_def_values = json.loads(askar_issuer_rev_reg_def.value) + + reg_def_value = RevRegDefValue( + revoc_reg_def_values["revoc_reg_def"]["value"]["publicKeys"], + revoc_reg_def_values["revoc_reg_def"]["value"]["maxCredNum"], + revoc_reg_def_values["revoc_reg_def"]["value"]["tailsLocation"], + revoc_reg_def_values["revoc_reg_def"]["value"]["tailsHash"], + ) + + rev_reg_def = RevRegDef( + issuer_id=askar_issuer_rev_reg_def.tags.get("issuer_did"), + cred_def_id=cred_def_upgrade_obj.cred_def_id, + tag=revoc_reg_def_values["tag"], + type=revoc_reg_def_values["revoc_def_type"], + value=reg_def_value, + ) + + return RevRegDefUpgradeObj( + rev_reg_def_id, rev_reg_def, askar_reg_rev_def_private.value, is_active + ) + + +async def get_rev_list_upgrade_object( + profile: Profile, rev_reg_def_upgrade_obj: RevRegDefUpgradeObj +) -> RevListUpgradeObj: + """Get revocation entry upgrade object.""" + rev_reg = rev_reg_def_upgrade_obj.rev_reg_def + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_cred_rev_records = await storage.find_all_records( + IssuerCredRevRecord.RECORD_TYPE, + {"rev_reg_id": rev_reg_def_upgrade_obj.rev_reg_def_id}, + ) + + revocation_list = [0] * rev_reg.value.max_cred_num + for askar_cred_rev_record in askar_cred_rev_records: + if askar_cred_rev_record.tags.get("state") == "revoked": + revocation_list[int(askar_cred_rev_record.tags.get("cred_rev_id")) - 1] = 1 + + rev_list = RevList( + issuer_id=rev_reg.issuer_id, + rev_reg_def_id=rev_reg_def_upgrade_obj.rev_reg_def_id, + revocation_list=revocation_list, + current_accumulator=json.loads( + rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def.value + )["revoc_reg_entry"]["value"]["accum"], + ) + + return RevListUpgradeObj( + rev_list, + json.loads(rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def.value)[ + "pending_pub" + ], + rev_reg_def_upgrade_obj.rev_reg_def_id, + askar_cred_rev_records, + ) + + +async def upgrade_and_delete_schema_records( + txn, schema_upgrade_obj: SchemaUpgradeObj +) -> None: + """Upgrade and delete schema records.""" + schema_anoncreds = schema_upgrade_obj.schema + await txn.handle.remove("schema_sent", schema_upgrade_obj.old_record_id) + await txn.handle.replace( + CATEGORY_SCHEMA, + schema_upgrade_obj.schema_id, + schema_anoncreds.to_json(), + { + "name": schema_upgrade_obj.name, + "version": schema_upgrade_obj.version, + "issuer_id": schema_upgrade_obj.issuer_id, + "state": SchemaState.STATE_FINISHED, + }, + ) + + +async def upgrade_and_delete_cred_def_records( + txn, anoncreds_schema, cred_def_upgrade_obj: CredDefUpgradeObj +) -> None: + """Upgrade and delete cred def records.""" + cred_def_id = cred_def_upgrade_obj.cred_def_id + anoncreds_schema = anoncreds_schema.to_dict() + askar_cred_def = cred_def_upgrade_obj.askar_cred_def + await txn.handle.remove("cred_def_sent", askar_cred_def.id) + await txn.handle.replace( + CATEGORY_CRED_DEF, + cred_def_id, + cred_def_upgrade_obj.cred_def.to_json(), + tags={ + "schema_id": askar_cred_def.tags.get("schema_id"), + "schema_issuer_id": anoncreds_schema["issuerId"], + "issuer_id": askar_cred_def.tags.get("issuer_did"), + "schema_name": anoncreds_schema["name"], + "schema_version": anoncreds_schema["version"], + "state": CredDefState.STATE_FINISHED, + "epoch": askar_cred_def.tags.get("epoch"), + # TODO We need to keep track of these but tags probably + # isn't ideal. This suggests that a full record object + # is necessary for non-private values + "support_revocation": json.dumps(cred_def_upgrade_obj.revocation), + "max_cred_num": str(cred_def_upgrade_obj.max_cred_num or 0), + }, + ) + await txn.handle.replace( + CATEGORY_CRED_DEF_PRIVATE, + cred_def_id, + CredentialDefinitionPrivate.load( + cred_def_upgrade_obj.cred_def_private + ).to_json_buffer(), + ) + await txn.handle.replace( + CATEGORY_CRED_DEF_KEY_PROOF, + cred_def_id, + KeyCorrectnessProof.load(cred_def_upgrade_obj.key_proof).to_json_buffer(), + ) + + +rev_reg_states_mapping = { + "init": RevRegDefState.STATE_WAIT, + "generated": RevRegDefState.STATE_ACTION, + "posted": RevRegDefState.STATE_FINISHED, + "active": RevRegDefState.STATE_FINISHED, + "full": RevRegDefState.STATE_FULL, + "decommissioned": RevRegDefState.STATE_DECOMMISSIONED, +} + + +async def upgrade_and_delete_rev_reg_def_records( + txn, rev_reg_def_upgrade_obj: RevRegDefUpgradeObj +) -> None: + """Upgrade and delete rev reg def records.""" + rev_reg_def_id = rev_reg_def_upgrade_obj.rev_reg_def_id + askar_issuer_rev_reg_def = rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def + await txn.handle.remove(IssuerRevRegRecord.RECORD_TYPE, askar_issuer_rev_reg_def.id) + await txn.handle.replace( + CATEGORY_REV_REG_DEF, + rev_reg_def_id, + rev_reg_def_upgrade_obj.rev_reg_def.to_json(), + tags={ + "cred_def_id": rev_reg_def_upgrade_obj.rev_reg_def.cred_def_id, + "issuer_id": askar_issuer_rev_reg_def.tags.get("issuer_did"), + "state": rev_reg_states_mapping[askar_issuer_rev_reg_def.tags.get("state")], + "active": json.dumps(rev_reg_def_upgrade_obj.active), + }, + ) + await txn.handle.replace( + CATEGORY_REV_REG_DEF_PRIVATE, + rev_reg_def_id, + RevocationRegistryDefinitionPrivate.load( + rev_reg_def_upgrade_obj.rev_reg_def_private + ).to_json_buffer(), + ) + + +async def upgrade_and_delete_rev_entry_records( + txn, rev_list_upgrade_obj: RevListUpgradeObj +) -> None: + """Upgrade and delete revocation entry records.""" + next_index = 0 + for cred_rev_record in rev_list_upgrade_obj.cred_rev_records: + if int(cred_rev_record.tags.get("cred_rev_id")) > next_index: + next_index = int(cred_rev_record.tags.get("cred_rev_id")) + await txn.handle.remove(IssuerCredRevRecord.RECORD_TYPE, cred_rev_record.id) + + await txn.handle.insert( + CATEGORY_REV_LIST, + rev_list_upgrade_obj.rev_reg_def_id, + value_json={ + "rev_list": rev_list_upgrade_obj.rev_list.serialize(), + "pending": rev_list_upgrade_obj.pending, + "next_index": next_index + 1, + }, + tags={ + "state": RevListState.STATE_FINISHED, + "pending": json.dumps(rev_list_upgrade_obj.pending is not None), + }, + ) + + +async def upgrade_all_records_with_transaction( + txn: any, + schema_upgrade_objs: list[SchemaUpgradeObj], + cred_def_upgrade_objs: list[CredDefUpgradeObj], + rev_reg_def_upgrade_objs: list[RevRegDefUpgradeObj], + rev_list_upgrade_objs: list[RevListUpgradeObj], +) -> None: + """Upgrade all objects with transaction.""" + for schema_upgrade_obj in schema_upgrade_objs: + await upgrade_and_delete_schema_records(txn, schema_upgrade_obj) + for cred_def_upgrade_obj in cred_def_upgrade_objs: + await upgrade_and_delete_cred_def_records( + txn, schema_upgrade_obj.schema, cred_def_upgrade_obj + ) + for rev_reg_def_upgrade_obj in rev_reg_def_upgrade_objs: + await upgrade_and_delete_rev_reg_def_records(txn, rev_reg_def_upgrade_obj) + for rev_list_upgrade_obj in rev_list_upgrade_objs: + await upgrade_and_delete_rev_entry_records(txn, rev_list_upgrade_obj) + + await txn.commit() + + +async def get_rev_reg_def_upgrade_objs( + profile: Profile, + cred_def_upgrade_obj: CredDefUpgradeObj, + rev_list_upgrade_objs: list[RevListUpgradeObj], +) -> list[RevRegDefUpgradeObj]: + """Get rev reg def upgrade objects.""" + + rev_reg_def_upgrade_objs = [] + async with profile.session() as session: + storage = session.inject(BaseStorage) + # Must be sorted to find the active rev reg def + askar_issuer_rev_reg_def_records = sorted( + await storage.find_all_records( + IssuerRevRegRecord.RECORD_TYPE, + {"cred_def_id": cred_def_upgrade_obj.cred_def_id}, + ), + key=lambda x: json.loads(x.value)["created_at"], + ) + found_active = False + for askar_issuer_rev_reg_def in askar_issuer_rev_reg_def_records: + # active rev reg def is the oldest non-full and active rev reg def + if ( + not found_active + and askar_issuer_rev_reg_def.tags.get("state") != "full" + and askar_issuer_rev_reg_def.tags.get("state") == "active" + ): + found_active = True + is_active = True + + rev_reg_def_upgrade_obj = await get_rev_reg_def_upgrade_object( + profile, + cred_def_upgrade_obj, + askar_issuer_rev_reg_def, + is_active, + ) + is_active = False + rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def = askar_issuer_rev_reg_def + + rev_reg_def_upgrade_objs.append(rev_reg_def_upgrade_obj) + + # add the revocation list upgrade object from reg def upgrade object + rev_list_upgrade_objs.append( + await get_rev_list_upgrade_object(profile, rev_reg_def_upgrade_obj) + ) + return rev_reg_def_upgrade_objs + + +async def convert_records_to_anoncreds(profile) -> None: + """Convert and delete old askar records.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_schema_records = await storage.find_all_records(SCHEMA_SENT_RECORD_TYPE) + + schema_upgrade_objs = [] + cred_def_upgrade_objs = [] + rev_reg_def_upgrade_objs = [] + rev_list_upgrade_objs = [] + + for askar_schema in askar_schema_records: + schema_upgrade_objs.append( + await get_schema_upgrade_object(profile, askar_schema.id, askar_schema) + ) + + askar_cred_def_records = await storage.find_all_records( + CRED_DEF_SENT_RECORD_TYPE, {} + ) + for askar_cred_def in askar_cred_def_records: + cred_def_upgrade_obj = await get_cred_def_upgrade_object( + profile, askar_cred_def + ) + rev_reg_def_upgrade_objs = await get_rev_reg_def_upgrade_objs( + profile, cred_def_upgrade_obj, rev_list_upgrade_objs + ) + # update the cred_def with the max_cred_num from first rev_reg_def + if rev_reg_def_upgrade_objs: + cred_def_upgrade_obj.max_cred_num = rev_reg_def_upgrade_objs[ + 0 + ].rev_reg_def.value.max_cred_num + cred_def_upgrade_objs.append(cred_def_upgrade_obj) + + async with profile.transaction() as txn: + try: + await upgrade_all_records_with_transaction( + txn, + schema_upgrade_objs, + cred_def_upgrade_objs, + rev_reg_def_upgrade_objs, + rev_list_upgrade_objs, + ) + except Exception as e: + await txn.rollback() + raise e + + +async def retry_converting_records( + profile: Profile, upgrading_record: StorageRecord, retry: int, is_subwallet=False +) -> None: + """Retry converting records to anoncreds.""" + + async def clear_upgrade(): + async with profile.session() as session: + storage = session.inject(BaseStorage) + upgrade_singleton.remove_wallet(profile.name) + await storage.delete_record(upgrading_record) + + try: + await convert_records_to_anoncreds(profile) + await set_storage_type_and_update_profile_if_subwallet(profile, is_subwallet) + LOGGER.info(f"Upgrade complete via retry for wallet: {profile.name}") + if is_subwallet: + await clear_upgrade() + except Exception as e: + LOGGER.error(f"Error when upgrading records for wallet {profile.name} : {e} ") + if retry < max_retries: + LOGGER.info(f"Retry attempt {retry + 1} to upgrade wallet {profile.name}") + await asyncio.sleep(1) + await retry_converting_records( + profile, upgrading_record, retry + 1, is_subwallet + ) + else: + LOGGER.error( + f"""Failed to upgrade wallet: {profile.name} after 5 retries. + Your wallet may be in an inconsistent state. + Try fixing any connection issues and re-running the update""" + ) + await clear_upgrade() + + +async def upgrade_wallet_to_anoncreds(profile: Profile, is_subwallet=False) -> None: + """Get upgrading record and attempt to upgrade wallet to anoncreds.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + except StorageNotFoundError: + return + + try: + LOGGER.info("Upgrade in process for wallet: %s", profile.name) + upgrade_singleton.set_wallet(profile.name) + await convert_records_to_anoncreds(profile) + await set_storage_type_and_update_profile_if_subwallet( + profile, is_subwallet + ) + upgrade_singleton.remove_wallet(profile.name) + await storage.delete_record(upgrading_record) + except Exception as e: + LOGGER.error(f"Error when upgrading wallet {profile.name} : {e} ") + await retry_converting_records(profile, upgrading_record, 0, is_subwallet) + + +async def set_storage_type_to_anoncreds(profile: Profile): + """Set storage type to anoncreds.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + await storage.update_record(storage_type_record, "askar-anoncreds", {}) + # This should only happen for subwallets + except StorageNotFoundError: + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + "askar-anoncreds", + ) + ) + + +async def remove_upgrading_record(profile: Profile): + """Remove upgrading record.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + await storage.delete_record(upgrading_record) + except StorageNotFoundError: + return + + +async def set_storage_type_and_update_profile_if_subwallet( + profile: Profile, is_subwallet=False +): + """Upgrade wallet to anoncreds and set storage type.""" + async with profile.session() as session: + if is_subwallet: + multitenant_mgr = session.inject_or(BaseMultitenantManager) + wallet_id = profile.settings.get("wallet.id") + settings = {"wallet.type": "askar-anoncreds"} + await multitenant_mgr.update_wallet(wallet_id, settings) + await set_storage_type_to_anoncreds(profile) + LOGGER.info( + f"""Upgrade of subwallet {profile.settings.get('wallet.name')} has completed. Profile is now askar-anoncreds""" # noqa: E501 + ) + else: + await set_storage_type_to_anoncreds(profile) + LOGGER.info( + f"Upgrade of base wallet {profile.settings.get('wallet.name')} to anoncreds has completed. Shutting down agent." # noqa: E501 + ) + await remove_upgrading_record(profile) + asyncio.get_event_loop().stop() diff --git a/aries_cloudagent/wallet/routes.py b/aries_cloudagent/wallet/routes.py index 5dc222c0c7..3dc5d70cb4 100644 --- a/aries_cloudagent/wallet/routes.py +++ b/aries_cloudagent/wallet/routes.py @@ -1,5 +1,6 @@ """Wallet admin routes.""" +import asyncio import json import logging from typing import List, Optional, Tuple, Union @@ -55,9 +56,15 @@ is_author_role, ) from ..resolver.base import ResolverError +from ..storage.base import BaseStorage from ..storage.error import StorageError, StorageNotFoundError +from ..storage.record import StorageRecord +from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING from ..wallet.jwt import jwt_sign, jwt_verify from ..wallet.sd_jwt import sd_jwt_sign, sd_jwt_verify +from .anoncreds_upgrade import ( + upgrade_wallet_to_anoncreds, +) from .base import BaseWallet from .did_info import DIDInfo from .did_method import KEY, PEER2, PEER4, SOV, DIDMethod, DIDMethods, HolderDefinedDid @@ -1241,6 +1248,73 @@ async def wallet_rotate_did_keypair(request: web.BaseRequest): return web.json_response({}) +class UpgradeVerificationSchema(OpenAPISchema): + """Parameters and validators for triggering an upgrade to anoncreds.""" + + wallet_name = fields.Str( + required=True, + metadata={ + "description": "Name of wallet to upgrade to anoncreds", + "example": "base-wallet", + }, + ) + + +class UpgradeResultSchema(OpenAPISchema): + """Result schema for upgrade.""" + + +@docs( + tags=["anoncreds - wallet upgrade"], + summary=""" + Upgrade the wallet from askar to anoncreds - Be very careful with this! You + cannot go back! Trigger by entering the wallet name as a parameter. When the + upgrade is in progress the api will return a 503. For a base wallet + (either a non-multitenant wallet or the admin wallet in multitenant mode) + the agent will shut down after the upgrade. It is up to you to restart it with a + wallet-type in the configuration file of askar-anoncreds. For a subwallet + in multitenant mode the agent will continue to run after the upgrade. + All agents that have upgraded will need to use the new anoncreds endpoints. + They will receive a 403 on the old enpoints. + """, +) +@querystring_schema(UpgradeVerificationSchema()) +@response_schema(UpgradeResultSchema(), description="") +async def upgrade_anoncreds(request: web.BaseRequest): + """Request handler for triggering an upgrade to anoncreds. + + Args: + request: aiohttp request object + + Returns: + An empty JSON response + + """ + context: AdminRequestContext = request["context"] + profile = context.profile + + if profile.settings.get("wallet.name") != request.query.get("wallet_name"): + raise web.HTTPBadRequest( + reason="Wallet name parameter does not match the agent which triggered the upgrade" # noqa: E501 + ) + + if profile.settings.get("wallet.type") == "askar-anoncreds": + raise web.HTTPBadRequest(reason="Wallet type is already anoncreds") + + async with profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + await storage.add_record(upgrading_record) + asyncio.create_task( + upgrade_wallet_to_anoncreds(profile, context.metadata is not None) + ) + + return web.json_response({}) + + def register_events(event_bus: EventBus): """Subscribe to any events we need to support.""" event_bus.subscribe(EVENT_LISTENER_PATTERN, on_register_nym_event) @@ -1333,6 +1407,7 @@ async def register(app: web.Application): "/wallet/get-did-endpoint", wallet_get_did_endpoint, allow_head=False ), web.patch("/wallet/did/local/rotate-keypair", wallet_rotate_did_keypair), + web.post("/anoncreds/wallet/upgrade", upgrade_anoncreds), ] ) @@ -1356,3 +1431,13 @@ def post_process_routes(app: web.Application): }, } ) + app._state["swagger_dict"]["tags"].append( + { + "name": "anoncreds - wallet upgrade", + "description": "Anoncreds wallet upgrade", + "externalDocs": { + "description": "Specification", + "url": "https://hyperledger.github.io/anoncreds-spec", + }, + } + ) diff --git a/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py new file mode 100644 index 0000000000..141a12c86f --- /dev/null +++ b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py @@ -0,0 +1,210 @@ +from time import time +from unittest import IsolatedAsyncioTestCase + +from aries_cloudagent.tests import mock + +from ...anoncreds.issuer import CATEGORY_CRED_DEF_PRIVATE +from ...core.in_memory.profile import InMemoryProfile +from ...indy.credx.issuer import CATEGORY_CRED_DEF_KEY_PROOF +from ...messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE +from ...messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE +from ...multitenant.base import BaseMultitenantManager +from ...multitenant.manager import MultitenantManager +from ...storage.base import BaseStorage +from ...storage.record import StorageRecord +from ...storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING +from .. import anoncreds_upgrade +from ..upgrade_singleton import UpgradeSingleton + + +class TestAnoncredsUpgrade(IsolatedAsyncioTestCase): + def setUp(self) -> None: + self.profile = InMemoryProfile.test_profile( + settings={"wallet.type": "askar", "wallet.id": "test-wallet-id"} + ) + self.context = self.profile.context + self.context.injector.bind_instance( + BaseMultitenantManager, mock.MagicMock(MultitenantManager, autospec=True) + ) + + async def test_convert_records_to_anoncreds(self): + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + + schema_id = "GHjSbphAcdsrZrLjSvsjMp:2:faber-simple:1.1" + schema_id_parts = schema_id.split(":") + schema_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord(SCHEMA_SENT_RECORD_TYPE, schema_id, schema_tags) + ) + + credential_definition_id = "GHjSbphAcdsrZrLjSvsjMp:3:CL:8:default" + cred_def_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "issuer_did": "GHjSbphAcdsrZrLjSvsjMp", + "cred_def_id": credential_definition_id, + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord( + CRED_DEF_SENT_RECORD_TYPE, credential_definition_id, cred_def_tags + ) + ) + storage.get_record = mock.CoroutineMock( + side_effect=[ + StorageRecord( + CATEGORY_CRED_DEF_PRIVATE, + {"p_key": {"p": "123...782", "q": "234...456"}, "r_key": None}, + {}, + ), + StorageRecord( + CATEGORY_CRED_DEF_KEY_PROOF, + {"c": "103...961", "xz_cap": "563...205", "xr_cap": []}, + {}, + ), + ] + ) + anoncreds_upgrade.IndyLedgerRequestsExecutor = mock.MagicMock() + anoncreds_upgrade.IndyLedgerRequestsExecutor.return_value.get_ledger_for_identifier = mock.CoroutineMock( + return_value=( + None, + mock.MagicMock( + get_schema=mock.CoroutineMock( + return_value={ + "attrNames": [ + "name", + "age", + ], + }, + ), + get_credential_definition=mock.CoroutineMock( + return_value={ + "type": "CL", + "tag": "default", + "value": { + "primary": { + "n": "123", + }, + }, + }, + ), + ), + ) + ) + + with mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_schema_records" + ), mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_cred_def_records" + ): + await anoncreds_upgrade.convert_records_to_anoncreds(self.profile) + + async def test_retry_converting_records(self): + with mock.patch.object( + anoncreds_upgrade, "convert_records_to_anoncreds", mock.CoroutineMock() + ) as mock_convert_records_to_anoncreds: + mock_convert_records_to_anoncreds.side_effect = [ + Exception("Error"), + Exception("Error"), + None, + ] + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + await storage.add_record(upgrading_record) + await anoncreds_upgrade.retry_converting_records( + self.profile, upgrading_record, 0 + ) + upgrade_singleton = UpgradeSingleton() + assert mock_convert_records_to_anoncreds.called + assert mock_convert_records_to_anoncreds.call_count == 3 + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == "askar-anoncreds" + assert not upgrade_singleton.current_upgrades + + async def test_upgrade_wallet_to_anoncreds(self): + # upgrading record not present + await anoncreds_upgrade.upgrade_wallet_to_anoncreds(self.profile) + + # upgrading record present + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds(self.profile) + # Upgrading record should be deleted + assert len(storage.profile.records) == 1 + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == "askar-anoncreds" + + upgrade_singleton = UpgradeSingleton() + assert not upgrade_singleton.current_upgrades + + # retry called on exception + with mock.patch.object( + anoncreds_upgrade, + "convert_records_to_anoncreds", + mock.CoroutineMock(side_effect=[Exception("Error")]), + ) as mock_convert_records_to_anoncreds, mock.patch.object( + anoncreds_upgrade, "retry_converting_records", mock.CoroutineMock() + ) as mock_retry_converting_records: + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds(self.profile) + assert mock_retry_converting_records.called + + async def test_set_storage_type_to_anoncreds_no_existing_record(self): + await anoncreds_upgrade.set_storage_type_to_anoncreds(self.profile) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == "askar-anoncreds" + + async def test_set_storage_type_to_anoncreds_has_existing_record(self): + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + "askar", + ) + ) + await anoncreds_upgrade.set_storage_type_to_anoncreds(self.profile) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == "askar-anoncreds" + + async def test_update_if_subwallet_and_set_storage_type_with_subwallet(self): + + await anoncreds_upgrade.set_storage_type_and_update_profile_if_subwallet( + self.profile, True + ) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == "askar-anoncreds" + + async def test_update_if_subwallet_and_set_storage_type_with_base_wallet(self): + + await anoncreds_upgrade.set_storage_type_and_update_profile_if_subwallet( + self.profile, False + ) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == "askar-anoncreds" diff --git a/aries_cloudagent/wallet/tests/test_routes.py b/aries_cloudagent/wallet/tests/test_routes.py index f2b756de23..8fe3413096 100644 --- a/aries_cloudagent/wallet/tests/test_routes.py +++ b/aries_cloudagent/wallet/tests/test_routes.py @@ -1006,6 +1006,25 @@ async def test_rotate_did_keypair_x(self): with self.assertRaises(test_module.web.HTTPBadRequest): await test_module.wallet_rotate_did_keypair(self.request) + async def test_upgrade_anoncreds(self): + self.profile.settings["wallet.name"] = "test_wallet" + self.request.query = {"wallet_name": "not_test_wallet"} + with self.assertRaises(test_module.web.HTTPBadRequest): + await test_module.upgrade_anoncreds(self.request) + + self.request.query = {"wallet_name": "not_test_wallet"} + self.profile.settings["wallet.type"] = "askar-anoncreds" + with self.assertRaises(test_module.web.HTTPBadRequest): + await test_module.upgrade_anoncreds(self.request) + + self.request.query = {"wallet_name": "test_wallet"} + self.profile.settings["wallet.type"] = "askar" + result = await test_module.upgrade_anoncreds(self.request) + print(result) + _, upgrade_record = next(iter(self.profile.records.items())) + assert upgrade_record.type == "acapy_upgrading" + assert upgrade_record.value == "true" + async def test_register(self): mock_app = mock.MagicMock() mock_app.add_routes = mock.MagicMock() diff --git a/aries_cloudagent/wallet/upgrade_singleton.py b/aries_cloudagent/wallet/upgrade_singleton.py new file mode 100644 index 0000000000..a6913f05cc --- /dev/null +++ b/aries_cloudagent/wallet/upgrade_singleton.py @@ -0,0 +1,22 @@ +"""Singleton class to ensure that upgrade is isolated.""" + + +class UpgradeSingleton: + """Singleton class to ensure that upgrade is isolated.""" + + instance = None + current_upgrades = set() + + def __new__(cls, *args, **kwargs): + """Create a new instance of the class.""" + if cls.instance is None: + cls.instance = super().__new__(cls) + return cls.instance + + def set_wallet(self, wallet: str): + """Set a wallet name.""" + self.current_upgrades.add(wallet) + + def remove_wallet(self, wallet: str): + """Remove a wallet name.""" + self.current_upgrades.discard(wallet) diff --git a/demo/features/steps/0586-sign-transaction.py b/demo/features/steps/0586-sign-transaction.py index da112e14d5..406db972a0 100644 --- a/demo/features/steps/0586-sign-transaction.py +++ b/demo/features/steps/0586-sign-transaction.py @@ -761,7 +761,6 @@ def step_impl(context, holder_name, issuer_name): "/credentials", params={}, ) - assert len(cred_list["results"]) == 1 cred_id = cred_list["results"][0]["referent"] revoc_status_bool = False diff --git a/demo/features/steps/upgrade.py b/demo/features/steps/upgrade.py new file mode 100644 index 0000000000..fe23f2570e --- /dev/null +++ b/demo/features/steps/upgrade.py @@ -0,0 +1,24 @@ +"""Steps for upgrading the wallet to support anoncreds.""" + +from bdd_support.agent_backchannel_client import ( + agent_container_POST, + async_sleep, +) +from behave import given, then + + +@given('"{issuer}" upgrades the wallet to anoncreds') +@then('"{issuer}" upgrades the wallet to anoncreds') +def step_impl(context, issuer): + """Upgrade the wallet to support anoncreds.""" + agent = context.active_agents[issuer] + agent_container_POST( + agent["agent"], + "/anoncreds/wallet/upgrade", + data={}, + params={ + "wallet_name": agent["agent"].agent.wallet_name, + }, + ) + + async_sleep(2.0) diff --git a/demo/features/upgrade.feature b/demo/features/upgrade.feature new file mode 100644 index 0000000000..a1ee35801f --- /dev/null +++ b/demo/features/upgrade.feature @@ -0,0 +1,26 @@ +Feature: ACA-Py Anoncreds Upgrade + + @GHA + Scenario Outline: Using revocation api, issue, revoke credentials and publish + Given we have "3" agents + | name | role | capabilities | + | Acme | issuer | | + | Faber | verifier | | + | Bob | prover | | + And "" and "Bob" have an existing connection + And "Bob" has an issued credential from "" + And "" has written the credential definition for to the ledger + And "" has written the revocation registry definition to the ledger + And "" has written the revocation registry entry transaction to the ledger + And "" revokes the credential without publishing the entry + And "" authors a revocation registry entry publishing transaction + And "Faber" and "Bob" have an existing connection + When "Faber" sends a request for proof presentation to "Bob" + Then "Faber" has the proof verification fail + Then "Bob" can verify the credential from "" was revoked + And "" upgrades the wallet to anoncreds + And "Bob" has an issued credential from "" + + Examples: + | issuer | Acme_capabilities | Bob_capabilities | Schema_name | Credential_data | Proof_request | + | Acme | --revocation --public-did --multitenant | | driverslicense_v2 | Data_DL_MaxValues | DL_age_over_19_v2 | \ No newline at end of file diff --git a/demo/runners/faber.py b/demo/runners/faber.py index 8a8d83e687..e636c77f6f 100644 --- a/demo/runners/faber.py +++ b/demo/runners/faber.py @@ -481,15 +481,25 @@ async def main(args): options += " (D) Set Endorser's DID\n" if faber_agent.multitenant: options += " (W) Create and/or Enable Wallet\n" + options += " (U) Upgrade wallet to anoncreds \n" options += " (T) Toggle tracing on credential/proof exchange\n" options += " (X) Exit?\n[1/2/3/4/{}{}T/X] ".format( "5/6/7/8/" if faber_agent.revocation else "", "W/" if faber_agent.multitenant else "", ) + upgraded_to_anoncreds = False async for option in prompt_loop(options): if option is not None: option = option.strip() + # Anoncreds has different endpoints for revocation + is_anoncreds = False + if ( + faber_agent.agent.__dict__["wallet_type"] == "askar-anoncreds" + or upgraded_to_anoncreds + ): + is_anoncreds = True + if option is None or option in "xX": break @@ -739,11 +749,6 @@ async def main(args): await prompt("Publish now? [Y/N]: ", default="N") ).strip() in "yY" - # Anoncreds has different endpoints for revocation - is_anoncreds = False - if faber_agent.agent.__dict__["wallet_type"] == "askar-anoncreds": - is_anoncreds = True - try: endpoint = ( "/anoncreds/revocation/revoke" @@ -845,6 +850,14 @@ async def main(args): ) except ClientError: pass + elif option in "uU" and faber_agent.multitenant: + log_status("Upgrading wallet to anoncreds. Wait a couple seconds...") + await faber_agent.agent.admin_POST( + "/anoncreds/wallet/upgrade", + params={"wallet_name": faber_agent.agent.wallet_name}, + ) + upgraded_to_anoncreds = True + await asyncio.sleep(2.0) if faber_agent.show_timing: timing = await faber_agent.agent.fetch_timing() From 3c9c0a973cb7fdcf7a8c9b618b4e694b2307bb9d Mon Sep 17 00:00:00 2001 From: jamshale Date: Wed, 10 Apr 2024 15:40:01 +0000 Subject: [PATCH 2/7] Reduce swagger doc for anoncreds upgrade Signed-off-by: jamshale --- aries_cloudagent/wallet/routes.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/aries_cloudagent/wallet/routes.py b/aries_cloudagent/wallet/routes.py index 3dc5d70cb4..5a5fd8a6c0 100644 --- a/aries_cloudagent/wallet/routes.py +++ b/aries_cloudagent/wallet/routes.py @@ -1268,14 +1268,7 @@ class UpgradeResultSchema(OpenAPISchema): tags=["anoncreds - wallet upgrade"], summary=""" Upgrade the wallet from askar to anoncreds - Be very careful with this! You - cannot go back! Trigger by entering the wallet name as a parameter. When the - upgrade is in progress the api will return a 503. For a base wallet - (either a non-multitenant wallet or the admin wallet in multitenant mode) - the agent will shut down after the upgrade. It is up to you to restart it with a - wallet-type in the configuration file of askar-anoncreds. For a subwallet - in multitenant mode the agent will continue to run after the upgrade. - All agents that have upgraded will need to use the new anoncreds endpoints. - They will receive a 403 on the old enpoints. + cannot go back! See migration guide for more information. """, ) @querystring_schema(UpgradeVerificationSchema()) From 0fac453f1d0c362e5350b1bc37d529aea07805b6 Mon Sep 17 00:00:00 2001 From: jamshale Date: Wed, 10 Apr 2024 15:40:39 +0000 Subject: [PATCH 3/7] Add unit test for anoncreds upgrade fail Signed-off-by: jamshale --- .../wallet/tests/test_anoncreds_upgrade.py | 168 +++++++++++++++++- 1 file changed, 166 insertions(+), 2 deletions(-) diff --git a/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py index 141a12c86f..e1077c03ca 100644 --- a/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py +++ b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py @@ -1,10 +1,11 @@ +import asyncio from time import time from unittest import IsolatedAsyncioTestCase from aries_cloudagent.tests import mock from ...anoncreds.issuer import CATEGORY_CRED_DEF_PRIVATE -from ...core.in_memory.profile import InMemoryProfile +from ...core.in_memory.profile import InMemoryProfile, InMemoryProfileSession from ...indy.credx.issuer import CATEGORY_CRED_DEF_KEY_PROOF from ...messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE from ...messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE @@ -161,7 +162,7 @@ async def test_upgrade_wallet_to_anoncreds(self): anoncreds_upgrade, "convert_records_to_anoncreds", mock.CoroutineMock(side_effect=[Exception("Error")]), - ) as mock_convert_records_to_anoncreds, mock.patch.object( + ), mock.patch.object( anoncreds_upgrade, "retry_converting_records", mock.CoroutineMock() ) as mock_retry_converting_records: async with self.profile.session() as session: @@ -208,3 +209,166 @@ async def test_update_if_subwallet_and_set_storage_type_with_base_wallet(self): ) _, storage_type_record = next(iter(self.profile.records.items())) assert storage_type_record.value == "askar-anoncreds" + + async def test_failed_upgrade(self): + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + + schema_id = "GHjSbphAcdsrZrLjSvsjMp:2:faber-simple:1.1" + schema_id_parts = schema_id.split(":") + schema_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord(SCHEMA_SENT_RECORD_TYPE, schema_id, schema_tags) + ) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + "askar", + ) + ) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + ) + + credential_definition_id = "GHjSbphAcdsrZrLjSvsjMp:3:CL:8:default" + cred_def_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "issuer_did": "GHjSbphAcdsrZrLjSvsjMp", + "cred_def_id": credential_definition_id, + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord( + CRED_DEF_SENT_RECORD_TYPE, credential_definition_id, cred_def_tags + ) + ) + storage.get_record = mock.CoroutineMock( + side_effect=[ + StorageRecord( + CATEGORY_CRED_DEF_PRIVATE, + {"p_key": {"p": "123...782", "q": "234...456"}, "r_key": None}, + {}, + ), + StorageRecord( + CATEGORY_CRED_DEF_KEY_PROOF, + {"c": "103...961", "xz_cap": "563...205", "xr_cap": []}, + {}, + ), + ] + ) + anoncreds_upgrade.IndyLedgerRequestsExecutor = mock.MagicMock() + anoncreds_upgrade.IndyLedgerRequestsExecutor.return_value.get_ledger_for_identifier = mock.CoroutineMock( + return_value=( + None, + mock.MagicMock( + get_schema=mock.CoroutineMock( + return_value={ + "attrNames": [ + "name", + "age", + ], + }, + ), + get_credential_definition=mock.CoroutineMock( + return_value={ + "type": "CL", + "tag": "default", + "value": { + "primary": { + "n": "123", + }, + }, + }, + ), + ), + ) + ) + + with mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_schema_records" + ), mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_cred_def_records" + ), mock.patch.object( + InMemoryProfileSession, "rollback" + ) as mock_rollback, mock.patch.object( + InMemoryProfileSession, + "commit", + # Don't wait for sleep in retry to speed up test + ) as mock_commit, mock.patch.object( + asyncio, "sleep" + ): + """ + Only tests schemas and cred_defs failing to upgrade because the other objects are + hard to mock. These tests should be enough to cover them as the logic is the same. + """ + + # Schemas fails to upgrade + anoncreds_upgrade.upgrade_and_delete_schema_records = mock.CoroutineMock( + # Needs to fail 5 times because of the retry logic + side_effect=[ + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + ] + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds(self.profile) + assert mock_rollback.called + assert not mock_commit.called + # Upgrading record should not be deleted + with self.assertRaises(Exception): + await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + # Storage type should not be updated + assert storage_type_record.value == "askar" + # Upgrade singleton should be empty + upgrade_singleton = UpgradeSingleton() + assert upgrade_singleton.current_upgrades.__len__() == 0 + + # Cred_defs fails to upgrade + anoncreds_upgrade.upgrade_and_delete_cred_def_records = ( + mock.CoroutineMock( + side_effect=[ + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + ] + ) + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds(self.profile) + assert mock_rollback.called + assert not mock_commit.called + # Upgrading record should not be deleted + with self.assertRaises(Exception): + await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + # Storage type should not be updated + assert storage_type_record.value == "askar" + # Upgrade singleton should be empty + upgrade_singleton = UpgradeSingleton() + assert upgrade_singleton.current_upgrades.__len__() == 0 From eb5386c6aef8fd07f5685ae4792a099630a466ac Mon Sep 17 00:00:00 2001 From: jamshale Date: Wed, 10 Apr 2024 19:50:49 +0000 Subject: [PATCH 4/7] add update via api design doc Signed-off-by: jamshale --- docs/design/UpgradeViaApi.md | 50 ++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 docs/design/UpgradeViaApi.md diff --git a/docs/design/UpgradeViaApi.md b/docs/design/UpgradeViaApi.md new file mode 100644 index 0000000000..1bd0722161 --- /dev/null +++ b/docs/design/UpgradeViaApi.md @@ -0,0 +1,50 @@ +# Upgrade via API Design + +To isolate an upgrade process and trigger it via API the following pattern was designed to handle multitenant scenarios. It includes a per instance memory singleton and an is_upgrading record in the wallet(DB) and a middleware to prevent requests during the upgrade process. + +```mermaid +sequenceDiagram + participant A as Agent + participant M as Middleware + participant S as Singleton + participant W as Wallet (DB) + + Note over A: Start upgrade + A->>M: POST /any-upgrade-path + M-->>S: check wallet name + S-->>M: + M->>A: OK + A-->>S: add wallet name to set + A-->>W: update is_upgrading = true for wallet or subwallet + + Note over A: Attempted Request + A->>M: GET /any-endpoint + M-->>S: check wallet name + S-->>M: + M->>A: 503 Service Unavailable + + Note over A: Agent Restart + A-->>W: Get is_upgrading record for wallet or all subwallets + W-->>A: + A-->>S: Populate set with wallet names + + Note over A: Attempted Request + A->>M: GET /any-endpoint + M-->>S: check wallet name + S-->>M: + M->>A: 503 Service Unavailable + + Note over A: End upgrade + A-->>S: Remove wallet name from set + A-->>W: delete is_upgrading record for wallet + + Note over A: Attempted Request + A->>M: GET /any-endpoint + M-->>S: check wallet name + S-->>M: + M->>A: OK +``` + +#### To use this mehanism you simply need to set the upgrading record in the wallet (DB) and add the wallet name to the singleton set. The middleware will prevent requests from being processed until the upgrade process is finished. After the upgrade process is finished you must remove the wallet name from the set and delete the upgrading record in the wallet (DB). + +##### An example can be found via the anoncreds upgrade `aries_cloudagent/wallet/routes.py` in the `upgrade_anoncreds` controller. \ No newline at end of file From 6418dfd9f22ca57674a80c8046b42b3a9d306fc2 Mon Sep 17 00:00:00 2001 From: jamshale Date: Thu, 11 Apr 2024 15:04:51 +0000 Subject: [PATCH 5/7] Remove in_memory upgrade set for multi instance upgrading Signed-off-by: jamshale --- aries_cloudagent/admin/server.py | 15 ++++++----- .../admin/tests/test_admin_server.py | 27 ++++++++++++------- aries_cloudagent/wallet/anoncreds_upgrade.py | 6 ----- .../wallet/tests/test_anoncreds_upgrade.py | 12 --------- aries_cloudagent/wallet/upgrade_singleton.py | 22 --------------- docs/design/UpgradeViaApi.md | 27 +++++++++---------- 6 files changed, 38 insertions(+), 71 deletions(-) delete mode 100644 aries_cloudagent/wallet/upgrade_singleton.py diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index 4b7b7ffccc..e84b49d714 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -30,14 +30,15 @@ from ..messaging.responder import BaseResponder from ..messaging.valid import UUIDFour from ..multitenant.base import BaseMultitenantManager, MultitenantManagerError +from ..storage.base import BaseStorage from ..storage.error import StorageNotFoundError +from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING from ..transport.outbound.message import OutboundMessage from ..transport.outbound.status import OutboundSendStatus from ..transport.queue.basic import BasicMessageQueue from ..utils.stats import Collector from ..utils.task_queue import TaskQueue from ..version import __version__ -from ..wallet.upgrade_singleton import UpgradeSingleton from .base_server import BaseAdminServer from .error import AdminSetupError from .request_context import AdminRequestContext @@ -58,8 +59,6 @@ "acapy::keylist::updated": "keylist", } -upgrade_singleton = UpgradeSingleton() - class AdminModulesSchema(OpenAPISchema): """Schema for the modules endpoint.""" @@ -212,10 +211,14 @@ async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine): """Blocking middleware for upgrades.""" context: AdminRequestContext = request["context"] - if context._profile.name in upgrade_singleton.current_upgrades: - raise web.HTTPServiceUnavailable(reason="Upgrade in progress") + async with context.profile.session() as session: + storage = session.inject(BaseStorage) + try: + await storage.find_record(RECORD_TYPE_ACAPY_UPGRADING, tag_query={}) + except StorageNotFoundError: + return await handler(request) - return await handler(request) + raise web.HTTPServiceUnavailable(reason="Upgrade in progress") @web.middleware diff --git a/aries_cloudagent/admin/tests/test_admin_server.py b/aries_cloudagent/admin/tests/test_admin_server.py index 22a82baa7c..c4aa6b977f 100644 --- a/aries_cloudagent/admin/tests/test_admin_server.py +++ b/aries_cloudagent/admin/tests/test_admin_server.py @@ -14,9 +14,11 @@ from ...core.goal_code_registry import GoalCodeRegistry from ...core.in_memory import InMemoryProfile from ...core.protocol_registry import ProtocolRegistry +from ...storage.base import BaseStorage +from ...storage.record import StorageRecord +from ...storage.type import RECORD_TYPE_ACAPY_UPGRADING from ...utils.stats import Collector from ...utils.task_queue import TaskQueue -from ...wallet.upgrade_singleton import UpgradeSingleton from .. import server as test_module from ..request_context import AdminRequestContext from ..server import AdminServer, AdminSetupError @@ -480,10 +482,8 @@ async def test_server_health_state(self): await server.stop() async def test_upgrade_middleware(self): - upgrade_singleton = UpgradeSingleton() - self.context = AdminRequestContext.test_context( - {}, InMemoryProfile.test_profile() - ) + profile = InMemoryProfile.test_profile() + self.context = AdminRequestContext.test_context({}, profile) self.request_dict = { "context": self.context, } @@ -497,12 +497,19 @@ async def test_upgrade_middleware(self): await test_module.upgrade_middleware(request, handler) - upgrade_singleton.set_wallet("test-profile") - with self.assertRaises(test_module.web.HTTPServiceUnavailable): - await test_module.upgrade_middleware(request, handler) + async with profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + await storage.add_record(upgrading_record) - upgrade_singleton.remove_wallet("test-profile") - await test_module.upgrade_middleware(request, handler) + with self.assertRaises(test_module.web.HTTPServiceUnavailable): + await test_module.upgrade_middleware(request, handler) + + await storage.delete_record(upgrading_record) + await test_module.upgrade_middleware(request, handler) @pytest.fixture diff --git a/aries_cloudagent/wallet/anoncreds_upgrade.py b/aries_cloudagent/wallet/anoncreds_upgrade.py index dbe45639a8..0a8eaa923a 100644 --- a/aries_cloudagent/wallet/anoncreds_upgrade.py +++ b/aries_cloudagent/wallet/anoncreds_upgrade.py @@ -48,15 +48,12 @@ from ..storage.error import StorageNotFoundError from ..storage.record import StorageRecord from ..storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING -from .upgrade_singleton import UpgradeSingleton LOGGER = logging.getLogger(__name__) # Number of times to retry upgrading records max_retries = 5 -upgrade_singleton = UpgradeSingleton() - class SchemaUpgradeObj: """Schema upgrade object.""" @@ -540,7 +537,6 @@ async def retry_converting_records( async def clear_upgrade(): async with profile.session() as session: storage = session.inject(BaseStorage) - upgrade_singleton.remove_wallet(profile.name) await storage.delete_record(upgrading_record) try: @@ -579,12 +575,10 @@ async def upgrade_wallet_to_anoncreds(profile: Profile, is_subwallet=False) -> N try: LOGGER.info("Upgrade in process for wallet: %s", profile.name) - upgrade_singleton.set_wallet(profile.name) await convert_records_to_anoncreds(profile) await set_storage_type_and_update_profile_if_subwallet( profile, is_subwallet ) - upgrade_singleton.remove_wallet(profile.name) await storage.delete_record(upgrading_record) except Exception as e: LOGGER.error(f"Error when upgrading wallet {profile.name} : {e} ") diff --git a/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py index e1077c03ca..52c9bd79ab 100644 --- a/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py +++ b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py @@ -15,7 +15,6 @@ from ...storage.record import StorageRecord from ...storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING from .. import anoncreds_upgrade -from ..upgrade_singleton import UpgradeSingleton class TestAnoncredsUpgrade(IsolatedAsyncioTestCase): @@ -128,12 +127,10 @@ async def test_retry_converting_records(self): await anoncreds_upgrade.retry_converting_records( self.profile, upgrading_record, 0 ) - upgrade_singleton = UpgradeSingleton() assert mock_convert_records_to_anoncreds.called assert mock_convert_records_to_anoncreds.call_count == 3 _, storage_type_record = next(iter(self.profile.records.items())) assert storage_type_record.value == "askar-anoncreds" - assert not upgrade_singleton.current_upgrades async def test_upgrade_wallet_to_anoncreds(self): # upgrading record not present @@ -154,9 +151,6 @@ async def test_upgrade_wallet_to_anoncreds(self): _, storage_type_record = next(iter(self.profile.records.items())) assert storage_type_record.value == "askar-anoncreds" - upgrade_singleton = UpgradeSingleton() - assert not upgrade_singleton.current_upgrades - # retry called on exception with mock.patch.object( anoncreds_upgrade, @@ -339,9 +333,6 @@ async def test_failed_upgrade(self): ) # Storage type should not be updated assert storage_type_record.value == "askar" - # Upgrade singleton should be empty - upgrade_singleton = UpgradeSingleton() - assert upgrade_singleton.current_upgrades.__len__() == 0 # Cred_defs fails to upgrade anoncreds_upgrade.upgrade_and_delete_cred_def_records = ( @@ -369,6 +360,3 @@ async def test_failed_upgrade(self): ) # Storage type should not be updated assert storage_type_record.value == "askar" - # Upgrade singleton should be empty - upgrade_singleton = UpgradeSingleton() - assert upgrade_singleton.current_upgrades.__len__() == 0 diff --git a/aries_cloudagent/wallet/upgrade_singleton.py b/aries_cloudagent/wallet/upgrade_singleton.py deleted file mode 100644 index a6913f05cc..0000000000 --- a/aries_cloudagent/wallet/upgrade_singleton.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Singleton class to ensure that upgrade is isolated.""" - - -class UpgradeSingleton: - """Singleton class to ensure that upgrade is isolated.""" - - instance = None - current_upgrades = set() - - def __new__(cls, *args, **kwargs): - """Create a new instance of the class.""" - if cls.instance is None: - cls.instance = super().__new__(cls) - return cls.instance - - def set_wallet(self, wallet: str): - """Set a wallet name.""" - self.current_upgrades.add(wallet) - - def remove_wallet(self, wallet: str): - """Remove a wallet name.""" - self.current_upgrades.discard(wallet) diff --git a/docs/design/UpgradeViaApi.md b/docs/design/UpgradeViaApi.md index 1bd0722161..24af7cc8c3 100644 --- a/docs/design/UpgradeViaApi.md +++ b/docs/design/UpgradeViaApi.md @@ -1,50 +1,47 @@ # Upgrade via API Design -To isolate an upgrade process and trigger it via API the following pattern was designed to handle multitenant scenarios. It includes a per instance memory singleton and an is_upgrading record in the wallet(DB) and a middleware to prevent requests during the upgrade process. +#### To isolate an upgrade process and trigger it via API the following pattern was designed to handle multitenant scenarios. It includes an is_upgrading record in the wallet(DB) and a middleware to prevent requests during the upgrade process. ```mermaid sequenceDiagram participant A as Agent participant M as Middleware - participant S as Singleton participant W as Wallet (DB) Note over A: Start upgrade A->>M: POST /any-upgrade-path - M-->>S: check wallet name - S-->>M: + M-->>W: check is_upgrading + W-->>M: M->>A: OK - A-->>S: add wallet name to set A-->>W: update is_upgrading = true for wallet or subwallet Note over A: Attempted Request A->>M: GET /any-endpoint - M-->>S: check wallet name - S-->>M: + M-->>W: check is_upgrading + W-->>M: M->>A: 503 Service Unavailable Note over A: Agent Restart A-->>W: Get is_upgrading record for wallet or all subwallets W-->>A: - A-->>S: Populate set with wallet names + A->>A: Resume upgrade if needed Note over A: Attempted Request A->>M: GET /any-endpoint - M-->>S: check wallet name - S-->>M: + M-->>W: check is_upgrading + W-->>M: M->>A: 503 Service Unavailable Note over A: End upgrade - A-->>S: Remove wallet name from set - A-->>W: delete is_upgrading record for wallet + A-->>W: delete is_upgrading record from wallet Note over A: Attempted Request A->>M: GET /any-endpoint - M-->>S: check wallet name - S-->>M: + M-->>W: check is_upgrading + W-->>M: M->>A: OK ``` -#### To use this mehanism you simply need to set the upgrading record in the wallet (DB) and add the wallet name to the singleton set. The middleware will prevent requests from being processed until the upgrade process is finished. After the upgrade process is finished you must remove the wallet name from the set and delete the upgrading record in the wallet (DB). +#### To use this mehanism you simply need to set the upgrading record in the wallet (DB). The middleware will prevent requests from being processed until the upgrade process is finished. After the upgrade process is finished you must remove the upgrading record in the wallet (DB). ##### An example can be found via the anoncreds upgrade `aries_cloudagent/wallet/routes.py` in the `upgrade_anoncreds` controller. \ No newline at end of file From 9eaf7861762fb225cd00eb8ca5193ec264f907dc Mon Sep 17 00:00:00 2001 From: jamshale Date: Thu, 11 Apr 2024 17:13:06 +0000 Subject: [PATCH 6/7] Remove try statement from middleware Signed-off-by: jamshale --- aries_cloudagent/admin/server.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index e84b49d714..a5a6bff06a 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -211,12 +211,13 @@ async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine): """Blocking middleware for upgrades.""" context: AdminRequestContext = request["context"] + is_upgrading = [] async with context.profile.session() as session: storage = session.inject(BaseStorage) - try: - await storage.find_record(RECORD_TYPE_ACAPY_UPGRADING, tag_query={}) - except StorageNotFoundError: - return await handler(request) + is_upgrading = await storage.find_all_records(RECORD_TYPE_ACAPY_UPGRADING) + + if not is_upgrading: + return await handler(request) raise web.HTTPServiceUnavailable(reason="Upgrade in progress") From 29a9ff0a075855bba86738fc792f168813d4b59f Mon Sep 17 00:00:00 2001 From: jamshale Date: Wed, 24 Apr 2024 16:19:10 +0000 Subject: [PATCH 7/7] Fix import formatiing in conductor.py Signed-off-by: jamshale --- aries_cloudagent/core/conductor.py | 47 ++++++++++++++++-------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index 7c44ce11a8..59601cd419 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -18,12 +18,14 @@ from ..admin.base_server import BaseAdminServer from ..admin.server import AdminResponder, AdminServer -from ..commands.upgrade import (add_version_record, get_upgrade_version_list, - upgrade) +from ..commands.upgrade import add_version_record, get_upgrade_version_list, upgrade from ..config.default_context import ContextBuilder from ..config.injection_context import InjectionContext -from ..config.ledger import (get_genesis_transactions, ledger_config, - load_multiple_genesis_transactions_from_config) +from ..config.ledger import ( + get_genesis_transactions, + ledger_config, + load_multiple_genesis_transactions_from_config, +) from ..config.logging import LoggingConfigurator from ..config.provider import ClassProvider from ..config.wallet import wallet_config @@ -31,28 +33,30 @@ from ..indy.verifier import IndyVerifier from ..ledger.base import BaseLedger from ..ledger.error import LedgerConfigError, LedgerTransactionError -from ..ledger.multiple_ledger.base_manager import (BaseMultipleLedgerManager, - MultipleLedgerManagerError) -from ..ledger.multiple_ledger.ledger_requests_executor import \ - IndyLedgerRequestsExecutor -from ..ledger.multiple_ledger.manager_provider import \ - MultiIndyLedgerManagerProvider +from ..ledger.multiple_ledger.base_manager import ( + BaseMultipleLedgerManager, + MultipleLedgerManagerError, +) +from ..ledger.multiple_ledger.ledger_requests_executor import IndyLedgerRequestsExecutor +from ..ledger.multiple_ledger.manager_provider import MultiIndyLedgerManagerProvider from ..messaging.responder import BaseResponder from ..multitenant.base import BaseMultitenantManager from ..multitenant.manager_provider import MultitenantManagerProvider -from ..protocols.connections.v1_0.manager import (ConnectionManager, - ConnectionManagerError) -from ..protocols.connections.v1_0.messages.connection_invitation import \ - ConnectionInvitation -from ..protocols.coordinate_mediation.mediation_invite_store import \ - MediationInviteStore +from ..protocols.connections.v1_0.manager import ( + ConnectionManager, + ConnectionManagerError, +) +from ..protocols.connections.v1_0.messages.connection_invitation import ( + ConnectionInvitation, +) +from ..protocols.coordinate_mediation.mediation_invite_store import MediationInviteStore from ..protocols.coordinate_mediation.v1_0.manager import MediationManager from ..protocols.coordinate_mediation.v1_0.route_manager import RouteManager -from ..protocols.coordinate_mediation.v1_0.route_manager_provider import \ - RouteManagerProvider +from ..protocols.coordinate_mediation.v1_0.route_manager_provider import ( + RouteManagerProvider, +) from ..protocols.out_of_band.v1_0.manager import OutOfBandManager -from ..protocols.out_of_band.v1_0.messages.invitation import ( - HSProto, InvitationMessage) +from ..protocols.out_of_band.v1_0.messages.invitation import HSProto, InvitationMessage from ..storage.base import BaseStorage from ..storage.error import StorageNotFoundError from ..storage.record import StorageRecord @@ -60,8 +64,7 @@ from ..transport.inbound.manager import InboundTransportManager from ..transport.inbound.message import InboundMessage from ..transport.outbound.base import OutboundDeliveryError -from ..transport.outbound.manager import (OutboundTransportManager, - QueuedOutboundMessage) +from ..transport.outbound.manager import OutboundTransportManager, QueuedOutboundMessage from ..transport.outbound.message import OutboundMessage from ..transport.outbound.status import OutboundSendStatus from ..transport.wire_format import BaseWireFormat