Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics instrumentation sqlalchemy #1645

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f08c3ad
Add metrics instrumentation sqlalchemy
shalevr Feb 6, 2023
28c8c2e
refactored the connection usage
shalevr Feb 6, 2023
cecafe1
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 6, 2023
ae56830
Add changelog entry and supports_metrics
shalevr Feb 6, 2023
eb5e1dc
Fix lint
shalevr Feb 6, 2023
b22f06a
Using semantic convention
shalevr Feb 7, 2023
85e81b0
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
srikanthccv Feb 13, 2023
454229a
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 13, 2023
2e59e58
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 13, 2023
281b933
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 13, 2023
d73eff3
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 13, 2023
a36873d
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 15, 2023
2af13ee
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 15, 2023
b5a9272
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 17, 2023
41a9b5a
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 17, 2023
176e8e6
Update test.yml
shalevr Feb 17, 2023
622f948
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
srikanthccv Feb 18, 2023
3104b4b
Fix Changelog
shalevr Feb 18, 2023
8c03ff9
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 18, 2023
35d1709
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 20, 2023
58532fc
Remove unused code
shalevr Feb 21, 2023
2ad2d06
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 22, 2023
dece77d
work with the metric basic test function from test_base.py
shalevr Feb 23, 2023
e9c6954
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 24, 2023
0a48b5a
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 25, 2023
244cde6
Merge branch 'main' into Metrics-instrumentation-sqlalchemy
shalevr Feb 25, 2023
71b234e
Fix merge issue
shalevr Feb 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,18 @@
from sqlalchemy.engine.base import Engine
from wrapt import wrap_function_wrapper as _w

from opentelemetry.metrics import Histogram, get_meter
from opentelemetry.trace import get_tracer
from opentelemetry.semconv.metrics import MetricInstruments

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import (
EngineTracer,
_get_tracer,
_wrap_connect,
_wrap_create_async_engine,
_wrap_create_engine,
)
from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.sqlalchemy.package import _instruments
from opentelemetry.instrumentation.utils import unwrap

Expand All @@ -136,32 +140,47 @@ def _instrument(self, **kwargs):
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)

connections_usage = meter.create_up_down_counter(
name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE,
unit="connections",
description="The number of connections that are currently in state described by the state attribute.",
)

enable_commenter = kwargs.get("enable_commenter", False)

_w(
"sqlalchemy",
"create_engine",
_wrap_create_engine(tracer_provider, enable_commenter),
_wrap_create_engine(tracer, connections_usage, enable_commenter),
)
_w(
"sqlalchemy.engine",
"create_engine",
_wrap_create_engine(tracer_provider, enable_commenter),
_wrap_create_engine(tracer, connections_usage, enable_commenter),
)
_w(
"sqlalchemy.engine.base",
"Engine.connect",
_wrap_connect(tracer_provider),
_wrap_connect(tracer, connections_usage),
)
if parse_version(sqlalchemy.__version__).release >= (1, 4):
_w(
"sqlalchemy.ext.asyncio",
"create_async_engine",
_wrap_create_async_engine(tracer_provider, enable_commenter),
_wrap_create_async_engine(
tracer, connections_usage, enable_commenter
),
)
if kwargs.get("engine") is not None:
return EngineTracer(
_get_tracer(tracer_provider),
tracer,
kwargs.get("engine"),
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
Expand All @@ -170,8 +189,9 @@ def _instrument(self, **kwargs):
):
return [
EngineTracer(
_get_tracer(tracer_provider),
tracer,
engine,
connections_usage,
kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
)

from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy.package import (
_instrumenting_module_name,
)
from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment
from opentelemetry.instrumentation.utils import _get_opentelemetry_values
Expand All @@ -44,48 +41,36 @@ def _normalize_vendor(vendor):
return vendor


def _get_tracer(tracer_provider=None):
return trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)


