-
Notifications
You must be signed in to change notification settings - Fork 511
/
routes.py
183 lines (140 loc) · 5.66 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""jsonld admin routes."""
from aiohttp import web
from aiohttp_apispec import docs, request_schema, response_schema
from pydid.verification_method import Ed25519VerificationKey2018
from marshmallow import INCLUDE, Schema, fields
from ...admin.request_context import AdminRequestContext
from ...config.base import InjectionError
from ...resolver.base import ResolverError
from ...resolver.did_resolver import DIDResolver
from ...wallet.error import WalletError
from ..models.openapi import OpenAPISchema
from .credential import sign_credential, verify_credential
from .error import BaseJSONLDMessagingError
SUPPORTED_VERIFICATION_METHOD_TYPES = (Ed25519VerificationKey2018,)
class SignatureOptionsSchema(Schema):
"""Schema for LD signature options."""
type = fields.Str(required=False)
verificationMethod = fields.Str(required=True)
proofPurpose = fields.Str(required=True)
challenge = fields.Str(required=False)
domain = fields.Str(required=False)
class DocSchema(OpenAPISchema):
"""Schema for LD doc to sign."""
credential = fields.Dict(
required=True, metadata={"description": "Credential to sign"}
)
options = fields.Nested(
SignatureOptionsSchema,
required=True,
metadata={"description": "Signature options"},
)
class SignRequestSchema(OpenAPISchema):
"""Request schema for signing a jsonld doc."""
verkey = fields.Str(
required=True, metadata={"description": "Verkey to use for signing"}
)
doc = fields.Nested(DocSchema, required=True)
class SignResponseSchema(OpenAPISchema):
"""Response schema for a signed jsonld doc."""
signed_doc = fields.Dict(
required=False, metadata={"description": "Signed document"}
)
error = fields.Str(required=False, metadata={"description": "Error text"})
@docs(tags=["jsonld"], summary="Sign a JSON-LD structure and return it")
@request_schema(SignRequestSchema())
@response_schema(SignResponseSchema(), 200, description="")
async def sign(request: web.BaseRequest):
"""Request handler for signing a jsonld doc.
Args:
request: aiohttp request object
"""
response = {}
body = await request.json()
doc = body.get("doc")
try:
context: AdminRequestContext = request["context"]
async with context.session() as session:
doc_with_proof = await sign_credential(
session, doc.get("credential"), doc.get("options"), body.get("verkey")
)
response["signed_doc"] = doc_with_proof
except BaseJSONLDMessagingError as err:
response["error"] = str(err)
except (WalletError, InjectionError):
raise web.HTTPForbidden(reason="No wallet available")
return web.json_response(response)
class SignedDocSchema(OpenAPISchema):
"""Verifiable doc schema."""
class Meta:
"""Keep unknown values."""
unknown = INCLUDE
proof = fields.Nested(
SignatureOptionsSchema,
required=True,
metadata={"unknown": INCLUDE, "description": "Linked data proof"},
)
class VerifyRequestSchema(OpenAPISchema):
"""Request schema for signing a jsonld doc."""
verkey = fields.Str(
required=False, metadata={"description": "Verkey to use for doc verification"}
)
doc = fields.Nested(
SignedDocSchema, required=True, metadata={"description": "Signed document"}
)
class VerifyResponseSchema(OpenAPISchema):
"""Response schema for verification result."""
valid = fields.Bool(required=True)
error = fields.Str(required=False, metadata={"description": "Error text"})
@docs(tags=["jsonld"], summary="Verify a JSON-LD structure.")
@request_schema(VerifyRequestSchema())
@response_schema(VerifyResponseSchema(), 200, description="")
async def verify(request: web.BaseRequest):
"""Request handler for signing a jsonld doc.
Args:
request: aiohttp request object
"""
response = {"valid": False}
try:
context: AdminRequestContext = request["context"]
profile = context.profile
body = await request.json()
verkey = body.get("verkey")
doc = body.get("doc")
async with context.session() as session:
if verkey is None:
resolver = session.inject(DIDResolver)
vmethod = await resolver.dereference(
profile,
doc["proof"]["verificationMethod"],
)
if not isinstance(vmethod, SUPPORTED_VERIFICATION_METHOD_TYPES):
raise web.HTTPBadRequest(
reason="{} is not supported".format(vmethod.type)
)
verkey = vmethod.material
valid = await verify_credential(session, doc, verkey)
response["valid"] = valid
except (BaseJSONLDMessagingError, ResolverError, ValueError) as error:
response["error"] = str(error)
except (WalletError, InjectionError):
raise web.HTTPForbidden(reason="No wallet available")
return web.json_response(response)
async def register(app: web.Application):
"""Register routes."""
app.add_routes([web.post("/jsonld/sign", sign), web.post("/jsonld/verify", verify)])
def post_process_routes(app: web.Application):
"""Amend swagger API."""
# Add top-level tags description
if "tags" not in app._state["swagger_dict"]:
app._state["swagger_dict"]["tags"] = []
app._state["swagger_dict"]["tags"].append(
{
"name": "jsonld",
"description": "Sign and verify json-ld data",
"externalDocs": {
"description": "Specification",
"url": "https://tools.ietf.org/html/rfc7515",
},
}
)