diff --git a/synapse/storage/database.py b/synapse/storage/database.py index b36ea7ccdfe6..fc4b85b9b678 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -461,8 +461,23 @@ def new_transaction( exception_callbacks: List[_CallbackListEntry], func: "Callable[..., R]", *args: Any, + db_retry: bool = True, **kwargs: Any ) -> R: + """Start a new database transaction with the given connection. + + Args: + conn + desc + after_callbacks + exception_callbacks + func + *args + db_retry: Whether to retry the transaction by calling `func` again. + This should be disabled if connection is in autocommit mode. + **kwargs + """ + start = monotonic_time() txn_id = self._TXN_ID @@ -493,7 +508,7 @@ def new_transaction( transaction_logger.warning( "[TXN OPERROR] {%s} %s %d/%d", name, e, i, N, ) - if i < N: + if db_retry and i < N: i += 1 try: conn.rollback() @@ -506,7 +521,7 @@ def new_transaction( transaction_logger.warning( "[TXN DEADLOCK] {%s} %d/%d", name, i, N ) - if i < N: + if db_retry and i < N: i += 1 try: conn.rollback() @@ -566,7 +581,12 @@ def new_transaction( sql_txn_timer.labels(desc).observe(duration) async def runInteraction( - self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any + self, + desc: str, + func: "Callable[..., R]", + *args: Any, + db_autocommit: bool = False, + **kwargs: Any ) -> R: """Starts a transaction on the database and runs a given function @@ -576,6 +596,12 @@ async def runInteraction( database transaction (twisted.enterprise.adbapi.Transaction) as its first argument, followed by `args` and `kwargs`. + db_autocommit: Whether to run the function in "autocommit" mode, + i.e. outside of a transaction. This is useful for transaction + that are only a single query. Currently only affects postgres. + WARNING: This means that if func fails half way through then + the changes will *not* be rolled back. + args: positional args to pass to `func` kwargs: named args to pass to `func` @@ -596,6 +622,8 @@ async def runInteraction( exception_callbacks, func, *args, + db_autocommit=db_autocommit, + db_retry=not db_autocommit, # Don't retry in auto commit mode. **kwargs ) @@ -609,7 +637,11 @@ async def runInteraction( return cast(R, result) async def runWithConnection( - self, func: "Callable[..., R]", *args: Any, **kwargs: Any + self, + func: "Callable[..., R]", + *args: Any, + db_autocommit: bool = False, + **kwargs: Any ) -> R: """Wraps the .runWithConnection() method on the underlying db_pool. @@ -618,6 +650,9 @@ async def runWithConnection( database connection (twisted.enterprise.adbapi.Connection) as its first argument, followed by `args` and `kwargs`. args: positional args to pass to `func` + db_autocommit: Whether to run the function in "autocommit" mode, + i.e. outside of a transaction. This is useful for transaction + that are only a single query. Currently only affects postgres. kwargs: named args to pass to `func` Returns: @@ -649,10 +684,17 @@ def inner_func(conn, *args, **kwargs): logger.debug("Reconnecting closed database connection") conn.reconnect() - db_conn = LoggingDatabaseConnection( - conn, self.engine, "runWithConnection" - ) - return func(db_conn, *args, **kwargs) + try: + if db_autocommit: + self.engine.set_autocommit(conn, True) + + db_conn = LoggingDatabaseConnection( + conn, self.engine, "runWithConnection" + ) + return func(db_conn, *args, **kwargs) + finally: + if db_autocommit: + self.engine.set_autocommit(conn, False) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 314d2e8d80a8..475270b6f398 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -103,3 +103,11 @@ def in_transaction(self, conn: Connection) -> bool: """Whether the connection is currently in a transaction. """ ... + + @abc.abstractmethod + def set_autocommit(self, conn: Connection, autocommit: bool): + """Set the connections autocommit mode. + + When True queries are run outside of transactions. + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 655f2a8b5f06..a8563faefd32 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -108,6 +108,7 @@ def on_new_connection(self, db_conn): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) + db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() @@ -176,3 +177,6 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.status != self.module.extensions.STATUS_READY # type: ignore + + def set_autocommit(self, conn: Connection, autocommit: bool): + return conn.set_session(autocommit=autocommit) # type: ignore diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 00bced3618f8..c12f5225aeed 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -110,6 +110,11 @@ def server_version(self): def in_transaction(self, conn: Connection) -> bool: return conn.in_transaction # type: ignore + def set_autocommit(self, conn: Connection, autocommit: bool): + # Twisted doesn't let us set attributes on the connections, so we can't + # set the connection to autocommit mode. + pass + # Following functions taken from: https://github.com/coleifer/peewee diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 40ba652248ce..eac7e4dcd2fa 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs): engine = create_engine(sqlite_config) fake_engine = Mock(wraps=engine) fake_engine.can_native_upsert = False + fake_engine.in_transaction.return_value = False db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine) db._db_pool = self.db_pool