-
Notifications
You must be signed in to change notification settings - Fork 9
/
actors.py
297 lines (242 loc) · 10.2 KB
/
actors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
from typing import List, Optional
from app.exceptions import TrustRegistryException
from shared.constants import TRUST_REGISTRY_URL
from shared.log_config import get_logger
from shared.models.trustregistry import Actor, TrustRegistryRole
from shared.util.rich_async_client import RichAsyncClient
logger = get_logger(__name__)
async def register_actor(actor: Actor) -> None:
"""Register an actor in the trust registry
Args:
actor (Actor): the actor to register
Raises:
TrustRegistryException: If an error occurred while registering the schema
"""
bound_logger = logger.bind(body={"actor": actor})
bound_logger.debug("Registering actor on trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
actor_response = await client.post(
f"{TRUST_REGISTRY_URL}/registry/actors", json=actor.model_dump()
)
if actor_response.status_code == 422:
bound_logger.error(
"Unprocessable entity response when registering actor: `{}`.",
actor_response.json(),
)
raise TrustRegistryException(
f"Unprocessable response when registering actor: `{actor_response.json()}`.",
422,
)
if actor_response.is_error:
bound_logger.error(
"Error registering actor. Got status code {} with message `{}`.",
actor_response.status_code,
actor_response.text,
)
raise TrustRegistryException(
f"Error registering actor: `{actor_response.text}`.",
actor_response.status_code,
)
bound_logger.debug("Successfully registered actor on trust registry.")
async def update_actor(actor: Actor) -> None:
bound_logger = logger.bind(body={"actor": actor})
bound_logger.info("Updating actor on trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
update_response = await client.put(
f"{TRUST_REGISTRY_URL}/registry/actors/{actor.id}",
json=actor.model_dump(),
)
if update_response.status_code == 422:
bound_logger.error(
"Unprocessable entity response when updating actor: `{}`.",
update_response.json(),
)
raise TrustRegistryException(
f"Unprocessable response when updating actor: `{update_response.json()}`.",
422,
)
elif update_response.is_error:
bound_logger.error(
"Error removing actor. Got status code `{}` with message `{}`.",
update_response.status_code,
update_response.text,
)
raise TrustRegistryException(
f"Error updating actor in trust registry: `{update_response.text}`.",
update_response.status_code,
)
async def fetch_all_actors() -> List[Actor]:
"""Fetch all actors from the trust registry
Raises:
TrustRegistryException: If an error occurred while retrieving the actors
Returns:
List[Actor]: List of actors
"""
logger.debug("Fetching all actors from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
actors_response = await client.get(f"{TRUST_REGISTRY_URL}/registry/actors")
if actors_response.is_error:
logger.error(
"Error fetching all actors. Got status code {} with message `{}`.",
actors_response.status_code,
actors_response.text,
)
raise TrustRegistryException(
f"Unable to retrieve actors from registry: `{actors_response.text}`.",
actors_response.status_code,
)
actors = [Actor.model_validate(actor) for actor in actors_response.json()]
if actors:
logger.debug("Successfully got all actors.")
else:
logger.debug("No actors found.")
return actors
async def fetch_actor_by_did(did: str) -> Optional[Actor]:
"""Retrieve actor by did from trust registry
Args:
actor_id (str): did of the actor to retrieve
Raises:
TrustRegistryException: If an error occurred while retrieving the actor.
Returns:
Actor: The actor with specified did.
"""
bound_logger = logger.bind(body={"did": did})
bound_logger.debug("Fetching actor by DID from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
actor_response = await client.get(
f"{TRUST_REGISTRY_URL}/registry/actors/did/{did}"
)
if actor_response.status_code == 404:
bound_logger.info("Bad request: Actor not found.")
return None
elif actor_response.is_error:
bound_logger.error(
"Error fetching actor by DID. Got status code {} with message `{}`.",
actor_response.status_code,
actor_response.text,
)
raise TrustRegistryException(
f"Error fetching actor by DID: `{actor_response.text}`.",
actor_response.status_code,
)
bound_logger.debug("Successfully fetched actor from trust registry.")
return Actor.model_validate(actor_response.json())
async def fetch_actor_by_id(actor_id: str) -> Optional[Actor]:
"""Retrieve actor by id from trust registry
Args:
actor_id (str): Identifier of the actor to retrieve
Raises:
TrustRegistryException: If an error occurred while retrieving the actor.
Returns:
Actor: The actor with specified id.
"""
bound_logger = logger.bind(body={"actor_id": actor_id})
bound_logger.debug("Fetching actor by ID from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
actor_response = await client.get(
f"{TRUST_REGISTRY_URL}/registry/actors/{actor_id}"
)
if actor_response.status_code == 404:
bound_logger.info("Bad request: actor not found.")
return None
elif actor_response.is_error:
bound_logger.error(
"Error fetching actor by id. Got status code {} with message `{}`.",
actor_response.status_code,
actor_response.text,
)
raise TrustRegistryException(
f"Error fetching actor by id: `{actor_response.text}`.",
actor_response.status_code,
)
bound_logger.debug("Successfully fetched actor from trust registry.")
return Actor.model_validate(actor_response.json())
async def fetch_actor_by_name(actor_name: str) -> Optional[Actor]:
"""Retrieve actor by name from trust registry
Args:
actor_name (str): Identifier of the actor to retrieve
Raises:
TrustRegistryException: If an error occurred while retrieving the actor.
Returns:
Actor: The actor with specified name.
"""
bound_logger = logger.bind(body={"actor_id": actor_name})
bound_logger.debug("Fetching actor by NAME from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
actor_response = await client.get(
f"{TRUST_REGISTRY_URL}/registry/actors/name/{actor_name}"
)
if actor_response.status_code == 404:
bound_logger.info("Bad request: Actor with name not found in registry.")
return None
elif actor_response.is_error:
bound_logger.error(
"Error fetching actor by name. Got status code {} with message `{}`.",
actor_response.status_code,
actor_response.text,
)
raise TrustRegistryException(
f"Error fetching actor by name: `{actor_response.text}`.",
actor_response.status_code,
)
bound_logger.debug("Successfully fetched actor from trust registry.")
return Actor.model_validate(actor_response.json())
async def fetch_actors_with_role(role: TrustRegistryRole) -> List[Actor]:
"""Fetch all actors from the trust registry by role
Args:
role (Role): The role to filter actors by
Raises:
TrustRegistryException: If an error occurred while retrieving the actors
Returns:
List[Actor]: List of actors with specified role
"""
bound_logger = logger.bind(body={"role": role})
bound_logger.debug("Fetching all actors with requested role from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
actors_response = await client.get(f"{TRUST_REGISTRY_URL}/registry/actors")
if actors_response.is_error:
bound_logger.error(
"Error fetching actors by role. Got status code {} with message `{}`.",
actors_response.status_code,
actors_response.text,
)
raise TrustRegistryException(
f"Unable to retrieve actors from registry: `{actors_response.text}`.",
actors_response.status_code,
)
actors = [Actor.model_validate(actor) for actor in actors_response.json()]
actors_with_role_list = [actor for actor in actors if role in actor.roles]
if actors_with_role_list:
bound_logger.debug("Successfully got actors with requested role.")
else:
bound_logger.debug("No actors found with requested role.")
return actors_with_role_list
async def remove_actor_by_id(actor_id: str) -> None:
"""Remove actor from trust registry by id
Args:
actor_id (str): identifier of the actor to remove
Raises:
TrustRegistryException: If an error occurred while removing the actor
"""
bound_logger = logger.bind(body={"actor_id": actor_id})
bound_logger.info("Removing actor from trust registry")
async with RichAsyncClient(raise_status_error=False) as client:
remove_response = await client.delete(
f"{TRUST_REGISTRY_URL}/registry/actors/{actor_id}"
)
if remove_response.status_code == 404:
bound_logger.info(
"Bad request: Tried to remove actor by id, but not found in registry."
)
return None
if remove_response.is_error:
bound_logger.error(
"Error removing actor. Got status code {} with message `{}`.",
remove_response.status_code,
remove_response.text,
)
raise TrustRegistryException(
f"Error removing actor from trust registry: `{remove_response.text}`.",
remove_response.status_code,
)
bound_logger.debug("Successfully removed actor from trust registry.")