From d8c2c96dfbb730736f5f549ea2bdf2ad051db820 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Oct 2020 11:06:18 +0100 Subject: [PATCH 1/8] Fix uncommitted connections leaking to pool when first connected --- synapse/storage/database.py | 7 +++++++ synapse/storage/engines/_base.py | 6 ++++++ synapse/storage/engines/postgres.py | 7 ++++++- synapse/storage/engines/sqlite.py | 5 +++++ 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0d9d9b7cc07a..b36ea7ccdfe6 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -633,6 +633,13 @@ async def runWithConnection( start_time = monotonic_time() def inner_func(conn, *args, **kwargs): + # We shouldn't be in a transaction. If we are then something + # somewhere hasn't committed after doing work. (This is likely only + # possible during startup, as `run*` will ensure changes are + # committed/rolled back before putting the connection back in the + # pool). + assert not self.engine.in_transaction(conn) + with LoggingContext("runWithConnection", parent_context) as context: sched_duration_sec = monotonic_time() - start_time sql_scheduling_timer.observe(sched_duration_sec) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 908cbc79e322..314d2e8d80a8 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -97,3 +97,9 @@ def server_version(self) -> str: """Gets a string giving the server version. For example: '3.22.0' """ ... + + @abc.abstractmethod + def in_transaction(self, conn: Connection) -> bool: + """Whether the connection is currently in a transaction. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index ff39281f8599..655f2a8b5f06 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -15,7 +15,8 @@ import logging -from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from synapse.storage.types import Connection logger = logging.getLogger(__name__) @@ -119,6 +120,7 @@ def on_new_connection(self, db_conn): cursor.execute("SET synchronous_commit TO OFF") cursor.close() + db_conn.commit() @property def can_native_upsert(self): @@ -171,3 +173,6 @@ def server_version(self): return "%i.%i" % (numver / 10000, numver % 10000) else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + + def in_transaction(self, conn: Connection) -> bool: + return conn.status != self.module.extensions.STATUS_READY # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 8a0f8c89d173..00bced3618f8 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -17,6 +17,7 @@ import typing from synapse.storage.engines import BaseDatabaseEngine +from synapse.storage.types import Connection if typing.TYPE_CHECKING: import sqlite3 # noqa: F401 @@ -86,6 +87,7 @@ def on_new_connection(self, db_conn): db_conn.create_function("rank", 1, _rank) db_conn.execute("PRAGMA foreign_keys = ON;") + db_conn.commit() def is_deadlock(self, error): return False @@ -105,6 +107,9 @@ def server_version(self): """ return "%i.%i.%i" % self.module.sqlite_version_info + def in_transaction(self, conn: Connection) -> bool: + return conn.in_transaction # type: ignore + # Following functions taken from: https://github.com/coleifer/peewee From 8644ce0c0cbfa667f194f2231b8c3f75c0ca4a5a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Oct 2020 16:45:48 +0100 Subject: [PATCH 2/8] Newsfile --- changelog.d/8456.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8456.misc diff --git a/changelog.d/8456.misc b/changelog.d/8456.misc new file mode 100644 index 000000000000..ccd260069ba9 --- /dev/null +++ b/changelog.d/8456.misc @@ -0,0 +1 @@ +Reduce number of serialization errors of `MultiWriterIdGenerator._update_table`. From 0e5c73b2a376165c817b37febcea0f0b3e328d9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Oct 2020 16:41:12 +0100 Subject: [PATCH 3/8] Allow running DB interactions in autocommit mode. This allows running queries outside of transactions, which is useful to avoid the overhead of transaction management (in terms of RTT and isolation levels). --- synapse/storage/database.py | 58 +++++++++++++++++++++++++---- synapse/storage/engines/_base.py | 8 ++++ synapse/storage/engines/postgres.py | 4 ++ synapse/storage/engines/sqlite.py | 5 +++ tests/storage/test_base.py | 1 + 5 files changed, 68 insertions(+), 8 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index b36ea7ccdfe6..fc4b85b9b678 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -461,8 +461,23 @@ def new_transaction( exception_callbacks: List[_CallbackListEntry], func: "Callable[..., R]", *args: Any, + db_retry: bool = True, **kwargs: Any ) -> R: + """Start a new database transaction with the given connection. + + Args: + conn + desc + after_callbacks + exception_callbacks + func + *args + db_retry: Whether to retry the transaction by calling `func` again. + This should be disabled if connection is in autocommit mode. + **kwargs + """ + start = monotonic_time() txn_id = self._TXN_ID @@ -493,7 +508,7 @@ def new_transaction( transaction_logger.warning( "[TXN OPERROR] {%s} %s %d/%d", name, e, i, N, ) - if i < N: + if db_retry and i < N: i += 1 try: conn.rollback() @@ -506,7 +521,7 @@ def new_transaction( transaction_logger.warning( "[TXN DEADLOCK] {%s} %d/%d", name, i, N ) - if i < N: + if db_retry and i < N: i += 1 try: conn.rollback() @@ -566,7 +581,12 @@ def new_transaction( sql_txn_timer.labels(desc).observe(duration) async def runInteraction( - self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any + self, + desc: str, + func: "Callable[..., R]", + *args: Any, + db_autocommit: bool = False, + **kwargs: Any ) -> R: """Starts a transaction on the database and runs a given function @@ -576,6 +596,12 @@ async def runInteraction( database transaction (twisted.enterprise.adbapi.Transaction) as its first argument, followed by `args` and `kwargs`. + db_autocommit: Whether to run the function in "autocommit" mode, + i.e. outside of a transaction. This is useful for transaction + that are only a single query. Currently only affects postgres. + WARNING: This means that if func fails half way through then + the changes will *not* be rolled back. + args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -596,6 +622,8 @@ async def runInteraction( exception_callbacks, func, *args, + db_autocommit=db_autocommit, + db_retry=not db_autocommit, # Don't retry in auto commit mode. **kwargs ) @@ -609,7 +637,11 @@ async def runInteraction( return cast(R, result) async def runWithConnection( - self, func: "Callable[..., R]", *args: Any, **kwargs: Any + self, + func: "Callable[..., R]", + *args: Any, + db_autocommit: bool = False, + **kwargs: Any ) -> R: """Wraps the .runWithConnection() method on the underlying db_pool. @@ -618,6 +650,9 @@ async def runWithConnection( database connection (twisted.enterprise.adbapi.Connection) as its first argument, followed by `args` and `kwargs`. args: positional args to pass to `func` + db_autocommit: Whether to run the function in "autocommit" mode, + i.e. outside of a transaction. This is useful for transaction + that are only a single query. Currently only affects postgres. kwargs: named args to pass to `func` Returns: @@ -649,10 +684,17 @@ def inner_func(conn, *args, **kwargs): logger.debug("Reconnecting closed database connection") conn.reconnect() - db_conn = LoggingDatabaseConnection( - conn, self.engine, "runWithConnection" - ) - return func(db_conn, *args, **kwargs) + try: + if db_autocommit: + self.engine.set_autocommit(conn, True) + + db_conn = LoggingDatabaseConnection( + conn, self.engine, "runWithConnection" + ) + return func(db_conn, *args, **kwargs) + finally: + if db_autocommit: + self.engine.set_autocommit(conn, False) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 314d2e8d80a8..475270b6f398 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -103,3 +103,11 @@ def in_transaction(self, conn: Connection) -> bool: """Whether the connection is currently in a transaction. """ ... + + @abc.abstractmethod + def set_autocommit(self, conn: Connection, autocommit: bool): + """Set the connections autocommit mode. + + When True queries are run outside of transactions. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 655f2a8b5f06..a8563faefd32 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -108,6 +108,7 @@ def on_new_connection(self, db_conn): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) + db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() @@ -176,3 +177,6 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.status != self.module.extensions.STATUS_READY # type: ignore + + def set_autocommit(self, conn: Connection, autocommit: bool): + return conn.set_session(autocommit=autocommit) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 00bced3618f8..c12f5225aeed 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -110,6 +110,11 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.in_transaction # type: ignore + def set_autocommit(self, conn: Connection, autocommit: bool): + # Twisted doesn't let us set attributes on the connections, so we can't + # set the connection to autocommit mode. + pass + # Following functions taken from: https://github.com/coleifer/peewee diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 40ba652248ce..eac7e4dcd2fa 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs): engine = create_engine(sqlite_config) fake_engine = Mock(wraps=engine) fake_engine.can_native_upsert = False + fake_engine.in_transaction.return_value = False db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine) db._db_pool = self.db_pool From 201d2da705a428bfba3a19d8d7733a70f0900db8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Oct 2020 14:19:09 +0100 Subject: [PATCH 4/8] Reduce serialization errors in MultiWriterIdGen We call `_update_stream_positions_table_txn` a lot, which is an UPSERT that can conflict in `REPEATABLE READ` isolation level. Instead of doing a transaction consisting of a single query we may as well run it outside of a transaction. --- synapse/storage/util/id_generators.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 51f680d05d81..ba72b8790b54 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -548,7 +548,7 @@ def _add_persisted_position(self, new_id: int): # do. break - def _update_stream_positions_table_txn(self, txn): + def _update_stream_positions_table_txn(self, txn: LoggingTransaction): """Update the `stream_positions` table with newly persisted position. """ @@ -632,10 +632,16 @@ async def __aexit__(self, exc_type, exc, tb): # # We only do this on the success path so that the persisted current # position points to a persisted row with the correct instance name. + # + # We do this in autocommit mode as a) the upsert works correctly outside + # transactions and b) reduces the amount of time the rows are locked + # for. If we don't do this then we'll often hit serialization errors due + # to the fact we default to REPEATABLE READ isolation levels. if self.id_gen._writers: await self.id_gen._db.runInteraction( "MultiWriterIdGenerator._update_table", self.id_gen._update_stream_positions_table_txn, + db_autocommit=True, ) return False From 9be577c4701414cc5ddc75368af93718083f0144 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Oct 2020 15:22:24 +0100 Subject: [PATCH 5/8] Use autocommit when fetching sequence values --- synapse/storage/util/id_generators.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index ba72b8790b54..bd32eea9d497 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -598,10 +598,13 @@ class _MultiWriterCtxManager: stream_ids = attr.ib(type=List[int], factory=list) async def __aenter__(self) -> Union[int, List[int]]: + # It's safe to run this in autocommit mode as fetching values from a + # sequence ignores transaction semantics anyway. self.stream_ids = await self.id_gen._db.runInteraction( "_load_next_mult_id", self.id_gen._load_next_mult_id_txn, self.multiple_ids or 1, + db_autocommit=True, ) # Assert the fetched ID is actually greater than any ID we've already From b16b7efb5eb7cbc3607228d865c87c338efa74a5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Oct 2020 17:34:50 +0100 Subject: [PATCH 6/8] Code review --- synapse/storage/database.py | 18 +++++++++++------- synapse/storage/engines/postgres.py | 1 - synapse/storage/util/id_generators.py | 3 ++- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index fc4b85b9b678..94570d1f1011 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -461,11 +461,16 @@ def new_transaction( exception_callbacks: List[_CallbackListEntry], func: "Callable[..., R]", *args: Any, - db_retry: bool = True, **kwargs: Any ) -> R: """Start a new database transaction with the given connection. + Note: The given func may be called multiple times under certain + failure modes. This is normally fine when in a standard transaction, + but care must be taken if the connection is in `autocommit` mode that + the function will correctly handle being aborted and retried half way + through its execution. + Args: conn desc @@ -473,8 +478,6 @@ def new_transaction( exception_callbacks func *args - db_retry: Whether to retry the transaction by calling `func` again. - This should be disabled if connection is in autocommit mode. **kwargs """ @@ -508,7 +511,7 @@ def new_transaction( transaction_logger.warning( "[TXN OPERROR] {%s} %s %d/%d", name, e, i, N, ) - if db_retry and i < N: + if i < N: i += 1 try: conn.rollback() @@ -521,7 +524,7 @@ def new_transaction( transaction_logger.warning( "[TXN DEADLOCK] {%s} %d/%d", name, i, N ) - if db_retry and i < N: + if i < N: i += 1 try: conn.rollback() @@ -600,7 +603,9 @@ async def runInteraction( i.e. outside of a transaction. This is useful for transaction that are only a single query. Currently only affects postgres. WARNING: This means that if func fails half way through then - the changes will *not* be rolled back. + the changes will *not* be rolled back. `func` may also get + called multiple times if the transaction is retried, so must + correctly handle that case. args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -623,7 +628,6 @@ async def runInteraction( func, *args, db_autocommit=db_autocommit, - db_retry=not db_autocommit, # Don't retry in auto commit mode. **kwargs ) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a8563faefd32..e30e19d20300 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -108,7 +108,6 @@ def on_new_connection(self, db_conn): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) - db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index bd32eea9d497..d7e40aaa8b40 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -24,6 +24,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.database import DatabasePool, LoggingTransaction +from synapse.storage.types import Cursor from synapse.storage.util.sequence import PostgresSequenceGenerator logger = logging.getLogger(__name__) @@ -548,7 +549,7 @@ def _add_persisted_position(self, new_id: int): # do. break - def _update_stream_positions_table_txn(self, txn: LoggingTransaction): + def _update_stream_positions_table_txn(self, txn: Cursor): """Update the `stream_positions` table with newly persisted position. """ From 8385e95557936fe18b1c73673a3080d8bce19882 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 14:36:53 +0100 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/database.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 94570d1f1011..d3d9f1936ae0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -600,8 +600,12 @@ async def runInteraction( its first argument, followed by `args` and `kwargs`. db_autocommit: Whether to run the function in "autocommit" mode, - i.e. outside of a transaction. This is useful for transaction - that are only a single query. Currently only affects postgres. + i.e. outside of a transaction. This is useful for transactions + that are only a single query. + + Currently, this is only implemented for Postgres. SQLite will still + run the function inside a transaction. + WARNING: This means that if func fails half way through then the changes will *not* be rolled back. `func` may also get called multiple times if the transaction is retried, so must From 9741ca7270eb3f7b5b54feb22abd560e45fae5ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Oct 2020 14:40:29 +0100 Subject: [PATCH 8/8] s/set_autocommit/attempt_to_set_autocommit/ --- synapse/storage/database.py | 10 +++++----- synapse/storage/engines/_base.py | 7 +++++-- synapse/storage/engines/postgres.py | 2 +- synapse/storage/engines/sqlite.py | 2 +- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index d3d9f1936ae0..0ba3a025cf1c 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -601,11 +601,11 @@ async def runInteraction( db_autocommit: Whether to run the function in "autocommit" mode, i.e. outside of a transaction. This is useful for transactions - that are only a single query. - + that are only a single query. + Currently, this is only implemented for Postgres. SQLite will still run the function inside a transaction. - + WARNING: This means that if func fails half way through then the changes will *not* be rolled back. `func` may also get called multiple times if the transaction is retried, so must @@ -694,7 +694,7 @@ def inner_func(conn, *args, **kwargs): try: if db_autocommit: - self.engine.set_autocommit(conn, True) + self.engine.attempt_to_set_autocommit(conn, True) db_conn = LoggingDatabaseConnection( conn, self.engine, "runWithConnection" @@ -702,7 +702,7 @@ def inner_func(conn, *args, **kwargs): return func(db_conn, *args, **kwargs) finally: if db_autocommit: - self.engine.set_autocommit(conn, False) + self.engine.attempt_to_set_autocommit(conn, False) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 475270b6f398..d6d632dc10f6 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -105,9 +105,12 @@ def in_transaction(self, conn: Connection) -> bool: ... @abc.abstractmethod - def set_autocommit(self, conn: Connection, autocommit: bool): - """Set the connections autocommit mode. + def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): + """Attempt to set the connections autocommit mode. When True queries are run outside of transactions. + + Note: This has no effect on SQLite3, so callers still need to + commit/rollback the connections. """ ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index e30e19d20300..7719ac32f764 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -177,5 +177,5 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.status != self.module.extensions.STATUS_READY # type: ignore - def set_autocommit(self, conn: Connection, autocommit: bool): + def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): return conn.set_session(autocommit=autocommit) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index c12f5225aeed..5db0f0b520db 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -110,7 +110,7 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.in_transaction # type: ignore - def set_autocommit(self, conn: Connection, autocommit: bool): + def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool): # Twisted doesn't let us set attributes on the connections, so we can't # set the connection to autocommit mode. pass