-
Notifications
You must be signed in to change notification settings - Fork 8
/
sd_jws.py
101 lines (86 loc) · 3.32 KB
/
sd_jws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from aries_cloudcontroller import SDJWSCreate, SDJWSVerify
from fastapi import APIRouter, Depends
from pydantic import ValidationError
from app.dependencies.acapy_clients import client_from_auth
from app.dependencies.auth import AcaPyAuth, acapy_auth_from_header
from app.exceptions import CloudApiException, handle_acapy_call
from app.models.sd_jws import (
SDJWSCreateRequest,
SDJWSCreateResponse,
SDJWSVerifyRequest,
SDJWSVerifyResponse,
)
from app.util.extract_validation_error import extract_validation_error_msg
from shared.log_config import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/v1/wallet/sd-jws", tags=["wallet"])
@router.post(
"/sign",
response_model=SDJWSCreateResponse,
summary="Sign SD-JWS",
description="""
Sign Select Disclosure for JWS (SD-JWS)
See https://www.ietf.org/archive/id/draft-ietf-oauth-selective-disclosure-jwt-07.html for the SD-JWT / SD-JWS spec.
""",
)
async def sign_sd_jws(
body: SDJWSCreateRequest,
auth: AcaPyAuth = Depends(acapy_auth_from_header),
) -> SDJWSCreateResponse:
bound_logger = logger.bind(
# Do not log payload:
body=body.model_dump(exclude="payload")
)
bound_logger.info("POST request received: Sign SD-JWS")
try:
sign_request = SDJWSCreate(**body.model_dump())
except ValidationError as e:
# Handle Pydantic validation error:
error_msg = extract_validation_error_msg(e)
bound_logger.info(
"Bad request: Validation error from SDJWSCreateRequest body: {}", error_msg
)
raise CloudApiException(status_code=422, detail=error_msg) from e
async with client_from_auth(auth) as aries_controller:
sd_jws = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.wallet.sign_sd_jwt,
body=sign_request,
)
result = SDJWSCreateResponse(sd_jws=sd_jws)
bound_logger.info("Successfully signed SD-JWS.")
return result
@router.post(
"/verify",
response_model=SDJWSVerifyResponse,
summary="Verify SD-JWS",
description="""
Verify Select Disclosure for JWS (SD-JWS)
See https://www.ietf.org/archive/id/draft-ietf-oauth-selective-disclosure-jwt-07.html for the SD-JWT / SD-JWS spec.
""",
)
async def verify_sd_jws(
body: SDJWSVerifyRequest,
auth: AcaPyAuth = Depends(acapy_auth_from_header),
) -> SDJWSVerifyResponse:
bound_logger = logger.bind(body=body)
bound_logger.info("POST request received: Verify SD-JWS")
try:
verify_request = SDJWSVerify(sd_jwt=body.sd_jws)
except ValidationError as e:
# Handle Pydantic validation error:
error_msg = extract_validation_error_msg(e)
error_msg = error_msg.replace("sd_jwt", "sd_jws") # match the input field
bound_logger.info(
"Bad request: Validation error from SDJWSVerifyRequest body: {}", error_msg
)
raise CloudApiException(status_code=422, detail=error_msg) from e
async with client_from_auth(auth) as aries_controller:
verify_result = await handle_acapy_call(
logger=bound_logger,
acapy_call=aries_controller.wallet.verify_sd_jwt,
body=verify_request,
)
result = SDJWSVerifyResponse(**verify_result.model_dump())
bound_logger.info("Successfully verified SD-JWS.")
return result