Skip to content

Commit

Permalink
Add a worker store for search insertion. (matrix-org#7516)
Browse files Browse the repository at this point in the history
This is required as both event persistence and the background update needs access to this function. It should be perfectly safe for two workers to write to that table at the same time.
  • Loading branch information
erikjohnston authored and phil-flex committed Jun 16, 2020
1 parent 1ab7dc4 commit 10d521b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 47 deletions.
1 change: 1 addition & 0 deletions changelog.d/7516.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a worker store for search insertion, required for moving event persistence off master.
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.storage.data_stores.main.search import SearchWorkerStore
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -451,6 +452,7 @@ class GenericWorkerSlavedStore(
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
SearchWorkerStore,
BaseSlavedStore,
):
def __init__(self, database, db_conn, hs):
Expand Down
96 changes: 49 additions & 47 deletions synapse/storage/data_stores/main/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,55 @@
)


class SearchBackgroundUpdateStore(SQLBaseStore):
class SearchWorkerStore(SQLBaseStore):
def store_search_entries_txn(self, txn, entries):
"""Add entries to the search table
Args:
txn (cursor):
entries (iterable[SearchEntry]):
entries to be added to the table
"""
if not self.hs.config.enable_search:
return
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)

args = (
(
entry.event_id,
entry.room_id,
entry.key,
entry.value,
entry.stream_ordering,
entry.origin_server_ts,
)
for entry in entries
)

txn.executemany(sql, args)

elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
args = (
(entry.event_id, entry.room_id, entry.key, entry.value)
for entry in entries
)

txn.executemany(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")


class SearchBackgroundUpdateStore(SearchWorkerStore):

EVENT_SEARCH_UPDATE_NAME = "event_search"
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
Expand Down Expand Up @@ -296,52 +344,6 @@ def reindex_search_txn(txn):

return num_rows

def store_search_entries_txn(self, txn, entries):
"""Add entries to the search table
Args:
txn (cursor):
entries (iterable[SearchEntry]):
entries to be added to the table
"""
if not self.hs.config.enable_search:
return
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)

args = (
(
entry.event_id,
entry.room_id,
entry.key,
entry.value,
entry.stream_ordering,
entry.origin_server_ts,
)
for entry in entries
)

txn.executemany(sql, args)

elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
args = (
(entry.event_id, entry.room_id, entry.key, entry.value)
for entry in entries
)

txn.executemany(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")


class SearchStore(SearchBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
Expand Down

0 comments on commit 10d521b

Please sign in to comment.