-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathendorsement.py
153 lines (121 loc) · 4.94 KB
/
endorsement.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import asyncio
from typing import Any, Dict, Optional
from aries_cloudcontroller import AcaPyClient
from fastapi import HTTPException
from endorser.util.transaction_record import (
get_did_and_schema_id_from_cred_def_attachment,
get_endorsement_request_attachment,
is_attrib_type,
is_credential_definition_transaction,
is_revocation_def_or_entry,
)
from endorser.util.trust_registry import is_valid_issuer
from shared.log_config import get_logger
from shared.models.endorsement import Endorsement, applicable_transaction_state
logger = get_logger(__name__)
async def should_accept_endorsement(
client: AcaPyClient, endorsement: Endorsement
) -> bool:
"""Check whether a transaction endorsement request should be endorsed.
Whether the request should be accepted is based on the follow criteria:
1. The transaction is for a credential definition
2. The did is registered as an issuer in the trust registry.
3. The schema_id is registered in the trust registry.
Args:
endorsement (Endorsement): The endorsement event model
Returns:
bool: Whether the endorsement request should be accepted
"""
bound_logger = logger.bind(body=endorsement)
bound_logger.debug("Validating if endorsement transaction should be endorsed")
transaction_id = endorsement.transaction_id
bound_logger.debug("Fetching transaction with id: `{}`", transaction_id)
transaction = await client.endorse_transaction.get_transaction(
tran_id=transaction_id
)
if transaction.state != applicable_transaction_state:
bound_logger.warning(
"Endorsement event for transaction with id `{}` "
"not in state '{}' (is `{}`).",
transaction_id,
applicable_transaction_state,
transaction.state,
)
return False
attachment = get_endorsement_request_attachment(transaction)
if not attachment:
bound_logger.warning("Could not extract attachment from transaction.")
return False
operation_type = await extract_operation_type(attachment)
if not operation_type:
return False
return await check_applicable_operation_type(
client, endorsement, operation_type, attachment
)
async def extract_operation_type(attachment) -> Optional[str]:
operation = attachment.get("operation")
if not operation:
logger.debug("Key `operation` not in attachment: `{}`.", attachment)
return None
operation_type = operation.get("type")
if not operation_type:
logger.debug("Key `type` not in operation attachment.")
return None
return operation_type
async def check_applicable_operation_type(
client: AcaPyClient,
endorsement: Endorsement,
operation_type: str,
attachment: Dict[str, Any],
) -> bool:
bound_logger = logger.bind(body=endorsement)
if is_revocation_def_or_entry(operation_type):
bound_logger.debug("Endorsement request is for revocation definition or entry.")
return True
if is_attrib_type(operation_type):
bound_logger.debug("Endorsement request is for ATTRIB type.")
return True
if not is_credential_definition_transaction(operation_type, attachment):
bound_logger.warning("Endorsement request is not for credential definition.")
return False
# Here, endorsement request is for credential definition
did, schema_id = await get_did_and_schema_id_from_cred_def_attachment(
client, attachment
)
return await retry_is_valid_issuer(did, schema_id, endorsement)
async def retry_is_valid_issuer(
did: str,
schema_id: str,
endorsement: Endorsement,
max_retries: int = 5,
retry_delay: float = 1.0,
) -> bool:
bound_logger = logger.bind(body=endorsement)
for attempt in range(max_retries):
try:
valid_issuer = await is_valid_issuer(did, schema_id)
if not valid_issuer:
bound_logger.warning(
"Endorsement request with transaction id `{}` is not for did "
"and schema registered in the trust registry.",
endorsement.transaction_id,
)
return False
return True
except HTTPException as e:
bound_logger.error(
"Attempt {}: Exception caught when asserting valid issuer: {}",
attempt + 1,
e,
)
if attempt < max_retries - 1:
bound_logger.warning("Retrying in {}s ...", retry_delay)
await asyncio.sleep(retry_delay)
else:
bound_logger.error("Max retries reached. Giving up.")
return False
async def accept_endorsement(client: AcaPyClient, endorsement: Endorsement) -> None:
logger.debug("Endorsing transaction with id: `{}`", endorsement.transaction_id)
await client.endorse_transaction.endorse_transaction(
tran_id=endorsement.transaction_id
)