From 994a45b4f419d24add76ec1d7b99ad1bc7eb005b Mon Sep 17 00:00:00 2001 From: Evgeny Seregin Date: Tue, 11 Jul 2023 15:00:02 +0600 Subject: [PATCH] Redis: Add support for redis.asyncio (#1933) --------- Co-authored-by: Anton Pirker --- .github/workflows/test-integration-redis.yml | 2 +- .../{redis.py => redis/__init__.py} | 199 +++++++++++------- sentry_sdk/integrations/redis/asyncio.py | 67 ++++++ tests/integrations/redis/asyncio/__init__.py | 3 + .../redis/asyncio/test_redis_asyncio.py | 75 +++++++ tests/integrations/redis/test_redis.py | 23 +- .../rediscluster/test_rediscluster.py | 19 +- tox.ini | 5 +- 8 files changed, 310 insertions(+), 83 deletions(-) rename sentry_sdk/integrations/{redis.py => redis/__init__.py} (53%) create mode 100644 sentry_sdk/integrations/redis/asyncio.py create mode 100644 tests/integrations/redis/asyncio/__init__.py create mode 100644 tests/integrations/redis/asyncio/test_redis_asyncio.py diff --git a/.github/workflows/test-integration-redis.yml b/.github/workflows/test-integration-redis.yml index 470a0408de..3a29033dcd 100644 --- a/.github/workflows/test-integration-redis.yml +++ b/.github/workflows/test-integration-redis.yml @@ -31,7 +31,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.7","3.8","3.9"] + python-version: ["3.7","3.8","3.9","3.10","3.11"] # python3.6 reached EOL and is no longer being supported on # new versions of hosted runners on Github Actions # ubuntu-20.04 is the last version that supported python3.6 diff --git a/sentry_sdk/integrations/redis.py b/sentry_sdk/integrations/redis/__init__.py similarity index 53% rename from sentry_sdk/integrations/redis.py rename to sentry_sdk/integrations/redis/__init__.py index 22464d8b4c..b0a4a8d1ed 100644 --- a/sentry_sdk/integrations/redis.py +++ b/sentry_sdk/integrations/redis/__init__.py @@ -14,6 +14,7 @@ if TYPE_CHECKING: from typing import Any, Sequence + from sentry_sdk.tracing import Span _SINGLE_KEY_COMMANDS = frozenset( ["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"] @@ -25,10 +26,64 @@ ] _MAX_NUM_ARGS = 10 # Trim argument lists to this many values +_MAX_NUM_COMMANDS = 10 # Trim command lists to this many values _DEFAULT_MAX_DATA_SIZE = 1024 +def _get_safe_command(name, args): + # type: (str, Sequence[Any]) -> str + command_parts = [name] + + for i, arg in enumerate(args): + if i > _MAX_NUM_ARGS: + break + + name_low = name.lower() + + if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA: + command_parts.append(SENSITIVE_DATA_SUBSTITUTE) + continue + + arg_is_the_key = i == 0 + if arg_is_the_key: + command_parts.append(repr(arg)) + + else: + if _should_send_default_pii(): + command_parts.append(repr(arg)) + else: + command_parts.append(SENSITIVE_DATA_SUBSTITUTE) + + command = " ".join(command_parts) + return command + + +def _set_pipeline_data( + span, is_cluster, get_command_args_fn, is_transaction, command_stack +): + # type: (Span, bool, Any, bool, Sequence[Any]) -> None + span.set_tag("redis.is_cluster", is_cluster) + transaction = is_transaction if not is_cluster else False + span.set_tag("redis.transaction", transaction) + + commands = [] + for i, arg in enumerate(command_stack): + if i >= _MAX_NUM_COMMANDS: + break + + command = get_command_args_fn(arg) + commands.append(_get_safe_command(command[0], command[1:])) + + span.set_data( + "redis.commands", + { + "count": len(command_stack), + "first_ten": commands, + }, + ) + + def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn): # type: (Any, bool, Any) -> None old_execute = pipeline_cls.execute @@ -44,24 +99,12 @@ def sentry_patched_execute(self, *args, **kwargs): op=OP.DB_REDIS, description="redis.pipeline.execute" ) as span: with capture_internal_exceptions(): - span.set_tag("redis.is_cluster", is_cluster) - transaction = self.transaction if not is_cluster else False - span.set_tag("redis.transaction", transaction) - - commands = [] - for i, arg in enumerate(self.command_stack): - if i > _MAX_NUM_ARGS: - break - command_args = [] - for j, command_arg in enumerate(get_command_args_fn(arg)): - if j > 0: - command_arg = repr(command_arg) - command_args.append(command_arg) - commands.append(" ".join(command_args)) - - span.set_data( - "redis.commands", - {"count": len(self.command_stack), "first_ten": commands}, + _set_pipeline_data( + span, + is_cluster, + get_command_args_fn, + self.transaction, + self.command_stack, ) span.set_data(SPANDATA.DB_SYSTEM, "redis") @@ -80,6 +123,43 @@ def _parse_rediscluster_command(command): return command.args +def _patch_redis(StrictRedis, client): # noqa: N803 + # type: (Any, Any) -> None + patch_redis_client(StrictRedis, is_cluster=False) + patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args) + try: + strict_pipeline = client.StrictPipeline + except AttributeError: + pass + else: + patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args) + + try: + import redis.asyncio + except ImportError: + pass + else: + from sentry_sdk.integrations.redis.asyncio import ( + patch_redis_async_client, + patch_redis_async_pipeline, + ) + + patch_redis_async_client(redis.asyncio.client.StrictRedis) + patch_redis_async_pipeline(redis.asyncio.client.Pipeline) + + +def _patch_rb(): + # type: () -> None + try: + import rb.clients # type: ignore + except ImportError: + pass + else: + patch_redis_client(rb.clients.FanoutClient, is_cluster=False) + patch_redis_client(rb.clients.MappingClient, is_cluster=False) + patch_redis_client(rb.clients.RoutingClient, is_cluster=False) + + def _patch_rediscluster(): # type: () -> None try: @@ -119,23 +199,8 @@ def setup_once(): except ImportError: raise DidNotEnable("Redis client not installed") - patch_redis_client(StrictRedis, is_cluster=False) - patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args) - try: - strict_pipeline = client.StrictPipeline # type: ignore - except AttributeError: - pass - else: - patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args) - - try: - import rb.clients # type: ignore - except ImportError: - pass - else: - patch_redis_client(rb.clients.FanoutClient, is_cluster=False) - patch_redis_client(rb.clients.MappingClient, is_cluster=False) - patch_redis_client(rb.clients.RoutingClient, is_cluster=False) + _patch_redis(StrictRedis, client) + _patch_rb() try: _patch_rediscluster() @@ -143,6 +208,31 @@ def setup_once(): logger.exception("Error occurred while patching `rediscluster` library") +def _get_span_description(name, *args): + # type: (str, *Any) -> str + description = name + + with capture_internal_exceptions(): + description = _get_safe_command(name, args) + + return description + + +def _set_client_data(span, is_cluster, name, *args): + # type: (Span, bool, str, *Any) -> None + span.set_tag("redis.is_cluster", is_cluster) + if name: + span.set_tag("redis.command", name) + span.set_tag(SPANDATA.DB_OPERATION, name) + + if name and args: + name_low = name.lower() + if (name_low in _SINGLE_KEY_COMMANDS) or ( + name_low in _MULTI_KEY_COMMANDS and len(args) == 1 + ): + span.set_tag("redis.key", args[0]) + + def patch_redis_client(cls, is_cluster): # type: (Any, bool) -> None """ @@ -159,31 +249,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs): if integration is None: return old_execute_command(self, name, *args, **kwargs) - description = name - - with capture_internal_exceptions(): - description_parts = [name] - for i, arg in enumerate(args): - if i > _MAX_NUM_ARGS: - break - - name_low = name.lower() - - if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA: - description_parts.append(SENSITIVE_DATA_SUBSTITUTE) - continue - - arg_is_the_key = i == 0 - if arg_is_the_key: - description_parts.append(repr(arg)) - - else: - if _should_send_default_pii(): - description_parts.append(repr(arg)) - else: - description_parts.append(SENSITIVE_DATA_SUBSTITUTE) - - description = " ".join(description_parts) + description = _get_span_description(name, *args) data_should_be_truncated = ( integration.max_data_size and len(description) > integration.max_data_size @@ -192,18 +258,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs): description = description[: integration.max_data_size - len("...")] + "..." with hub.start_span(op=OP.DB_REDIS, description=description) as span: - span.set_tag("redis.is_cluster", is_cluster) - - if name: - span.set_tag("redis.command", name) - span.set_tag(SPANDATA.DB_OPERATION, name) - - if name and args: - name_low = name.lower() - if (name_low in _SINGLE_KEY_COMMANDS) or ( - name_low in _MULTI_KEY_COMMANDS and len(args) == 1 - ): - span.set_tag("redis.key", args[0]) + _set_client_data(span, is_cluster, name, *args) return old_execute_command(self, name, *args, **kwargs) diff --git a/sentry_sdk/integrations/redis/asyncio.py b/sentry_sdk/integrations/redis/asyncio.py new file mode 100644 index 0000000000..d0e4e16a87 --- /dev/null +++ b/sentry_sdk/integrations/redis/asyncio.py @@ -0,0 +1,67 @@ +from __future__ import absolute_import + +from sentry_sdk import Hub +from sentry_sdk.consts import OP +from sentry_sdk.utils import capture_internal_exceptions +from sentry_sdk.integrations.redis import ( + RedisIntegration, + _get_redis_command_args, + _get_span_description, + _set_client_data, + _set_pipeline_data, +) + + +from sentry_sdk._types import MYPY + +if MYPY: + from typing import Any + + +def patch_redis_async_pipeline(pipeline_cls): + # type: (Any) -> None + old_execute = pipeline_cls.execute + + async def _sentry_execute(self, *args, **kwargs): + # type: (Any, *Any, **Any) -> Any + hub = Hub.current + + if hub.get_integration(RedisIntegration) is None: + return await old_execute(self, *args, **kwargs) + + with hub.start_span( + op=OP.DB_REDIS, description="redis.pipeline.execute" + ) as span: + with capture_internal_exceptions(): + _set_pipeline_data( + span, + False, + _get_redis_command_args, + self.is_transaction, + self.command_stack, + ) + + return await old_execute(self, *args, **kwargs) + + pipeline_cls.execute = _sentry_execute + + +def patch_redis_async_client(cls): + # type: (Any) -> None + old_execute_command = cls.execute_command + + async def _sentry_execute_command(self, name, *args, **kwargs): + # type: (Any, str, *Any, **Any) -> Any + hub = Hub.current + + if hub.get_integration(RedisIntegration) is None: + return await old_execute_command(self, name, *args, **kwargs) + + description = _get_span_description(name, *args) + + with hub.start_span(op=OP.DB_REDIS, description=description) as span: + _set_client_data(span, False, name, *args) + + return await old_execute_command(self, name, *args, **kwargs) + + cls.execute_command = _sentry_execute_command diff --git a/tests/integrations/redis/asyncio/__init__.py b/tests/integrations/redis/asyncio/__init__.py new file mode 100644 index 0000000000..bd93246a9a --- /dev/null +++ b/tests/integrations/redis/asyncio/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("fakeredis.aioredis") diff --git a/tests/integrations/redis/asyncio/test_redis_asyncio.py b/tests/integrations/redis/asyncio/test_redis_asyncio.py new file mode 100644 index 0000000000..f97960f0eb --- /dev/null +++ b/tests/integrations/redis/asyncio/test_redis_asyncio.py @@ -0,0 +1,75 @@ +import pytest + +from sentry_sdk import capture_message, start_transaction +from sentry_sdk.integrations.redis import RedisIntegration + +from fakeredis.aioredis import FakeRedis + + +@pytest.mark.asyncio +async def test_async_basic(sentry_init, capture_events): + sentry_init(integrations=[RedisIntegration()]) + events = capture_events() + + connection = FakeRedis() + + await connection.get("foobar") + capture_message("hi") + + (event,) = events + (crumb,) = event["breadcrumbs"]["values"] + + assert crumb == { + "category": "redis", + "message": "GET 'foobar'", + "data": { + "db.operation": "GET", + "redis.key": "foobar", + "redis.command": "GET", + "redis.is_cluster": False, + }, + "timestamp": crumb["timestamp"], + "type": "redis", + } + + +@pytest.mark.parametrize( + "is_transaction, send_default_pii, expected_first_ten", + [ + (False, False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +@pytest.mark.asyncio +async def test_async_redis_pipeline( + sentry_init, capture_events, is_transaction, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) + events = capture_events() + + connection = FakeRedis() + with start_transaction(): + pipeline = connection.pipeline(transaction=is_transaction) + pipeline.get("foo") + pipeline.set("bar", 1) + pipeline.set("baz", 2) + await pipeline.execute() + + (event,) = events + (span,) = event["spans"] + assert span["op"] == "db.redis" + assert span["description"] == "redis.pipeline.execute" + assert span["data"] == { + "redis.commands": { + "count": 3, + "first_ten": expected_first_ten, + } + } + assert span["tags"] == { + "redis.transaction": is_transaction, + "redis.is_cluster": False, + } diff --git a/tests/integrations/redis/test_redis.py b/tests/integrations/redis/test_redis.py index 37a886c224..e5d760b018 100644 --- a/tests/integrations/redis/test_redis.py +++ b/tests/integrations/redis/test_redis.py @@ -1,9 +1,10 @@ +import pytest + from sentry_sdk import capture_message, start_transaction from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations.redis import RedisIntegration from fakeredis import FakeStrictRedis -import pytest try: from unittest import mock # python 3.3 and above @@ -37,9 +38,21 @@ def test_basic(sentry_init, capture_events): } -@pytest.mark.parametrize("is_transaction", [False, True]) -def test_redis_pipeline(sentry_init, capture_events, is_transaction): - sentry_init(integrations=[RedisIntegration()], traces_sample_rate=1.0) +@pytest.mark.parametrize( + "is_transaction, send_default_pii, expected_first_ten", + [ + (False, False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +def test_redis_pipeline( + sentry_init, capture_events, is_transaction, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) events = capture_events() connection = FakeStrictRedis() @@ -57,7 +70,7 @@ def test_redis_pipeline(sentry_init, capture_events, is_transaction): assert span["data"] == { "redis.commands": { "count": 3, - "first_ten": ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"], + "first_ten": expected_first_ten, }, SPANDATA.DB_SYSTEM: "redis", } diff --git a/tests/integrations/rediscluster/test_rediscluster.py b/tests/integrations/rediscluster/test_rediscluster.py index c4b5a8e7d3..32eb8c4fa5 100644 --- a/tests/integrations/rediscluster/test_rediscluster.py +++ b/tests/integrations/rediscluster/test_rediscluster.py @@ -52,8 +52,21 @@ def test_rediscluster_basic(rediscluster_cls, sentry_init, capture_events): } -def test_rediscluster_pipeline(sentry_init, capture_events): - sentry_init(integrations=[RedisIntegration()], traces_sample_rate=1.0) +@pytest.mark.parametrize( + "send_default_pii, expected_first_ten", + [ + (False, ["GET 'foo'", "SET 'bar' [Filtered]", "SET 'baz' [Filtered]"]), + (True, ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"]), + ], +) +def test_rediscluster_pipeline( + sentry_init, capture_events, send_default_pii, expected_first_ten +): + sentry_init( + integrations=[RedisIntegration()], + traces_sample_rate=1.0, + send_default_pii=send_default_pii, + ) events = capture_events() rc = rediscluster.RedisCluster(connection_pool=True) @@ -71,7 +84,7 @@ def test_rediscluster_pipeline(sentry_init, capture_events): assert span["data"] == { "redis.commands": { "count": 3, - "first_ten": ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"], + "first_ten": expected_first_ten, }, SPANDATA.DB_SYSTEM: "redis", } diff --git a/tox.ini b/tox.ini index a1f307100f..65eb368c3d 100644 --- a/tox.ini +++ b/tox.ini @@ -120,7 +120,7 @@ envlist = {py3.7,py3.8,py3.9,py3.10,py3.11}-quart-v{0.16,0.17,0.18} # Redis - {py2.7,py3.7,py3.8,py3.9}-redis + {py2.7,py3.7,py3.8,py3.9,py3.10,py3.11}-redis # Redis Cluster {py2.7,py3.7,py3.8,py3.9}-rediscluster-v{1,2.1.0,2} @@ -364,7 +364,8 @@ deps = requests: requests>=2.0 # Redis - redis: fakeredis<1.7.4 + redis: fakeredis!=1.7.4 + {py3.7,py3.8,py3.9,py3.10,py3.11}-redis: pytest-asyncio # Redis Cluster rediscluster-v1: redis-py-cluster>=1.0.0,<2.0.0