From 84fc383da252d0a114ae7d230ebdbf62898628e7 Mon Sep 17 00:00:00 2001 From: jamshale <31809382+jamshale@users.noreply.github.com> Date: Wed, 8 May 2024 08:30:58 -0700 Subject: [PATCH] Upgrade to anoncreds via api endpoint (#2922) * Add upgrade to anoncreds via api Signed-off-by: jamshale --------- Signed-off-by: jamshale --- aries_cloudagent/admin/server.py | 53 +- .../admin/tests/test_admin_server.py | 55 +- .../anoncreds/default/legacy_indy/registry.py | 6 +- .../legacy_indy/tests/test_registry.py | 32 +- aries_cloudagent/core/conductor.py | 67 +- aries_cloudagent/core/tests/test_conductor.py | 67 +- aries_cloudagent/ledger/base.py | 2 +- aries_cloudagent/ledger/indy.py | 4 +- aries_cloudagent/ledger/indy_vdr.py | 4 +- aries_cloudagent/multitenant/manager.py | 8 + .../endorse_transaction/v1_0/manager.py | 9 +- aries_cloudagent/storage/type.py | 4 + aries_cloudagent/utils/profiles.py | 28 + aries_cloudagent/wallet/anoncreds_upgrade.py | 719 ++++++++++++++++++ aries_cloudagent/wallet/routes.py | 87 +++ aries_cloudagent/wallet/singletons.py | 43 ++ .../wallet/tests/test_anoncreds_upgrade.py | 406 ++++++++++ aries_cloudagent/wallet/tests/test_routes.py | 22 + demo/features/steps/0586-sign-transaction.py | 1 - demo/features/steps/upgrade.py | 24 + demo/features/upgrade.feature | 29 + demo/runners/faber.py | 25 +- docs/design/UpgradeViaApi.md | 103 +++ 23 files changed, 1730 insertions(+), 68 deletions(-) create mode 100644 aries_cloudagent/wallet/anoncreds_upgrade.py create mode 100644 aries_cloudagent/wallet/singletons.py create mode 100644 aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py create mode 100644 demo/features/steps/upgrade.py create mode 100644 demo/features/upgrade.feature create mode 100644 docs/design/UpgradeViaApi.md diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index c5fb4dc516..2fe7dd1b61 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -18,9 +18,10 @@ setup_aiohttp_apispec, validation_middleware, ) - from marshmallow import fields +from aries_cloudagent.wallet import singletons + from ..config.injection_context import InjectionContext from ..config.logging import context_wallet_id from ..core.event_bus import Event, EventBus @@ -31,13 +32,16 @@ from ..messaging.responder import BaseResponder from ..messaging.valid import UUIDFour from ..multitenant.base import BaseMultitenantManager, MultitenantManagerError +from ..storage.base import BaseStorage from ..storage.error import StorageNotFoundError +from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING from ..transport.outbound.message import OutboundMessage from ..transport.outbound.status import OutboundSendStatus from ..transport.queue.basic import BasicMessageQueue from ..utils.stats import Collector from ..utils.task_queue import TaskQueue from ..version import __version__ +from ..wallet.anoncreds_upgrade import check_upgrade_completion_loop from .base_server import BaseAdminServer from .error import AdminSetupError from .request_context import AdminRequestContext @@ -58,6 +62,9 @@ "acapy::keylist::updated": "keylist", } +anoncreds_wallets = singletons.IsAnoncredsSingleton().wallets +in_progress_upgrades = singletons.UpgradeInProgressSingleton() + class AdminModulesSchema(OpenAPISchema): """Schema for the modules endpoint.""" @@ -205,6 +212,40 @@ async def ready_middleware(request: web.BaseRequest, handler: Coroutine): raise web.HTTPServiceUnavailable(reason="Shutdown in progress") +@web.middleware +async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine): + """Blocking middleware for upgrades.""" + context: AdminRequestContext = request["context"] + + # Already upgraded + if context.profile.name in anoncreds_wallets: + return await handler(request) + + # Upgrade in progress + if context.profile.name in in_progress_upgrades.wallets: + raise web.HTTPServiceUnavailable(reason="Upgrade in progress") + + # Avoid try/except in middleware with find_all_records + upgrade_initiated = [] + async with context.profile.session() as session: + storage = session.inject(BaseStorage) + upgrade_initiated = await storage.find_all_records(RECORD_TYPE_ACAPY_UPGRADING) + if upgrade_initiated: + # If we get here, than another instance started an upgrade + # We need to check for completion (or fail) in another process + in_progress_upgrades.set_wallet(context.profile.name) + is_subwallet = context.metadata and "wallet_id" in context.metadata + asyncio.create_task( + check_upgrade_completion_loop( + context.profile, + is_subwallet, + ) + ) + raise web.HTTPServiceUnavailable(reason="Upgrade in progress") + + return await handler(request) + + @web.middleware async def debug_middleware(request: web.BaseRequest, handler: Coroutine): """Show request detail in debug log.""" @@ -351,6 +392,8 @@ async def check_multitenant_authorization(request: web.Request, handler): is_multitenancy_path = path.startswith("/multitenancy") is_server_path = path in self.server_paths or path == "/features" + # allow base wallets to trigger update through api + is_upgrade_path = path.startswith("/anoncreds/wallet/upgrade") # subwallets are not allowed to access multitenancy routes if authorization_header and is_multitenancy_path: @@ -380,6 +423,7 @@ async def check_multitenant_authorization(request: web.Request, handler): and not is_unprotected_path(path) and not base_limited_access_path and not (request.method == "OPTIONS") # CORS fix + and not is_upgrade_path ): raise web.HTTPUnauthorized() @@ -453,6 +497,9 @@ async def setup_context(request: web.Request, handler): middlewares.append(setup_context) + # Upgrade middleware needs the context setup + middlewares.append(upgrade_middleware) + # Register validation_middleware last avoiding unauthorized validations middlewares.append(validation_middleware) @@ -583,6 +630,10 @@ def sort_dict(raw: dict) -> dict: async def stop(self) -> None: """Stop the webserver.""" + # Stopped before admin server is created + if not self.app: + return + self.app._state["ready"] = False # in case call does not come through OpenAPI for queue in self.websocket_queues.values(): queue.stop() diff --git a/aries_cloudagent/admin/tests/test_admin_server.py b/aries_cloudagent/admin/tests/test_admin_server.py index 300e82f758..d74b9d1139 100644 --- a/aries_cloudagent/admin/tests/test_admin_server.py +++ b/aries_cloudagent/admin/tests/test_admin_server.py @@ -1,22 +1,28 @@ import gc import json +from unittest import IsolatedAsyncioTestCase import pytest -from aries_cloudagent.tests import mock -from unittest import IsolatedAsyncioTestCase from aiohttp import ClientSession, DummyCookieJar, TCPConnector, web from aiohttp.test_utils import unused_port +from aries_cloudagent.tests import mock +from aries_cloudagent.wallet import singletons + from ...config.default_context import DefaultContextBuilder from ...config.injection_context import InjectionContext from ...core.event_bus import Event +from ...core.goal_code_registry import GoalCodeRegistry from ...core.in_memory import InMemoryProfile from ...core.protocol_registry import ProtocolRegistry -from ...core.goal_code_registry import GoalCodeRegistry +from ...storage.base import BaseStorage +from ...storage.record import StorageRecord +from ...storage.type import RECORD_TYPE_ACAPY_UPGRADING from ...utils.stats import Collector from ...utils.task_queue import TaskQueue - +from ...wallet.anoncreds_upgrade import UPGRADING_RECORD_IN_PROGRESS from .. import server as test_module +from ..request_context import AdminRequestContext from ..server import AdminServer, AdminSetupError @@ -477,6 +483,47 @@ async def test_server_health_state(self): assert response.status == 503 await server.stop() + async def test_upgrade_middleware(self): + profile = InMemoryProfile.test_profile() + self.context = AdminRequestContext.test_context({}, profile) + self.request_dict = { + "context": self.context, + } + request = mock.MagicMock( + method="GET", + path_qs="/schemas/created", + match_info={}, + __getitem__=lambda _, k: self.request_dict[k], + ) + handler = mock.CoroutineMock() + + await test_module.upgrade_middleware(request, handler) + + async with profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + UPGRADING_RECORD_IN_PROGRESS, + ) + # No upgrade in progress + await storage.add_record(upgrading_record) + + # Upgrade in progress without cache + with self.assertRaises(test_module.web.HTTPServiceUnavailable): + await test_module.upgrade_middleware(request, handler) + + # Upgrade in progress with cache + singletons.UpgradeInProgressSingleton().set_wallet("test-profile") + with self.assertRaises(test_module.web.HTTPServiceUnavailable): + await test_module.upgrade_middleware(request, handler) + + singletons.UpgradeInProgressSingleton().remove_wallet("test-profile") + await storage.delete_record(upgrading_record) + + # Upgrade in progress with cache + singletons.IsAnoncredsSingleton().set_wallet("test-profile") + await test_module.upgrade_middleware(request, handler) + @pytest.fixture async def server(): diff --git a/aries_cloudagent/anoncreds/default/legacy_indy/registry.py b/aries_cloudagent/anoncreds/default/legacy_indy/registry.py index 5bed179f96..31eca4f977 100644 --- a/aries_cloudagent/anoncreds/default/legacy_indy/registry.py +++ b/aries_cloudagent/anoncreds/default/legacy_indy/registry.py @@ -1123,7 +1123,7 @@ async def fix_ledger_entry( async def txn_submit( self, - profile: Profile, + ledger: BaseLedger, ledger_transaction: str, sign: bool = None, taa_accept: bool = None, @@ -1131,10 +1131,6 @@ async def txn_submit( write_ledger: bool = True, ) -> str: """Submit a transaction to the ledger.""" - ledger = profile.inject(BaseLedger) - - if not ledger: - raise LedgerError("No ledger available") try: async with ledger: diff --git a/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py b/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py index 58631cfadb..830a0bb722 100644 --- a/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py +++ b/aries_cloudagent/anoncreds/default/legacy_indy/tests/test_registry.py @@ -9,7 +9,6 @@ from base58 import alphabet from .....anoncreds.base import ( - AnonCredsRegistrationError, AnonCredsSchemaAlreadyExists, ) from .....anoncreds.models.anoncreds_schema import ( @@ -21,7 +20,7 @@ from .....connections.models.conn_record import ConnRecord from .....core.in_memory.profile import InMemoryProfile from .....ledger.base import BaseLedger -from .....ledger.error import LedgerError, LedgerObjectAlreadyExistsError +from .....ledger.error import LedgerObjectAlreadyExistsError from .....messaging.responder import BaseResponder from .....protocols.endorse_transaction.v1_0.manager import ( TransactionManager, @@ -728,27 +727,16 @@ async def test_register_revocation_registry_definition_with_create_transaction_a assert mock_create_record.called async def test_txn_submit(self): - self.profile.inject = mock.MagicMock( - side_effect=[ - None, - mock.CoroutineMock( - txn_submit=mock.CoroutineMock(side_effect=LedgerError("test error")) - ), - mock.CoroutineMock( - txn_submit=mock.CoroutineMock(return_value="transaction response") - ), - ] + self.profile.context.injector.bind_instance( + BaseLedger, + mock.MagicMock( + txn_submit=mock.CoroutineMock(return_value="transaction_id") + ), ) - - # No ledger - with self.assertRaises(LedgerError): - await self.registry.txn_submit(self.profile, "test_txn") - # Write error - with self.assertRaises(AnonCredsRegistrationError): - await self.registry.txn_submit(self.profile, "test_txn") - - result = await self.registry.txn_submit(self.profile, "test_txn") - assert result == "transaction response" + async with self.profile.session() as session: + ledger = session.inject(BaseLedger) + result = await self.registry.txn_submit(ledger, "test_txn") + assert result == "transaction_id" async def test_register_revocation_list_no_endorsement(self): self.profile.context.injector.bind_instance( diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index d1f44e68ab..ffa3f4964e 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -7,6 +7,7 @@ """ +import asyncio import hashlib import json import logging @@ -17,12 +18,8 @@ from ..admin.base_server import BaseAdminServer from ..admin.server import AdminResponder, AdminServer -from ..commands.upgrade import ( - add_version_record, - get_upgrade_version_list, - upgrade, -) -from ..config.default_context import ContextBuilder +from ..commands.upgrade import add_version_record, get_upgrade_version_list, upgrade +from ..config.default_context import ContextBuilder, DefaultContextBuilder from ..config.injection_context import InjectionContext from ..config.ledger import ( get_genesis_transactions, @@ -63,7 +60,11 @@ from ..storage.base import BaseStorage from ..storage.error import StorageNotFoundError from ..storage.record import StorageRecord -from ..storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE +from ..storage.type import ( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + STORAGE_TYPE_VALUE_ANONCREDS, + STORAGE_TYPE_VALUE_ASKAR, +) from ..transport.inbound.manager import InboundTransportManager from ..transport.inbound.message import InboundMessage from ..transport.outbound.base import OutboundDeliveryError @@ -71,10 +72,12 @@ from ..transport.outbound.message import OutboundMessage from ..transport.outbound.status import OutboundSendStatus from ..transport.wire_format import BaseWireFormat +from ..utils.profiles import get_subwallet_profiles_from_storage from ..utils.stats import Collector from ..utils.task_queue import CompletedTask, TaskQueue from ..vc.ld_proofs.document_loader import DocumentLoader from ..version import RECORD_TYPE_ACAPY_VERSION, __version__ +from ..wallet.anoncreds_upgrade import upgrade_wallet_to_anoncreds_if_requested from ..wallet.did_info import DIDInfo from .dispatcher import Dispatcher from .error import StartupError @@ -111,6 +114,8 @@ def __init__(self, context_builder: ContextBuilder) -> None: self.root_profile: Profile = None self.setup_public_did: DIDInfo = None + force_agent_anoncreds = False + @property def context(self) -> InjectionContext: """Accessor for the injection context.""" @@ -121,6 +126,9 @@ async def setup(self): context = await self.context_builder.build_context() + if self.force_agent_anoncreds: + context.settings.set_value("wallet.type", "askar-anoncreds") + # Fetch genesis transactions if necessary if context.settings.get("ledger.ledger_config_list"): await load_multiple_genesis_transactions_from_config(context.settings) @@ -522,7 +530,9 @@ async def start(self) -> None: except Exception: LOGGER.exception("Error accepting mediation invitation") - # notify protocols of startup status + await self.check_for_wallet_upgrades_in_progress() + + # notify protcols of startup status await self.root_profile.notify(STARTUP_EVENT_TOPIC, {}) async def stop(self, timeout=1.0): @@ -796,8 +806,9 @@ async def check_for_valid_wallet_type(self, profile): ) except StorageNotFoundError: acapy_version = None + # Any existing agent will have acapy_version record if acapy_version: - storage_type_from_storage = "askar" + storage_type_from_storage = STORAGE_TYPE_VALUE_ASKAR LOGGER.info( f"Existing agent found. Setting wallet type to {storage_type_from_storage}." # noqa: E501 ) @@ -820,6 +831,38 @@ async def check_for_valid_wallet_type(self, profile): ) if storage_type_from_storage != storage_type_from_config: - raise StartupError( - f"Wallet type config [{storage_type_from_config}] doesn't match with the wallet type in storage [{storage_type_record.value}]" # noqa: E501 - ) + if ( + storage_type_from_config == STORAGE_TYPE_VALUE_ASKAR + and storage_type_from_storage == STORAGE_TYPE_VALUE_ANONCREDS + ): + LOGGER.warning( + "The agent has been upgrade to use anoncreds wallet. Please update the wallet.type in the config file to 'askar-anoncreds'" # noqa: E501 + ) + # Allow agent to create anoncreds profile with askar + # wallet type config by stopping conductor and reloading context + await self.stop() + self.force_agent_anoncreds = True + self.context.settings.set_value("wallet.type", "askar-anoncreds") + self.context_builder = DefaultContextBuilder(self.context.settings) + await self.setup() + else: + raise StartupError( + f"Wallet type config [{storage_type_from_config}] doesn't match with the wallet type in storage [{storage_type_record.value}]" # noqa: E501 + ) + + async def check_for_wallet_upgrades_in_progress(self): + """Check for upgrade and upgrade if needed.""" + multitenant_mgr = self.context.inject_or(BaseMultitenantManager) + if multitenant_mgr: + subwallet_profiles = await get_subwallet_profiles_from_storage( + self.root_profile + ) + await asyncio.gather( + *[ + upgrade_wallet_to_anoncreds_if_requested(profile, is_subwallet=True) + for profile in subwallet_profiles + ] + ) + + else: + await upgrade_wallet_to_anoncreds_if_requested(self.root_profile) diff --git a/aries_cloudagent/core/tests/test_conductor.py b/aries_cloudagent/core/tests/test_conductor.py index 55a8ab0c4b..685e420005 100644 --- a/aries_cloudagent/core/tests/test_conductor.py +++ b/aries_cloudagent/core/tests/test_conductor.py @@ -117,6 +117,8 @@ async def test_startup_version_record_exists(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -166,6 +168,7 @@ async def test_startup_version_record_exists(self): mock_inbound_mgr.return_value.stop.assert_awaited_once_with() mock_outbound_mgr.return_value.stop.assert_awaited_once_with() + assert mock_upgrade.called async def test_startup_version_no_upgrade_add_record(self): builder: ContextBuilder = StubContextBuilder(self.test_settings) @@ -176,6 +179,8 @@ async def test_startup_version_no_upgrade_add_record(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -213,6 +218,8 @@ async def test_startup_version_no_upgrade_add_record(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -257,6 +264,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -296,6 +305,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -335,6 +346,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -373,6 +386,8 @@ async def test_startup_version_record_not_exists(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -449,6 +464,8 @@ async def test_startup_no_public_did(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -492,6 +509,8 @@ async def test_stats(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger: mock_inbound_mgr.return_value.sessions = ["dummy"] @@ -884,6 +903,8 @@ async def test_admin(self): ) as admin_start, mock.patch.object( admin, "stop", autospec=True ) as admin_stop, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -936,6 +957,8 @@ async def test_admin_startx(self): ) as oob_mgr, mock.patch.object( test_module, "ConnectionManager" ) as conn_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -992,7 +1015,9 @@ async def test_start_static(self): ), ), mock.patch.object( test_module, "OutboundTransportManager", autospec=True - ) as mock_outbound_mgr: + ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade: mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } @@ -1166,7 +1191,9 @@ async def test_print_invite_connection(self): ), ), mock.patch.object( test_module, "OutboundTransportManager", autospec=True - ) as mock_outbound_mgr: + ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade: mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } @@ -1203,6 +1230,8 @@ async def test_clear_default_mediator(self): "MediationManager", return_value=mock.MagicMock(clear_default_mediator=mock.CoroutineMock()), ) as mock_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1254,7 +1283,9 @@ async def test_set_default_mediator(self): mock.MagicMock(value=f"v{__version__}"), ] ), - ): + ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade: await conductor.start() await conductor.stop() mock_mgr.return_value.set_default_mediator_by_id.assert_called_once() @@ -1277,6 +1308,8 @@ async def test_set_default_mediator_x(self): "retrieve_by_id", mock.CoroutineMock(side_effect=Exception()), ), mock.patch.object(test_module, "LOGGER") as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1425,6 +1458,8 @@ async def test_mediator_invitation_0160(self, mock_from_url, _): ) as mock_mgr, mock.patch.object( mock_conn_record, "metadata_set", mock.CoroutineMock() ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1484,6 +1519,8 @@ async def test_mediator_invitation_0434(self, mock_from_url, _): ) ), ) as mock_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1542,6 +1579,8 @@ async def test_mediation_invitation_should_use_stored_invitation( ), mock.patch.object( test_module, "MediationManager", return_value=mock_mediation_manager ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1596,7 +1635,9 @@ async def test_mediation_invitation_should_not_create_connection_for_old_invitat mock.MagicMock(value=f"v{__version__}"), ] ), - ): + ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade: # when await conductor.start() await conductor.stop() @@ -1631,6 +1672,8 @@ async def test_mediator_invitation_x(self, _): ) as mock_from_url, mock.patch.object( test_module, "LOGGER" ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1694,6 +1737,8 @@ async def test_startup_x_no_storage_version(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LOGGER" ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1735,6 +1780,8 @@ async def test_startup_storage_type_exists_and_matches(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1774,7 +1821,7 @@ async def test_startup_storage_type_exists_and_matches(self): await conductor.stop() - async def test_startup_storage_type_exists_and_does_not_match(self): + async def test_startup_storage_type_anoncreds_and_config_askar_re_calls_setup(self): builder: ContextBuilder = StubContextBuilder(self.test_settings) conductor = test_module.Conductor(builder) @@ -1785,6 +1832,8 @@ async def test_startup_storage_type_exists_and_does_not_match(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1819,9 +1868,9 @@ async def test_startup_storage_type_exists_and_does_not_match(self): mock_inbound_mgr.return_value.registered_transports = {} mock_outbound_mgr.return_value.registered_transports = {} - - with self.assertRaises(test_module.StartupError): + with mock.patch.object(test_module.Conductor, "setup") as mock_setup: await conductor.start() + assert mock_setup.called await conductor.stop() @@ -1838,6 +1887,8 @@ async def test_startup_storage_type_does_not_exist_and_existing_agent_then_set_t ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1902,6 +1953,8 @@ async def test_startup_storage_type_does_not_exist_and_new_anoncreds_agent( ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds_if_requested", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( diff --git a/aries_cloudagent/ledger/base.py b/aries_cloudagent/ledger/base.py index 1ebc8926c5..509dc48d4b 100644 --- a/aries_cloudagent/ledger/base.py +++ b/aries_cloudagent/ledger/base.py @@ -643,7 +643,7 @@ async def send_schema_anoncreds( try: legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, schema_req, sign=True, sign_did=public_info, diff --git a/aries_cloudagent/ledger/indy.py b/aries_cloudagent/ledger/indy.py index 9dc18cbc7a..9967500546 100644 --- a/aries_cloudagent/ledger/indy.py +++ b/aries_cloudagent/ledger/indy.py @@ -1186,7 +1186,7 @@ async def send_revoc_reg_def( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, rev_reg_def_req, sign=True, sign_did=did_info, @@ -1255,7 +1255,7 @@ async def send_revoc_reg_entry( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, rev_reg_def_entry_req, sign=True, sign_did=did_info, diff --git a/aries_cloudagent/ledger/indy_vdr.py b/aries_cloudagent/ledger/indy_vdr.py index 7444311343..1c04677e2d 100644 --- a/aries_cloudagent/ledger/indy_vdr.py +++ b/aries_cloudagent/ledger/indy_vdr.py @@ -1147,7 +1147,7 @@ async def send_revoc_reg_def( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, rev_reg_def_req, sign=True, sign_did=did_info, @@ -1222,7 +1222,7 @@ async def send_revoc_reg_entry( legacy_indy_registry = LegacyIndyRegistry() resp = await legacy_indy_registry.txn_submit( - self.profile, + self, revoc_reg_entry_req, sign=True, sign_did=did_info, diff --git a/aries_cloudagent/multitenant/manager.py b/aries_cloudagent/multitenant/manager.py index 550389f0db..4f1cf89134 100644 --- a/aries_cloudagent/multitenant/manager.py +++ b/aries_cloudagent/multitenant/manager.py @@ -3,6 +3,7 @@ import logging from typing import Iterable, Optional +from ..askar.profile_anon import AskarAnoncredsProfile from ..config.injection_context import InjectionContext from ..config.wallet import wallet_config from ..core.profile import Profile @@ -84,6 +85,13 @@ async def get_wallet_profile( profile, _ = await wallet_config(context, provision=provision) self._profiles.put(wallet_id, profile) + # return anoncreds profile if explicitly set as wallet type + if profile.context.settings.get("wallet.type") == "askar-anoncreds": + return AskarAnoncredsProfile( + profile.opened, + profile.context, + ) + return profile async def update_wallet(self, wallet_id: str, new_settings: dict) -> WalletRecord: diff --git a/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py b/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py index a96a2e8fe9..293a8448ef 100644 --- a/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py +++ b/aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py @@ -415,6 +415,9 @@ async def complete_transaction( if (not endorser) and ( txn_goal_code != TransactionRecord.WRITE_DID_TRANSACTION ): + ledger = self.profile.inject(BaseLedger) + if not ledger: + raise TransactionManagerError("No ledger available") if ( self._profile.context.settings.get_value("wallet.type") == "askar-anoncreds" @@ -425,13 +428,9 @@ async def complete_transaction( legacy_indy_registry = LegacyIndyRegistry() ledger_response_json = await legacy_indy_registry.txn_submit( - self._profile, ledger_transaction, sign=False, taa_accept=False + ledger, ledger_transaction, sign=False, taa_accept=False ) else: - ledger = self.profile.inject(BaseLedger) - if not ledger: - raise TransactionManagerError("No ledger available") - async with ledger: try: ledger_response_json = await shield( diff --git a/aries_cloudagent/storage/type.py b/aries_cloudagent/storage/type.py index 7a0cc9aab7..ea4279377f 100644 --- a/aries_cloudagent/storage/type.py +++ b/aries_cloudagent/storage/type.py @@ -1,3 +1,7 @@ """Library version information.""" RECORD_TYPE_ACAPY_STORAGE_TYPE = "acapy_storage_type" +RECORD_TYPE_ACAPY_UPGRADING = "acapy_upgrading" + +STORAGE_TYPE_VALUE_ANONCREDS = "askar-anoncreds" +STORAGE_TYPE_VALUE_ASKAR = "askar" diff --git a/aries_cloudagent/utils/profiles.py b/aries_cloudagent/utils/profiles.py index 45a440ed79..d5433f3afd 100644 --- a/aries_cloudagent/utils/profiles.py +++ b/aries_cloudagent/utils/profiles.py @@ -1,10 +1,15 @@ """Profile utilities.""" +import json + from aiohttp import web from ..anoncreds.error_messages import ANONCREDS_PROFILE_REQUIRED_MSG from ..askar.profile_anon import AskarAnoncredsProfile from ..core.profile import Profile +from ..multitenant.manager import MultitenantManager +from ..storage.base import BaseStorageSearch +from ..wallet.models.wallet_record import WalletRecord def is_anoncreds_profile_raise_web_exception(profile: Profile) -> None: @@ -29,3 +34,26 @@ def subwallet_type_not_same_as_base_wallet_raise_web_exception( raise web.HTTPForbidden( reason="Subwallet type must be the same as the base wallet type" ) + + +async def get_subwallet_profiles_from_storage(root_profile: Profile) -> list[Profile]: + """Get subwallet profiles from storage.""" + subwallet_profiles = [] + base_storage_search = root_profile.inject(BaseStorageSearch) + search_session = base_storage_search.search_records( + type_filter=WalletRecord.RECORD_TYPE, page_size=10 + ) + while search_session._done is False: + wallet_storage_records = await search_session.fetch() + for wallet_storage_record in wallet_storage_records: + wallet_record = WalletRecord.from_storage( + wallet_storage_record.id, + json.loads(wallet_storage_record.value), + ) + subwallet_profiles.append( + await MultitenantManager(root_profile).get_wallet_profile( + base_context=root_profile.context, + wallet_record=wallet_record, + ) + ) + return subwallet_profiles diff --git a/aries_cloudagent/wallet/anoncreds_upgrade.py b/aries_cloudagent/wallet/anoncreds_upgrade.py new file mode 100644 index 0000000000..4e9f16e8bc --- /dev/null +++ b/aries_cloudagent/wallet/anoncreds_upgrade.py @@ -0,0 +1,719 @@ +"""Functions for upgrading records to anoncreds.""" + +import asyncio +import json +import logging +from typing import Optional + +from anoncreds import ( + CredentialDefinition, + CredentialDefinitionPrivate, + KeyCorrectnessProof, + RevocationRegistryDefinitionPrivate, + Schema, +) +from aries_askar import AskarError +from indy_credx import LinkSecret + +from ..anoncreds.issuer import ( + CATEGORY_CRED_DEF, + CATEGORY_CRED_DEF_KEY_PROOF, + CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_SCHEMA, +) +from ..anoncreds.models.anoncreds_cred_def import CredDef, CredDefState +from ..anoncreds.models.anoncreds_revocation import ( + RevList, + RevListState, + RevRegDef, + RevRegDefState, + RevRegDefValue, +) +from ..anoncreds.models.anoncreds_schema import SchemaState +from ..anoncreds.revocation import ( + CATEGORY_REV_LIST, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, +) +from ..cache.base import BaseCache +from ..core.profile import Profile +from ..indy.credx.holder import CATEGORY_LINK_SECRET, IndyCredxHolder +from ..ledger.multiple_ledger.ledger_requests_executor import ( + GET_CRED_DEF, + GET_SCHEMA, + IndyLedgerRequestsExecutor, +) +from ..messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE +from ..messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE +from ..multitenant.base import BaseMultitenantManager +from ..revocation.models.issuer_cred_rev_record import IssuerCredRevRecord +from ..revocation.models.issuer_rev_reg_record import IssuerRevRegRecord +from ..storage.base import BaseStorage +from ..storage.error import StorageNotFoundError +from ..storage.record import StorageRecord +from ..storage.type import ( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + RECORD_TYPE_ACAPY_UPGRADING, + STORAGE_TYPE_VALUE_ANONCREDS, +) +from .singletons import IsAnoncredsSingleton, UpgradeInProgressSingleton + +LOGGER = logging.getLogger(__name__) + +UPGRADING_RECORD_IN_PROGRESS = "anoncreds_in_progress" +UPGRADING_RECORD_FINISHED = "anoncreds_finished" + +# Number of times to retry upgrading records +max_retries = 5 + + +class SchemaUpgradeObj: + """Schema upgrade object.""" + + def __init__( + self, + schema_id: str, + schema: Schema, + name: str, + version: str, + issuer_id: str, + old_record_id: str, + ): + """Initialize schema upgrade object.""" + self.schema_id = schema_id + self.schema = schema + self.name = name + self.version = version + self.issuer_id = issuer_id + self.old_record_id = old_record_id + + +class CredDefUpgradeObj: + """Cred def upgrade object.""" + + def __init__( + self, + cred_def_id: str, + cred_def: CredentialDefinition, + cred_def_private: CredentialDefinitionPrivate, + key_proof: KeyCorrectnessProof, + revocation: Optional[bool] = None, + askar_cred_def: Optional[any] = None, + max_cred_num: Optional[int] = None, + ): + """Initialize cred def upgrade object.""" + self.cred_def_id = cred_def_id + self.cred_def = cred_def + self.cred_def_private = cred_def_private + self.key_proof = key_proof + self.revocation = revocation + self.askar_cred_def = askar_cred_def + self.max_cred_num = max_cred_num + + +class RevRegDefUpgradeObj: + """Rev reg def upgrade object.""" + + def __init__( + self, + rev_reg_def_id: str, + rev_reg_def: RevRegDef, + rev_reg_def_private: RevocationRegistryDefinitionPrivate, + active: bool = False, + ): + """Initialize rev reg def upgrade object.""" + self.rev_reg_def_id = rev_reg_def_id + self.rev_reg_def = rev_reg_def + self.rev_reg_def_private = rev_reg_def_private + self.active = active + + +class RevListUpgradeObj: + """Rev entry upgrade object.""" + + def __init__( + self, + rev_list: RevList, + pending: list, + rev_reg_def_id: str, + cred_rev_records: list, + ): + """Initialize rev entry upgrade object.""" + self.rev_list = rev_list + self.pending = pending + self.rev_reg_def_id = rev_reg_def_id + self.cred_rev_records = cred_rev_records + + +async def get_schema_upgrade_object( + profile: Profile, schema_id: str, askar_schema +) -> SchemaUpgradeObj: + """Get schema upgrade object.""" + + async with profile.session() as session: + schema_id = askar_schema.tags.get("schema_id") + issuer_did = askar_schema.tags.get("schema_issuer_did") + # Need to get schema from the ledger because the attribute names + # are not stored in the wallet + multitenant_mgr = session.inject_or(BaseMultitenantManager) + if multitenant_mgr: + ledger_exec_inst = IndyLedgerRequestsExecutor(profile) + else: + ledger_exec_inst = session.inject(IndyLedgerRequestsExecutor) + + _, ledger = await ledger_exec_inst.get_ledger_for_identifier( + schema_id, + txn_record_type=GET_SCHEMA, + ) + async with ledger: + schema_from_ledger = await ledger.get_schema(schema_id) + + return SchemaUpgradeObj( + schema_id, + Schema.create( + schema_id, + askar_schema.tags.get("schema_name"), + issuer_did, + schema_from_ledger["attrNames"], + ), + askar_schema.tags.get("schema_name"), + askar_schema.tags.get("schema_version"), + issuer_did, + askar_schema.id, + ) + + +async def get_cred_def_upgrade_object( + profile: Profile, askar_cred_def +) -> CredDefUpgradeObj: + """Get cred def upgrade object.""" + cred_def_id = askar_cred_def.tags.get("cred_def_id") + async with profile.session() as session: + # Need to get cred_def from the ledger because the tag + # is not stored in the wallet and don't know wether it supports revocation + multitenant_mgr = session.inject_or(BaseMultitenantManager) + if multitenant_mgr: + ledger_exec_inst = IndyLedgerRequestsExecutor(profile) + else: + ledger_exec_inst = session.inject(IndyLedgerRequestsExecutor) + _, ledger = await ledger_exec_inst.get_ledger_for_identifier( + cred_def_id, + txn_record_type=GET_CRED_DEF, + ) + async with ledger: + cred_def_from_ledger = await ledger.get_credential_definition(cred_def_id) + + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_cred_def_private = await storage.get_record( + CATEGORY_CRED_DEF_PRIVATE, cred_def_id + ) + askar_cred_def_key_proof = await storage.get_record( + CATEGORY_CRED_DEF_KEY_PROOF, cred_def_id + ) + + cred_def = CredDef( + issuer_id=askar_cred_def.tags.get("issuer_did"), + schema_id=askar_cred_def.tags.get("schema_id"), + tag=cred_def_from_ledger["tag"], + type=cred_def_from_ledger["type"], + value=cred_def_from_ledger["value"], + ) + + return CredDefUpgradeObj( + cred_def_id, + cred_def, + askar_cred_def_private.value, + askar_cred_def_key_proof.value, + cred_def_from_ledger["value"].get("revocation", None), + askar_cred_def=askar_cred_def, + ) + + +async def get_rev_reg_def_upgrade_object( + profile: Profile, + cred_def_upgrade_obj: CredDefUpgradeObj, + askar_issuer_rev_reg_def, + is_active: bool, +) -> RevRegDefUpgradeObj: + """Get rev reg def upgrade object.""" + rev_reg_def_id = askar_issuer_rev_reg_def.tags.get("revoc_reg_id") + + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_reg_rev_def_private = await storage.get_record( + CATEGORY_REV_REG_DEF_PRIVATE, rev_reg_def_id + ) + + revoc_reg_def_values = json.loads(askar_issuer_rev_reg_def.value) + + reg_def_value = RevRegDefValue( + revoc_reg_def_values["revoc_reg_def"]["value"]["publicKeys"], + revoc_reg_def_values["revoc_reg_def"]["value"]["maxCredNum"], + revoc_reg_def_values["revoc_reg_def"]["value"]["tailsLocation"], + revoc_reg_def_values["revoc_reg_def"]["value"]["tailsHash"], + ) + + rev_reg_def = RevRegDef( + issuer_id=askar_issuer_rev_reg_def.tags.get("issuer_did"), + cred_def_id=cred_def_upgrade_obj.cred_def_id, + tag=revoc_reg_def_values["tag"], + type=revoc_reg_def_values["revoc_def_type"], + value=reg_def_value, + ) + + return RevRegDefUpgradeObj( + rev_reg_def_id, rev_reg_def, askar_reg_rev_def_private.value, is_active + ) + + +async def get_rev_list_upgrade_object( + profile: Profile, rev_reg_def_upgrade_obj: RevRegDefUpgradeObj +) -> RevListUpgradeObj: + """Get revocation entry upgrade object.""" + rev_reg = rev_reg_def_upgrade_obj.rev_reg_def + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_cred_rev_records = await storage.find_all_records( + IssuerCredRevRecord.RECORD_TYPE, + {"rev_reg_id": rev_reg_def_upgrade_obj.rev_reg_def_id}, + ) + + revocation_list = [0] * rev_reg.value.max_cred_num + for askar_cred_rev_record in askar_cred_rev_records: + if askar_cred_rev_record.tags.get("state") == "revoked": + revocation_list[int(askar_cred_rev_record.tags.get("cred_rev_id")) - 1] = 1 + + rev_list = RevList( + issuer_id=rev_reg.issuer_id, + rev_reg_def_id=rev_reg_def_upgrade_obj.rev_reg_def_id, + revocation_list=revocation_list, + current_accumulator=json.loads( + rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def.value + )["revoc_reg_entry"]["value"]["accum"], + ) + + return RevListUpgradeObj( + rev_list, + json.loads(rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def.value)[ + "pending_pub" + ], + rev_reg_def_upgrade_obj.rev_reg_def_id, + askar_cred_rev_records, + ) + + +async def upgrade_and_delete_schema_records( + txn, schema_upgrade_obj: SchemaUpgradeObj +) -> None: + """Upgrade and delete schema records.""" + schema_anoncreds = schema_upgrade_obj.schema + await txn.handle.remove("schema_sent", schema_upgrade_obj.old_record_id) + await txn.handle.replace( + CATEGORY_SCHEMA, + schema_upgrade_obj.schema_id, + schema_anoncreds.to_json(), + { + "name": schema_upgrade_obj.name, + "version": schema_upgrade_obj.version, + "issuer_id": schema_upgrade_obj.issuer_id, + "state": SchemaState.STATE_FINISHED, + }, + ) + + +async def upgrade_and_delete_cred_def_records( + txn, anoncreds_schema, cred_def_upgrade_obj: CredDefUpgradeObj +) -> None: + """Upgrade and delete cred def records.""" + cred_def_id = cred_def_upgrade_obj.cred_def_id + anoncreds_schema = anoncreds_schema.to_dict() + askar_cred_def = cred_def_upgrade_obj.askar_cred_def + await txn.handle.remove("cred_def_sent", askar_cred_def.id) + await txn.handle.replace( + CATEGORY_CRED_DEF, + cred_def_id, + cred_def_upgrade_obj.cred_def.to_json(), + tags={ + "schema_id": askar_cred_def.tags.get("schema_id"), + "schema_issuer_id": anoncreds_schema["issuerId"], + "issuer_id": askar_cred_def.tags.get("issuer_did"), + "schema_name": anoncreds_schema["name"], + "schema_version": anoncreds_schema["version"], + "state": CredDefState.STATE_FINISHED, + "epoch": askar_cred_def.tags.get("epoch"), + # TODO We need to keep track of these but tags probably + # isn't ideal. This suggests that a full record object + # is necessary for non-private values + "support_revocation": json.dumps(cred_def_upgrade_obj.revocation), + "max_cred_num": str(cred_def_upgrade_obj.max_cred_num or 0), + }, + ) + await txn.handle.replace( + CATEGORY_CRED_DEF_PRIVATE, + cred_def_id, + CredentialDefinitionPrivate.load( + cred_def_upgrade_obj.cred_def_private + ).to_json_buffer(), + ) + await txn.handle.replace( + CATEGORY_CRED_DEF_KEY_PROOF, + cred_def_id, + KeyCorrectnessProof.load(cred_def_upgrade_obj.key_proof).to_json_buffer(), + ) + + +rev_reg_states_mapping = { + "init": RevRegDefState.STATE_WAIT, + "generated": RevRegDefState.STATE_ACTION, + "posted": RevRegDefState.STATE_FINISHED, + "active": RevRegDefState.STATE_FINISHED, + "full": RevRegDefState.STATE_FULL, + "decommissioned": RevRegDefState.STATE_DECOMMISSIONED, +} + + +async def upgrade_and_delete_rev_reg_def_records( + txn, rev_reg_def_upgrade_obj: RevRegDefUpgradeObj +) -> None: + """Upgrade and delete rev reg def records.""" + rev_reg_def_id = rev_reg_def_upgrade_obj.rev_reg_def_id + askar_issuer_rev_reg_def = rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def + await txn.handle.remove(IssuerRevRegRecord.RECORD_TYPE, askar_issuer_rev_reg_def.id) + await txn.handle.replace( + CATEGORY_REV_REG_DEF, + rev_reg_def_id, + rev_reg_def_upgrade_obj.rev_reg_def.to_json(), + tags={ + "cred_def_id": rev_reg_def_upgrade_obj.rev_reg_def.cred_def_id, + "issuer_id": askar_issuer_rev_reg_def.tags.get("issuer_did"), + "state": rev_reg_states_mapping[askar_issuer_rev_reg_def.tags.get("state")], + "active": json.dumps(rev_reg_def_upgrade_obj.active), + }, + ) + await txn.handle.replace( + CATEGORY_REV_REG_DEF_PRIVATE, + rev_reg_def_id, + RevocationRegistryDefinitionPrivate.load( + rev_reg_def_upgrade_obj.rev_reg_def_private + ).to_json_buffer(), + ) + + +async def upgrade_and_delete_rev_entry_records( + txn, rev_list_upgrade_obj: RevListUpgradeObj +) -> None: + """Upgrade and delete revocation entry records.""" + next_index = 0 + for cred_rev_record in rev_list_upgrade_obj.cred_rev_records: + if int(cred_rev_record.tags.get("cred_rev_id")) > next_index: + next_index = int(cred_rev_record.tags.get("cred_rev_id")) + await txn.handle.remove(IssuerCredRevRecord.RECORD_TYPE, cred_rev_record.id) + + await txn.handle.insert( + CATEGORY_REV_LIST, + rev_list_upgrade_obj.rev_reg_def_id, + value_json={ + "rev_list": rev_list_upgrade_obj.rev_list.serialize(), + "pending": rev_list_upgrade_obj.pending, + "next_index": next_index + 1, + }, + tags={ + "state": RevListState.STATE_FINISHED, + "pending": json.dumps(rev_list_upgrade_obj.pending is not None), + }, + ) + + +async def upgrade_all_records_with_transaction( + txn: any, + schema_upgrade_objs: list[SchemaUpgradeObj], + cred_def_upgrade_objs: list[CredDefUpgradeObj], + rev_reg_def_upgrade_objs: list[RevRegDefUpgradeObj], + rev_list_upgrade_objs: list[RevListUpgradeObj], + link_secret: Optional[LinkSecret] = None, +) -> None: + """Upgrade all objects with transaction.""" + for schema_upgrade_obj in schema_upgrade_objs: + await upgrade_and_delete_schema_records(txn, schema_upgrade_obj) + for cred_def_upgrade_obj in cred_def_upgrade_objs: + await upgrade_and_delete_cred_def_records( + txn, schema_upgrade_obj.schema, cred_def_upgrade_obj + ) + for rev_reg_def_upgrade_obj in rev_reg_def_upgrade_objs: + await upgrade_and_delete_rev_reg_def_records(txn, rev_reg_def_upgrade_obj) + for rev_list_upgrade_obj in rev_list_upgrade_objs: + await upgrade_and_delete_rev_entry_records(txn, rev_list_upgrade_obj) + + if link_secret: + await txn.handle.replace( + CATEGORY_LINK_SECRET, + IndyCredxHolder.LINK_SECRET_ID, + link_secret.to_dict()["value"]["ms"].encode("ascii"), + ) + + await txn.commit() + + +async def get_rev_reg_def_upgrade_objs( + profile: Profile, + cred_def_upgrade_obj: CredDefUpgradeObj, + rev_list_upgrade_objs: list[RevListUpgradeObj], +) -> list[RevRegDefUpgradeObj]: + """Get rev reg def upgrade objects.""" + + rev_reg_def_upgrade_objs = [] + async with profile.session() as session: + storage = session.inject(BaseStorage) + # Must be sorted to find the active rev reg def + askar_issuer_rev_reg_def_records = sorted( + await storage.find_all_records( + IssuerRevRegRecord.RECORD_TYPE, + {"cred_def_id": cred_def_upgrade_obj.cred_def_id}, + ), + key=lambda x: json.loads(x.value)["created_at"], + ) + found_active = False + for askar_issuer_rev_reg_def in askar_issuer_rev_reg_def_records: + # active rev reg def is the oldest non-full and active rev reg def + if ( + not found_active + and askar_issuer_rev_reg_def.tags.get("state") != "full" + and askar_issuer_rev_reg_def.tags.get("state") == "active" + ): + found_active = True + is_active = True + + rev_reg_def_upgrade_obj = await get_rev_reg_def_upgrade_object( + profile, + cred_def_upgrade_obj, + askar_issuer_rev_reg_def, + is_active, + ) + is_active = False + rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def = askar_issuer_rev_reg_def + + rev_reg_def_upgrade_objs.append(rev_reg_def_upgrade_obj) + + # add the revocation list upgrade object from reg def upgrade object + rev_list_upgrade_objs.append( + await get_rev_list_upgrade_object(profile, rev_reg_def_upgrade_obj) + ) + return rev_reg_def_upgrade_objs + + +async def convert_records_to_anoncreds(profile) -> None: + """Convert and delete old askar records.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_schema_records = await storage.find_all_records(SCHEMA_SENT_RECORD_TYPE) + + schema_upgrade_objs = [] + cred_def_upgrade_objs = [] + rev_reg_def_upgrade_objs = [] + rev_list_upgrade_objs = [] + + # Schemas + for askar_schema in askar_schema_records: + schema_upgrade_objs.append( + await get_schema_upgrade_object(profile, askar_schema.id, askar_schema) + ) + + # CredDefs and Revocation Objects + askar_cred_def_records = await storage.find_all_records( + CRED_DEF_SENT_RECORD_TYPE, {} + ) + for askar_cred_def in askar_cred_def_records: + cred_def_upgrade_obj = await get_cred_def_upgrade_object( + profile, askar_cred_def + ) + rev_reg_def_upgrade_objs = await get_rev_reg_def_upgrade_objs( + profile, cred_def_upgrade_obj, rev_list_upgrade_objs + ) + # update the cred_def with the max_cred_num from first rev_reg_def + if rev_reg_def_upgrade_objs: + cred_def_upgrade_obj.max_cred_num = rev_reg_def_upgrade_objs[ + 0 + ].rev_reg_def.value.max_cred_num + cred_def_upgrade_objs.append(cred_def_upgrade_obj) + + # Link secret + link_secret_record = None + try: + link_secret_record = await session.handle.fetch( + CATEGORY_LINK_SECRET, IndyCredxHolder.LINK_SECRET_ID + ) + except AskarError: + pass + + link_secret = None + if link_secret_record: + link_secret = LinkSecret.load(link_secret_record.raw_value) + + async with profile.transaction() as txn: + try: + await upgrade_all_records_with_transaction( + txn, + schema_upgrade_objs, + cred_def_upgrade_objs, + rev_reg_def_upgrade_objs, + rev_list_upgrade_objs, + link_secret, + ) + except Exception as e: + await txn.rollback() + raise e + + +async def retry_converting_records( + profile: Profile, upgrading_record: StorageRecord, retry: int, is_subwallet=False +) -> None: + """Retry converting records to anoncreds.""" + + async def fail_upgrade(): + async with profile.session() as session: + storage = session.inject(BaseStorage) + await storage.delete_record(upgrading_record) + + try: + await convert_records_to_anoncreds(profile) + await finish_upgrade_by_updating_profile_or_shutting_down(profile, is_subwallet) + LOGGER.info(f"Upgrade complete via retry for wallet: {profile.name}") + except Exception as e: + LOGGER.error(f"Error when upgrading records for wallet {profile.name} : {e} ") + if retry < max_retries: + LOGGER.info(f"Retry attempt {retry + 1} to upgrade wallet {profile.name}") + await asyncio.sleep(1) + await retry_converting_records( + profile, upgrading_record, retry + 1, is_subwallet + ) + else: + LOGGER.error( + f"""Failed to upgrade wallet: {profile.name} after 5 retries. + Try fixing any connection issues and re-running the update""" + ) + await fail_upgrade() + + +async def upgrade_wallet_to_anoncreds_if_requested( + profile: Profile, is_subwallet=False +) -> None: + """Get upgrading record and attempt to upgrade wallet to anoncreds.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, {} + ) + if upgrading_record.value == UPGRADING_RECORD_FINISHED: + IsAnoncredsSingleton().set_wallet(profile.name) + return + except StorageNotFoundError: + return + + try: + LOGGER.info("Upgrade in process for wallet: %s", profile.name) + await convert_records_to_anoncreds(profile) + await finish_upgrade_by_updating_profile_or_shutting_down( + profile, is_subwallet + ) + except Exception as e: + LOGGER.error(f"Error when upgrading wallet {profile.name} : {e} ") + await retry_converting_records(profile, upgrading_record, 0, is_subwallet) + + +async def finish_upgrade(profile: Profile): + """Finish record by setting records and caches.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + await storage.update_record( + storage_type_record, STORAGE_TYPE_VALUE_ANONCREDS, {} + ) + # This should only happen for subwallets + except StorageNotFoundError: + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + STORAGE_TYPE_VALUE_ANONCREDS, + ) + ) + await finish_upgrading_record(profile) + IsAnoncredsSingleton().set_wallet(profile.name) + UpgradeInProgressSingleton().remove_wallet(profile.name) + + +async def finish_upgrading_record(profile: Profile): + """Update upgrading record to finished.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + await storage.update_record(upgrading_record, UPGRADING_RECORD_FINISHED, {}) + except StorageNotFoundError: + return + + +async def upgrade_subwallet(profile: Profile) -> None: + """Upgrade subwallet to anoncreds.""" + async with profile.session() as session: + multitenant_mgr = session.inject_or(BaseMultitenantManager) + wallet_id = profile.settings.get("wallet.id") + cache = profile.inject_or(BaseCache) + await cache.flush() + settings = {"wallet.type": STORAGE_TYPE_VALUE_ANONCREDS} + await multitenant_mgr.update_wallet(wallet_id, settings) + + +async def finish_upgrade_by_updating_profile_or_shutting_down( + profile: Profile, is_subwallet=False +): + """Upgrade wallet to anoncreds and set storage type.""" + if is_subwallet: + await upgrade_subwallet(profile) + await finish_upgrade(profile) + LOGGER.info( + f"""Upgrade of subwallet {profile.settings.get('wallet.name')} has completed. Profile is now askar-anoncreds""" # noqa: E501 + ) + else: + await finish_upgrade(profile) + LOGGER.info( + f"Upgrade of base wallet {profile.settings.get('wallet.name')} to anoncreds has completed. Shutting down agent." # noqa: E501 + ) + asyncio.get_event_loop().stop() + + +async def check_upgrade_completion_loop(profile: Profile, is_subwallet=False): + """Check if upgrading is complete.""" + async with profile.session() as session: + while True: + storage = session.inject(BaseStorage) + LOGGER.debug(f"Checking upgrade completion for wallet: {profile.name}") + try: + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + if upgrading_record.value == UPGRADING_RECORD_FINISHED: + IsAnoncredsSingleton().set_wallet(profile.name) + UpgradeInProgressSingleton().remove_wallet(profile.name) + if is_subwallet: + await upgrade_subwallet(profile) + LOGGER.info( + f"""Upgrade of subwallet {profile.settings.get('wallet.name')} has completed. Profile is now askar-anoncreds""" # noqa: E501 + ) + return + LOGGER.info( + f"Upgrade complete for wallet: {profile.name}, shutting down agent." # noqa: E501 + ) + # Shut down agent if base wallet + asyncio.get_event_loop().stop() + except StorageNotFoundError: + # If the record is not found, the upgrade failed + return + + await asyncio.sleep(1) diff --git a/aries_cloudagent/wallet/routes.py b/aries_cloudagent/wallet/routes.py index 5dc222c0c7..23afe6fb73 100644 --- a/aries_cloudagent/wallet/routes.py +++ b/aries_cloudagent/wallet/routes.py @@ -1,5 +1,6 @@ """Wallet admin routes.""" +import asyncio import json import logging from typing import List, Optional, Tuple, Union @@ -55,15 +56,23 @@ is_author_role, ) from ..resolver.base import ResolverError +from ..storage.base import BaseStorage from ..storage.error import StorageError, StorageNotFoundError +from ..storage.record import StorageRecord +from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING from ..wallet.jwt import jwt_sign, jwt_verify from ..wallet.sd_jwt import sd_jwt_sign, sd_jwt_verify +from .anoncreds_upgrade import ( + UPGRADING_RECORD_IN_PROGRESS, + upgrade_wallet_to_anoncreds_if_requested, +) from .base import BaseWallet from .did_info import DIDInfo from .did_method import KEY, PEER2, PEER4, SOV, DIDMethod, DIDMethods, HolderDefinedDid from .did_posture import DIDPosture from .error import WalletError, WalletNotFoundError from .key_type import BLS12381G2, ED25519, KeyTypes +from .singletons import UpgradeInProgressSingleton from .util import EVENT_LISTENER_PATTERN LOGGER = logging.getLogger(__name__) @@ -1241,6 +1250,73 @@ async def wallet_rotate_did_keypair(request: web.BaseRequest): return web.json_response({}) +class UpgradeVerificationSchema(OpenAPISchema): + """Parameters and validators for triggering an upgrade to anoncreds.""" + + wallet_name = fields.Str( + required=True, + metadata={ + "description": "Name of wallet to upgrade to anoncreds", + "example": "base-wallet", + }, + ) + + +class UpgradeResultSchema(OpenAPISchema): + """Result schema for upgrade.""" + + +@docs( + tags=["anoncreds - wallet upgrade"], + summary=""" + Upgrade the wallet from askar to anoncreds - Be very careful with this! You + cannot go back! See migration guide for more information. + """, +) +@querystring_schema(UpgradeVerificationSchema()) +@response_schema(UpgradeResultSchema(), description="") +async def upgrade_anoncreds(request: web.BaseRequest): + """Request handler for triggering an upgrade to anoncreds. + + Args: + request: aiohttp request object + + Returns: + An empty JSON response + + """ + context: AdminRequestContext = request["context"] + profile = context.profile + + if profile.settings.get("wallet.name") != request.query.get("wallet_name"): + raise web.HTTPBadRequest( + reason="Wallet name parameter does not match the agent which triggered the upgrade" # noqa: E501 + ) + + if profile.settings.get("wallet.type") == "askar-anoncreds": + raise web.HTTPBadRequest(reason="Wallet type is already anoncreds") + + async with profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + UPGRADING_RECORD_IN_PROGRESS, + ) + await storage.add_record(upgrading_record) + is_subwallet = context.metadata and "wallet_id" in context.metadata + asyncio.create_task( + upgrade_wallet_to_anoncreds_if_requested(profile, is_subwallet) + ) + UpgradeInProgressSingleton().set_wallet(profile.name) + + return web.json_response( + { + "success": True, + "message": f"Upgrade to anoncreds has been triggered for wallet {profile.name}", # noqa: E501 + } + ) + + def register_events(event_bus: EventBus): """Subscribe to any events we need to support.""" event_bus.subscribe(EVENT_LISTENER_PATTERN, on_register_nym_event) @@ -1333,6 +1409,7 @@ async def register(app: web.Application): "/wallet/get-did-endpoint", wallet_get_did_endpoint, allow_head=False ), web.patch("/wallet/did/local/rotate-keypair", wallet_rotate_did_keypair), + web.post("/anoncreds/wallet/upgrade", upgrade_anoncreds), ] ) @@ -1356,3 +1433,13 @@ def post_process_routes(app: web.Application): }, } ) + app._state["swagger_dict"]["tags"].append( + { + "name": "anoncreds - wallet upgrade", + "description": "Anoncreds wallet upgrade", + "externalDocs": { + "description": "Specification", + "url": "https://hyperledger.github.io/anoncreds-spec", + }, + } + ) diff --git a/aries_cloudagent/wallet/singletons.py b/aries_cloudagent/wallet/singletons.py new file mode 100644 index 0000000000..9a7a91d057 --- /dev/null +++ b/aries_cloudagent/wallet/singletons.py @@ -0,0 +1,43 @@ +"""Module that contains singleton classes for wallet operations.""" + + +class IsAnoncredsSingleton: + """Singleton class used as cache for anoncreds wallet-type queries.""" + + instance = None + wallets = set() + + def __new__(cls, *args, **kwargs): + """Create a new instance of the class.""" + if cls.instance is None: + cls.instance = super().__new__(cls) + return cls.instance + + def set_wallet(self, wallet: str): + """Set a wallet name.""" + self.wallets.add(wallet) + + def remove_wallet(self, wallet: str): + """Remove a wallet name.""" + self.wallets.discard(wallet) + + +class UpgradeInProgressSingleton: + """Singleton class used as cache for upgrade in progress.""" + + instance = None + wallets = set() + + def __new__(cls, *args, **kwargs): + """Create a new instance of the class.""" + if cls.instance is None: + cls.instance = super().__new__(cls) + return cls.instance + + def set_wallet(self, wallet: str): + """Set a wallet name.""" + self.wallets.add(wallet) + + def remove_wallet(self, wallet: str): + """Remove a wallet name.""" + self.wallets.discard(wallet) diff --git a/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py new file mode 100644 index 0000000000..00c52bc623 --- /dev/null +++ b/aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py @@ -0,0 +1,406 @@ +import asyncio +from time import time +from unittest import IsolatedAsyncioTestCase + +from aries_cloudagent.tests import mock +from aries_cloudagent.wallet import singletons + +from ...anoncreds.issuer import CATEGORY_CRED_DEF_PRIVATE +from ...cache.base import BaseCache +from ...core.in_memory.profile import InMemoryProfile, InMemoryProfileSession +from ...indy.credx.issuer import CATEGORY_CRED_DEF_KEY_PROOF +from ...messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE +from ...messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE +from ...multitenant.base import BaseMultitenantManager +from ...multitenant.manager import MultitenantManager +from ...storage.base import BaseStorage +from ...storage.record import StorageRecord +from ...storage.type import ( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + RECORD_TYPE_ACAPY_UPGRADING, + STORAGE_TYPE_VALUE_ANONCREDS, +) +from .. import anoncreds_upgrade + + +class TestAnoncredsUpgrade(IsolatedAsyncioTestCase): + def setUp(self) -> None: + self.profile = InMemoryProfile.test_profile( + settings={"wallet.type": "askar", "wallet.id": "test-wallet-id"} + ) + self.context = self.profile.context + self.context.injector.bind_instance( + BaseMultitenantManager, mock.MagicMock(MultitenantManager, autospec=True) + ) + self.context.injector.bind_instance( + BaseCache, mock.MagicMock(BaseCache, autospec=True) + ) + + @mock.patch.object(InMemoryProfileSession, "handle") + async def test_convert_records_to_anoncreds(self, mock_handle): + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + mock_handle.fetch = mock.CoroutineMock(return_value=None) + + schema_id = "GHjSbphAcdsrZrLjSvsjMp:2:faber-simple:1.1" + schema_id_parts = schema_id.split(":") + schema_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord(SCHEMA_SENT_RECORD_TYPE, schema_id, schema_tags) + ) + + credential_definition_id = "GHjSbphAcdsrZrLjSvsjMp:3:CL:8:default" + cred_def_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "issuer_did": "GHjSbphAcdsrZrLjSvsjMp", + "cred_def_id": credential_definition_id, + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord( + CRED_DEF_SENT_RECORD_TYPE, credential_definition_id, cred_def_tags + ) + ) + storage.get_record = mock.CoroutineMock( + side_effect=[ + StorageRecord( + CATEGORY_CRED_DEF_PRIVATE, + {"p_key": {"p": "123...782", "q": "234...456"}, "r_key": None}, + {}, + ), + StorageRecord( + CATEGORY_CRED_DEF_KEY_PROOF, + {"c": "103...961", "xz_cap": "563...205", "xr_cap": []}, + {}, + ), + ] + ) + anoncreds_upgrade.IndyLedgerRequestsExecutor = mock.MagicMock() + anoncreds_upgrade.IndyLedgerRequestsExecutor.return_value.get_ledger_for_identifier = mock.CoroutineMock( + return_value=( + None, + mock.MagicMock( + get_schema=mock.CoroutineMock( + return_value={ + "attrNames": [ + "name", + "age", + ], + }, + ), + get_credential_definition=mock.CoroutineMock( + return_value={ + "type": "CL", + "tag": "default", + "value": { + "primary": { + "n": "123", + }, + }, + }, + ), + ), + ) + ) + + with mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_schema_records" + ), mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_cred_def_records" + ): + await anoncreds_upgrade.convert_records_to_anoncreds(self.profile) + + @mock.patch.object(InMemoryProfileSession, "handle") + async def test_retry_converting_records(self, mock_handle): + mock_handle.fetch = mock.CoroutineMock(return_value=None) + with mock.patch.object( + anoncreds_upgrade, "convert_records_to_anoncreds", mock.CoroutineMock() + ) as mock_convert_records_to_anoncreds: + mock_convert_records_to_anoncreds.side_effect = [ + Exception("Error"), + Exception("Error"), + None, + ] + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + anoncreds_upgrade.UPGRADING_RECORD_IN_PROGRESS, + ) + await storage.add_record(upgrading_record) + await anoncreds_upgrade.retry_converting_records( + self.profile, upgrading_record, 0 + ) + + assert mock_convert_records_to_anoncreds.call_count == 3 + storage_type_record = await storage.find_record( + RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + assert storage_type_record.value == STORAGE_TYPE_VALUE_ANONCREDS + assert ( + upgrading_record.value + == anoncreds_upgrade.UPGRADING_RECORD_FINISHED + ) + assert "test-profile" in singletons.IsAnoncredsSingleton().wallets + + @mock.patch.object(InMemoryProfileSession, "handle") + async def test_upgrade_wallet_to_anoncreds(self, mock_handle): + mock_handle.fetch = mock.CoroutineMock(return_value=None) + + # upgrading record not present + await anoncreds_upgrade.upgrade_wallet_to_anoncreds_if_requested(self.profile) + + # upgrading record present + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + anoncreds_upgrade.UPGRADING_RECORD_IN_PROGRESS, + ) + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds_if_requested( + self.profile + ) + storage_type_record = await storage.find_record( + RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + assert storage_type_record.value == STORAGE_TYPE_VALUE_ANONCREDS + assert upgrading_record.value == anoncreds_upgrade.UPGRADING_RECORD_FINISHED + assert "test-profile" in singletons.IsAnoncredsSingleton().wallets + + # retry called on exception + with mock.patch.object( + anoncreds_upgrade, + "convert_records_to_anoncreds", + mock.CoroutineMock(side_effect=[Exception("Error")]), + ), mock.patch.object( + anoncreds_upgrade, "retry_converting_records", mock.CoroutineMock() + ) as mock_retry_converting_records: + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + await storage.update_record( + upgrading_record, anoncreds_upgrade.UPGRADING_RECORD_IN_PROGRESS, {} + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds_if_requested( + self.profile + ) + assert mock_retry_converting_records.called + + async def test_set_storage_type_to_anoncreds_no_existing_record(self): + await anoncreds_upgrade.finish_upgrade(self.profile) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == STORAGE_TYPE_VALUE_ANONCREDS + + async def test_set_storage_type_to_anoncreds_has_existing_record(self): + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + "askar", + ) + ) + await anoncreds_upgrade.finish_upgrade(self.profile) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == STORAGE_TYPE_VALUE_ANONCREDS + + async def test_update_if_subwallet_and_set_storage_type_with_subwallet(self): + + await anoncreds_upgrade.finish_upgrade_by_updating_profile_or_shutting_down( + self.profile, True + ) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == STORAGE_TYPE_VALUE_ANONCREDS + assert self.profile.context.injector.get_provider( + BaseCache + )._instance.flush.called + + async def test_update_if_subwallet_and_set_storage_type_with_base_wallet(self): + + await anoncreds_upgrade.finish_upgrade_by_updating_profile_or_shutting_down( + self.profile, False + ) + _, storage_type_record = next(iter(self.profile.records.items())) + assert storage_type_record.value == STORAGE_TYPE_VALUE_ANONCREDS + + @mock.patch.object(InMemoryProfileSession, "handle") + async def test_failed_upgrade(self, mock_handle): + mock_handle.fetch = mock.CoroutineMock(return_value=None) + async with self.profile.session() as session: + storage = session.inject(BaseStorage) + + schema_id = "GHjSbphAcdsrZrLjSvsjMp:2:faber-simple:1.1" + schema_id_parts = schema_id.split(":") + schema_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord(SCHEMA_SENT_RECORD_TYPE, schema_id, schema_tags) + ) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + "askar", + ) + ) + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + ) + + credential_definition_id = "GHjSbphAcdsrZrLjSvsjMp:3:CL:8:default" + cred_def_tags = { + "schema_id": schema_id, + "schema_issuer_did": schema_id_parts[0], + "schema_name": schema_id_parts[-2], + "schema_version": schema_id_parts[-1], + "issuer_did": "GHjSbphAcdsrZrLjSvsjMp", + "cred_def_id": credential_definition_id, + "epoch": str(int(time())), + } + await storage.add_record( + StorageRecord( + CRED_DEF_SENT_RECORD_TYPE, credential_definition_id, cred_def_tags + ) + ) + storage.get_record = mock.CoroutineMock( + side_effect=[ + StorageRecord( + CATEGORY_CRED_DEF_PRIVATE, + {"p_key": {"p": "123...782", "q": "234...456"}, "r_key": None}, + {}, + ), + StorageRecord( + CATEGORY_CRED_DEF_KEY_PROOF, + {"c": "103...961", "xz_cap": "563...205", "xr_cap": []}, + {}, + ), + ] + ) + anoncreds_upgrade.IndyLedgerRequestsExecutor = mock.MagicMock() + anoncreds_upgrade.IndyLedgerRequestsExecutor.return_value.get_ledger_for_identifier = mock.CoroutineMock( + return_value=( + None, + mock.MagicMock( + get_schema=mock.CoroutineMock( + return_value={ + "attrNames": [ + "name", + "age", + ], + }, + ), + get_credential_definition=mock.CoroutineMock( + return_value={ + "type": "CL", + "tag": "default", + "value": { + "primary": { + "n": "123", + }, + }, + }, + ), + ), + ) + ) + + with mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_schema_records" + ), mock.patch.object( + anoncreds_upgrade, "upgrade_and_delete_cred_def_records" + ), mock.patch.object( + InMemoryProfileSession, "rollback" + ) as mock_rollback, mock.patch.object( + InMemoryProfileSession, + "commit", + # Don't wait for sleep in retry to speed up test + ) as mock_commit, mock.patch.object( + asyncio, "sleep" + ): + """ + Only tests schemas and cred_defs failing to upgrade because the other objects are + hard to mock. These tests should be enough to cover them as the logic is the same. + """ + + # Schemas fails to upgrade + anoncreds_upgrade.upgrade_and_delete_schema_records = mock.CoroutineMock( + # Needs to fail 5 times because of the retry logic + side_effect=[ + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + ] + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds_if_requested( + self.profile + ) + assert mock_rollback.called + assert not mock_commit.called + # Upgrading record should not be deleted + with self.assertRaises(Exception): + await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + # Storage type should not be updated + assert storage_type_record.value == "askar" + + # Cred_defs fails to upgrade + anoncreds_upgrade.upgrade_and_delete_cred_def_records = ( + mock.CoroutineMock( + side_effect=[ + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + Exception("Error"), + ] + ) + ) + await anoncreds_upgrade.upgrade_wallet_to_anoncreds_if_requested( + self.profile + ) + assert mock_rollback.called + assert not mock_commit.called + # Upgrading record should not be deleted + with self.assertRaises(Exception): + await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + # Storage type should not be updated + assert storage_type_record.value == "askar" diff --git a/aries_cloudagent/wallet/tests/test_routes.py b/aries_cloudagent/wallet/tests/test_routes.py index f2b756de23..f99fbc1679 100644 --- a/aries_cloudagent/wallet/tests/test_routes.py +++ b/aries_cloudagent/wallet/tests/test_routes.py @@ -3,6 +3,7 @@ from aiohttp.web import HTTPForbidden from aries_cloudagent.tests import mock +from aries_cloudagent.wallet import singletons from ...admin.request_context import AdminRequestContext from ...core.in_memory import InMemoryProfile @@ -11,6 +12,7 @@ from ...wallet.did_method import SOV, DIDMethod, DIDMethods, HolderDefinedDid from ...wallet.key_type import ED25519, KeyTypes from .. import routes as test_module +from ..anoncreds_upgrade import UPGRADING_RECORD_IN_PROGRESS from ..base import BaseWallet from ..did_info import DIDInfo from ..did_posture import DIDPosture @@ -1006,6 +1008,26 @@ async def test_rotate_did_keypair_x(self): with self.assertRaises(test_module.web.HTTPBadRequest): await test_module.wallet_rotate_did_keypair(self.request) + async def test_upgrade_anoncreds(self): + self.profile.settings["wallet.name"] = "test_wallet" + self.request.query = {"wallet_name": "not_test_wallet"} + with self.assertRaises(test_module.web.HTTPBadRequest): + await test_module.upgrade_anoncreds(self.request) + + self.request.query = {"wallet_name": "not_test_wallet"} + self.profile.settings["wallet.type"] = "askar-anoncreds" + with self.assertRaises(test_module.web.HTTPBadRequest): + await test_module.upgrade_anoncreds(self.request) + + self.request.query = {"wallet_name": "test_wallet"} + self.profile.settings["wallet.type"] = "askar" + result = await test_module.upgrade_anoncreds(self.request) + print(result) + _, upgrade_record = next(iter(self.profile.records.items())) + assert upgrade_record.type == "acapy_upgrading" + assert upgrade_record.value == UPGRADING_RECORD_IN_PROGRESS + assert "test-profile" in singletons.UpgradeInProgressSingleton().wallets + async def test_register(self): mock_app = mock.MagicMock() mock_app.add_routes = mock.MagicMock() diff --git a/demo/features/steps/0586-sign-transaction.py b/demo/features/steps/0586-sign-transaction.py index da112e14d5..406db972a0 100644 --- a/demo/features/steps/0586-sign-transaction.py +++ b/demo/features/steps/0586-sign-transaction.py @@ -761,7 +761,6 @@ def step_impl(context, holder_name, issuer_name): "/credentials", params={}, ) - assert len(cred_list["results"]) == 1 cred_id = cred_list["results"][0]["referent"] revoc_status_bool = False diff --git a/demo/features/steps/upgrade.py b/demo/features/steps/upgrade.py new file mode 100644 index 0000000000..fe23f2570e --- /dev/null +++ b/demo/features/steps/upgrade.py @@ -0,0 +1,24 @@ +"""Steps for upgrading the wallet to support anoncreds.""" + +from bdd_support.agent_backchannel_client import ( + agent_container_POST, + async_sleep, +) +from behave import given, then + + +@given('"{issuer}" upgrades the wallet to anoncreds') +@then('"{issuer}" upgrades the wallet to anoncreds') +def step_impl(context, issuer): + """Upgrade the wallet to support anoncreds.""" + agent = context.active_agents[issuer] + agent_container_POST( + agent["agent"], + "/anoncreds/wallet/upgrade", + data={}, + params={ + "wallet_name": agent["agent"].agent.wallet_name, + }, + ) + + async_sleep(2.0) diff --git a/demo/features/upgrade.feature b/demo/features/upgrade.feature new file mode 100644 index 0000000000..d837efbab9 --- /dev/null +++ b/demo/features/upgrade.feature @@ -0,0 +1,29 @@ +Feature: ACA-Py Anoncreds Upgrade + + @GHA + Scenario Outline: Using revocation api, issue, revoke credentials and publish + Given we have "3" agents + | name | role | capabilities | + | Acme | issuer | | + | Faber | verifier | | + | Bob | prover | | + And "" and "Bob" have an existing connection + And "Bob" has an issued credential from "" + And "" has written the credential definition for to the ledger + And "" has written the revocation registry definition to the ledger + And "" has written the revocation registry entry transaction to the ledger + And "" revokes the credential without publishing the entry + And "" authors a revocation registry entry publishing transaction + And "Faber" and "Bob" have an existing connection + When "Faber" sends a request for proof presentation to "Bob" + Then "Faber" has the proof verification fail + Then "Bob" can verify the credential from "" was revoked + And "" upgrades the wallet to anoncreds + And "Bob" has an issued credential from "" + And "Bob" upgrades the wallet to anoncreds + And "Bob" has an issued credential from "" + When "Faber" sends a request for proof presentation to "Bob" + + Examples: + | issuer | Acme_capabilities | Bob_capabilities | Schema_name | Credential_data | Proof_request | + | Acme | --revocation --public-did --multitenant | --multitenant | driverslicense_v2 | Data_DL_MaxValues | DL_age_over_19_v2 | \ No newline at end of file diff --git a/demo/runners/faber.py b/demo/runners/faber.py index d497a318cd..6de8018c80 100644 --- a/demo/runners/faber.py +++ b/demo/runners/faber.py @@ -29,7 +29,6 @@ prompt_loop, ) - CRED_PREVIEW_TYPE = "https://didcomm.org/issue-credential/2.0/credential-preview" SELF_ATTESTED = os.getenv("SELF_ATTESTED") TAILS_FILE_COUNT = int(os.getenv("TAILS_FILE_COUNT", 100)) @@ -582,17 +581,28 @@ async def main(args): options += " (D) Set Endorser's DID\n" if faber_agent.multitenant: options += " (W) Create and/or Enable Wallet\n" + options += " (U) Upgrade wallet to anoncreds \n" options += " (T) Toggle tracing on credential/proof exchange\n" options += " (X) Exit?\n[1/2/3/4/{}{}T/X] ".format( "5/6/7/8/" if faber_agent.revocation else "", "W/" if faber_agent.multitenant else "", ) + + upgraded_to_anoncreds = False async for option in prompt_loop( options.replace("%CRED_TYPE%", faber_agent.cred_type) ): if option is not None: option = option.strip() + # Anoncreds has different endpoints for revocation + is_anoncreds = False + if ( + faber_agent.agent.__dict__["wallet_type"] == "askar-anoncreds" + or upgraded_to_anoncreds + ): + is_anoncreds = True + if option is None or option in "xX": break @@ -886,11 +896,6 @@ async def main(args): await prompt("Publish now? [Y/N]: ", default="N") ).strip() in "yY" - # Anoncreds has different endpoints for revocation - is_anoncreds = False - if faber_agent.agent.__dict__["wallet_type"] == "askar-anoncreds": - is_anoncreds = True - try: endpoint = ( "/anoncreds/revocation/revoke" @@ -992,6 +997,14 @@ async def main(args): ) except ClientError: pass + elif option in "uU" and faber_agent.multitenant: + log_status("Upgrading wallet to anoncreds. Wait a couple seconds...") + await faber_agent.agent.admin_POST( + "/anoncreds/wallet/upgrade", + params={"wallet_name": faber_agent.agent.wallet_name}, + ) + upgraded_to_anoncreds = True + await asyncio.sleep(2.0) if faber_agent.show_timing: timing = await faber_agent.agent.fetch_timing() diff --git a/docs/design/UpgradeViaApi.md b/docs/design/UpgradeViaApi.md new file mode 100644 index 0000000000..0ddb890449 --- /dev/null +++ b/docs/design/UpgradeViaApi.md @@ -0,0 +1,103 @@ +# Upgrade via API Design + +#### To isolate an upgrade process and trigger it via API the following pattern was designed to handle multitenant scenarios. It includes an is_upgrading record in the wallet(DB) and a middleware to prevent requests during the upgrade process. + +#### The diagam below descripes the sequence of events for the anoncreds upgrade process which it was designed for, but the architecture can be used for any upgrade process. + +```mermaid +sequenceDiagram + participant A1 as Agent 1 + participant M1 as Middleware + participant IAS1 as IsAnoncredsSingleton Set + participant UIPS1 as UpgradeInProgressSingleton Set + participant W as Wallet (DB) + participant UIPS2 as UpgradeInProgressSingleton Set + participant IAS2 as IsAnoncredsSingleton Set + participant M2 as Middleware + participant A2 as Agent 2 + + Note over A1,A2: Start upgrade for non-anoncreds wallet + A1->>M1: POST /anoncreds/wallet/upgrade + M1-->>IAS1: check if wallet is in set + IAS1-->>M1: wallet is not in set + M1-->>UIPS1: check if wallet is in set + UIPS1-->>M1: wallet is not in set + M1->>A1: OK + A1-->>W: Add is_upgrading = anoncreds_in_progress record + A1->>A1: Upgrade wallet + A1-->>UIPS1: Add wallet to set + + Note over A1,A2: Attempted Requests During Upgrade + + Note over A1: Attempted Request + A1->>M1: GET /any-endpoint + M1-->>IAS1: check if wallet is in set + IAS1-->>M1: wallet is not in set + M1-->>UIPS1: check if wallet is in set + UIPS1-->>M1: wallet is in set + M1->>A1: 503 Service Unavailable + + Note over A2: Attempted Request + A2->>M2: GET /any-endpoint + M2-->>IAS2: check if wallet is in set + IAS2->>M2: wallet is not in set + M2-->>UIPS2: check if wallet is in set + UIPS2-->>M2: wallet is not in set + A2-->>W: Query is_upgrading = anoncreds_in_progress record + W-->>A2: record = anoncreds_in_progress + A2->>A2: Loop until upgrade is finished in seperate process + A2-->>UIPS2: Add wallet to set + M2->>A2: 503 Service Unavailable + + Note over A1,A2: Agent Restart During Upgrade + A1-->>W: Get is_upgrading record for wallet or all subwallets + W-->>A1: + A1->>A1: Resume upgrade if in progress + A1-->>UIPS1: Add wallet to set + + Note over A2: Same as Agent 1 + + Note over A1,A2: Upgrade Completes + + Note over A1: Finish Upgrade + A1-->>W: set is_upgrading = anoncreds_finished + A1-->>UIPS1: Remove wallet from set + A1-->>IAS1: Add wallet to set + A1->>A1: update subwallet or restart + + Note over A2: Detect Upgrade Complete + A2-->>W: Check is_upgrading = anoncreds_finished + W-->>A2: record = anoncreds_in_progress + A2->>A2: Wait 1 second + A2-->>W: Check is_upgrading = anoncreds_finished + W-->>A2: record = anoncreds_finished + A2-->>UIPS2: Remove wallet from set + A2-->>IAS2: Add wallet to set + A2->>A2: update subwallet or restart + + Note over A1,A2: Restarted Agents After Upgrade + + A1-->W: Get is_upgrading record for wallet or all subwallets + W-->>A1: + A1->>IAS1: Add wallet to set if record = anoncreds_finished + + Note over A2: Same as Agent 1 + + Note over A1,A2: Attempted Requests After Upgrade + + Note over A1: Attempted Request + A1->>M1: GET /any-endpoint + M1-->>IAS1: check if wallet is in set + IAS1-->>M1: wallet is in set + M1-->>A1: OK + + Note over A2: Same as Agent 1 +``` + + +##### An example of the implementation can be found via the anoncreds upgrade components. + - `aries_cloudagent/wallet/routes.py` in the `upgrade_anoncreds` controller + - the upgrade code in `wallet/anoncreds_upgrade.py` + - the middleware in `admin/server.py` in the `upgrade_middleware` function + - the singleton sets in `wallet/singletons.py` + - the startup process in `core/conductor.py` in the `check_for_wallet_upgrades_in_progress` function \ No newline at end of file