Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Consistently use db_to_json to convert from database values to JSON…
Browse files Browse the repository at this point in the history
… objects. (#7849)
  • Loading branch information
clokep committed Jul 16, 2020
1 parent b0f031f commit f460da6
Show file tree
Hide file tree
Showing 22 changed files with 80 additions and 82 deletions.
1 change: 1 addition & 0 deletions changelog.d/7849.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Consistently use `db_to_json` to convert from database values to JSON objects.
4 changes: 2 additions & 2 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def db_to_json(db_content):
if isinstance(db_content, memoryview):
db_content = db_content.tobytes()

# Decode it to a Unicode string before feeding it to json.loads, so we
# consistenty get a Unicode-containing object out.
# Decode it to a Unicode string before feeding it to json.loads, since
# Python 3.5 does not support deserializing bytes.
if isinstance(db_content, (bytes, bytearray)):
db_content = db_content.decode("utf8")

Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ async def _do_background_update(self, desired_duration_ms: float) -> int:
retcol="progress_json",
)

progress = json.loads(progress_json)
# Avoid a circular import.
from synapse.storage._base import db_to_json

progress = db_to_json(progress_json)

time_start = self._clock.time_msec()
items_updated = await update_handler(progress, batch_size)
Expand Down
16 changes: 8 additions & 8 deletions synapse/storage/data_stores/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from twisted.internet import defer

from synapse.storage._base import SQLBaseStore
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
Expand Down Expand Up @@ -77,7 +77,7 @@ def get_account_data_for_user_txn(txn):
)

global_account_data = {
row["account_data_type"]: json.loads(row["content"]) for row in rows
row["account_data_type"]: db_to_json(row["content"]) for row in rows
}

rows = self.db.simple_select_list_txn(
Expand All @@ -90,7 +90,7 @@ def get_account_data_for_user_txn(txn):
by_room = {}
for row in rows:
room_data = by_room.setdefault(row["room_id"], {})
room_data[row["account_data_type"]] = json.loads(row["content"])
room_data[row["account_data_type"]] = db_to_json(row["content"])

return global_account_data, by_room

Expand All @@ -113,7 +113,7 @@ def get_global_account_data_by_type_for_user(self, data_type, user_id):
)

if result:
return json.loads(result)
return db_to_json(result)
else:
return None

Expand All @@ -137,7 +137,7 @@ def get_account_data_for_room_txn(txn):
)

return {
row["account_data_type"]: json.loads(row["content"]) for row in rows
row["account_data_type"]: db_to_json(row["content"]) for row in rows
}

return self.db.runInteraction(
Expand Down Expand Up @@ -170,7 +170,7 @@ def get_account_data_for_room_and_type_txn(txn):
allow_none=True,
)

return json.loads(content_json) if content_json else None
return db_to_json(content_json) if content_json else None

return self.db.runInteraction(
"get_account_data_for_room_and_type", get_account_data_for_room_and_type_txn
Expand Down Expand Up @@ -255,7 +255,7 @@ def get_updated_account_data_for_user_txn(txn):

txn.execute(sql, (user_id, stream_id))

global_account_data = {row[0]: json.loads(row[1]) for row in txn}
global_account_data = {row[0]: db_to_json(row[1]) for row in txn}

sql = (
"SELECT room_id, account_data_type, content FROM room_account_data"
Expand All @@ -267,7 +267,7 @@ def get_updated_account_data_for_user_txn(txn):
account_data_by_room = {}
for row in txn:
room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2])
room_account_data[row[1]] = db_to_json(row[2])

return global_account_data, account_data_by_room

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/data_stores/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
from synapse.storage._base import SQLBaseStore
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.database import Database

Expand Down Expand Up @@ -303,7 +303,7 @@ def _get_oldest_unsent_txn(txn):
if not entry:
return None

event_ids = json.loads(entry["event_ids"])
event_ids = db_to_json(entry["event_ids"])

events = yield self.get_events_as_list(event_ids)

Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/data_stores/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from twisted.internet import defer

from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.util.caches.expiringcache import ExpiringCache

Expand Down Expand Up @@ -65,7 +65,7 @@ def get_new_messages_for_device_txn(txn):
messages = []
for row in txn:
stream_pos = row[0]
messages.append(json.loads(row[1]))
messages.append(db_to_json(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return messages, stream_pos
Expand Down Expand Up @@ -173,7 +173,7 @@ def get_new_messages_for_remote_destination_txn(txn):
messages = []
for row in txn:
stream_pos = row[0]
messages.append(json.loads(row[1]))
messages.append(db_to_json(row[1]))
if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ def get_users_whose_signatures_changed(self, user_id, from_key):
rows = yield self.db.execute(
"get_users_whose_signatures_changed", None, sql, user_id, from_key
)
return {user for row in rows for user in json.loads(row[0])}
return {user for row in rows for user in db_to_json(row[0])}
else:
return set()

Expand Down
10 changes: 5 additions & 5 deletions synapse/storage/data_stores/main/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from canonicaljson import json

from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.logging.opentracing import log_kv, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage._base import SQLBaseStore, db_to_json


class EndToEndRoomKeyStore(SQLBaseStore):
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"forwarded_count": row["forwarded_count"],
# is_verified must be returned to the client as a boolean
"is_verified": bool(row["is_verified"]),
"session_data": json.loads(row["session_data"]),
"session_data": db_to_json(row["session_data"]),
}

return sessions
Expand Down Expand Up @@ -222,7 +222,7 @@ def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
"first_message_index": row[2],
"forwarded_count": row[3],
"is_verified": row[4],
"session_data": json.loads(row[5]),
"session_data": db_to_json(row[5]),
}

