Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext/pymongo: Add instrumentor interface #598

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pymongo import MongoClient

from opentelemetry import trace as trace_api
from opentelemetry.ext.pymongo import trace_integration
from opentelemetry.ext.pymongo import PymongoInstrumentor
from opentelemetry.test.test_base import TestBase

MONGODB_HOST = os.getenv("MONGODB_HOST ", "localhost")
Expand All @@ -31,7 +31,7 @@ class TestFunctionalPymongo(TestBase):
def setUpClass(cls):
super().setUpClass()
cls._tracer = cls.tracer_provider.get_tracer(__name__)
trace_integration(cls.tracer_provider)
PymongoInstrumentor().instrument()
client = MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)
Expand Down Expand Up @@ -94,3 +94,23 @@ def test_delete(self):
with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"})
self.validate_spans()

def test_uninstrument(self):
# check that integration is working
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.memory_exporter.clear()
self.assertEqual(len(spans), 1)

# uninstrument and check not new spans are created
PymongoInstrumentor().uninstrument()
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.memory_exporter.clear()
self.assertEqual(len(spans), 0)

# re-enable and check that it works again
PymongoInstrumentor().instrument()
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
5 changes: 5 additions & 0 deletions ext/opentelemetry-ext-pymongo/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api == 0.7.dev0
opentelemetry-auto-instrumentation == 0.7.dev0
pymongo ~= 3.1

[options.extras_require]
Expand All @@ -49,3 +50,7 @@ test =

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
pymongo = opentelemetry.instrumentation.pymongo:PymongoInstrumentor
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# limitations under the License.

"""
The integration with MongoDB supports the `pymongo`_ library and is specified
to ``trace_integration`` using ``'pymongo'``.
The integration with MongoDB supports the `pymongo`_ library, it can be
enabled using the ``PymongoInstrumentor``.

.. _pymongo: https://pypi.org/project/pymongo

Expand All @@ -26,11 +26,11 @@
from pymongo import MongoClient
from opentelemetry import trace
from opentelemetry.trace import TracerProvider
from opentelemetry.trace.ext.pymongo import trace_integration
from opentelemetry.trace.ext.pymongo import PymongoInstrumentor

trace.set_tracer_provider(TracerProvider())

trace_integration()
PymongoInstrumentor().instrument()
client = MongoClient()
db = client["MongoDB_Database"]
collection = db["MongoDB_Collection"]
Expand All @@ -42,6 +42,8 @@

from pymongo import monitoring

from opentelemetry import trace
from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext.pymongo.version import __version__
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace.status import Status, StatusCanonicalCode
Expand All @@ -50,27 +52,51 @@
COMMAND_ATTRIBUTES = ["filter", "sort", "skip", "limit", "pipeline"]


def trace_integration(tracer_provider=None):
"""Integrate with pymongo to trace it using event listener.
https://api.mongodb.com/python/current/api/pymongo/monitoring.html
class PymongoInstrumentor(BaseInstrumentor):
_commandtracer_instance = None
# The instrumentation for PyMongo is based on the event listener interface
# https://api.mongodb.com/python/current/api/pymongo/monitoring.html.
# This interface only allows to register listeners and does not provide
# an unregister API. In order to provide a mechanishm to disable
# instrumentation an enabled flag is implemented in CommandTracer,
# it's checked in the different listeners.

Args:
tracer_provider: The `TracerProvider` to use. If none is passed the
current configured one is used.
"""
def _instrument(self, **kwargs):
"""Integrate with pymongo to trace it using event listener.
https://api.mongodb.com/python/current/api/pymongo/monitoring.html

