Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Allow running DB interactions in autocommit mode.
Browse files Browse the repository at this point in the history
This allows running queries outside of transactions, which is useful to
avoid the overhead of transaction management (in terms of RTT and
isolation levels).
  • Loading branch information
erikjohnston committed Oct 5, 2020
1 parent 8644ce0 commit 0e5c73b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
58 changes: 50 additions & 8 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,23 @@ def new_transaction(
exception_callbacks: List[_CallbackListEntry],
func: "Callable[..., R]",
*args: Any,
db_retry: bool = True,
**kwargs: Any
) -> R:
"""Start a new database transaction with the given connection.
Args:
conn
desc
after_callbacks
exception_callbacks
func
*args
db_retry: Whether to retry the transaction by calling `func` again.
This should be disabled if connection is in autocommit mode.
**kwargs
"""

start = monotonic_time()
txn_id = self._TXN_ID

Expand Down Expand Up @@ -493,7 +508,7 @@ def new_transaction(
transaction_logger.warning(
"[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
)
if i < N:
if db_retry and i < N:
i += 1
try:
conn.rollback()
Expand All @@ -506,7 +521,7 @@ def new_transaction(
transaction_logger.warning(
"[TXN DEADLOCK] {%s} %d/%d", name, i, N
)
if i < N:
if db_retry and i < N:
i += 1
try:
conn.rollback()
Expand Down Expand Up @@ -566,7 +581,12 @@ def new_transaction(
sql_txn_timer.labels(desc).observe(duration)

async def runInteraction(
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
desc: str,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Starts a transaction on the database and runs a given function
Expand All @@ -576,6 +596,12 @@ async def runInteraction(
database transaction (twisted.enterprise.adbapi.Transaction) as
its first argument, followed by `args` and `kwargs`.
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
WARNING: This means that if func fails half way through then
the changes will *not* be rolled back.
args: positional args to pass to `func`
kwargs: named args to pass to `func`
Expand All @@ -596,6 +622,8 @@ async def runInteraction(
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
db_retry=not db_autocommit, # Don't retry in auto commit mode.
**kwargs
)

Expand All @@ -609,7 +637,11 @@ async def runInteraction(
return cast(R, result)

async def runWithConnection(
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
self,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool.
Expand All @@ -618,6 +650,9 @@ async def runWithConnection(
database connection (twisted.enterprise.adbapi.Connection) as
its first argument, followed by `args` and `kwargs`.
args: positional args to pass to `func`
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
kwargs: named args to pass to `func`
Returns:
Expand Down Expand Up @@ -649,10 +684,17 @@ def inner_func(conn, *args, **kwargs):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
try:
if db_autocommit:
self.engine.set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.set_autocommit(conn, False)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down
8 changes: 8 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,11 @@ def in_transaction(self, conn: Connection) -> bool:
"""Whether the connection is currently in a transaction.
"""
...

@abc.abstractmethod
def set_autocommit(self, conn: Connection, autocommit: bool):
"""Set the connections autocommit mode.
When True queries are run outside of transactions.
"""
...
4 changes: 4 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def on_new_connection(self, db_conn):
db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
db_conn.set_session(self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ)

# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
Expand Down Expand Up @@ -176,3 +177,6 @@ def server_version(self):

def in_transaction(self, conn: Connection) -> bool:
return conn.status != self.module.extensions.STATUS_READY # type: ignore

def set_autocommit(self, conn: Connection, autocommit: bool):
return conn.set_session(autocommit=autocommit) # type: ignore
5 changes: 5 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def server_version(self):
def in_transaction(self, conn: Connection) -> bool:
return conn.in_transaction # type: ignore

def set_autocommit(self, conn: Connection, autocommit: bool):
# Twisted doesn't let us set attributes on the connections, so we can't
# set the connection to autocommit mode.
pass


# Following functions taken from: https://github.com/coleifer/peewee

Expand Down
1 change: 1 addition & 0 deletions tests/storage/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def runWithConnection(func, *args, **kwargs):
engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine)
fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False

db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
db._db_pool = self.db_pool
Expand Down

0 comments on commit 0e5c73b

Please sign in to comment.