Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Consolidate JSON decoding logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Mar 27, 2023
1 parent b909c91 commit d8a0d93
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
10 changes: 4 additions & 6 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary
from synapse.types import JsonDict, StateMap, get_domain_from_id
from synapse.util import json_decoder, unwrapFirstError
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import parse_server_name
Expand Down Expand Up @@ -1015,14 +1015,12 @@ async def on_claim_client_keys(
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = await self._e2e_keys_handler.claim_local_one_time_keys(query)

json_result: Dict[str, Dict[str, dict]] = {}
json_result: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for result in results:
for user_id, device_keys in result.items():
for device_id, keys in device_keys.items():
for key_id, json_str in keys.items():
json_result.setdefault(user_id, {})[device_id] = {
key_id: json_decoder.decode(json_str)
}
for key_id, key in keys.items():
json_result.setdefault(user_id, {})[device_id] = {key_id: key}

logger.info(
"Claimed one-time-keys: %s",
Expand Down
9 changes: 3 additions & 6 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ async def on_federation_query_client_keys(

async def claim_local_one_time_keys(
self, local_query: List[Tuple[str, str, str]]
) -> Iterable[Dict[str, Dict[str, Dict[str, str]]]]:
) -> Iterable[Dict[str, Dict[str, Dict[str, JsonDict]]]]:
"""Claim one time keys for local users.
1. Attempt to claim OTKs from the database.
Expand Down Expand Up @@ -587,14 +587,11 @@ async def claim_one_time_keys(

# A map of user ID -> device ID -> key ID -> key.
json_result: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
failures: Dict[str, JsonDict] = {}
for result in results:
for user_id, device_keys in result.items():
for device_id, keys in device_keys.items():
for key_id, json_str in keys.items():
json_result.setdefault(user_id, {})[device_id] = {
key_id: json_decoder.decode(json_str)
}
for key_id, key in keys.items():
json_result.setdefault(user_id, {})[device_id] = {key_id: key}

@trace
async def claim_client_keys(destination: str) -> None:
Expand Down
20 changes: 10 additions & 10 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
Expand Down Expand Up @@ -1028,15 +1028,15 @@ def get_device_stream_token(self) -> int:

async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Tuple[Dict[str, Dict[str, Dict[str, str]]], List[Tuple[str, str, str]]]:
) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
"""Take a list of one time keys out of the database.
Args:
query_list: An iterable of tuples of (user ID, device ID, algorithm).
Returns:
A tuple of:
A map of user ID -> a map device ID -> a map of key ID -> JSON bytes.
A tuple pf:
A map of user ID -> a map device ID -> a map of key ID -> JSON.
A copy of the input which has not been fulfilled.
"""
Expand Down Expand Up @@ -1118,7 +1118,7 @@ def _claim_e2e_one_time_key_returning(
key_id, key_json = otk_row
return f"{algorithm}:{key_id}", key_json

results: Dict[str, Dict[str, Dict[str, str]]] = {}
results: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
missing: List[Tuple[str, str, str]] = []
for user_id, device_id, algorithm in query_list:
if self.database_engine.supports_returning:
Expand All @@ -1142,24 +1142,24 @@ def _claim_e2e_one_time_key_returning(
device_results = results.setdefault(user_id, {}).setdefault(
device_id, {}
)
device_results[claim_row[0]] = claim_row[1]
device_results[claim_row[0]] = json_decoder.decode(claim_row[1])
else:
missing.append((user_id, device_id, algorithm))

return results, missing

async def claim_e2e_fallback_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Dict[str, Dict[str, Dict[str, str]]]:
) -> Dict[str, Dict[str, Dict[str, JsonDict]]]:
"""Take a list of fallback keys out of the database.
Args:
query_list: An iterable of tuples of (user ID, device ID, algorithm).
Returns:
A map of user ID -> a map device ID -> a map of key ID -> JSON bytes.
A map of user ID -> a map device ID -> a map of key ID -> JSON.
"""
results: Dict[str, Dict[str, Dict[str, str]]] = {}
results: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, device_id, algorithm in query_list:
# No one-time key available, so see if there's a fallback
# key
Expand Down Expand Up @@ -1199,7 +1199,7 @@ async def claim_e2e_fallback_keys(
)

device_results = results.setdefault(user_id, {}).setdefault(device_id, {})
device_results[f"{algorithm}:{key_id}"] = key_json
device_results[f"{algorithm}:{key_id}"] = json_decoder.decode(key_json)

return results

Expand Down

0 comments on commit d8a0d93

Please sign in to comment.