-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add metrics instrumentation sqlalchemy #1645
Changes from 23 commits
f08c3ad
28c8c2e
cecafe1
ae56830
eb5e1dc
b22f06a
85e81b0
454229a
2e59e58
281b933
d73eff3
a36873d
2af13ee
b5a9272
41a9b5a
176e8e6
622f948
3104b4b
8c03ff9
35d1709
58532fc
2ad2d06
dece77d
e9c6954
0a48b5a
244cde6
71b234e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,13 +105,16 @@ | |
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor | ||
from opentelemetry.instrumentation.sqlalchemy.engine import ( | ||
EngineTracer, | ||
_get_tracer, | ||
_wrap_connect, | ||
_wrap_create_async_engine, | ||
_wrap_create_engine, | ||
) | ||
from opentelemetry.instrumentation.sqlalchemy.package import _instruments | ||
from opentelemetry.instrumentation.sqlalchemy.version import __version__ | ||
from opentelemetry.instrumentation.utils import unwrap | ||
from opentelemetry.metrics import get_meter | ||
from opentelemetry.semconv.metrics import MetricInstruments | ||
from opentelemetry.trace import get_tracer | ||
|
||
|
||
class SQLAlchemyInstrumentor(BaseInstrumentor): | ||
|
@@ -136,32 +139,47 @@ def _instrument(self, **kwargs): | |
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise. | ||
""" | ||
tracer_provider = kwargs.get("tracer_provider") | ||
tracer = get_tracer(__name__, __version__, tracer_provider) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
|
||
meter_provider = kwargs.get("meter_provider") | ||
meter = get_meter(__name__, __version__, meter_provider) | ||
|
||
connections_usage = meter.create_up_down_counter( | ||
name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE, | ||
unit="connections", | ||
description="The number of connections that are currently in state described by the state attribute.", | ||
) | ||
|
||
enable_commenter = kwargs.get("enable_commenter", False) | ||
|
||
_w( | ||
"sqlalchemy", | ||
"create_engine", | ||
_wrap_create_engine(tracer_provider, enable_commenter), | ||
_wrap_create_engine(tracer, connections_usage, enable_commenter), | ||
) | ||
_w( | ||
"sqlalchemy.engine", | ||
"create_engine", | ||
_wrap_create_engine(tracer_provider, enable_commenter), | ||
_wrap_create_engine(tracer, connections_usage, enable_commenter), | ||
) | ||
_w( | ||
"sqlalchemy.engine.base", | ||
"Engine.connect", | ||
_wrap_connect(tracer_provider), | ||
_wrap_connect(tracer), | ||
) | ||
if parse_version(sqlalchemy.__version__).release >= (1, 4): | ||
_w( | ||
"sqlalchemy.ext.asyncio", | ||
"create_async_engine", | ||
_wrap_create_async_engine(tracer_provider, enable_commenter), | ||
_wrap_create_async_engine( | ||
tracer, connections_usage, enable_commenter | ||
), | ||
) | ||
if kwargs.get("engine") is not None: | ||
return EngineTracer( | ||
_get_tracer(tracer_provider), | ||
tracer, | ||
kwargs.get("engine"), | ||
connections_usage, | ||
kwargs.get("enable_commenter", False), | ||
kwargs.get("commenter_options", {}), | ||
) | ||
|
@@ -170,8 +188,9 @@ def _instrument(self, **kwargs): | |
): | ||
return [ | ||
EngineTracer( | ||
_get_tracer(tracer_provider), | ||
tracer, | ||
engine, | ||
connections_usage, | ||
kwargs.get("enable_commenter", False), | ||
kwargs.get("commenter_options", {}), | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,6 @@ | |
) | ||
|
||
from opentelemetry import trace | ||
from opentelemetry.instrumentation.sqlalchemy.package import ( | ||
_instrumenting_module_name, | ||
) | ||
from opentelemetry.instrumentation.sqlalchemy.version import __version__ | ||
from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment | ||
from opentelemetry.instrumentation.utils import _get_opentelemetry_values | ||
|
@@ -44,49 +41,36 @@ def _normalize_vendor(vendor): | |
return vendor | ||
|
||
|
||
def _get_tracer(tracer_provider=None): | ||
return trace.get_tracer( | ||
_instrumenting_module_name, | ||
__version__, | ||
tracer_provider=tracer_provider, | ||
) | ||
|
||
|
||
def _wrap_create_async_engine(tracer_provider=None, enable_commenter=False): | ||
def _wrap_create_async_engine( | ||
tracer, connections_usage, enable_commenter=False | ||
): | ||
# pylint: disable=unused-argument | ||
def _wrap_create_async_engine_internal(func, module, args, kwargs): | ||
"""Trace the SQLAlchemy engine, creating an `EngineTracer` | ||
object that will listen to SQLAlchemy events. | ||
""" | ||
engine = func(*args, **kwargs) | ||
EngineTracer( | ||
_get_tracer(tracer_provider), engine.sync_engine, enable_commenter | ||
tracer, engine.sync_engine, connections_usage, enable_commenter | ||
) | ||
return engine | ||
|
||
return _wrap_create_async_engine_internal | ||
|
||
|
||
def _wrap_create_engine(tracer_provider=None, enable_commenter=False): | ||
# pylint: disable=unused-argument | ||
def _wrap_create_engine_internal(func, module, args, kwargs): | ||
def _wrap_create_engine(tracer, connections_usage, enable_commenter=False): | ||
def _wrap_create_engine_internal(func, _module, args, kwargs): | ||
"""Trace the SQLAlchemy engine, creating an `EngineTracer` | ||
object that will listen to SQLAlchemy events. | ||
""" | ||
engine = func(*args, **kwargs) | ||
EngineTracer(_get_tracer(tracer_provider), engine, enable_commenter) | ||
EngineTracer(tracer, engine, connections_usage, enable_commenter) | ||
return engine | ||
|
||
return _wrap_create_engine_internal | ||
|
||
|
||
def _wrap_connect(tracer_provider=None): | ||
tracer = trace.get_tracer( | ||
_instrumenting_module_name, | ||
__version__, | ||
tracer_provider=tracer_provider, | ||
) | ||
|
||
def _wrap_connect(tracer): | ||
# pylint: disable=unused-argument | ||
def _wrap_connect_internal(func, module, args, kwargs): | ||
with tracer.start_as_current_span( | ||
|
@@ -107,10 +91,16 @@ class EngineTracer: | |
_remove_event_listener_params = [] | ||
|
||
def __init__( | ||
self, tracer, engine, enable_commenter=False, commenter_options=None | ||
self, | ||
tracer, | ||
engine, | ||
connections_usage, | ||
enable_commenter=False, | ||
commenter_options=None, | ||
): | ||
self.tracer = tracer | ||
self.engine = engine | ||
self.connections_usage = connections_usage | ||
self.vendor = _normalize_vendor(engine.name) | ||
self.enable_commenter = enable_commenter | ||
self.commenter_options = commenter_options if commenter_options else {} | ||
|
@@ -123,6 +113,49 @@ def __init__( | |
engine, "after_cursor_execute", _after_cur_exec | ||
) | ||
self._register_event_listener(engine, "handle_error", _handle_error) | ||
self._register_event_listener(engine, "connect", self._pool_connect) | ||
self._register_event_listener(engine, "close", self._pool_close) | ||
self._register_event_listener(engine, "checkin", self._pool_checkin) | ||
self._register_event_listener(engine, "checkout", self._pool_checkout) | ||
|
||
def _get_pool_name(self): | ||
return self.engine.pool.logging_name or "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The specification didn't write what to do in case the pool_name is not provided, so I kept it empty in this case. |
||
|
||
def _add_idle_to_connection_usage(self, value): | ||
self.connections_usage.add( | ||
value, | ||
attributes={ | ||
"pool.name": self._get_pool_name(), | ||
"state": "idle", | ||
}, | ||
) | ||
|
||
def _add_used_to_connection_usage(self, value): | ||
self.connections_usage.add( | ||
value, | ||
attributes={ | ||
"pool.name": self._get_pool_name(), | ||
"state": "used", | ||
}, | ||
) | ||
|
||
def _pool_connect(self, _dbapi_connection, _connection_record): | ||
self._add_idle_to_connection_usage(1) | ||
|
||
def _pool_close(self, _dbapi_connection, _connection_record): | ||
self._add_idle_to_connection_usage(-1) | ||
|
||
# Called when a connection returns to the pool. | ||
def _pool_checkin(self, _dbapi_connection, _connection_record): | ||
self._add_used_to_connection_usage(-1) | ||
self._add_idle_to_connection_usage(1) | ||
|
||
# Called when a connection is retrieved from the Pool. | ||
def _pool_checkout( | ||
self, _dbapi_connection, _connection_record, _connection_proxy | ||
): | ||
self._add_idle_to_connection_usage(-1) | ||
self._add_used_to_connection_usage(1) | ||
|
||
@classmethod | ||
def _register_event_listener(cls, target, identifier, func, *args, **kw): | ||
|
@@ -153,9 +186,8 @@ def _operation_name(self, db_name, statement): | |
return self.vendor | ||
return " ".join(parts) | ||
|
||
# pylint: disable=unused-argument | ||
def _before_cur_exec( | ||
self, conn, cursor, statement, params, context, executemany | ||
self, conn, cursor, statement, params, context, _executemany | ||
shalevr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
attrs, found = _get_attributes_from_url(conn.engine.url) | ||
if not found: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open-telemetry/opentelemetry-python#3182