From e05d147feedb3ab3af7f021ca7b661ac810ff0ac Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Sun, 25 Oct 2020 11:15:11 +0000 Subject: [PATCH 1/8] Fix as stream id getter/setter --- synapse/storage/databases/main/appservice.py | 23 ++++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 43bf0f649abf..6bdde19ca55b 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -369,17 +369,22 @@ def get_new_events_for_appservice_txn(txn): async def get_type_stream_id_for_appservice( self, service: ApplicationService, type: str ) -> int: + if type not in ["read_receipt", "presence"]: + raise ValueError("type must be a valid application stream id type") + def get_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type txn.execute( - "SELECT ? FROM application_services_state WHERE as_id=?", - (stream_id_type, service.id,), + # We do NOT want to escape `stream_id_type`. + "SELECT %s FROM application_services_state WHERE as_id=?" + % stream_id_type, + (service.id,), ) - last_txn_id = txn.fetchone() - if last_txn_id is None or last_txn_id[0] is None: # no row exists + last_stream_id = txn.fetchone() + if last_stream_id is None or last_stream_id[0] is None: # no row exists return 0 else: - return int(last_txn_id[0]) + return int(last_stream_id[0]) return await self.db_pool.runInteraction( "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn @@ -388,11 +393,15 @@ def get_type_stream_id_for_appservice_txn(txn): async def set_type_stream_id_for_appservice( self, service: ApplicationService, type: str, pos: int ) -> None: + if type not in ["read_receipt", "presence"]: + raise ValueError("type must be a valid application stream id type") + def set_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type txn.execute( - "UPDATE ? SET device_list_stream_id = ? WHERE as_id=?", - (stream_id_type, pos, service.id), + "UPDATE application_services_state SET %s = ? WHERE as_id=?" + % stream_id_type, + (pos, service.id), ) await self.db_pool.runInteraction( From dbad18d377b8630d28ba48e5d2edf66a52c8aaf8 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Sun, 25 Oct 2020 11:15:27 +0000 Subject: [PATCH 2/8] Add comprehensive tests for functions --- tests/storage/test_appservice.py | 56 ++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index c5c79873495d..1ce29af5fd9d 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -410,6 +410,62 @@ def test_get_appservices_by_state_multiple(self): ) +class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver() + return hs + + def prepare(self, hs, reactor, clock): + self.service = Mock(id="foo") + self.store = self.hs.get_datastore() + self.get_success(self.store.set_appservice_state(self.service, "up")) + + def test_get_type_stream_id_for_appservice_no_value(self): + value = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") + ) + self.assertEquals(value, 0) + + value = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "presence") + ) + self.assertEquals(value, 0) + + def test_get_type_stream_id_for_appservice_invalid_type(self): + self.get_failure( + self.store.get_type_stream_id_for_appservice(self.service, "foobar"), + ValueError, + ) + + def test_set_type_stream_id_for_appservice(self): + read_receipt_value = 1024 + self.get_success( + self.store.set_type_stream_id_for_appservice( + self.service, "read_receipt", read_receipt_value + ) + ) + result = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") + ) + self.assertEqual(result, read_receipt_value) + + self.get_success( + self.store.set_type_stream_id_for_appservice( + self.service, "presence", read_receipt_value + ) + ) + result = self.get_success( + self.store.get_type_stream_id_for_appservice(self.service, "presence") + ) + self.assertEqual(result, read_receipt_value) + + def test_set_type_stream_id_for_appservice_invalid_type(self): + self.get_failure( + self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), + ValueError, + ) + + # required for ApplicationServiceTransactionStoreTestCase tests class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): def __init__(self, database: DatabasePool, db_conn, hs): From 7a5bf278aaf5eceb0e9c192e6587d2ae7ecabd95 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Sun, 25 Oct 2020 11:19:00 +0000 Subject: [PATCH 3/8] changelog --- changelog.d/8648.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8648.bugfix diff --git a/changelog.d/8648.bugfix b/changelog.d/8648.bugfix new file mode 100644 index 000000000000..98c2f91214a5 --- /dev/null +++ b/changelog.d/8648.bugfix @@ -0,0 +1 @@ +Fix an issue that would cause ephemeral events to not be sent to appservices. \ No newline at end of file From a21533cfc02bd93f64b32ef92e69a282825a634d Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Sun, 25 Oct 2020 12:03:39 +0000 Subject: [PATCH 4/8] Store stream_id regardless of outcome --- synapse/handlers/appservice.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 07240d3a14ba..8a04be45b0bc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -203,16 +203,16 @@ async def notify_interested_services_ephemeral( events = await self._handle_receipts(service) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token - ) + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) elif stream_key == "presence_key": events = await self._handle_presence(service, users) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing(self, service: ApplicationService, new_token: int): typing_source = self.event_sources.sources["typing"] From 26bcc234067e7864953448d0ca5891b359773aa6 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 26 Oct 2020 14:09:19 +0000 Subject: [PATCH 5/8] Improve assertions --- synapse/storage/databases/main/appservice.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 6bdde19ca55b..e5e1ed1f4b55 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -369,8 +369,8 @@ def get_new_events_for_appservice_txn(txn): async def get_type_stream_id_for_appservice( self, service: ApplicationService, type: str ) -> int: - if type not in ["read_receipt", "presence"]: - raise ValueError("type must be a valid application stream id type") + if type not in ("read_receipt", "presence"): + raise ValueError("Expected type to be a valid application stream id type, got %s" % type) def get_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type @@ -393,8 +393,8 @@ def get_type_stream_id_for_appservice_txn(txn): async def set_type_stream_id_for_appservice( self, service: ApplicationService, type: str, pos: int ) -> None: - if type not in ["read_receipt", "presence"]: - raise ValueError("type must be a valid application stream id type") + if type not in ("read_receipt", "presence"): + raise ValueError("Expected type to be a valid application stream id type, got %s" % type) def set_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type From 1999a9b5a76ce097c4f5a1dfe18db88cf0fe431a Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 26 Oct 2020 14:11:12 +0000 Subject: [PATCH 6/8] improve changelog --- changelog.d/8648.bugfix | 2 +- synapse/storage/databases/main/appservice.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/changelog.d/8648.bugfix b/changelog.d/8648.bugfix index 98c2f91214a5..aa71ad0ff2e8 100644 --- a/changelog.d/8648.bugfix +++ b/changelog.d/8648.bugfix @@ -1 +1 @@ -Fix an issue that would cause ephemeral events to not be sent to appservices. \ No newline at end of file +Fix a bug introduced in v1.22.0rc1 which would cause ephemeral events to not be sent to appservices. \ No newline at end of file diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index e5e1ed1f4b55..9a903317fff4 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -370,7 +370,9 @@ async def get_type_stream_id_for_appservice( self, service: ApplicationService, type: str ) -> int: if type not in ("read_receipt", "presence"): - raise ValueError("Expected type to be a valid application stream id type, got %s" % type) + raise ValueError( + "Expected type to be a valid application stream id type, got %s" % type + ) def get_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type @@ -394,7 +396,9 @@ async def set_type_stream_id_for_appservice( self, service: ApplicationService, type: str, pos: int ) -> None: if type not in ("read_receipt", "presence"): - raise ValueError("Expected type to be a valid application stream id type, got %s" % type) + raise ValueError( + "Expected type to be a valid application stream id type, got %s" % type + ) def set_type_stream_id_for_appservice_txn(txn): stream_id_type = "%s_stream_id" % type From 58dcdcd593cd336f7f6e06ed2bc55b4dde538583 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 26 Oct 2020 10:16:16 -0400 Subject: [PATCH 7/8] Wrap type in a tuple for formatting. --- synapse/storage/databases/main/appservice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 9a903317fff4..2d997ca419ff 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -371,7 +371,7 @@ async def get_type_stream_id_for_appservice( ) -> int: if type not in ("read_receipt", "presence"): raise ValueError( - "Expected type to be a valid application stream id type, got %s" % type + "Expected type to be a valid application stream id type, got %s" % (type,) ) def get_type_stream_id_for_appservice_txn(txn): @@ -397,7 +397,7 @@ async def set_type_stream_id_for_appservice( ) -> None: if type not in ("read_receipt", "presence"): raise ValueError( - "Expected type to be a valid application stream id type, got %s" % type + "Expected type to be a valid application stream id type, got %s" % (type,) ) def set_type_stream_id_for_appservice_txn(txn): From b1716a4fdef3ad760e9c9f16134b3e9c20c55783 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 26 Oct 2020 10:21:18 -0400 Subject: [PATCH 8/8] Lint --- synapse/storage/databases/main/appservice.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 2d997ca419ff..637a938bacaa 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -371,7 +371,8 @@ async def get_type_stream_id_for_appservice( ) -> int: if type not in ("read_receipt", "presence"): raise ValueError( - "Expected type to be a valid application stream id type, got %s" % (type,) + "Expected type to be a valid application stream id type, got %s" + % (type,) ) def get_type_stream_id_for_appservice_txn(txn): @@ -397,7 +398,8 @@ async def set_type_stream_id_for_appservice( ) -> None: if type not in ("read_receipt", "presence"): raise ValueError( - "Expected type to be a valid application stream id type, got %s" % (type,) + "Expected type to be a valid application stream id type, got %s" + % (type,) ) def set_type_stream_id_for_appservice_txn(txn):