return ret
Expand Down Expand Up @@ -319,7 +319,7 @@ def _get_e2e_room_keys_version_info_txn(txn):
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
retcols=("version", "algorithm", "auth_data", "etag"),
)
result["auth_data"] = json.loads(result["auth_data"])
result["auth_data"] = db_to_json(result["auth_data"])
result["version"] = str(result["version"])
if result["etag"] is None:
result["etag"] = 0
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/data_stores/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def _get_bare_e2e_cross_signing_keys_bulk_txn(
for row in rows:
user_id = row["user_id"]
key_type = row["keytype"]
key = json.loads(row["keydata"])
key = db_to_json(row["keydata"])
user_info = result.setdefault(user_id, {})
user_info[key_type] = key

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage.database import Database
from synapse.util.caches.descriptors import cachedInlineCallbacks

Expand Down Expand Up @@ -58,7 +58,7 @@ def _deserialize_action(actions, is_highlight):
"""Custom deserializer for actions. This allows us to "compress" common actions
"""
if actions:
return json.loads(actions)
return db_to_json(actions)

if is_highlight:
return DEFAULT_HIGHLIGHT_ACTION
Expand Down
9 changes: 4 additions & 5 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple

import attr
from canonicaljson import json
from prometheus_client import Counter

from twisted.internet import defer
Expand All @@ -32,7 +31,7 @@
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.logging.utils import log_function
from synapse.storage._base import make_in_list_sql_clause
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.data_stores.main.search import SearchEntry
from synapse.storage.database import Database, LoggingTransaction
from synapse.storage.util.id_generators import StreamIdGenerator
Expand Down Expand Up @@ -236,7 +235,7 @@ def _get_events_which_are_prevs_txn(txn, batch):
)

txn.execute(sql + clause, args)
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
results.extend(r[0] for r in txn if not db_to_json(r[1]).get("soft_failed"))

for chunk in batch_iter(event_ids, 100):
yield self.db.runInteraction(
Expand Down Expand Up @@ -297,7 +296,7 @@ def _get_prevs_before_rejected_txn(txn, batch):
if prev_event_id in existing_prevs:
continue

soft_failed = json.loads(metadata).get("soft_failed")
soft_failed = db_to_json(metadata).get("soft_failed")
if soft_failed or rejected:
to_recursively_check.append(prev_event_id)
existing_prevs.add(prev_event_id)
Expand Down Expand Up @@ -583,7 +582,7 @@ def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
txn.execute(sql, (room_id, EventTypes.Create, ""))
row = txn.fetchone()
if row:
event_json = json.loads(row[0])
event_json = db_to_json(row[0])
content = event_json.get("content", {})
creator = content.get("creator")
room_version_id = content.get("room_version", RoomVersions.V1.identifier)
Expand Down
14 changes: 6 additions & 8 deletions synapse/storage/data_stores/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

import logging

from canonicaljson import json

from twisted.internet import defer

from synapse.api.constants import EventContentFields
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -125,7 +123,7 @@ def reindex_txn(txn):
for row in rows:
try:
event_id = row[1]
event_json = json.loads(row[2])
event_json = db_to_json(row[2])
sender = event_json["sender"]
content = event_json["content"]

Expand Down Expand Up @@ -208,7 +206,7 @@ def reindex_search_txn(txn):

for row in ev_rows:
event_id = row["event_id"]
event_json = json.loads(row["json"])
event_json = db_to_json(row["json"])
try:
origin_server_ts = event_json["origin_server_ts"]
except (KeyError, AttributeError):
Expand Down Expand Up @@ -317,7 +315,7 @@ def _cleanup_extremities_bg_update_txn(txn):

soft_failed = False
if metadata:
soft_failed = json.loads(metadata).get("soft_failed")
soft_failed = db_to_json(metadata).get("soft_failed")

if soft_failed or rejected:
soft_failed_events_to_lookup.add(event_id)
Expand Down Expand Up @@ -358,7 +356,7 @@ def _cleanup_extremities_bg_update_txn(txn):

graph[event_id] = {prev_event_id}

soft_failed = json.loads(metadata).get("soft_failed")
soft_failed = db_to_json(metadata).get("soft_failed")
if soft_failed or rejected:
soft_failed_events_to_lookup.add(event_id)
else:
Expand Down Expand Up @@ -543,7 +541,7 @@ def _event_store_labels_txn(txn):
last_row_event_id = ""
for (event_id, event_json_raw) in results:
try:
event_json = json.loads(event_json_raw)
event_json = db_to_json(event_json_raw)

self.db.simple_insert_many_txn(
txn=txn,
Expand Down
7 changes: 3 additions & 4 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from collections import namedtuple
from typing import List, Optional, Tuple

from canonicaljson import json
from constantly import NamedConstant, Names

from twisted.internet import defer
Expand All @@ -40,7 +39,7 @@
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id
Expand Down Expand Up @@ -611,8 +610,8 @@ def _get_events_from_db(self, event_ids, allow_rejected=False):
if not allow_rejected and rejected_reason:
continue

d = json.loads(row["json"])
internal_metadata = json.loads(row["internal_metadata"])
d = db_to_json(row["json"])
internal_metadata = db_to_json(row["internal_metadata"])

format_version = row["format_version"]
if format_version is None:
Expand Down
Loading

0 comments on commit f460da6

Please sign in to comment.