diff --git a/ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py b/ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py index 57649be7b4f..a5fda62dea0 100644 --- a/ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py +++ b/ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py @@ -174,7 +174,7 @@ def instrument_connection( connection_attributes=connection_attributes, ) db_integration.get_connection_attributes(connection) - return TracedConnectionProxy(connection, db_integration) + return get_traced_connection_proxy(connection, db_integration) def uninstrument_connection(connection): @@ -227,7 +227,7 @@ def wrapped_connection( """ connection = connect_method(*args, **kwargs) self.get_connection_attributes(connection) - return TracedConnectionProxy(connection, self) + return get_traced_connection_proxy(connection, self) def get_connection_attributes(self, connection): # Populate span fields using connection @@ -260,23 +260,21 @@ def get_connection_attributes(self, connection): self.span_attributes["net.peer.port"] = port -# pylint: disable=abstract-method -class TracedConnectionProxy(wrapt.ObjectProxy): - # pylint: disable=unused-argument - def __init__( - self, - connection, - db_api_integration: DatabaseApiIntegration, - *args, - **kwargs - ): - wrapt.ObjectProxy.__init__(self, connection) - self._db_api_integration = db_api_integration +def get_traced_connection_proxy( + connection, db_api_integration, *args, **kwargs +): + # pylint: disable=abstract-method + class TracedConnectionProxy(wrapt.ObjectProxy): + # pylint: disable=unused-argument + def __init__(self, connection, *args, **kwargs): + wrapt.ObjectProxy.__init__(self, connection) + + def cursor(self, *args, **kwargs): + return get_traced_cursor_proxy( + self.__wrapped__.cursor(*args, **kwargs), db_api_integration + ) - def cursor(self, *args, **kwargs): - return TracedCursorProxy( - self.__wrapped__.cursor(*args, **kwargs), self._db_api_integration - ) + return TracedConnectionProxy(connection, *args, **kwargs) class TracedCursor: @@ -323,31 +321,28 @@ def traced_execution( raise ex -# pylint: disable=abstract-method -class TracedCursorProxy(wrapt.ObjectProxy): +def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): + _traced_cursor = TracedCursor(db_api_integration) + # pylint: disable=abstract-method + class TracedCursorProxy(wrapt.ObjectProxy): - # pylint: disable=unused-argument - def __init__( - self, - cursor, - db_api_integration: DatabaseApiIntegration, - *args, - **kwargs - ): - wrapt.ObjectProxy.__init__(self, cursor) - self._traced_cursor = TracedCursor(db_api_integration) + # pylint: disable=unused-argument + def __init__(self, cursor, *args, **kwargs): + wrapt.ObjectProxy.__init__(self, cursor) - def execute(self, *args, **kwargs): - return self._traced_cursor.traced_execution( - self.__wrapped__.execute, *args, **kwargs - ) + def execute(self, *args, **kwargs): + return _traced_cursor.traced_execution( + self.__wrapped__.execute, *args, **kwargs + ) - def executemany(self, *args, **kwargs): - return self._traced_cursor.traced_execution( - self.__wrapped__.executemany, *args, **kwargs - ) + def executemany(self, *args, **kwargs): + return _traced_cursor.traced_execution( + self.__wrapped__.executemany, *args, **kwargs + ) - def callproc(self, *args, **kwargs): - return self._traced_cursor.traced_execution( - self.__wrapped__.callproc, *args, **kwargs - ) + def callproc(self, *args, **kwargs): + return _traced_cursor.traced_execution( + self.__wrapped__.callproc, *args, **kwargs + ) + + return TracedCursorProxy(cursor, *args, **kwargs) diff --git a/ext/opentelemetry-ext-dbapi/tests/test_dbapi_integration.py b/ext/opentelemetry-ext-dbapi/tests/test_dbapi_integration.py index 88c243c5bfc..a2ba9f7d89d 100644 --- a/ext/opentelemetry-ext-dbapi/tests/test_dbapi_integration.py +++ b/ext/opentelemetry-ext-dbapi/tests/test_dbapi_integration.py @@ -125,7 +125,6 @@ def test_wrap_connect(self, mock_dbapi): dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-") connection = mock_dbapi.connect() self.assertEqual(mock_dbapi.connect.call_count, 1) - self.assertIsInstance(connection, dbapi.TracedConnectionProxy) self.assertIsInstance(connection.__wrapped__, mock.Mock) @mock.patch("opentelemetry.ext.dbapi") @@ -133,7 +132,6 @@ def test_unwrap_connect(self, mock_dbapi): dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-") connection = mock_dbapi.connect() self.assertEqual(mock_dbapi.connect.call_count, 1) - self.assertIsInstance(connection, dbapi.TracedConnectionProxy) dbapi.unwrap_connect(mock_dbapi, "connect") connection = mock_dbapi.connect() @@ -145,7 +143,6 @@ def test_instrument_connection(self): # Avoid get_attributes failing because can't concatenate mock connection.database = "-" connection2 = dbapi.instrument_connection(self.tracer, connection, "-") - self.assertIsInstance(connection2, dbapi.TracedConnectionProxy) self.assertIs(connection2.__wrapped__, connection) def test_uninstrument_connection(self): @@ -154,7 +151,6 @@ def test_uninstrument_connection(self): # be concatenated connection.database = "-" connection2 = dbapi.instrument_connection(self.tracer, connection, "-") - self.assertIsInstance(connection2, dbapi.TracedConnectionProxy) self.assertIs(connection2.__wrapped__, connection) connection3 = dbapi.uninstrument_connection(connection2) diff --git a/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py b/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py index e0537ad2935..d0bdd685f4a 100644 --- a/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py +++ b/ext/opentelemetry-ext-docker-tests/tests/postgres/test_psycopg_functional.py @@ -18,7 +18,7 @@ import psycopg2 from opentelemetry import trace as trace_api -from opentelemetry.ext.psycopg2 import trace_integration +from opentelemetry.ext.psycopg2 import Psycopg2Instrumentor from opentelemetry.test.test_base import TestBase POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost") @@ -35,7 +35,7 @@ def setUpClass(cls): cls._connection = None cls._cursor = None cls._tracer = cls.tracer_provider.get_tracer(__name__) - trace_integration(cls.tracer_provider) + Psycopg2Instrumentor().instrument(tracer_provider=cls.tracer_provider) cls._connection = psycopg2.connect( dbname=POSTGRES_DB_NAME, user=POSTGRES_USER, @@ -52,6 +52,7 @@ def tearDownClass(cls): cls._cursor.close() if cls._connection: cls._connection.close() + Psycopg2Instrumentor().uninstrument() def validate_spans(self): spans = self.memory_exporter.get_finished_spans() diff --git a/ext/opentelemetry-ext-psycopg2/CHANGELOG.md b/ext/opentelemetry-ext-psycopg2/CHANGELOG.md index f32ad5bd4c2..1a6748f7ba4 100644 --- a/ext/opentelemetry-ext-psycopg2/CHANGELOG.md +++ b/ext/opentelemetry-ext-psycopg2/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Implement instrumentor interface, enabling auto-instrumentation ([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694) + ## 0.4a0 Released 2020-02-21 diff --git a/ext/opentelemetry-ext-psycopg2/setup.cfg b/ext/opentelemetry-ext-psycopg2/setup.cfg index 4b4627fa833..2a6e69605da 100644 --- a/ext/opentelemetry-ext-psycopg2/setup.cfg +++ b/ext/opentelemetry-ext-psycopg2/setup.cfg @@ -41,8 +41,18 @@ package_dir= packages=find_namespace: install_requires = opentelemetry-api == 0.8.dev0 + opentelemetry-ext-dbapi == 0.8.dev0 + opentelemetry-auto-instrumentation == 0.8.dev0 psycopg2-binary >= 2.7.3.1 wrapt >= 1.0.0, < 2.0.0 +[options.extras_require] +test = + opentelemetry-test == 0.8.dev0 + [options.packages.find] where = src + +[options.entry_points] +opentelemetry_instrumentor = + psycopg2 = opentelemetry.ext.psycopg2:Psycopg2Instrumentor diff --git a/ext/opentelemetry-ext-psycopg2/src/opentelemetry/ext/psycopg2/__init__.py b/ext/opentelemetry-ext-psycopg2/src/opentelemetry/ext/psycopg2/__init__.py index a0db2993ba7..582d147320c 100644 --- a/ext/opentelemetry-ext-psycopg2/src/opentelemetry/ext/psycopg2/__init__.py +++ b/ext/opentelemetry-ext-psycopg2/src/opentelemetry/ext/psycopg2/__init__.py @@ -13,8 +13,8 @@ # limitations under the License. """ -The integration with PostgreSQL supports the `Psycopg`_ library and is specified -to ``trace_integration`` using ``'PostgreSQL'``. +The integration with PostgreSQL supports the `Psycopg`_ library, it can be enabled by +using ``Psycopg2Instrumentor``. .. _Psycopg: http://initd.org/psycopg/ @@ -26,11 +26,12 @@ import psycopg2 from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.trace.ext.psycopg2 import trace_integration + from opentelemetry.trace.ext.psycopg2 import Psycopg2Instrumentor trace.set_tracer_provider(TracerProvider()) - trace_integration() + Psycopg2Instrumentor().instrument() + cnx = psycopg2.connect(database='Database') cursor = cnx.cursor() cursor.execute("INSERT INTO test (testField) VALUES (123)") @@ -41,83 +42,77 @@ --- """ -import logging import typing import psycopg2 import wrapt -from psycopg2.sql import Composable -from opentelemetry.ext.dbapi import DatabaseApiIntegration, TracedCursor +from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.ext import dbapi from opentelemetry.ext.psycopg2.version import __version__ -from opentelemetry.trace import Tracer, get_tracer - -logger = logging.getLogger(__name__) - -DATABASE_COMPONENT = "postgresql" -DATABASE_TYPE = "sql" +from opentelemetry.trace import TracerProvider, get_tracer -def trace_integration(tracer_provider=None): - """Integrate with PostgreSQL Psycopg library. - Psycopg: http://initd.org/psycopg/ - """ - - tracer = get_tracer(__name__, __version__, tracer_provider) - - connection_attributes = { +class Psycopg2Instrumentor(BaseInstrumentor): + _CONNECTION_ATTRIBUTES = { "database": "info.dbname", "port": "info.port", "host": "info.host", "user": "info.user", } - db_integration = DatabaseApiIntegration( - tracer, - DATABASE_COMPONENT, - database_type=DATABASE_TYPE, - connection_attributes=connection_attributes, - ) - - # pylint: disable=unused-argument - def wrap_connect( - connect_func: typing.Callable[..., any], - instance: typing.Any, - args: typing.Tuple[any, any], - kwargs: typing.Dict[any, any], - ): - connection = connect_func(*args, **kwargs) - db_integration.get_connection_attributes(connection) - connection.cursor_factory = PsycopgTraceCursor - return connection - - try: - wrapt.wrap_function_wrapper(psycopg2, "connect", wrap_connect) - except Exception as ex: # pylint: disable=broad-except - logger.warning("Failed to integrate with pyscopg2. %s", str(ex)) - - class PsycopgTraceCursor(psycopg2.extensions.cursor): - def __init__(self, *args, **kwargs): - self._traced_cursor = TracedCursor(db_integration) - super(PsycopgTraceCursor, self).__init__(*args, **kwargs) - - # pylint: disable=redefined-builtin - def execute(self, query, vars=None): - if isinstance(query, Composable): - query = query.as_string(self) - return self._traced_cursor.traced_execution( - super(PsycopgTraceCursor, self).execute, query, vars - ) - - # pylint: disable=redefined-builtin - def executemany(self, query, vars): - if isinstance(query, Composable): - query = query.as_string(self) - return self._traced_cursor.traced_execution( - super(PsycopgTraceCursor, self).executemany, query, vars - ) - - # pylint: disable=redefined-builtin - def callproc(self, procname, vars=None): - return self._traced_cursor.traced_execution( - super(PsycopgTraceCursor, self).callproc, procname, vars - ) + + _DATABASE_COMPONENT = "postgresql" + _DATABASE_TYPE = "sql" + + def _instrument(self, **kwargs): + """Integrate with PostgreSQL Psycopg library. + Psycopg: http://initd.org/psycopg/ + """ + + tracer_provider = kwargs.get("tracer_provider") + + tracer = get_tracer(__name__, __version__, tracer_provider) + + dbapi.wrap_connect( + tracer, + psycopg2, + "connect", + self._DATABASE_COMPONENT, + self._DATABASE_TYPE, + self._CONNECTION_ATTRIBUTES, + ) + + def _uninstrument(self, **kwargs): + """"Disable Psycopg2 instrumentation""" + dbapi.unwrap_connect(psycopg2, "connect") + + # pylint:disable=no-self-use + def instrument_connection(self, connection): + """Enable instrumentation in a Psycopg2 connection. + + Args: + connection: The connection to instrument. + + Returns: + An instrumented connection. + """ + tracer = get_tracer(__name__, __version__) + + return dbapi.instrument_connection( + tracer, + connection, + self._DATABASE_COMPONENT, + self._DATABASE_TYPE, + self._CONNECTION_ATTRIBUTES, + ) + + def uninstrument_connection(self, connection): + """Disable instrumentation in a Psycopg2 connection. + + Args: + connection: The connection to uninstrument. + + Returns: + An uninstrumented connection. + """ + return dbapi.uninstrument_connection(connection) diff --git a/ext/opentelemetry-ext-psycopg2/tests/test_psycopg2_integration.py b/ext/opentelemetry-ext-psycopg2/tests/test_psycopg2_integration.py index b4724e308b0..f854787bd9e 100644 --- a/ext/opentelemetry-ext-psycopg2/tests/test_psycopg2_integration.py +++ b/ext/opentelemetry-ext-psycopg2/tests/test_psycopg2_integration.py @@ -12,17 +12,105 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest from unittest import mock import psycopg2 -from opentelemetry.ext.psycopg2 import trace_integration +import opentelemetry.ext.psycopg2 +from opentelemetry.ext.psycopg2 import Psycopg2Instrumentor +from opentelemetry.sdk import resources +from opentelemetry.test.test_base import TestBase -class TestPostgresqlIntegration(unittest.TestCase): - def test_trace_integration(self): - with mock.patch("psycopg2.connect"): - trace_integration() - cnx = psycopg2.connect(database="test") - self.assertIsNotNone(cnx.cursor_factory) +class TestPostgresqlIntegration(TestBase): + def tearDown(self): + super().tearDown() + with self.disable_logging(): + Psycopg2Instrumentor().uninstrument() + + @mock.patch("psycopg2.connect") + # pylint: disable=unused-argument + def test_instrumentor(self, mock_connect): + Psycopg2Instrumentor().instrument() + + cnx = psycopg2.connect(database="test") + + cursor = cnx.cursor() + + query = "SELECT * FROM test" + cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info(span, opentelemetry.ext.psycopg2) + + # check that no spans are generated after uninstrument + Psycopg2Instrumentor().uninstrument() + + cnx = psycopg2.connect(database="test") + cursor = cnx.cursor() + query = "SELECT * FROM test" + cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + + @mock.patch("psycopg2.connect") + # pylint: disable=unused-argument + def test_custom_tracer_provider(self, mock_connect): + resource = resources.Resource.create({}) + result = self.create_tracer_provider(resource=resource) + tracer_provider, exporter = result + + Psycopg2Instrumentor().instrument(tracer_provider=tracer_provider) + + cnx = psycopg2.connect(database="test") + cursor = cnx.cursor() + query = "SELECT * FROM test" + cursor.execute(query) + + spans_list = exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertIs(span.resource, resource) + + @mock.patch("psycopg2.connect") + # pylint: disable=unused-argument + def test_instrument_connection(self, mock_connect): + cnx = psycopg2.connect(database="test") + query = "SELECT * FROM test" + cursor = cnx.cursor() + cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 0) + + cnx = Psycopg2Instrumentor().instrument_connection(cnx) + cursor = cnx.cursor() + cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + + @mock.patch("psycopg2.connect") + # pylint: disable=unused-argument + def test_uninstrument_connection(self, mock_connect): + Psycopg2Instrumentor().instrument() + cnx = psycopg2.connect(database="test") + query = "SELECT * FROM test" + cursor = cnx.cursor() + cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + + cnx = Psycopg2Instrumentor().uninstrument_connection(cnx) + cursor = cnx.cursor() + cursor.execute(query) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) diff --git a/tox.ini b/tox.ini index 04ea2b1650b..44944874078 100644 --- a/tox.ini +++ b/tox.ini @@ -207,8 +207,9 @@ commands_pre = pymongo: pip install {toxinidir}/opentelemetry-auto-instrumentation pymongo: pip install {toxinidir}/ext/opentelemetry-ext-pymongo[test] + psycopg2: pip install {toxinidir}/opentelemetry-auto-instrumentation psycopg2: pip install {toxinidir}/ext/opentelemetry-ext-dbapi - psycopg2: pip install {toxinidir}/ext/opentelemetry-ext-psycopg2 + psycopg2: pip install {toxinidir}/ext/opentelemetry-ext-psycopg2[test] pymysql: pip install {toxinidir}/opentelemetry-auto-instrumentation pymysql: pip install {toxinidir}/ext/opentelemetry-ext-dbapi