Skip to content

Commit

Permalink
Add span for connection phase (#1134)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Jul 17, 2022
1 parent 9e2dbec commit 2ce69a6
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients
([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177))
- `opentelemetry-instrumentation-sqlalchemy` Added span for the connection phase ([#1133](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1133))

## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@

import sqlalchemy
from packaging.version import parse as parse_version
from sqlalchemy.engine.base import Engine
from wrapt import wrap_function_wrapper as _w

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import (
EngineTracer,
_get_tracer,
_wrap_connect,
_wrap_create_async_engine,
_wrap_create_engine,
)
Expand Down Expand Up @@ -97,13 +99,17 @@ def _instrument(self, **kwargs):
"create_engine",
_wrap_create_engine(tracer_provider),
)
_w(
"sqlalchemy.engine.base",
"Engine.connect",
_wrap_connect(tracer_provider),
)
if parse_version(sqlalchemy.__version__).release >= (1, 4):
_w(
"sqlalchemy.ext.asyncio",
"create_async_engine",
_wrap_create_async_engine(tracer_provider),
)

if kwargs.get("engine") is not None:
return EngineTracer(
_get_tracer(tracer_provider),
Expand All @@ -127,5 +133,6 @@ def _instrument(self, **kwargs):
def _uninstrument(self, **kwargs):
unwrap(sqlalchemy, "create_engine")
unwrap(sqlalchemy.engine, "create_engine")
unwrap(Engine, "connect")
if parse_version(sqlalchemy.__version__).release >= (1, 4):
unwrap(sqlalchemy.ext.asyncio, "create_async_engine")
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ def _wrap_create_engine_internal(func, module, args, kwargs):
return _wrap_create_engine_internal


def _wrap_connect(tracer_provider=None):
tracer = trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)

# pylint: disable=unused-argument
def _wrap_connect_internal(func, module, args, kwargs):
with tracer.start_as_current_span(
"connect", kind=trace.SpanKind.CLIENT
):
return func(*args, **kwargs)

return _wrap_connect_internal


class EngineTracer:
def __init__(self, tracer, engine, enable_commenter=False):
self.tracer = tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ def test_trace_integration(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query itself
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)

def test_instrument_two_engines(self):
engine_1 = create_engine("sqlite:///:memory:")
Expand All @@ -65,8 +69,20 @@ def test_instrument_two_engines(self):
cnx_2.execute("SELECT 1 + 1;").fetchall()

spans = self.memory_exporter.get_finished_spans()
# 2 queries + 2 engine connect
self.assertEqual(len(spans), 4)

self.assertEqual(len(spans), 2)
def test_instrument_engine_connect(self):
engine = create_engine("sqlite:///:memory:")

SQLAlchemyInstrumentor().instrument(
engine=engine,
tracer_provider=self.tracer_provider,
)

engine.connect()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

@pytest.mark.skipif(
not sqlalchemy.__version__.startswith("1.4"),
Expand All @@ -85,11 +101,15 @@ async def run():
async with engine.connect() as cnx:
await cnx.execute(sqlalchemy.text("SELECT 1 + 1;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)
self.assertEqual(
spans[0].instrumentation_scope.name,
spans[1].instrumentation_scope.name,
"opentelemetry.instrumentation.sqlalchemy",
)

Expand All @@ -99,7 +119,10 @@ def test_not_recording(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_span.__enter__ = mock.Mock(return_value=(mock.Mock(), None))
mock_span.__exit__ = mock.Mock(return_value=None)
mock_tracer.start_span.return_value = mock_span
mock_tracer.start_as_current_span.return_value = mock_span
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
engine = create_engine("sqlite:///:memory:")
Expand All @@ -123,11 +146,15 @@ def test_create_engine_wrapper(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)
self.assertEqual(
spans[0].instrumentation_scope.name,
spans[1].instrumentation_scope.name,
"opentelemetry.instrumentation.sqlalchemy",
)

Expand All @@ -153,7 +180,7 @@ def test_custom_tracer_provider(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(len(spans), 2)
self.assertEqual(spans[0].resource.attributes["service.name"], "test")
self.assertEqual(
spans[0].resource.attributes["deployment.environment"], "env"
Expand All @@ -177,11 +204,15 @@ async def run():
async with engine.connect() as cnx:
await cnx.execute(sqlalchemy.text("SELECT 1 + 1;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)
self.assertEqual(
spans[0].instrumentation_scope.name,
spans[1].instrumentation_scope.name,
"opentelemetry.instrumentation.sqlalchemy",
)

Expand All @@ -199,8 +230,8 @@ def test_generate_commenter(self):
cnx = engine.connect()
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(len(spans), 2)
span = spans[1]
self.assertIn(
EngineTracer._generate_comment(span),
self.caplog.records[-2].getMessage(),
Expand Down
22 changes: 13 additions & 9 deletions tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ def test_orm_insert(self):
self.session.commit()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
stmt = "INSERT INTO players (id, name) VALUES "
if span.attributes.get(SpanAttributes.DB_SYSTEM) == "sqlite":
stmt += "(?, ?)"
Expand All @@ -148,8 +149,9 @@ def test_session_query(self):
self.assertEqual(len(out), 0)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
stmt = "SELECT players.id AS players_id, players.name AS players_name \nFROM players \nWHERE players.name = "
if span.attributes.get(SpanAttributes.DB_SYSTEM) == "sqlite":
stmt += "?"
Expand All @@ -170,8 +172,9 @@ def test_engine_connect_execute(self):
self.assertEqual(len(rows), 0)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
self._check_span(span, "SELECT")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
Expand All @@ -190,8 +193,9 @@ def test_parent(self):
self.assertEqual(len(rows), 0)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans
# one span for the connection and two for the queries
self.assertEqual(len(spans), 3)
_, child_span, parent_span = spans

# confirm the parenting
self.assertIsNone(parent_span.parent)
Expand Down Expand Up @@ -247,5 +251,5 @@ def insert_players(session):
# batch inserts together which means `insert_players` only generates one span.
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#orm-batch-inserts-with-psycopg2-now-batch-statements-with-returning-in-most-cases
self.assertEqual(
len(spans), 5 if self.VENDOR not in ["postgresql"] else 3
len(spans), 8 if self.VENDOR not in ["postgresql"] else 6
)
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ def test_engine_execute_errors(self):
conn.execute("SELECT * FROM a_wrong_table").fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT opentelemetry-tests")
self.assertEqual(
Expand All @@ -96,9 +97,9 @@ def test_orm_insert(self):
self.session.commit()

spans = self.memory_exporter.get_finished_spans()
# identity insert on before the insert, insert, and identity insert off after the insert
self.assertEqual(len(spans), 3)
span = spans[1]
# connect, identity insert on before the insert, insert, and identity insert off after the insert
self.assertEqual(len(spans), 4)
span = spans[2]
self._check_span(span, "INSERT")
self.assertIn(
"INSERT INTO players",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def test_engine_execute_errors(self):
conn.execute("SELECT * FROM a_wrong_table").fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT opentelemetry-tests")
self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ def test_engine_execute_errors(self):
conn.execute("SELECT * FROM a_wrong_table").fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT opentelemetry-tests")
self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def test_engine_execute_errors(self):
conn.execute(stmt).fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one span for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT :memory:")
self.assertEqual(
Expand Down

0 comments on commit 2ce69a6

Please sign in to comment.