Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add OpenTracing for database activity. #10113

Merged
merged 6 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10113.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Report OpenTracing spans for database activity.
6 changes: 6 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ class SynapseTags:
# HTTP request tag (used to distinguish full vs incremental syncs, etc)
REQUEST_TAG = "request_tag"

# Text description of a database transaction
DB_TXN_DESC = "db.txn_desc"

# Uniqueish ID of a database transaction
DB_TXN_ID = "db.txn_id"


# Block everything by default
# A regex which matches the server_names to expose traces for.
Expand Down
86 changes: 53 additions & 33 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

from synapse.api.errors import StoreError
from synapse.config.database import DatabaseConnectionConfig
from synapse.logging import opentracing
from synapse.logging.context import (
LoggingContext,
current_context,
Expand Down Expand Up @@ -313,7 +314,14 @@ def _do_execute(self, func: Callable[..., R], sql: str, *args: Any) -> R:
start = time.time()

try:
return func(sql, *args)
with opentracing.start_active_span(
"db.query",
tags={
opentracing.tags.DATABASE_TYPE: "sql",
opentracing.tags.DATABASE_STATEMENT: sql,
},
):
return func(sql, *args)
except Exception as e:
sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
Expand Down Expand Up @@ -525,9 +533,16 @@ def new_transaction(
exception_callbacks=exception_callbacks,
)
try:
r = func(cursor, *args, **kwargs)
conn.commit()
return r
with opentracing.start_active_span(
"db.txn",
tags={
opentracing.SynapseTags.DB_TXN_DESC: desc,
opentracing.SynapseTags.DB_TXN_ID: name,
},
):
r = func(cursor, *args, **kwargs)
conn.commit()
return r
except self.engine.module.OperationalError as e:
# This can happen if the database disappears mid
# transaction.
Expand Down Expand Up @@ -653,16 +668,17 @@ async def runInteraction(
logger.warning("Starting db txn '%s' from sentinel context", desc)

try:
result = await self.runWithConnection(
self.new_transaction,
desc,
after_callbacks,
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
**kwargs,
)
with opentracing.start_active_span(f"db.{desc}"):
result = await self.runWithConnection(
self.new_transaction,
desc,
after_callbacks,
exception_callbacks,
func,
*args,
db_autocommit=db_autocommit,
**kwargs,
)

for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
Expand Down Expand Up @@ -718,25 +734,29 @@ def inner_func(conn, *args, **kwargs):
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)

if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)
with opentracing.start_active_span(
operation_name="db.connection",
):
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)

if self.engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
opentracing.log_kv({"message", "reconnected"})
richvdh marked this conversation as resolved.
Show resolved Hide resolved

try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)

db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)

return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down