From fc49fbff3f6b968d8cc5aabd1e229b7293effe91 Mon Sep 17 00:00:00 2001 From: Michael Lavers Date: Fri, 16 Aug 2019 13:26:18 -0700 Subject: [PATCH 1/4] Add mysqldb/pymysql wrappers Closes #345 --- iopipe/contrib/trace/auto_db.py | 193 ++++++++++++++++++++++---------- iopipe/contrib/trace/dbapi.py | 8 +- setup.py | 2 + 3 files changed, 142 insertions(+), 61 deletions(-) diff --git a/iopipe/contrib/trace/auto_db.py b/iopipe/contrib/trace/auto_db.py index 16a2e02d..bcce1043 100644 --- a/iopipe/contrib/trace/auto_db.py +++ b/iopipe/contrib/trace/auto_db.py @@ -10,6 +10,10 @@ ) +def collect_mysql_metrics(context, trace, instance): + pass + + def collect_psycopg2_metrics(context, trace, instance): from psycopg2.extensions import parse_dsn @@ -114,44 +118,45 @@ def collect_redis_metrics(context, trace, args, connection): context.iopipe.mark.db_trace(trace, "redis", request) -def patch_pymongo(context): +def patch_mysqldb(context): """ - Monkey patches pymongo client, if available. Overloads the - query methods to add tracing and metrics collection. + Monkey patches mysqldb client, if available. Overloads the + execute method to add tracing and metrics collection. """ - def wrapper(wrapped, instance, args, kwargs): - if not hasattr(context, "iopipe") or not hasattr( - context.iopipe, "mark" - ): # pragma: no cover - return wrapped(*args, **kwargs) + class _CursorProxy(CursorProxy): + def execute(self, *args, **kwargs): + if not hasattr(context, "iopipe") or not hasattr( + context.iopipe, "mark" + ): # pragma: no cover + self.__wrapped__.execute(*args, **kwargs) + return - id = ensure_utf8(str(uuid.uuid4())) - with context.iopipe.mark(id): - response = wrapped(*args, **kwargs) - trace = context.iopipe.mark.measure(id) - context.iopipe.mark.delete(id) - collect_pymongo_metrics(context, trace, instance, response) - return response + id = ensure_utf8(str(uuid.uuid4())) + with context.iopipe.mark(id): + self.__wrapped__.execute(*args, **kwargs) + trace = context.iopipe.mark.measure(id) + context.iopipe.mark.delete(id) + collect_mysql_metrics(context, trace, self) - try: - wrapt.wrap_function_wrapper("pymongo.collection", "Collection.find", wrapper) - except Exception: # pragma: no cover - pass - else: - for class_method in ( - "bulk_write", - "delete_many", - "delete_one", - "insert_many", - "insert_one", - "replace_one", - "update_many", - "update_one", - ): - wrapt.wrap_function_wrapper( - "pymongo.collection", "Collection.%s" % class_method, wrapper - ) + class _ConnectionProxy(ConnectionProxy): + def cursor(self, *args, **kwargs): + cursor = self.__wrapped__.cursor(*args, **kwargs) + return _CursorProxy(cursor, self) + + def connect_wrapper(wrapped, instance, args, kwargs): + connection = wrapped(*args, **kwargs) + return _ConnectionProxy(connection, args, kwargs) + + for module, attr, wrapper in [ + ("MySQLdb", "connect", connect_wrapper), + ("MySQLdb", "Connection", connect_wrapper), + ("MySQLdb", "Connect", connect_wrapper), + ]: + try: + wrapt.wrap_function_wrapper(module, attr, wrapper) + except Exception: # pragma: no cover + pass def patch_psycopg2(context): @@ -160,7 +165,7 @@ def patch_psycopg2(context): execute method to add tracing and metrics collection. """ - class PGCursorProxy(CursorProxy): + class _CursorProxy(CursorProxy): def execute(self, *args, **kwargs): if not hasattr(context, "iopipe") or not hasattr( context.iopipe, "mark" @@ -175,10 +180,10 @@ def execute(self, *args, **kwargs): context.iopipe.mark.delete(id) collect_psycopg2_metrics(context, trace, self) - class PGConnectionProxy(ConnectionProxy): + class _ConnectionProxy(ConnectionProxy): def cursor(self, *args, **kwargs): cursor = self.__wrapped__.cursor(*args, **kwargs) - return PGCursorProxy(cursor, self) + return _CursorProxy(cursor, self) def adapt_wrapper(wrapped, instance, args, kwargs): adapter = wrapped(*args, **kwargs) @@ -186,7 +191,7 @@ def adapt_wrapper(wrapped, instance, args, kwargs): def connect_wrapper(wrapped, instance, args, kwargs): connection = wrapped(*args, **kwargs) - return PGConnectionProxy(connection, args, kwargs) + return _ConnectionProxy(connection, args, kwargs) def register_type_wrapper(wrapped, instance, args, kwargs): def _extract_arguments(obj, scope=None): @@ -201,21 +206,90 @@ def _extract_arguments(obj, scope=None): return wrapped(obj) + for module, attr, wrapper in [ + ("psycopg2", "connect", connect_wrapper), + ("psycopg2.extensions", "adapt", adapt_wrapper), + ("psycopg2.extensions", "register_type", register_type_wrapper), + ("psycopg2._psycopg", "register_type", register_type_wrapper), + ("psycopg2._json", "register_type", register_type_wrapper), + ]: + try: + wrapt.wrap_function_wrapper(module, attr, wrapper) + except Exception: # pragma: no cover + pass + + +def patch_pymongo(context): + """ + Monkey patches pymongo client, if available. Overloads the + query methods to add tracing and metrics collection. + """ + + def wrapper(wrapped, instance, args, kwargs): + if not hasattr(context, "iopipe") or not hasattr( + context.iopipe, "mark" + ): # pragma: no cover + return wrapped(*args, **kwargs) + + id = ensure_utf8(str(uuid.uuid4())) + with context.iopipe.mark(id): + response = wrapped(*args, **kwargs) + trace = context.iopipe.mark.measure(id) + context.iopipe.mark.delete(id) + collect_pymongo_metrics(context, trace, instance, response) + return response + + for module, attr, _wrapper in [ + ("pymongo.collection", "Collection.find", wrapper), + ("pymongo.collection", "Collection.bulk_write", wrapper), + ("pymongo.collection", "Collection.delete_many", wrapper), + ("pymongo.collection", "Collection.delete_one", wrapper), + ("pymongo.collection", "Collection.insert_many", wrapper), + ("pymongo.collection", "Collection.insert_one", wrapper), + ("pymongo.collection", "Collection.replace_one", wrapper), + ("pymongo.collection", "Collection.update_many", wrapper), + ("pymongo.collection", "Collection.update_one", wrapper), + ]: + try: + wrapt.wrap_function_wrapper(module, attr, _wrapper) + except Exception: # pragma: no cover + pass + + +def patch_pymysql(context): + """ + Monkey patches pymysql client, if available. Overloads the + execute method to add tracing and metrics collection. + """ + + class _CursorProxy(CursorProxy): + def execute(self, *args, **kwargs): + if not hasattr(context, "iopipe") or not hasattr( + context.iopipe, "mark" + ): # pragma: no cover + self.__wrapped__.execute(*args, **kwargs) + return + + id = ensure_utf8(str(uuid.uuid4())) + with context.iopipe.mark(id): + self.__wrapped__.execute(*args, **kwargs) + trace = context.iopipe.mark.measure(id) + context.iopipe.mark.delete(id) + collect_mysql_metrics(context, trace, self) + + class _ConnectionProxy(ConnectionProxy): + def cursor(self, *args, **kwargs): + cursor = self.__wrapped__.cursor(*args, **kwargs) + return _CursorProxy(cursor, self) + + def connect_wrapper(wrapped, instance, args, kwargs): + connection = wrapped(*args, **kwargs) + return _ConnectionProxy(connection, args, kwargs) + try: - wrapt.wrap_function_wrapper("psycopg2", "connect", connect_wrapper) + wrapt.wrap_function_wrapper("pymysql", "connect", connect_wrapper) except Exception: # pragma: no cover pass - else: - wrapt.wrap_function_wrapper("psycopg2.extensions", "adapt", adapt_wrapper) - wrapt.wrap_function_wrapper( - "psycopg2.extensions", "register_type", register_type_wrapper - ) - wrapt.wrap_function_wrapper( - "psycopg2._psycopg", "register_type", register_type_wrapper - ) - wrapt.wrap_function_wrapper( - "psycopg2._json", "register_type", register_type_wrapper - ) def patch_redis(context): @@ -259,16 +333,15 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover ) return response - try: - wrapt.wrap_function_wrapper("redis.client", "Redis.execute_command", wrapper) - except Exception: # pragma: no cover - pass - else: - for module_name, class_method in [ - ("redis.client", "Pipeline.execute"), - ("redis.client", "Pipeline.immediate_execute_command"), - ]: - wrapt.wrap_function_wrapper(module_name, class_method, pipeline_wrapper) + for module, attr, _wrapper in [ + ("redis.client", "Redis.execute_command", wrapper), + ("redis.client", "Pipeline.execute", wrapper), + ("redis.client", "Pipeline.immediate_execute_command", wrapper), + ]: + try: + wrapt.wrap_function_wrapper(module, attr, _wrapper) + except Exception: # pragma: no cover + pass def restore_psycopg2(): @@ -357,8 +430,10 @@ def patch_db_requests(context): if not hasattr(context, "iopipe"): return + patch_mysqldb(context) patch_psycopg2(context) patch_pymongo(context) + patch_pymysql(context) patch_redis(context) diff --git a/iopipe/contrib/trace/dbapi.py b/iopipe/contrib/trace/dbapi.py index 29023cba..e42f9236 100644 --- a/iopipe/contrib/trace/dbapi.py +++ b/iopipe/contrib/trace/dbapi.py @@ -43,13 +43,17 @@ def cursor(self, *args, **kwargs): # pragma: no cover cursor = self.__wrapped__.cursor(*args, **kwargs) return CursorProxy(cursor, self) + @property + def extract_dbname(self): # pragma: no cover + return self._self_kwargs.get("db", self._self_kwargs.get("database", "")) + @property def extract_hostname(self): # pragma: no cover return self._self_kwargs.get("host", "localhost") @property - def extract_dbname(self): # pragma: no cover - return self._self_kwargs.get("db", self._self_kwargs.get("database", "")) + def extract_port(self): # pragma: no cover + return self._self_kwargs.get("port") class AdapterProxy(wrapt.ObjectProxy): diff --git a/setup.py b/setup.py index fdd9b2c4..18e727e8 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,10 @@ "mock", "mongomock==3.17.0", "more-itertools<6.0.0", + "mysqlclient==1.4.4", "psycopg2-binary==2.8.3", "pymongo==3.8.0", + "PyMySQL==0.9.3", "pytest==4.1.0", "pytest-benchmark==3.2.0", "redis==3.3.4", From 50e6528626695edbd43c9df9af6fbf3c4bbca9ac Mon Sep 17 00:00:00 2001 From: Michael Lavers Date: Mon, 19 Aug 2019 16:38:01 -0700 Subject: [PATCH 2/4] Add tests for mysql --- iopipe/contrib/trace/auto_db.py | 59 ++++++++++++++++++++++++++--- iopipe/contrib/trace/dbapi.py | 10 +++-- tests/contrib/trace/conftest.py | 42 +++++++++++++++++++- tests/contrib/trace/test_auto_db.py | 17 +++++++++ tests/contrib/trace/test_plugin.py | 56 +++++++++++++++++++++++++++ 5 files changed, 175 insertions(+), 9 deletions(-) diff --git a/iopipe/contrib/trace/auto_db.py b/iopipe/contrib/trace/auto_db.py index bcce1043..fdb8f972 100644 --- a/iopipe/contrib/trace/auto_db.py +++ b/iopipe/contrib/trace/auto_db.py @@ -10,12 +10,35 @@ ) -def collect_mysql_metrics(context, trace, instance): - pass +def collect_mysql_metrics(context, trace, instance, args): + connection = instance.connection_proxy + + db = connection.extract_db + hostname = connection.extract_hostname + port = connection.extract_port + + query = args[0] + command = query.split()[0].lower() + table = table_name(query, command) + + request = Request( + command=ensure_utf8(command), + key=None, + hostname=ensure_utf8(hostname), + port=ensure_utf8(port), + connectionName=None, + db=ensure_utf8(db), + table=ensure_utf8(table), + ) + request = request._asdict() + context.iopipe.mark.db_trace(trace, "mysql", request) def collect_psycopg2_metrics(context, trace, instance): - from psycopg2.extensions import parse_dsn + try: + from psycopg2.extensions import parse_dsn + except ImportError: # pragma: no cover + from .dbapi import parse_dsn connection = instance.connection_proxy dsn = parse_dsn(connection.dsn) @@ -137,7 +160,7 @@ def execute(self, *args, **kwargs): self.__wrapped__.execute(*args, **kwargs) trace = context.iopipe.mark.measure(id) context.iopipe.mark.delete(id) - collect_mysql_metrics(context, trace, self) + collect_mysql_metrics(context, trace, self, args) class _ConnectionProxy(ConnectionProxy): def cursor(self, *args, **kwargs): @@ -275,7 +298,7 @@ def execute(self, *args, **kwargs): self.__wrapped__.execute(*args, **kwargs) trace = context.iopipe.mark.measure(id) context.iopipe.mark.delete(id) - collect_mysql_metrics(context, trace, self) + collect_mysql_metrics(context, trace, self, args) class _ConnectionProxy(ConnectionProxy): def cursor(self, *args, **kwargs): @@ -344,6 +367,18 @@ def pipeline_wrapper(wrapped, instance, args, kwargs): # pragma: no cover pass +def restore_mysqldb(): + """Restores mysqldb""" + try: + import MySQLdb + except ImportError: # pragma: no cover + pass + else: + setattr( + MySQLdb, "connect", getattr(MySQLdb.connect, "__wrapped__", MySQLdb.connect) + ) + + def restore_psycopg2(): """Restores psycopg2""" try: @@ -410,6 +445,18 @@ def restore_pymongo(): ) +def restore_pymysql(): + """Restores pymysql""" + try: + import pymysql + except ImportError: # pragma: no cover + pass + else: + setattr( + pymysql, "connect", getattr(pymysql.connect, "__wrapped__", pymysql.connect) + ) + + def restore_redis(): """Restores the redis client""" try: @@ -438,6 +485,8 @@ def patch_db_requests(context): def restore_db_requests(): + restore_mysqldb() restore_psycopg2() restore_pymongo() + restore_pymysql() restore_redis() diff --git a/iopipe/contrib/trace/dbapi.py b/iopipe/contrib/trace/dbapi.py index e42f9236..e0de6249 100644 --- a/iopipe/contrib/trace/dbapi.py +++ b/iopipe/contrib/trace/dbapi.py @@ -9,6 +9,10 @@ } +def parse_dsn(dsn): + return dict(attr.split("=") for attr in dsn.split() if "=" in attr) + + def table_name(query, command): if command in COMMAND_KEYWORDS: keyword = COMMAND_KEYWORDS[command] @@ -44,15 +48,15 @@ def cursor(self, *args, **kwargs): # pragma: no cover return CursorProxy(cursor, self) @property - def extract_dbname(self): # pragma: no cover + def extract_db(self): return self._self_kwargs.get("db", self._self_kwargs.get("database", "")) @property - def extract_hostname(self): # pragma: no cover + def extract_hostname(self): return self._self_kwargs.get("host", "localhost") @property - def extract_port(self): # pragma: no cover + def extract_port(self): return self._self_kwargs.get("port") diff --git a/tests/contrib/trace/conftest.py b/tests/contrib/trace/conftest.py index 09b49ba7..b4b8572c 100644 --- a/tests/contrib/trace/conftest.py +++ b/tests/contrib/trace/conftest.py @@ -1,5 +1,7 @@ +import MySQLdb import psycopg2 import pymongo +import pymysql import pytest import redis import requests @@ -200,7 +202,45 @@ def _handler(event, context): conn = psycopg2.connect("postgres://username:password@localhost:5432/foobar") cur = conn.cursor() cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) - cur.execute("SELECT * FROM test;") + cur.execute("SELECT * FROM test") + cur.fetchone() + + return iopipe_with_trace_auto_db, _handler + + +@pytest.fixture +def handler_with_trace_auto_db_mysqldb(iopipe_with_trace_auto_db): + @iopipe_with_trace_auto_db + def _handler(event, context): + conn = MySQLdb.connect( + db="foobar", + host="localhost", + port="3306", + user="username", + passwd="password", + ) + cur = conn.cursor() + cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) + cur.execute("SELECT * FROM test") + cur.fetchone() + + return iopipe_with_trace_auto_db, _handler + + +@pytest.fixture +def handler_with_trace_auto_db_pymysql(iopipe_with_trace_auto_db): + @iopipe_with_trace_auto_db + def _handler(event, context): + conn = pymysql.connect( + db="foobar", + host="localhost", + port="3306", + user="username", + passwd="password", + ) + cur = conn.cursor() + cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (100, "abc'def")) + cur.execute("SELECT * FROM test") cur.fetchone() return iopipe_with_trace_auto_db, _handler diff --git a/tests/contrib/trace/test_auto_db.py b/tests/contrib/trace/test_auto_db.py index 6933aca8..9ad2ad0e 100644 --- a/tests/contrib/trace/test_auto_db.py +++ b/tests/contrib/trace/test_auto_db.py @@ -1,5 +1,7 @@ +import MySQLdb import psycopg2 from pymongo.collection import Collection +import pymysql from redis.client import Pipeline, Redis import wrapt @@ -19,6 +21,9 @@ def test_patch_db_requests(mock_context_wrapper,): """Asserts that monkey patching occurs if iopipe present""" + assert not hasattr(MySQLdb.connect, "__wrapped__") + assert not hasattr(pymysql.connect, "__wrapped__") + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) assert not hasattr(Redis.execute_command, "__wrapped__") @@ -30,6 +35,9 @@ def test_patch_db_requests(mock_context_wrapper,): patch_db_requests(mock_context_wrapper) + assert hasattr(MySQLdb.connect, "__wrapped__") + assert hasattr(pymysql.connect, "__wrapped__") + assert isinstance(psycopg2.connect, wrapt.ObjectProxy) assert hasattr(Redis.execute_command, "__wrapped__") @@ -41,6 +49,9 @@ def test_patch_db_requests(mock_context_wrapper,): restore_db_requests() + assert not hasattr(MySQLdb.connect, "__wrapped__") + assert not hasattr(pymysql.connect, "__wrapped__") + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) assert not hasattr(Redis.execute_command, "__wrapped__") @@ -53,6 +64,9 @@ def test_patch_db_requests(mock_context_wrapper,): def test_patch_db_requests_no_iopipe(mock_context,): """Asserts that monkey patching does not occur if IOpipe not present""" + assert not hasattr(MySQLdb.connect, "__wrapped__") + assert not hasattr(pymysql.connect, "__wrapped__") + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) assert not hasattr(Redis.execute_command, "__wrapped__") @@ -66,6 +80,9 @@ def test_patch_db_requests_no_iopipe(mock_context,): patch_db_requests(mock_context) + assert not hasattr(MySQLdb.connect, "__wrapped__") + assert not hasattr(pymysql.connect, "__wrapped__") + assert not isinstance(psycopg2.connect, wrapt.ObjectProxy) assert not hasattr(Redis.execute_command, "__wrapped__") diff --git a/tests/contrib/trace/test_plugin.py b/tests/contrib/trace/test_plugin.py index 581cc9d4..179a4744 100644 --- a/tests/contrib/trace/test_plugin.py +++ b/tests/contrib/trace/test_plugin.py @@ -311,3 +311,59 @@ def test_trace_plugin__auto_db__psycopg2( assert db_traces[0]["request"]["command"] == "insert" assert db_traces[1]["request"]["command"] == "select" + + +@mock.patch("MySQLdb.connect") +@mock.patch("iopipe.report.send_report", autospec=True) +def test_trace_plugin__auto_db__mysqldb( + mock_send_report, mock_connect, handler_with_trace_auto_db_mysqldb, mock_context +): + iopipe, handler = handler_with_trace_auto_db_mysqldb + + assert len(iopipe.config["plugins"]) == 1 + + handler({}, mock_context) + + assert len(iopipe.report.performance_entries) == 0 + + db_traces = iopipe.report.db_trace_entries + + assert len(db_traces) == 2 + + for db_trace in db_traces: + assert db_trace["dbType"] == "mysql" + assert db_trace["request"]["hostname"] == "localhost" + assert db_trace["request"]["port"] == "3306" + assert db_trace["request"]["db"] == "foobar" + assert db_trace["request"]["table"] == "test" + + assert db_traces[0]["request"]["command"] == "insert" + assert db_traces[1]["request"]["command"] == "select" + + +@mock.patch("pymysql.connect") +@mock.patch("iopipe.report.send_report", autospec=True) +def test_trace_plugin__auto_db__pymysql( + mock_send_report, mock_connect, handler_with_trace_auto_db_pymysql, mock_context +): + iopipe, handler = handler_with_trace_auto_db_pymysql + + assert len(iopipe.config["plugins"]) == 1 + + handler({}, mock_context) + + assert len(iopipe.report.performance_entries) == 0 + + db_traces = iopipe.report.db_trace_entries + + assert len(db_traces) == 2 + + for db_trace in db_traces: + assert db_trace["dbType"] == "mysql" + assert db_trace["request"]["hostname"] == "localhost" + assert db_trace["request"]["port"] == "3306" + assert db_trace["request"]["db"] == "foobar" + assert db_trace["request"]["table"] == "test" + + assert db_traces[0]["request"]["command"] == "insert" + assert db_traces[1]["request"]["command"] == "select" From e15b96c7f7342d524e4c6ed8b098ea3a3459b015 Mon Sep 17 00:00:00 2001 From: Michael Lavers Date: Mon, 19 Aug 2019 16:47:45 -0700 Subject: [PATCH 3/4] Install mysql-devel to make build happy --- Dockerfile.python2.7 | 2 ++ Dockerfile.python3.6 | 1 + Dockerfile.python3.7 | 1 + 3 files changed, 4 insertions(+) diff --git a/Dockerfile.python2.7 b/Dockerfile.python2.7 index 909c8fb2..49cad515 100644 --- a/Dockerfile.python2.7 +++ b/Dockerfile.python2.7 @@ -1,5 +1,7 @@ FROM lambci/lambda:build-python2.7 +RUN +RUN yum -y install mysql-devel RUN pip install -U pip setuptools RUN mkdir -p /var/lib/iopipe diff --git a/Dockerfile.python3.6 b/Dockerfile.python3.6 index e5aa4d42..7d460bbc 100644 --- a/Dockerfile.python3.6 +++ b/Dockerfile.python3.6 @@ -1,5 +1,6 @@ FROM lambci/lambda:build-python3.6 +RUN yum -y install mysql-devel RUN pip install -U pip setuptools RUN mkdir -p /var/lib/iopipe diff --git a/Dockerfile.python3.7 b/Dockerfile.python3.7 index 34b8deae..ab4d207f 100644 --- a/Dockerfile.python3.7 +++ b/Dockerfile.python3.7 @@ -1,5 +1,6 @@ FROM lambci/lambda:build-python3.7 +RUN yum -y install mysql-devel RUN pip install -U pip setuptools RUN mkdir -p /var/lib/iopipe From 98c442978273ac4f4601349e242c11f9067fbc60 Mon Sep 17 00:00:00 2001 From: Michael Lavers Date: Mon, 19 Aug 2019 18:16:51 -0700 Subject: [PATCH 4/4] Remove empty run directive --- Dockerfile.python2.7 | 1 - 1 file changed, 1 deletion(-) diff --git a/Dockerfile.python2.7 b/Dockerfile.python2.7 index 49cad515..77a6ea3a 100644 --- a/Dockerfile.python2.7 +++ b/Dockerfile.python2.7 @@ -1,6 +1,5 @@ FROM lambci/lambda:build-python2.7 -RUN RUN yum -y install mysql-devel RUN pip install -U pip setuptools