tracer = get_tracer(__name__, __version__, tracer_provider)
Args:
tracer_provider: The `TracerProvider` to use. If none is passed the
current configured one is used.
"""

monitoring.register(CommandTracer(tracer))
tracer_provider = kwargs.get("tracer_provider")

# Create and register a CommandTracer only the first time
if self._commandtracer_instance is None:
tracer = get_tracer(__name__, __version__, tracer_provider)

self._commandtracer_instance = CommandTracer(tracer)
monitoring.register(self._commandtracer_instance)

# If already created, just enable it
self._commandtracer_instance.enable = True

def _uninstrument(self, **kwargs):
if self._commandtracer_instance is not None:
self._commandtracer_instance.enable = False


class CommandTracer(monitoring.CommandListener):
def __init__(self, tracer):
self._tracer = tracer
self._span_dict = {}
self.enable = True

def started(self, event: monitoring.CommandStartedEvent):
""" Method to handle a pymongo CommandStartedEvent """
if not self.enable:
return
command = event.command.get(event.command_name, "")
name = DATABASE_TYPE + "." + event.command_name
statement = event.command_name
Expand Down Expand Up @@ -103,35 +129,32 @@ def started(self, event: monitoring.CommandStartedEvent):
if span is not None:
span.set_status(Status(StatusCanonicalCode.INTERNAL, str(ex)))
span.end()
self._remove_span(event)
self._pop_span(event)

def succeeded(self, event: monitoring.CommandSucceededEvent):
""" Method to handle a pymongo CommandSucceededEvent """
span = self._get_span(event)
if span is not None:
span.set_attribute(
"db.mongo.duration_micros", event.duration_micros
)
span.set_status(Status(StatusCanonicalCode.OK, event.reply))
span.end()
self._remove_span(event)
if not self.enable:
return
span = self._pop_span(event)
if span is None:
return
span.set_attribute("db.mongo.duration_micros", event.duration_micros)
span.set_status(Status(StatusCanonicalCode.OK, event.reply))
span.end()

def failed(self, event: monitoring.CommandFailedEvent):
""" Method to handle a pymongo CommandFailedEvent """
span = self._get_span(event)
if span is not None:
span.set_attribute(
"db.mongo.duration_micros", event.duration_micros
)
span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure))
span.end()
self._remove_span(event)

def _get_span(self, event):
return self._span_dict.get(_get_span_dict_key(event))

def _remove_span(self, event):
self._span_dict.pop(_get_span_dict_key(event))
if not self.enable:
return
span = self._pop_span(event)
if span is None:
return
span.set_attribute("db.mongo.duration_micros", event.duration_micros)
span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure))
span.end()

def _pop_span(self, event):
return self._span_dict.pop(_get_span_dict_key(event), None)


def _get_span_dict_key(event):
Expand Down
8 changes: 4 additions & 4 deletions ext/opentelemetry-ext-pymongo/tests/test_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.ext.pymongo import CommandTracer, trace_integration
from opentelemetry.ext.pymongo import CommandTracer, PymongoInstrumentor
from opentelemetry.test.test_base import TestBase


Expand All @@ -24,13 +24,13 @@ def setUp(self):
super().setUp()
self.tracer = self.tracer_provider.get_tracer(__name__)

def test_trace_integration(self):
def test_pymongo_instrumentor(self):
mock_register = mock.Mock()
patch = mock.patch(
"pymongo.monitoring.register", side_effect=mock_register
)
with patch:
trace_integration(self.tracer_provider)
PymongoInstrumentor().instrument()

self.assertTrue(mock_register.called)

Expand All @@ -50,7 +50,7 @@ def test_started(self):
# the memory exporter can't be used here because the span isn't ended
# yet
# pylint: disable=protected-access
span = command_tracer._get_span(mock_event)
span = command_tracer._pop_span(mock_event)
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.name, "mongodb.command_name.find")
self.assertEqual(span.attributes["component"], "mongodb")
Expand Down
2 changes: 2 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ commands_pre =

prometheus: pip install {toxinidir}/ext/opentelemetry-ext-prometheus

pymongo: pip install {toxinidir}/opentelemetry-auto-instrumentation
pymongo: pip install {toxinidir}/ext/opentelemetry-ext-pymongo[test]

psycopg2: pip install {toxinidir}/ext/opentelemetry-ext-dbapi
Expand Down Expand Up @@ -276,6 +277,7 @@ changedir =
commands_pre =
pip install -e {toxinidir}/opentelemetry-api \
-e {toxinidir}/opentelemetry-sdk \
-e {toxinidir}/opentelemetry-auto-instrumentation \
-e {toxinidir}/tests/util \
-e {toxinidir}/ext/opentelemetry-ext-dbapi \
-e {toxinidir}/ext/opentelemetry-ext-mysql \
Expand Down