Skip to content

Commit

Permalink
Updated dbapi and psycopg2 instrumentations.
Browse files Browse the repository at this point in the history
Changes:

- Update dbapi instrumentation to use the SQL statement name as the span
instead of the entire SQL query.
- Renamed TracedCursor with CursorTracing. The class was not a valid
Cursor so the name was confusing.
- Updated CursorTracing's (previously TracedCursor) traced_execution
method to accept the cursor instance as the first argument. This is
required as for some dbapi implementations, we need a reference to the
cursor in order to correctly format the SQL query.
- Updated psycopg2 instrumentation to leverage dbapi's `cursor_factory`
mechanism instead of wrapping the cursor with wrapt. This results in a
simpler instrumentation without monkey patching objects at runtime and
allows psycopg2's type registration system to work. This should make it
possible to use psycopg2 instrumentation when using the JSONB feature or
with frameworks like Django.
  • Loading branch information
owais committed Dec 22, 2020
1 parent 7a1be91 commit ad2493d
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 88 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ lib64
__pycache__
venv*/
.venv*/
opentelemetry-python-core*/
/opentelemetry-python-core*/
/opentelemetry-python-core

# Installer logs
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#253](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/253))
- `opentelemetry-instrumentation-requests`, `opentelemetry-instrumentation-urllib` Fix span name callback parameters
- ([#259](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/259))
- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer`
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types.
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query.
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))

## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from aiopg.utils import _ContextManager, _PoolAcquireContextManager

from opentelemetry.instrumentation.dbapi import (
CursorTracer,
DatabaseApiIntegration,
TracedCursor,
)
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
Expand Down Expand Up @@ -94,25 +94,29 @@ async def _acquire(self):
return TracedPoolProxy(pool, *args, **kwargs)


class AsyncTracedCursor(TracedCursor):
class AsyncCursorTracer(CursorTracer):
async def traced_execution(
self,
cursor,
query_method: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
):
name = ""
if len(args) > 0 and args[0]:
name = args[0]
elif self._db_api_integration.database:
name = self._db_api_integration.database
else:
name = self._db_api_integration.name
if args:
name = self.get_operation_name(cursor, args)

if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)

with self._db_api_integration.get_tracer().start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, *args)
self._populate_span(cursor, span, *args)
try:
result = await query_method(*args, **kwargs)
return result
Expand All @@ -123,7 +127,7 @@ async def traced_execution(


def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = AsyncTracedCursor(db_api_integration)
_traced_cursor = AsyncCursorTracer(db_api_integration)

# pylint: disable=abstract-method
class AsyncTracedCursorProxy(AsyncProxyObject):
Expand All @@ -134,19 +138,19 @@ def __init__(self, cursor, *args, **kwargs):

async def execute(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
self, self.__wrapped__.execute, *args, **kwargs
)
return result

async def executemany(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
self, self.__wrapped__.executemany, *args, **kwargs
)
return result

async def callproc(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
self, self.__wrapped__.callproc, *args, **kwargs
)
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def test_span_succeeded(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)

self.assertEqual(span.attributes["component"], "testcomponent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def trace_integration(
connection_attributes: typing.Dict = None,
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Expand All @@ -86,6 +87,7 @@ def trace_integration(
version=__version__,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
db_api_integration_factory=db_api_integration_factory,
)


Expand All @@ -99,6 +101,7 @@ def wrap_connect(
version: str = "",
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
Expand All @@ -115,6 +118,9 @@ def wrap_connect(
capture_parameters: Configure if db.statement.parameters should be captured.
"""
db_api_integration_factory = (
db_api_integration_factory or DatabaseApiIntegration
)

# pylint: disable=unused-argument
def wrap_connect_(
Expand All @@ -123,7 +129,7 @@ def wrap_connect_(
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
db_integration = DatabaseApiIntegration(
db_integration = db_api_integration_factory(
name,
database_component,
database_type=database_type,
Expand Down Expand Up @@ -314,16 +320,19 @@ def __exit__(self, *args, **kwargs):
return TracedConnectionProxy(connection, *args, **kwargs)


class TracedCursor:
class CursorTracer:
def __init__(self, db_api_integration: DatabaseApiIntegration):
self._db_api_integration = db_api_integration

def _populate_span(
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
self,
span: trace_api.Span,
cursor,
*args: typing.Tuple[typing.Any, typing.Any]
):
if not span.is_recording():
return
statement = args[0] if args else ""
statement = self.get_statement(cursor, args)
span.set_attribute(
"component", self._db_api_integration.database_component
)
Expand All @@ -342,24 +351,38 @@ def _populate_span(
if self._db_api_integration.capture_parameters and len(args) > 1:
span.set_attribute("db.statement.parameters", str(args[1]))

def get_operation_name(self, cursor, args):
if args and isinstance(args[0], str):
return args[0].split(" ")[0]
return ""

def get_statement(self, cursor, args):
if not args:
return ""
statement = args[0]
if isinstance(statement, bytes):
return statement.decode("utf8", "replace")
return statement

def traced_execution(
self,
cursor,
query_method: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
):
name = ""
if args:
name = args[0]
elif self._db_api_integration.database:
name = self._db_api_integration.database
else:
name = self._db_api_integration.name
name = self.get_operation_name(cursor, args)
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)

with self._db_api_integration.get_tracer().start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, *args)
self._populate_span(span, cursor, *args)
try:
result = query_method(*args, **kwargs)
return result
Expand All @@ -370,7 +393,7 @@ def traced_execution(


def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = TracedCursor(db_api_integration)
_cursor_tracer = CursorTracer(db_api_integration)

# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):
Expand All @@ -380,18 +403,18 @@ def __init__(self, cursor, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, cursor)

def execute(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
)

def __enter__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_span_succeeded(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)

self.assertEqual(span.attributes["component"], "testcomponent")
Expand Down Expand Up @@ -93,7 +93,7 @@ def test_span_succeeded_with_capture_of_statement_parameters(self):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)

self.assertEqual(span.attributes["component"], "testcomponent")
Expand Down
Loading

0 comments on commit ad2493d

Please sign in to comment.