-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathrevocation_registry.py
427 lines (364 loc) · 14.9 KB
/
revocation_registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
from typing import Optional
from aries_cloudcontroller import (
AcaPyClient,
ApiException,
CredRevRecordResult,
IssuerCredRevRecord,
IssuerRevRegRecord,
RevokeRequest,
RevRegCreateRequest,
RevRegResult,
TransactionRecord,
TxnOrRevRegResult,
)
from app.dependencies.acapy_clients import get_governance_controller
from app.event_handling.sse_listener import SseListener
from app.exceptions import CloudApiException
from shared.log_config import get_logger
logger = get_logger(__name__)
async def create_revocation_registry(
controller: AcaPyClient, credential_definition_id: str, max_cred_num: int = 32767
) -> IssuerRevRegRecord:
"""
Create a new revocation registry
This should be called whenever a new credential definition is created.
Args:
controller (AcaPyClient): aca-py client
credential_definition_id (str): The credential definition ID.
max_cred_num (int): The maximum number of credentials to be stored by the registry.
Default = 32768 (i.e. max is 32768)
Raises:
Exception: When the credential definition is not found or the revocation registry could not be created.
Returns:
result (IssuerRevRegRecord): The revocation registry record.
"""
bound_logger = logger.bind(
body={
"credential_definition_id": credential_definition_id,
"max_cred_num": max_cred_num,
}
)
bound_logger.info("Creating a new revocation registry for a credential definition")
result = await controller.revocation.create_registry(
body=RevRegCreateRequest(
credential_definition_id=credential_definition_id, max_cred_num=max_cred_num
)
)
if not result:
bound_logger.error("Error creating revocation registry.")
raise CloudApiException(
f"Error creating revocation registry for credential with ID `{credential_definition_id}`."
)
bound_logger.info("Successfully created revocation registry.")
return result.result
async def get_active_revocation_registry_for_credential(
controller: AcaPyClient, credential_definition_id: str
) -> IssuerRevRegRecord:
"""
Get the active revocation registry for a credential
Args:
controller (AcaPyClient): aca-py client
credential_definition_id (str): The credential definition ID.
Raises:
Exception: When the active revocation registry cannot be retrieved.
Returns:
result (IssuerRevRegRecord): The revocation registry record.
"""
bound_logger = logger.bind(
body={"credential_definition_id": credential_definition_id}
)
bound_logger.info("Fetching activate revocation registry for a credential")
result = await controller.revocation.get_active_registry_for_cred_def(
cred_def_id=credential_definition_id
)
if not isinstance(result, RevRegResult):
bound_logger.error(
"Unexpected type returned from get_active_registry_for_cred_def: `{}`.",
result,
)
raise CloudApiException(
f"Error retrieving revocation registry for credential with ID `{credential_definition_id}`."
)
bound_logger.info(
"Successfully retrieved revocation registry for credential definition."
)
return result.result
async def get_credential_revocation_status(
controller: AcaPyClient, credential_exchange_id: str
) -> IssuerCredRevRecord:
"""
Get the revocation status for a credential
Args:
controller (AcaPyClient): aca-py client
credential_exchange_id (str): The credential exchange ID.
Raises:
Exception: When the active revocation registry cannot be retrieved.
Returns:
IssuerCredRevRecord: The revocation registry record.
"""
bound_logger = logger.bind(body={"credential_exchange_id": credential_exchange_id})
bound_logger.info("Fetching the revocation status for a credential exchange")
result = await controller.revocation.get_revocation_status(
cred_ex_id=credential_exchange_id
)
if not isinstance(result, CredRevRecordResult):
bound_logger.error(
"Unexpected type returned from get_revocation_status: `{}`.", result
)
raise CloudApiException(
f"Error retrieving revocation status for credential exchange ID `{credential_exchange_id}`."
)
else:
result = result.result
bound_logger.info("Successfully retrieved revocation status.")
return result
async def publish_revocation_registry_on_ledger(
controller: AcaPyClient,
revocation_registry_id: str,
connection_id: Optional[str] = None,
create_transaction_for_endorser: bool = False,
) -> TransactionRecord:
"""
Publish a created revocation registry to the ledger
Args:
controller (AcaPyClient): aca-py client
revocation_registry_id (str): The revocation registry ID.
connection_id (Optional[str]): The connection ID of author to endorser.
create_transaction_for_endorser (bool): Whether to create a transaction
record to for the endorser to be endorsed.
Raises:
Exception: When the revocation registry could not be published.
Returns:
result TransactionRecord: The transaction record or the Revocation Register Result.
"""
bound_logger = logger.bind(
body={
"revocation_registry_id": revocation_registry_id,
"connection_id": connection_id,
"create_transaction_for_endorser": create_transaction_for_endorser,
}
)
bound_logger.info("Publishing revocation registry to the ledger")
txn_or_rev_reg_result = await controller.revocation.publish_rev_reg_def(
rev_reg_id=revocation_registry_id,
conn_id=connection_id if create_transaction_for_endorser else None,
create_transaction_for_endorser=create_transaction_for_endorser,
)
if isinstance(txn_or_rev_reg_result, RevRegResult):
result = txn_or_rev_reg_result.result
elif (
isinstance(txn_or_rev_reg_result, TxnOrRevRegResult)
and txn_or_rev_reg_result.txn
):
result = txn_or_rev_reg_result.txn
else:
bound_logger.error(
"Unexpected type returned from publish_rev_reg_def: `{}`.",
txn_or_rev_reg_result,
)
raise CloudApiException("Failed to publish revocation registry to ledger.")
bound_logger.info("Successfully published revocation registry to ledger.")
return result
async def publish_revocation_entry_to_ledger(
controller: AcaPyClient,
revocation_registry_id: Optional[str] = None,
credential_definition_id: Optional[str] = None,
connection_id: Optional[str] = None,
create_transaction_for_endorser: Optional[bool] = False,
) -> IssuerRevRegRecord:
"""
Publish a created revocation entry to the ledger
Args:
controller (AcaPyClient): aca-py client
credential_definition_id (str): The credential definition ID.
revocation_registry_id (str): The revocation registry ID.
Default is None
connection_id (str): The connection ID of author to endorser.
Default is None
create_transaction_for_endorser (bool): Whether to create a transaction
record to for the endorser to be endorsed.
Default is False
Raises:
Exception: When the revocation registry entry could not be published.
Returns:
result (IssuerRevRegRecord): The revocation registry record.
"""
bound_logger = logger.bind(
body={
"revocation_registry_id": revocation_registry_id,
"credential_definition_id": credential_definition_id,
"connection_id": connection_id,
"create_transaction_for_endorser": create_transaction_for_endorser,
}
)
bound_logger.info("Publishing revocation entry to the ledger")
if not revocation_registry_id and not credential_definition_id:
bound_logger.info(
"Bad request: one of `revocation_registry_id` or `credential_definition_id` must be given"
)
raise CloudApiException(
"Invalid request. Please provide either a 'revocation registry id' or a 'credential definition id'.",
400,
)
if not revocation_registry_id:
bound_logger.debug("Fetching active revocation registry for credential")
revocation_registry_id = await get_active_revocation_registry_for_credential(
controller=controller, credential_definition_id=credential_definition_id
)
try:
bound_logger.debug("Publishing revocation entry")
result = await controller.revocation.publish_rev_reg_entry(
rev_reg_id=revocation_registry_id,
conn_id=connection_id if create_transaction_for_endorser else None,
create_transaction_for_endorser=create_transaction_for_endorser,
)
except Exception as e:
bound_logger.exception("An unexpected exception occurred.")
return e
if not isinstance(result, RevRegResult):
bound_logger.error(
"Unexpected type returned from publish_rev_reg_entry: `{}`.", result
)
raise CloudApiException("Failed to publish revocation entry to ledger.")
bound_logger.info("Successfully published revocation entry to ledger.")
return result.result
async def revoke_credential(
controller: AcaPyClient,
credential_exchange_id: str,
credential_definition_id: str = None,
auto_publish_to_ledger: bool = False,
) -> None:
"""
Revoke an issued credential
Args:
controller (AcaPyClient): aca-py client
credential_exchange_id (str): The credential exchange ID.
credential_definition_id (str): The credential definition ID.
auto_publish_to_ledger (bool): Whether to directly publish the revocation to the ledger.
This should only be true when invoked by an endorser.
Default is False
Raises:
Exception: When the credential could not be revoked
Returns:
result (None): Successful execution returns None.
"""
bound_logger = logger.bind(
body={
"credential_exchange_id": credential_exchange_id,
"credential_definition_id": credential_definition_id,
"auto_publish_to_ledger": auto_publish_to_ledger,
}
)
bound_logger.info("Revoking an issued credential")
try:
await controller.revocation.revoke_credential(
body=RevokeRequest(
cred_ex_id=credential_exchange_id,
publish=auto_publish_to_ledger,
)
)
except ApiException as e:
bound_logger.info(
"An ApiException was caught while revoking credential. The error message is: '{}'.",
e.reason,
)
raise CloudApiException(
f"Failed to revoke credential: {e.reason}.", e.status
) from e
if not auto_publish_to_ledger:
try:
rev_reg_record = await get_active_revocation_registry_for_credential(
controller=controller,
credential_definition_id=credential_definition_id,
)
await publish_revocation_entry_to_ledger(
controller=controller,
revocation_registry_id=rev_reg_record.revoc_reg_id,
create_transaction_for_endorser=True,
)
except CloudApiException as e:
if e.status_code == 400:
bound_logger.info(
"Bad request: Cannot publish revocation entry to ledger: {}",
e.detail,
)
else:
bound_logger.error(e.detail)
raise e
except Exception as e:
bound_logger.exception("Exception caught when revoking credential.")
raise e
await endorser_revoke()
bound_logger.info("Successfully revoked credential.")
async def endorser_revoke():
listener = SseListener(topic="endorsements", wallet_id="admin")
try:
logger.debug("Waiting for endorsements event in `request-received` state")
txn_record = await listener.wait_for_state(desired_state="request-received")
except TimeoutError as e:
logger.error("Waiting for an endorsement event has timed out.")
raise CloudApiException(
"Timeout occurred while waiting to retrieve transaction record for endorser.",
504,
) from e
async with get_governance_controller() as endorser_controller:
logger.info("Endorsing what is presumed to be a revocation transaction")
await endorser_controller.endorse_transaction.endorse_transaction(
tran_id=txn_record["transaction_id"]
)
logger.info("Successfully endorsed transaction of revocation.")
async def get_credential_definition_id_from_exchange_id(
controller: AcaPyClient, credential_exchange_id: str
) -> Optional[str]:
"""
Get the credential definition id from the credential exchange id.
Args:
controller (AcaPyClient): aca-py client
credential_exchange_id (RevokeRequest): The credential exchange ID.
Returns:
credential_definition_id (Optional[str]): The credential definition ID or None.
"""
bound_logger = logger.bind(body={"credential_exchange_id": credential_exchange_id})
bound_logger.info("Fetching credential definition id from exchange id")
try:
credential_definition_id = (
await controller.issue_credential_v1_0.get_record(
cred_ex_id=credential_exchange_id
)
).credential_definition_id
except ApiException as err1:
bound_logger.info(
"An ApiException was caught while getting v1 record. The error message is: '{}'",
err1.reason,
)
try:
bound_logger.info("Trying to get v2 records")
rev_reg_parts = (
await controller.issue_credential_v2_0.get_record(
cred_ex_id=credential_exchange_id
)
).indy.rev_reg_id.split(":")
credential_definition_id = ":".join(
[
rev_reg_parts[2],
"3",
"CL", # NOTE: Potentially replace this with other possible signature type in future
rev_reg_parts[-4],
rev_reg_parts[-1],
]
)
except ApiException as err2:
bound_logger.info(
"An ApiException was caught while getting v2 record. The error message is: '{}'",
err2.reason,
)
return
except Exception:
bound_logger.exception(
"Exception caught when getting v2 records for cred ex id."
)
return
bound_logger.info(
"Successfully obtained cred definition id from the cred exchange id."
)
return credential_definition_id