def _wrap_create_async_engine(tracer_provider=None, enable_commenter=False):
def _wrap_create_async_engine(
tracer, connections_usage, enable_commenter=False
):
# pylint: disable=unused-argument
def _wrap_create_async_engine_internal(func, module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer`
object that will listen to SQLAlchemy events.
"""
engine = func(*args, **kwargs)
EngineTracer(
_get_tracer(tracer_provider), engine.sync_engine, enable_commenter
tracer, engine.sync_engine, connections_usage, enable_commenter
)
return engine

return _wrap_create_async_engine_internal


def _wrap_create_engine(tracer_provider=None, enable_commenter=False):
# pylint: disable=unused-argument
def _wrap_create_engine_internal(func, module, args, kwargs):
def _wrap_create_engine(tracer, connections_usage, enable_commenter=False):
def _wrap_create_engine_internal(func, _module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer`
object that will listen to SQLAlchemy events.
"""
engine = func(*args, **kwargs)
EngineTracer(_get_tracer(tracer_provider), engine, enable_commenter)
EngineTracer(tracer, engine, connections_usage, enable_commenter)
return engine

return _wrap_create_engine_internal


def _wrap_connect(tracer_provider=None):
tracer = trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)
def _wrap_connect(tracer, connections_usage):

# pylint: disable=unused-argument
def _wrap_connect_internal(func, module, args, kwargs):
Expand All @@ -101,10 +86,16 @@ class EngineTracer:
_remove_event_listener_params = []

def __init__(
self, tracer, engine, enable_commenter=False, commenter_options=None
self,
tracer,
engine,
connections_usage,
enable_commenter=False,
commenter_options=None,
):
self.tracer = tracer
self.engine = engine
self.connections_usage = connections_usage
self.vendor = _normalize_vendor(engine.name)
self.enable_commenter = enable_commenter
self.commenter_options = commenter_options if commenter_options else {}
Expand All @@ -117,6 +108,49 @@ def __init__(
engine, "after_cursor_execute", _after_cur_exec
)
self._register_event_listener(engine, "handle_error", _handle_error)
self._register_event_listener(engine, "connect", self._pool_connect)
self._register_event_listener(engine, "close", self._pool_close)
self._register_event_listener(engine, "checkin", self._pool_checkin)
self._register_event_listener(engine, "checkout", self._pool_checkout)

def _get_pool_name(self):
return self.engine.pool.logging_name or ""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specification didn't write what to do in case the pool_name is not provided, so I kept it empty in this case.
There is a discussion about it in the specification, I'm waiting to see what will be decided


def _add_idle_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
"state": "idle",
},
)

def _add_used_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
"state": "used",
},
)

def _pool_connect(self, _dbapi_connection, _connection_record):
self._add_idle_to_connection_usage(1)

def _pool_close(self, _dbapi_connection, _connection_record):
self._add_idle_to_connection_usage(-1)

# Called when a connection returns to the pool.
def _pool_checkin(self, _dbapi_connection, _connection_record):
self._add_used_to_connection_usage(-1)
self._add_idle_to_connection_usage(1)

# Called when a connection is retrieved from the Pool.
def _pool_checkout(
self, _dbapi_connection, _connection_record, _connection_proxy
):
self._add_idle_to_connection_usage(-1)
self._add_used_to_connection_usage(1)

@classmethod
def _register_event_listener(cls, target, identifier, func, *args, **kw):
Expand Down Expand Up @@ -147,9 +181,8 @@ def _operation_name(self, db_name, statement):
return self.vendor
return " ".join(parts)

# pylint: disable=unused-argument
def _before_cur_exec(
self, conn, cursor, statement, params, context, executemany
self, conn, cursor, statement, params, context, _executemany
shalevr marked this conversation as resolved.
Show resolved Hide resolved
):
attrs, found = _get_attributes_from_url(conn.engine.url)
if not found:
Expand Down
Loading