-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create a service for definitions endpoints (#842)
* move to service * move to service and call service based on tenant * move to service , call create from service * move to service and call service here * imports * moved create schema logic here * service for tenants to get schemas * service for governance to get schemas * from schema_ids get full schemas * moved bulk of create cred_def logic here * moved get cred_def logic here * import dataclass for service dependencies * add service dependencies dataclass * add schema publisher class * add class to publish schema to trust registry * refactor create schema service * add helper class cred def publisher * refactor create cred def service * move to helper class * formatting * rework logging * redo logging not passing logger down * update names * move to definitions folder * move to cred_def publisher class to its own module * move schema publisher class to own module * make assert public did a util function * add check endorser connection function * add wait for transaction acked util function * assert governance is calling function * formatting * add state to get connection connection must be completed * formatting * formatting * update doc strings * fix indentation * add return types * reorder logic * add return types * add return types * handle exception if not governance * call register schema directly * raise exception if not governance role * formatting * update imports * formatting * formatting * removed function calling it directly * update tests * Refactor trust registry client code for better error handling and consistency * formatting * fix status code on exception * formatting * raise from error * formatting * add default values * add logs * move to service * update imports * not used removed * update function names * remove unused imports * move to schema pub class add check for governance agent * removed arg build in func * update arg and type use new field names * update imports * move error handling here * remove unused imports * 🎨 init file * 🚚 move schema specific methods to own module * 🎨 rename module for clarity * 🎨 update imports and method references * 🎨 rearrange if condition * 🎨 fit in max lines --------- Co-authored-by: ff137 <ff137@proton.me> Co-authored-by: cl0ete <cloete.dupreez@gmail.com>
- Loading branch information
Showing
14 changed files
with
711 additions
and
428 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
74 changes: 74 additions & 0 deletions
74
app/services/definitions/credential_definition_publisher.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
import asyncio | ||
from logging import Logger | ||
|
||
from aries_cloudcontroller import AcaPyClient | ||
|
||
from app.exceptions import CloudApiException, handle_acapy_call | ||
from app.services.revocation_registry import wait_for_active_registry | ||
from app.util.check_endorser_connection import check_endorser_connection | ||
from shared import REGISTRY_CREATION_TIMEOUT | ||
|
||
|
||
class CredentialDefinitionPublisher: | ||
def __init__(self, controller: AcaPyClient, logger: Logger): | ||
self._logger = logger | ||
self._controller = controller | ||
|
||
async def check_endorser_connection(self): | ||
has_connections = await check_endorser_connection( | ||
aries_controller=self._controller | ||
) | ||
|
||
if not has_connections: | ||
self._logger.error( | ||
"Failed to create credential definition supporting revocation: " | ||
"no endorser connection found. Issuer attempted to create a credential " | ||
"definition with support for revocation but does not have an active " | ||
"connection with an endorser, which is required for this operation." | ||
) | ||
raise CloudApiException( | ||
"Credential definition creation failed: An active endorser connection " | ||
"is required to support revocation. Please establish a connection with " | ||
"an endorser and try again." | ||
) | ||
|
||
async def publish_credential_definition(self, request_body): | ||
try: | ||
result = await handle_acapy_call( | ||
logger=self._logger, | ||
acapy_call=self._controller.credential_definition.publish_cred_def, | ||
body=request_body, | ||
) | ||
except CloudApiException as e: | ||
self._logger.warning( | ||
"An Exception was caught while publishing cred def: `{}` `{}`", | ||
e.detail, | ||
e.status_code, | ||
) | ||
if "already exists" in e.detail: | ||
self._logger.info("Credential definition already exists") | ||
raise CloudApiException(status_code=409, detail=e.detail) from e | ||
else: | ||
self._logger.error( | ||
"Error while creating credential definition: `{}`", e.detail | ||
) | ||
raise CloudApiException( | ||
detail=f"Error while creating credential definition: {e.detail}", | ||
status_code=e.status_code, | ||
) from e | ||
|
||
return result | ||
|
||
async def wait_for_revocation_registry(self, credential_definition_id): | ||
try: | ||
self._logger.debug("Waiting for revocation registry creation") | ||
await asyncio.wait_for( | ||
wait_for_active_registry(self._controller, credential_definition_id), | ||
timeout=REGISTRY_CREATION_TIMEOUT, | ||
) | ||
except asyncio.TimeoutError as e: | ||
self._logger.error("Timeout waiting for revocation registry creation.") | ||
raise CloudApiException( | ||
"Timeout waiting for revocation registry creation.", | ||
504, | ||
) from e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
import asyncio | ||
from typing import List, Optional | ||
|
||
from aries_cloudcontroller import AcaPyClient, CredentialDefinitionSendRequest | ||
|
||
from app.exceptions import handle_acapy_call, handle_model_with_validation | ||
from app.models.definitions import CreateCredentialDefinition, CredentialDefinition | ||
from app.services.definitions.credential_definition_publisher import ( | ||
CredentialDefinitionPublisher, | ||
) | ||
from app.services.trust_registry.util.issuer import assert_valid_issuer | ||
from app.util.assert_public_did import assert_public_did | ||
from app.util.definitions import credential_definition_from_acapy | ||
from app.util.transaction_acked import wait_for_transaction_ack | ||
from shared.log_config import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
async def create_credential_definition( | ||
aries_controller: AcaPyClient, | ||
credential_definition: CreateCredentialDefinition, | ||
support_revocation: bool, | ||
) -> str: | ||
""" | ||
Create a credential definition | ||
""" | ||
bound_logger = logger.bind( | ||
body={ | ||
"schema_id": credential_definition.schema_id, | ||
"tag": credential_definition.tag, | ||
"support_revocation": credential_definition.support_revocation, | ||
} | ||
) | ||
publisher = CredentialDefinitionPublisher( | ||
controller=aries_controller, logger=bound_logger | ||
) | ||
|
||
public_did = await assert_public_did(aries_controller) | ||
|
||
await assert_valid_issuer(public_did, credential_definition.schema_id) | ||
|
||
if support_revocation: | ||
await publisher.check_endorser_connection() | ||
|
||
request_body = handle_model_with_validation( | ||
logger=logger, | ||
model_class=CredentialDefinitionSendRequest, | ||
schema_id=credential_definition.schema_id, | ||
support_revocation=support_revocation, | ||
tag=credential_definition.tag, | ||
revocation_registry_size=32767, | ||
) | ||
|
||
result = await publisher.publish_credential_definition(request_body) | ||
credential_definition_id = result.sent.credential_definition_id | ||
|
||
if result.txn and result.txn.transaction_id: | ||
await wait_for_transaction_ack( | ||
aries_controller=aries_controller, transaction_id=result.txn.transaction_id | ||
) | ||
|
||
if support_revocation: | ||
await publisher.wait_for_revocation_registry(credential_definition_id) | ||
|
||
return credential_definition_id | ||
|
||
|
||
async def get_credential_definitions( | ||
aries_controller: AcaPyClient, | ||
issuer_did: Optional[str] = None, | ||
credential_definition_id: Optional[str] = None, | ||
schema_id: Optional[str] = None, | ||
schema_issuer_did: Optional[str] = None, | ||
schema_name: Optional[str] = None, | ||
schema_version: Optional[str] = None, | ||
) -> List[CredentialDefinition]: | ||
""" | ||
Get credential definitions | ||
""" | ||
bound_logger = logger.bind( | ||
body={ | ||
"issuer_did": issuer_did, | ||
"credential_definition_id": credential_definition_id, | ||
"schema_id": schema_id, | ||
"schema_issuer_did": schema_issuer_did, | ||
"schema_name": schema_name, | ||
"schema_version": schema_version, | ||
} | ||
) | ||
bound_logger.debug("Getting created credential definitions") | ||
|
||
response = await handle_acapy_call( | ||
logger=bound_logger, | ||
acapy_call=aries_controller.credential_definition.get_created_cred_defs, | ||
issuer_did=issuer_did, | ||
cred_def_id=credential_definition_id, | ||
schema_id=schema_id, | ||
schema_issuer_did=schema_issuer_did, | ||
schema_name=schema_name, | ||
schema_version=schema_version, | ||
) | ||
|
||
# Initiate retrieving all credential definitions | ||
credential_definition_ids = response.credential_definition_ids or [] | ||
get_credential_definition_futures = [ | ||
handle_acapy_call( | ||
logger=bound_logger, | ||
acapy_call=aries_controller.credential_definition.get_cred_def, | ||
cred_def_id=credential_definition_id, | ||
) | ||
for credential_definition_id in credential_definition_ids | ||
] | ||
|
||
# Wait for completion of retrieval and transform all credential definitions | ||
# into response model (if a credential definition was returned) | ||
if get_credential_definition_futures: | ||
bound_logger.debug("Getting definitions from fetched credential ids") | ||
credential_definition_results = await asyncio.gather( | ||
*get_credential_definition_futures | ||
) | ||
else: | ||
bound_logger.debug("No definition ids returned") | ||
credential_definition_results = [] | ||
|
||
credential_definitions = [ | ||
credential_definition_from_acapy(credential_definition.credential_definition) | ||
for credential_definition in credential_definition_results | ||
if credential_definition.credential_definition | ||
] | ||
|
||
return credential_definitions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
from logging import Logger | ||
from typing import List | ||
|
||
from aries_cloudcontroller import ( | ||
AcaPyClient, | ||
SchemaGetResult, | ||
SchemaSendRequest, | ||
TxnOrSchemaSendResult, | ||
) | ||
|
||
from app.exceptions import CloudApiException, handle_acapy_call | ||
from app.models.definitions import CredentialSchema | ||
from app.services.trust_registry.schemas import register_schema | ||
from app.util.definitions import credential_schema_from_acapy | ||
|
||
|
||
class SchemaPublisher: | ||
def __init__(self, controller: AcaPyClient, logger: Logger): | ||
self._logger = logger | ||
self._controller = controller | ||
|
||
async def publish_schema( | ||
self, schema_request: SchemaSendRequest | ||
) -> TxnOrSchemaSendResult: | ||
try: | ||
result = await handle_acapy_call( | ||
logger=self._logger, | ||
acapy_call=self._controller.schema.publish_schema, | ||
body=schema_request, | ||
create_transaction_for_endorser=False, | ||
) | ||
except CloudApiException as e: | ||
if "already exist" in e.detail and e.status_code == 400: | ||
result = await self._handle_existing_schema(schema_request) | ||
else: | ||
self._logger.warning( | ||
"An unhandled Exception was caught while publishing schema: {}", | ||
e.detail, | ||
) | ||
raise CloudApiException("Error while creating schema.") from e | ||
|
||
if result.sent and result.sent.schema_id: | ||
await register_schema(schema_id=result.sent.schema_id) | ||
else: | ||
self._logger.error("No SchemaSendResult in `publish_schema` response.") | ||
raise CloudApiException( | ||
"An unexpected error occurred: could not publish schema." | ||
) | ||
return result | ||
|
||
async def _handle_existing_schema( | ||
self, schema: SchemaSendRequest | ||
) -> CredentialSchema: | ||
self._logger.info("Handling case of schema already existing on ledger") | ||
self._logger.debug("Fetching public DID for governance controller") | ||
pub_did = await handle_acapy_call( | ||
logger=self._logger, | ||
acapy_call=self._controller.wallet.get_public_did, | ||
) | ||
|
||
_schema_id = ( | ||
f"{pub_did.result.did}:2:{schema.schema_name}:{schema.schema_version}" | ||
) | ||
self._logger.debug( | ||
"Fetching schema id `{}` which is associated with request", | ||
_schema_id, | ||
) | ||
|
||
_schema: SchemaGetResult = await handle_acapy_call( | ||
logger=self._logger, | ||
acapy_call=self._controller.schema.get_schema, | ||
schema_id=_schema_id, | ||
) | ||
|
||
# Edge case where the governance agent has changed its public did | ||
# Then we need to retrieve the schema in a different way as constructing | ||
# the schema ID the way above will not be correct due to different public did. | ||
if _schema.var_schema is None: | ||
self._logger.debug( | ||
"Schema not found. Governance agent may have changed public DID. " | ||
"Fetching schemas created by governance with requested name and version" | ||
) | ||
schemas_created_ids = await handle_acapy_call( | ||
logger=self._logger, | ||
acapy_call=self._controller.schema.get_created_schemas, | ||
schema_name=schema.schema_name, | ||
schema_version=schema.schema_version, | ||
) | ||
self._logger.debug("Getting schemas associated with fetched ids") | ||
schemas: List[SchemaGetResult] = [ | ||
await handle_acapy_call( | ||
logger=self._logger, | ||
acapy_call=self._controller.schema.get_schema, | ||
schema_id=schema_id, | ||
) | ||
for schema_id in schemas_created_ids.schema_ids | ||
if schema_id | ||
] | ||
|
||
if not schemas: | ||
raise CloudApiException("Could not publish schema.", 500) | ||
if len(schemas) > 1: | ||
error_message = ( | ||
f"Multiple schemas with name {schema.schema_name} " | ||
f"and version {schema.schema_version} exist." | ||
f"These are: `{str(schemas_created_ids.schema_ids)}`." | ||
) | ||
raise CloudApiException(error_message, 409) | ||
self._logger.debug("Using updated schema id with new DID") | ||
_schema: SchemaGetResult = schemas[0] | ||
|
||
# Schema exists with different attributes | ||
if set(_schema.var_schema.attr_names) != set(schema.attributes): | ||
error_message = ( | ||
"Error creating schema: Schema already exists with different attribute " | ||
f"names. Given: `{str(set(schema.attributes))}`. " | ||
f"Found: `{str(set(_schema.var_schema.attr_names))}`." | ||
) | ||
raise CloudApiException(error_message, 409) | ||
|
||
result = credential_schema_from_acapy(_schema.var_schema) | ||
self._logger.info( | ||
"Schema already exists on ledger. Returning schema definition: `{}`.", | ||
result, | ||
) | ||
return result |
Oops, something went wrong.