-
Notifications
You must be signed in to change notification settings - Fork 8
/
credential_definitions.py
132 lines (112 loc) · 4.6 KB
/
credential_definitions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import asyncio
from typing import List, Optional
from aries_cloudcontroller import AcaPyClient, CredentialDefinitionSendRequest
from app.exceptions import handle_acapy_call, handle_model_with_validation
from app.models.definitions import CreateCredentialDefinition, CredentialDefinition
from app.services.definitions.credential_definition_publisher import (
CredentialDefinitionPublisher,
)
from app.services.trust_registry.util.issuer import assert_valid_issuer
from app.util.assert_public_did import assert_public_did
from app.util.definitions import credential_definition_from_acapy
from app.util.transaction_acked import wait_for_transaction_ack
from shared.log_config import get_logger
logger = get_logger(__name__)
async def create_credential_definition(
aries_controller: AcaPyClient,
credential_definition: CreateCredentialDefinition,
support_revocation: bool,
) -> str:
"""
Create a credential definition
"""
bound_logger = logger.bind(
body={
"schema_id": credential_definition.schema_id,
"tag": credential_definition.tag,
"support_revocation": credential_definition.support_revocation,
}
)
publisher = CredentialDefinitionPublisher(
controller=aries_controller, logger=bound_logger
)
public_did = await assert_public_did(aries_controller)
await assert_valid_issuer(public_did, credential_definition.schema_id)
if support_revocation:
await publisher.check_endorser_connection()
request_body = handle_model_with_validation(
logger=logger,
model_class=CredentialDefinitionSendRequest,
schema_id=credential_definition.schema_id,
support_revocation=support_revocation,
tag=credential_definition.tag,
revocation_registry_size=32767,
)
result = await publisher.publish_credential_definition(request_body)
credential_definition_id = result.sent.credential_definition_id
if result.txn and result.txn.transaction_id:
await wait_for_transaction_ack(
aries_controller=aries_controller, transaction_id=result.txn.transaction_id
)
if support_revocation:
await publisher.wait_for_revocation_registry(credential_definition_id)
return credential_definition_id
async def get_credential_definitions(
aries_controller: AcaPyClient,
issuer_did: Optional[str] = None,
credential_definition_id: Optional[str] = None,
schema_id: Optional[str] = None,
schema_issuer_did: Optional[str] = None,
schema_name: Optional[str] = None,
schema_version: Optional[str] = None,
) -> List[CredentialDefinition]:
"""
Get credential definitions
"""
bound_logger = logger.bind(
body={
"issuer_did": issuer_did,
"credential_definition_id": credential_definition_id,
"schema_id": schema_id,
"schema_issuer_did": schema_issuer_did,
"schema_name": schema_name,
"schema_version": schema_version,
}
)
bound_logger.debug("Getting created credential definitions")
response = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.credential_definition.get_created_cred_defs,
issuer_did=issuer_did,
cred_def_id=credential_definition_id,
schema_id=schema_id,
schema_issuer_did=schema_issuer_did,
schema_name=schema_name,
schema_version=schema_version,
)
# Initiate retrieving all credential definitions
credential_definition_ids = response.credential_definition_ids or []
get_credential_definition_futures = [
handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.credential_definition.get_cred_def,
cred_def_id=credential_definition_id,
)
for credential_definition_id in credential_definition_ids
]
# Wait for completion of retrieval and transform all credential definitions
# into response model (if a credential definition was returned)
if get_credential_definition_futures:
bound_logger.debug("Getting definitions from fetched credential ids")
credential_definition_results = await asyncio.gather(
*get_credential_definition_futures
)
else:
bound_logger.debug("No definition ids returned")
credential_definition_results = []
credential_definitions = [
credential_definition_from_acapy(credential_definition.credential_definition)
for credential_definition in credential_definition_results
if credential_definition.credential_definition
]
return credential_definitions