-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathacapy_verifier_v1.py
258 lines (230 loc) · 10.6 KB
/
acapy_verifier_v1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
from aries_cloudcontroller import (
AcaPyClient,
IndyPresSpec,
V10PresentationCreateRequestRequest,
V10PresentationProblemReportRequest,
V10PresentationSendRequest,
V10PresentationSendRequestRequest,
)
from app.exceptions import CloudApiException
from app.models.verifier import (
AcceptProofRequest,
CreateProofRequest,
ProofRequestType,
RejectProofRequest,
SendProofRequest,
)
from app.services.verifier.acapy_verifier import Verifier
from shared.log_config import get_logger
from shared.models.conversion import presentation_record_to_model as record_to_model
from shared.models.protocol import pres_id_no_version
from shared.models.webhook_topics import PresentationExchange
logger = get_logger(__name__)
class VerifierV1(Verifier):
@classmethod
async def create_proof_request(
cls,
controller: AcaPyClient,
create_proof_request: CreateProofRequest,
) -> PresentationExchange:
if create_proof_request.type != ProofRequestType.INDY:
raise CloudApiException(
f"Only Indy credential types are supported in v1. Requested type: {create_proof_request.type}",
status_code=400,
)
bound_logger = logger.bind(body=create_proof_request)
bound_logger.debug("Creating v1 proof request")
auto_remove = not create_proof_request.save_exchange_record
try:
presentation_exchange = (
await controller.present_proof_v1_0.create_proof_request(
body=V10PresentationCreateRequestRequest(
auto_remove=auto_remove,
proof_request=create_proof_request.indy_proof_request,
auto_verify=create_proof_request.auto_verify,
comment=create_proof_request.comment,
trace=create_proof_request.trace,
)
)
)
bound_logger.debug("Returning v1 PresentationExchange.")
return record_to_model(presentation_exchange)
except Exception as e:
bound_logger.exception(
"An exception occurred while creating presentation request."
)
raise CloudApiException(
f"Failed to create presentation request: {e}."
) from e
@classmethod
async def send_proof_request(
cls,
controller: AcaPyClient,
send_proof_request: SendProofRequest,
) -> PresentationExchange:
if send_proof_request.type != ProofRequestType.INDY:
raise CloudApiException(
f"Only Indy credential types are supported in v1. Requested type: {send_proof_request.type}",
status_code=400,
)
bound_logger = logger.bind(body=send_proof_request)
auto_remove = not send_proof_request.save_exchange_record
try:
bound_logger.debug("Send free v1 presentation request")
presentation_exchange = (
await controller.present_proof_v1_0.send_request_free(
body=V10PresentationSendRequestRequest(
auto_remove=auto_remove,
connection_id=send_proof_request.connection_id,
proof_request=send_proof_request.indy_proof_request,
auto_verify=send_proof_request.auto_verify,
comment=send_proof_request.comment,
trace=send_proof_request.trace,
)
)
)
result = record_to_model(presentation_exchange)
except Exception as e:
bound_logger.exception(
"An exception occurred while sending presentation request."
)
raise CloudApiException(f"Failed to send presentation request: {e}.") from e
if result:
bound_logger.debug("Successfully sent v1 presentation request.")
else:
bound_logger.warning("No result from sending v1 presentation request.")
return result
@classmethod
async def accept_proof_request(
cls, controller: AcaPyClient, accept_proof_request: AcceptProofRequest
) -> PresentationExchange:
if accept_proof_request.type != ProofRequestType.INDY:
raise CloudApiException(
f"Only Indy credential types are supported in v1. Requested type: {accept_proof_request.type}",
status_code=400,
)
bound_logger = logger.bind(body=accept_proof_request)
proof_id = pres_id_no_version(proof_id=accept_proof_request.proof_id)
auto_remove = not accept_proof_request.save_exchange_record
try:
bound_logger.debug("Send v1 proof presentation")
indy_pres_spec = accept_proof_request.indy_presentation_spec
v10_pres_send_req = V10PresentationSendRequest(
auto_remove=auto_remove,
requested_attributes=indy_pres_spec.requested_attributes,
requested_predicates=indy_pres_spec.requested_predicates,
self_attested_attributes=indy_pres_spec.self_attested_attributes,
trace=indy_pres_spec.trace,
)
presentation_record = await controller.present_proof_v1_0.send_presentation(
pres_ex_id=proof_id,
body=v10_pres_send_req,
)
result = record_to_model(presentation_record)
except Exception as e:
bound_logger.exception(
"An exception occurred while sending a proof presentation."
)
raise CloudApiException(f"Failed to send proof presentation: {e}.") from e
if result:
bound_logger.debug("Successfully sent v1 proof presentation.")
else:
bound_logger.warning("No result from sending v1 proof presentation.")
return result
@classmethod
async def reject_proof_request(
cls, controller: AcaPyClient, reject_proof_request: RejectProofRequest
) -> None:
bound_logger = logger.bind(body=reject_proof_request)
bound_logger.info("Request to reject v1 presentation exchange record")
proof_id = pres_id_no_version(proof_id=reject_proof_request.proof_id)
# Report problem if desired
if reject_proof_request.problem_report:
try:
bound_logger.debug("Submitting v1 problem report")
await controller.present_proof_v1_0.report_problem(
pres_ex_id=proof_id,
body=V10PresentationProblemReportRequest(
description=reject_proof_request.problem_report
),
)
except Exception as e:
bound_logger.exception("An exception occurred while reporting problem.")
raise CloudApiException(f"Failed to report problem: {e}.") from e
try:
bound_logger.debug("Deleting v1 presentation exchange record")
await controller.present_proof_v1_0.delete_record(pres_ex_id=proof_id)
except Exception as e:
bound_logger.exception("An exception occurred while deleting record.")
raise CloudApiException(f"Failed to delete record: {e}.") from e
bound_logger.info("Successfully rejected v1 presentation exchange record.")
@classmethod
async def get_proof_records(cls, controller: AcaPyClient):
try:
logger.debug("Fetching v1 present-proof exchange records")
presentation_exchange = await controller.present_proof_v1_0.get_records()
result = [
record_to_model(rec) for rec in presentation_exchange.results or []
]
except Exception as e:
logger.exception("An exception occurred while getting records.")
raise CloudApiException(f"Failed to get proof records: {e}.") from e
if result:
logger.debug("Successfully got v1 present-proof records.")
else:
logger.info("No v1 present-proof records obtained.")
return result
@classmethod
async def get_proof_record(cls, controller: AcaPyClient, proof_id: str):
bound_logger = logger.bind(body={"proof_id": proof_id})
pres_ex_id = pres_id_no_version(proof_id)
try:
bound_logger.debug("Fetching single v1 present-proof exchange record")
presentation_exchange = await controller.present_proof_v1_0.get_record(
pres_ex_id=pres_ex_id
)
result = record_to_model(presentation_exchange)
except Exception as e:
bound_logger.exception("An exception occurred while getting record.")
raise CloudApiException(
f"Failed to get proof record with proof id `{proof_id}`: {e}."
) from e
if result:
bound_logger.debug("Successfully got v1 present-proof record.")
else:
bound_logger.info("No v1 present-proof record obtained.")
return result
@classmethod
async def delete_proof(cls, controller: AcaPyClient, proof_id: str):
bound_logger = logger.bind(body={"proof_id": proof_id})
pres_ex_id = pres_id_no_version(proof_id=proof_id)
try:
bound_logger.debug("Deleting v1 present-proof exchange record")
await controller.present_proof_v1_0.delete_record(pres_ex_id=pres_ex_id)
except Exception as e:
bound_logger.exception("An exception occurred while deleting record.")
raise CloudApiException(
f"Failed to delete record with proof id `{proof_id}`: {e}."
) from e
bound_logger.debug("Successfully deleted v1 present-proof record.")
@classmethod
async def get_credentials_by_proof_id(cls, controller: AcaPyClient, proof_id: str):
bound_logger = logger.bind(body={"proof_id": proof_id})
pres_ex_id = pres_id_no_version(proof_id=proof_id)
try:
bound_logger.debug("Getting v1 matching credentials from proof id")
result = await controller.present_proof_v1_0.get_matching_credentials(
pres_ex_id=pres_ex_id
)
except Exception as e:
bound_logger.exception(
"An exception occurred while getting matching credentials."
)
raise CloudApiException(
f"Failed to get credentials with proof id `{proof_id}`: {e}."
) from e
if result:
bound_logger.debug("Successfully got matching v1 credentials.")
else:
bound_logger.debug("No matching v1 credentials obtained.")
return result