Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix get|set_type_stream_id_for_appservice store functions #8648

Merged
merged 8 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8648.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue that would cause ephemeral events to not be sent to appservices.
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved
23 changes: 16 additions & 7 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,17 +369,22 @@ def get_new_events_for_appservice_txn(txn):
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
if type not in ["read_receipt", "presence"]:
raise ValueError("type must be a valid application stream id type")
Half-Shot marked this conversation as resolved.
Show resolved Hide resolved

def get_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"SELECT ? FROM application_services_state WHERE as_id=?",
(stream_id_type, service.id,),
# We do NOT want to escape `stream_id_type`.
"SELECT %s FROM application_services_state WHERE as_id=?"
% stream_id_type,
(service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
last_stream_id = txn.fetchone()
if last_stream_id is None or last_stream_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0])
return int(last_stream_id[0])

return await self.db_pool.runInteraction(
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
Expand All @@ -388,11 +393,15 @@ def get_type_stream_id_for_appservice_txn(txn):
async def set_type_stream_id_for_appservice(
self, service: ApplicationService, type: str, pos: int
) -> None:
if type not in ["read_receipt", "presence"]:
raise ValueError("type must be a valid application stream id type")

def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
(stream_id_type, pos, service.id),
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
% stream_id_type,
(pos, service.id),
)

await self.db_pool.runInteraction(
Expand Down
56 changes: 56 additions & 0 deletions tests/storage/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,62 @@ def test_get_appservices_by_state_multiple(self):
)


class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver()
return hs

def prepare(self, hs, reactor, clock):
self.service = Mock(id="foo")
self.store = self.hs.get_datastore()
self.get_success(self.store.set_appservice_state(self.service, "up"))

def test_get_type_stream_id_for_appservice_no_value(self):
value = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
)
self.assertEquals(value, 0)

value = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "presence")
)
self.assertEquals(value, 0)

def test_get_type_stream_id_for_appservice_invalid_type(self):
self.get_failure(
self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
ValueError,
)

def test_set_type_stream_id_for_appservice(self):
read_receipt_value = 1024
self.get_success(
self.store.set_type_stream_id_for_appservice(
self.service, "read_receipt", read_receipt_value
)
)
result = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
)
self.assertEqual(result, read_receipt_value)

self.get_success(
self.store.set_type_stream_id_for_appservice(
self.service, "presence", read_receipt_value
)
)
result = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "presence")
)
self.assertEqual(result, read_receipt_value)

def test_set_type_stream_id_for_appservice_invalid_type(self):
self.get_failure(
self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
ValueError,
)


# required for ApplicationServiceTransactionStoreTestCase tests
class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down