Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix get|set_type_stream_id_for_appservice store functions (#8648)
Browse files Browse the repository at this point in the history
  • Loading branch information
Half-Shot committed Oct 26, 2020
1 parent 73d8209 commit e8dbbcb
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 13 deletions.
1 change: 1 addition & 0 deletions changelog.d/8648.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices.
12 changes: 6 additions & 6 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,16 @@ async def _notify_interested_services_ephemeral(
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

async def _handle_typing(self, service: ApplicationService, new_token: int):
typing_source = self.event_sources.sources["typing"]
Expand Down
29 changes: 22 additions & 7 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,17 +369,25 @@ def get_new_events_for_appservice_txn(txn):
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
if type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
)

def get_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"SELECT ? FROM application_services_state WHERE as_id=?",
(stream_id_type, service.id,),
# We do NOT want to escape `stream_id_type`.
"SELECT %s FROM application_services_state WHERE as_id=?"
% stream_id_type,
(service.id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
last_stream_id = txn.fetchone()
if last_stream_id is None or last_stream_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0])
return int(last_stream_id[0])

return await self.db_pool.runInteraction(
"get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
Expand All @@ -388,11 +396,18 @@ def get_type_stream_id_for_appservice_txn(txn):
async def set_type_stream_id_for_appservice(
self, service: ApplicationService, type: str, pos: int
) -> None:
if type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
)

def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
txn.execute(
"UPDATE ? SET device_list_stream_id = ? WHERE as_id=?",
(stream_id_type, pos, service.id),
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
% stream_id_type,
(pos, service.id),
)

await self.db_pool.runInteraction(
Expand Down
56 changes: 56 additions & 0 deletions tests/storage/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,62 @@ def test_get_appservices_by_state_multiple(self):
)


class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver()
return hs

def prepare(self, hs, reactor, clock):
self.service = Mock(id="foo")
self.store = self.hs.get_datastore()
self.get_success(self.store.set_appservice_state(self.service, "up"))

def test_get_type_stream_id_for_appservice_no_value(self):
value = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
)
self.assertEquals(value, 0)

value = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "presence")
)
self.assertEquals(value, 0)

def test_get_type_stream_id_for_appservice_invalid_type(self):
self.get_failure(
self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
ValueError,
)

def test_set_type_stream_id_for_appservice(self):
read_receipt_value = 1024
self.get_success(
self.store.set_type_stream_id_for_appservice(
self.service, "read_receipt", read_receipt_value
)
)
result = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
)
self.assertEqual(result, read_receipt_value)

self.get_success(
self.store.set_type_stream_id_for_appservice(
self.service, "presence", read_receipt_value
)
)
result = self.get_success(
self.store.get_type_stream_id_for_appservice(self.service, "presence")
)
self.assertEqual(result, read_receipt_value)

def test_set_type_stream_id_for_appservice_invalid_type(self):
self.get_failure(
self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
ValueError,
)


# required for ApplicationServiceTransactionStoreTestCase tests
class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down

0 comments on commit e8dbbcb

Please sign in to comment.