From a3a9a73398b6f585d7bf97d3d02c5aaf44221886 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Sun, 26 Mar 2023 12:45:15 +0300 Subject: [PATCH 01/17] try to fix --- redis/asyncio/client.py | 48 ++++++++++++++++++++++++++-------------- redis/asyncio/cluster.py | 9 ++++++-- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 9e16ee08de..bb946cbce9 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -511,12 +511,17 @@ async def execute_command(self, *args, **options): if self.single_connection_client: await self._single_conn_lock.acquire() try: - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_raise(conn, error), + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_raise(conn, error), + ) ) + except asyncio.CancelledError: + await conn.disconnect(nowait=True) + raise finally: if self.single_connection_client: self._single_conn_lock.release() @@ -772,10 +777,16 @@ async def _execute(self, conn, command, *args, **kwargs): called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - return await conn.retry.call_with_retry( - lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), - ) + try: + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: command(*args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), + ) + ) + except asyncio.CancelledError: + await conn.disconnect() + raise async def parse_response(self, block: bool = True, timeout: float = 0): """Parse the response from a publish/subscribe command""" @@ -1191,13 +1202,18 @@ async def immediate_execute_command(self, *args, **options): command_name, self.shard_hint ) self.connection = conn - - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_reset_raise(conn, error), - ) + try: + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_reset_raise(conn, error), + ) + ) + except asyncio.CancelledError: + await conn.disconnect() + raise def pipeline_execute_command(self, *args, **options): """ diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 569a0765f8..84d986f601 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1029,9 +1029,14 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: ret = False for cmd in commands: try: - cmd.result = await self.parse_response( - connection, cmd.args[0], **cmd.kwargs + cmd.result = await asyncio.shield( + self.parse_response( + connection, cmd.args[0], **cmd.kwargs + ) ) + except asyncio.CancelledError: + await connection.disconnect(nowait=True) + raise except Exception as e: cmd.result = e ret = True From 0cf8aaeb6fc3fcac001938e3178cfa320c7a3f7d Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Sun, 26 Mar 2023 13:08:18 +0300 Subject: [PATCH 02/17] starting testing --- tests/test_asyncio/test_connection.py | 21 ------- tests/test_asyncio/test_cwe_404.py | 86 +++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 21 deletions(-) create mode 100644 tests/test_asyncio/test_cwe_404.py diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index d3b6285cfb..e2d77fc1c3 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -44,27 +44,6 @@ async def test_invalid_response(create_redis): await r.connection.disconnect() -async def test_asynckills(): - - for b in [True, False]: - r = Redis(single_connection_client=b) - - await r.set("foo", "foo") - await r.set("bar", "bar") - - t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(1) - t.cancel() - try: - await t - except asyncio.CancelledError: - pytest.fail("connection left open with unread response") - - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" - - @pytest.mark.onlynoncluster async def test_single_connection(): """Test that concurrent requests on a single client are synchronised.""" diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py new file mode 100644 index 0000000000..3d9ac20544 --- /dev/null +++ b/tests/test_asyncio/test_cwe_404.py @@ -0,0 +1,86 @@ +import asyncio +import sys +import pytest +from redis.asyncio import Redis + +async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name=''): + while data := await reader.read(1000): + # print(name, 'received:', data) + await asyncio.sleep(delay) + writer.write(data) + await writer.drain() + + +class DelayProxy: + + def __init__(self, addr, redis_addr, delay: float): + self.addr = addr + self.redis_addr = redis_addr + self.delay = delay + + async def start(self): + server = await asyncio.start_server(self.handle, *self.addr) + self.ROUTINE = asyncio.create_task(server.serve_forever()) + + async def handle(self, reader, writer): + # establish connection to redis + redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) + pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, 'to redis:')) + pipe2 = asyncio.create_task(pipe(redis_reader, writer, self.delay, 'from redis:')) + await asyncio.gather(pipe1, pipe2) + + async def kill(self): + self.ROUTINE.cancel() + +async def test_standalone(): + + # create a tcp socket proxy that relays data to Redis and back, inserting 0.1 seconds of delay + dp = DelayProxy(addr=('localhost', 6380), redis_addr=('localhost', 6379), delay=0.1) + await dp.start() + + for b in [True, False]: + # note that we connect to proxy, rather than to Redis directly + async with Redis(host='localhost', port=6380, single_connection_client=b) as r: + + await r.set('foo', 'foo') + await r.set('bar', 'bar') + + t = asyncio.create_task(r.get('foo')) + await asyncio.sleep(0.050) + t.cancel() + try: + await t + sys.stderr.write('try again, we did not cancel the task in time\n') + except asyncio.CancelledError: + sys.stderr.write('managed to cancel the task, connection is left open with unread response\n') + + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" + + await dp.kill() + +async def test_standalone_pipeline(): + dp = DelayProxy(addr=('localhost', 6380), redis_addr=('localhost', 6379), delay=0.1) + await dp.start() + async with Redis(host='localhost', port=6380) as r: + await r.set('foo', 'foo') + await r.set('bar', 'bar') + + pipe = r.pipeline() + pipe.get("bar") + pipe.ping() + pipe.get("foo") + + pipe2 = r.pipeline() + pipe2.get("bar") + pipe2.ping() + pipe2.get("foo") + + t = asyncio.create_task(r.get('foo')) + await asyncio.sleep(0.050) + t.cancel() + assert await pipe.execute() == [b"bar", True, b"foo"] + assert await pipe2.execute() == [b"bar", True, b"foo"] + + await dp.kill() From 4163e8595d9da50e78b35ddeaeed272c64be3f38 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Sun, 26 Mar 2023 14:53:41 +0300 Subject: [PATCH 03/17] pipeline --- redis/asyncio/cluster.py | 4 +- tests/test_asyncio/test_cwe_404.py | 75 +++++++++++++++++------------- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 84d986f601..325f741030 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1030,9 +1030,7 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: for cmd in commands: try: cmd.result = await asyncio.shield( - self.parse_response( - connection, cmd.args[0], **cmd.kwargs - ) + self.parse_response(connection, cmd.args[0], **cmd.kwargs) ) except asyncio.CancelledError: await connection.disconnect(nowait=True) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 3d9ac20544..cb478a06f3 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -3,84 +3,95 @@ import pytest from redis.asyncio import Redis -async def pipe(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name=''): + +async def pipe( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name="" +): while data := await reader.read(1000): - # print(name, 'received:', data) await asyncio.sleep(delay) writer.write(data) await writer.drain() class DelayProxy: - def __init__(self, addr, redis_addr, delay: float): self.addr = addr self.redis_addr = redis_addr self.delay = delay async def start(self): - server = await asyncio.start_server(self.handle, *self.addr) - self.ROUTINE = asyncio.create_task(server.serve_forever()) + self.server = await asyncio.start_server(self.handle, *self.addr) + self.ROUTINE = asyncio.create_task(self.server.serve_forever()) async def handle(self, reader, writer): # establish connection to redis redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) - pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, 'to redis:')) - pipe2 = asyncio.create_task(pipe(redis_reader, writer, self.delay, 'from redis:')) + pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, "to redis:")) + pipe2 = asyncio.create_task( + pipe(redis_reader, writer, self.delay, "from redis:") + ) await asyncio.gather(pipe1, pipe2) - + async def kill(self): + # clean up enough so that we can reuse the looper self.ROUTINE.cancel() - + loop = self.server.get_loop() + await loop.shutdown_asyncgens() + + async def test_standalone(): - + # create a tcp socket proxy that relays data to Redis and back, inserting 0.1 seconds of delay - dp = DelayProxy(addr=('localhost', 6380), redis_addr=('localhost', 6379), delay=0.1) + dp = DelayProxy(addr=("localhost", 6380), redis_addr=("localhost", 6379), delay=0.1) await dp.start() for b in [True, False]: - # note that we connect to proxy, rather than to Redis directly - async with Redis(host='localhost', port=6380, single_connection_client=b) as r: + # note that we connect to proxy, rather than to Redis directly + async with Redis(host="localhost", port=6380, single_connection_client=b) as r: - await r.set('foo', 'foo') - await r.set('bar', 'bar') + await r.set("foo", "foo") + await r.set("bar", "bar") - t = asyncio.create_task(r.get('foo')) + t = asyncio.create_task(r.get("foo")) await asyncio.sleep(0.050) t.cancel() try: await t - sys.stderr.write('try again, we did not cancel the task in time\n') + sys.stderr.write("try again, we did not cancel the task in time\n") except asyncio.CancelledError: - sys.stderr.write('managed to cancel the task, connection is left open with unread response\n') + sys.stderr.write( + "managed to cancel the task, connection is left open with unread response\n" + ) assert await r.get("bar") == b"bar" assert await r.ping() assert await r.get("foo") == b"foo" - + await dp.kill() - + + async def test_standalone_pipeline(): - dp = DelayProxy(addr=('localhost', 6380), redis_addr=('localhost', 6379), delay=0.1) + dp = DelayProxy(addr=("localhost", 6380), redis_addr=("localhost", 6379), delay=0.1) await dp.start() - async with Redis(host='localhost', port=6380) as r: - await r.set('foo', 'foo') - await r.set('bar', 'bar') - + async with Redis(host="localhost", port=6380) as r: + await r.set("foo", "foo") + await r.set("bar", "bar") + pipe = r.pipeline() - pipe.get("bar") - pipe.ping() - pipe.get("foo") - + pipe2 = r.pipeline() pipe2.get("bar") pipe2.ping() pipe2.get("foo") - - t = asyncio.create_task(r.get('foo')) + + t = asyncio.create_task(pipe.get("foo").execute()) await asyncio.sleep(0.050) t.cancel() - assert await pipe.execute() == [b"bar", True, b"foo"] + + pipe.get("bar") + pipe.ping() + pipe.get("foo") + assert await pipe.execute() == [b"foo", b"bar", True, b"foo"] assert await pipe2.execute() == [b"bar", True, b"foo"] await dp.kill() From 20893a4710a29a2372a9eef6e5b27b7e1dd1d40a Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Sun, 26 Mar 2023 16:52:12 +0300 Subject: [PATCH 04/17] partial fix --- tests/test_asyncio/test_cluster.py | 17 ----------- tests/test_asyncio/test_cwe_404.py | 47 ++++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 0857c056c2..13e5e26ae3 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -340,23 +340,6 @@ async def test_from_url(self, request: FixtureRequest) -> None: rc = RedisCluster.from_url("rediss://localhost:16379") assert rc.connection_kwargs["connection_class"] is SSLConnection - async def test_asynckills(self, r) -> None: - - await r.set("foo", "foo") - await r.set("bar", "bar") - - t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(1) - t.cancel() - try: - await t - except asyncio.CancelledError: - pytest.fail("connection is left open with unread response") - - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" - async def test_max_connections( self, create_redis: Callable[..., RedisCluster] ) -> None: diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index cb478a06f3..60ad32f63d 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -1,7 +1,10 @@ import asyncio import sys + import pytest + from redis.asyncio import Redis +from redis.asyncio.cluster import RedisCluster async def pipe( @@ -32,7 +35,7 @@ async def handle(self, reader, writer): ) await asyncio.gather(pipe1, pipe2) - async def kill(self): + async def stop(self): # clean up enough so that we can reuse the looper self.ROUTINE.cancel() loop = self.server.get_loop() @@ -41,13 +44,14 @@ async def kill(self): async def test_standalone(): - # create a tcp socket proxy that relays data to Redis and back, inserting 0.1 seconds of delay - dp = DelayProxy(addr=("localhost", 6380), redis_addr=("localhost", 6379), delay=0.1) + # create a tcp socket proxy that relays data to Redis and back, + # inserting 0.1 seconds of delay + dp = DelayProxy(addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=0.1) await dp.start() for b in [True, False]: # note that we connect to proxy, rather than to Redis directly - async with Redis(host="localhost", port=6380, single_connection_client=b) as r: + async with Redis(host="localhost", port=5380, single_connection_client=b) as r: await r.set("foo", "foo") await r.set("bar", "bar") @@ -60,20 +64,20 @@ async def test_standalone(): sys.stderr.write("try again, we did not cancel the task in time\n") except asyncio.CancelledError: sys.stderr.write( - "managed to cancel the task, connection is left open with unread response\n" + "canceled task, connection is left open with unread response\n" ) assert await r.get("bar") == b"bar" assert await r.ping() assert await r.get("foo") == b"foo" - await dp.kill() + await dp.stop() async def test_standalone_pipeline(): - dp = DelayProxy(addr=("localhost", 6380), redis_addr=("localhost", 6379), delay=0.1) + dp = DelayProxy(addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=0.1) await dp.start() - async with Redis(host="localhost", port=6380) as r: + async with Redis(host="localhost", port=5380) as r: await r.set("foo", "foo") await r.set("bar", "bar") @@ -94,4 +98,29 @@ async def test_standalone_pipeline(): assert await pipe.execute() == [b"foo", b"bar", True, b"foo"] assert await pipe2.execute() == [b"bar", True, b"foo"] - await dp.kill() + await dp.stop() + + +async def test_cluster(request): + + dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1) + await dp.start() + + r = RedisCluster.from_url("redis://localhost:5381") + await r.initialize() + await r.set("foo", "foo") + await r.set("bar", "bar") + + t = asyncio.create_task(r.get("foo")) + await asyncio.sleep(0.050) + t.cancel() + try: + await t + except asyncio.CancelledError: + pytest.fail("connection is left open with unread response") + + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" + + await dp.stop() From 38f795764df36b30e4bfdc427f5ae47daca28b2a Mon Sep 17 00:00:00 2001 From: Chayim Date: Mon, 27 Mar 2023 08:47:46 +0300 Subject: [PATCH 05/17] Update tests/test_asyncio/test_cwe_404.py Co-authored-by: James R T --- tests/test_asyncio/test_cwe_404.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 60ad32f63d..f2c7eb9aa3 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -10,7 +10,10 @@ async def pipe( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name="" ): - while data := await reader.read(1000): + while True: + data = await reader.read(1000) + if not data: + break await asyncio.sleep(delay) writer.write(data) await writer.drain() From e54d3bba77700f9e433fb365901d49af849794a0 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Mon, 27 Mar 2023 10:06:50 +0300 Subject: [PATCH 06/17] fail-fast false --- .github/workflows/integration.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 0f9db8fb1a..9ff6de7899 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -51,6 +51,7 @@ jobs: timeout-minutes: 30 strategy: max-parallel: 15 + fail-fast: false matrix: python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] test-type: ['standalone', 'cluster'] @@ -110,6 +111,7 @@ jobs: strategy: matrix: python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] + fail-fast: false steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 From b9e92f0d23b198b23218570712621174555e28b6 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Mon, 27 Mar 2023 10:37:18 +0300 Subject: [PATCH 07/17] syntax --- .github/workflows/integration.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 9ff6de7899..f49a4fcd46 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -109,9 +109,9 @@ jobs: name: Install package from commit hash runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', 'pypy-3.7', 'pypy-3.8', 'pypy-3.9'] - fail-fast: false steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 From f3739629f4f1d3f9c009c172e18850ac16383c62 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Mon, 27 Mar 2023 16:24:28 +0300 Subject: [PATCH 08/17] con reconnect in one state --- redis/asyncio/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index bb946cbce9..f283d8b2f6 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -787,6 +787,9 @@ async def _execute(self, conn, command, *args, **kwargs): except asyncio.CancelledError: await conn.disconnect() raise + except TimeoutError: + await conn.disconnect() + await conn.connect() async def parse_response(self, block: bool = True, timeout: float = 0): """Parse the response from a publish/subscribe command""" From e4854de7f5c335b0000f2b79901bf600118a1468 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Mon, 27 Mar 2023 16:45:12 +0300 Subject: [PATCH 09/17] moved the shield --- redis/asyncio/client.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index f283d8b2f6..30f2648431 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -764,7 +764,12 @@ async def _disconnect_raise_connect(self, conn, error): if retry_on_timeout is not set or the error is not a TimeoutError. Otherwise, try to reconnect """ - await conn.disconnect() + try: + await asyncio.shield(conn.disconnect()) + except asyncio.CancelledError: + await conn.disconnect() + raise + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): raise error await conn.connect() @@ -777,19 +782,10 @@ async def _execute(self, conn, command, *args, **kwargs): called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - try: - return await asyncio.shield( - conn.retry.call_with_retry( + return await conn.retry.call_with_retry( lambda: command(*args, **kwargs), lambda error: self._disconnect_raise_connect(conn, error), - ) - ) - except asyncio.CancelledError: - await conn.disconnect() - raise - except TimeoutError: - await conn.disconnect() - await conn.connect() + ) async def parse_response(self, block: bool = True, timeout: float = 0): """Parse the response from a publish/subscribe command""" From 6a92da78d5459d202ff3f957c7af421b133795d0 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Tue, 28 Mar 2023 10:41:49 +0300 Subject: [PATCH 10/17] linter fix --- redis/asyncio/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 30f2648431..20ab45812e 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -783,8 +783,8 @@ async def _execute(self, conn, command, *args, **kwargs): patterns we were previously listening to """ return await conn.retry.call_with_retry( - lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), + lambda: command(*args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), ) async def parse_response(self, block: bool = True, timeout: float = 0): From 7744dc6c5d385472d0806c5ad5b3de97c007bef5 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Tue, 28 Mar 2023 15:17:37 +0300 Subject: [PATCH 11/17] move try-except inside the asyncio.shield --- redis/asyncio/client.py | 107 +++++++++++++++++++++++---------------- redis/asyncio/cluster.py | 24 +++++---- 2 files changed, 77 insertions(+), 54 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 20ab45812e..3e0c2212a0 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -500,6 +500,23 @@ async def _disconnect_raise(self, conn: Connection, error: Exception): ): raise error + async def _try_send_command_parse_response(self, conn, *args, **options): + try: + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, args[0], *args, **options + ), + lambda error: self._disconnect_raise(conn, error), + ) + except asyncio.CancelledError: + await conn.disconnect(nowait=True) + raise + finally: + if self.single_connection_client: + self._single_conn_lock.release() + if not self.connection: + await self.connection_pool.release(conn) + # COMMAND EXECUTION AND PROTOCOL PARSING async def execute_command(self, *args, **options): """Execute a command and return a parsed response""" @@ -510,23 +527,10 @@ async def execute_command(self, *args, **options): if self.single_connection_client: await self._single_conn_lock.acquire() - try: - return await asyncio.shield( - conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_raise(conn, error), - ) - ) - except asyncio.CancelledError: - await conn.disconnect(nowait=True) - raise - finally: - if self.single_connection_client: - self._single_conn_lock.release() - if not self.connection: - await pool.release(conn) + + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) + ) async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options @@ -764,16 +768,19 @@ async def _disconnect_raise_connect(self, conn, error): if retry_on_timeout is not set or the error is not a TimeoutError. Otherwise, try to reconnect """ - try: - await asyncio.shield(conn.disconnect()) - except asyncio.CancelledError: - await conn.disconnect() - raise + await conn.disconnect() if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): raise error await conn.connect() + async def _try_execute(self, conn, command, *arg, **kwargs): + try: + return await command(*arg, **kwargs) + except asyncio.CancelledError: + await conn.disconnect() + raise + async def _execute(self, conn, command, *args, **kwargs): """ Connect manually upon disconnection. If the Redis server is down, @@ -782,9 +789,11 @@ async def _execute(self, conn, command, *args, **kwargs): called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - return await conn.retry.call_with_retry( - lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: self._try_execute(conn, command, *args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), + ) ) async def parse_response(self, block: bool = True, timeout: float = 0): @@ -1186,6 +1195,18 @@ async def _disconnect_reset_raise(self, conn, error): await self.reset() raise + async def _try_send_command_parse_response(self, conn, *args, **options): + try: + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, args[0], *args, **options + ), + lambda error: self._disconnect_reset_raise(conn, error), + ) + except asyncio.CancelledError: + await conn.disconnect() + raise + async def immediate_execute_command(self, *args, **options): """ Execute a command immediately, but don't auto-retry on a @@ -1203,12 +1224,7 @@ async def immediate_execute_command(self, *args, **options): self.connection = conn try: return await asyncio.shield( - conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_reset_raise(conn, error), - ) + self._try_send_command_parse_response(conn, *args, **options) ) except asyncio.CancelledError: await conn.disconnect() @@ -1379,6 +1395,19 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception): await self.reset() raise + async def _try_execute(self, conn, execute, stack, raise_on_error): + try: + conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error), + ) + except asyncio.CancelledError: + # not supposed to be possible, yet here we are + await conn.disconnect(nowait=True) + raise + finally: + await self.reset() + async def execute(self, raise_on_error: bool = True): """Execute all the commands in the current pipeline""" stack = self.command_stack @@ -1399,19 +1428,9 @@ async def execute(self, raise_on_error: bool = True): self.connection = conn conn = cast(Connection, conn) - try: - return await asyncio.shield( - conn.retry.call_with_retry( - lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), - ) - ) - except asyncio.CancelledError: - # not supposed to be possible, yet here we are - await conn.disconnect(nowait=True) - raise - finally: - await self.reset() + return await asyncio.shield( + self._try_execute(conn, execute, stack, raise_on_error) + ) async def discard(self): """Flushes all previously queued commands diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 325f741030..a4a9561cf1 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1016,6 +1016,19 @@ async def _parse_and_release(self, connection, *args, **kwargs): finally: self._free.append(connection) + async def _try_parse_response(self, cmd, connection, ret): + try: + cmd.result = await asyncio.shield( + self.parse_response(connection, cmd.args[0], **cmd.kwargs) + ) + except asyncio.CancelledError: + await connection.disconnect(nowait=True) + raise + except Exception as e: + cmd.result = e + ret = True + return ret + async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Acquire connection connection = self.acquire_connection() @@ -1028,16 +1041,7 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Read responses ret = False for cmd in commands: - try: - cmd.result = await asyncio.shield( - self.parse_response(connection, cmd.args[0], **cmd.kwargs) - ) - except asyncio.CancelledError: - await connection.disconnect(nowait=True) - raise - except Exception as e: - cmd.result = e - ret = True + ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret)) # Release connection self._free.append(connection) From 7f820521ea672c6593a6b5266fdde5b8290f37ca Mon Sep 17 00:00:00 2001 From: dvora-h Date: Tue, 28 Mar 2023 15:29:22 +0300 Subject: [PATCH 12/17] fix pipeline --- redis/asyncio/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 3e0c2212a0..4585c78148 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1397,7 +1397,7 @@ async def _disconnect_raise_reset(self, conn: Connection, error: Exception): async def _try_execute(self, conn, execute, stack, raise_on_error): try: - conn.retry.call_with_retry( + return await conn.retry.call_with_retry( lambda: execute(conn, stack, raise_on_error), lambda error: self._disconnect_raise_reset(conn, error), ) From f89c0a7fa5e5941e15e8be8b23ee0ee367639929 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Tue, 28 Mar 2023 22:01:35 +0300 Subject: [PATCH 13/17] catching, handling, and validating the error --- redis/asyncio/client.py | 9 ++++++--- tests/test_asyncio/test_cwe_404.py | 28 +++++++++++++++++++++------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 4585c78148..0667ea05dd 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1428,9 +1428,12 @@ async def execute(self, raise_on_error: bool = True): self.connection = conn conn = cast(Connection, conn) - return await asyncio.shield( - self._try_execute(conn, execute, stack, raise_on_error) - ) + try: + return await asyncio.shield( + self._try_execute(conn, execute, stack, raise_on_error) + ) + except RuntimeError: + self.reset() async def discard(self): """Flushes all previously queued commands diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index f2c7eb9aa3..5b3d0a3b43 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -45,11 +45,14 @@ async def stop(self): await loop.shutdown_asyncgens() -async def test_standalone(): +@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) +async def test_standalone(delay): # create a tcp socket proxy that relays data to Redis and back, # inserting 0.1 seconds of delay - dp = DelayProxy(addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=0.1) + dp = DelayProxy( + addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 + ) await dp.start() for b in [True, False]: @@ -60,7 +63,7 @@ async def test_standalone(): await r.set("bar", "bar") t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(0.050) + await asyncio.sleep(delay) t.cancel() try: await t @@ -77,8 +80,11 @@ async def test_standalone(): await dp.stop() -async def test_standalone_pipeline(): - dp = DelayProxy(addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=0.1) +@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) +async def test_standalone_pipeline(delay): + dp = DelayProxy( + addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 + ) await dp.start() async with Redis(host="localhost", port=5380) as r: await r.set("foo", "foo") @@ -92,13 +98,21 @@ async def test_standalone_pipeline(): pipe2.get("foo") t = asyncio.create_task(pipe.get("foo").execute()) - await asyncio.sleep(0.050) + await asyncio.sleep(delay) t.cancel() pipe.get("bar") pipe.ping() pipe.get("foo") - assert await pipe.execute() == [b"foo", b"bar", True, b"foo"] + pipe.reset() + + assert await pipe.execute() is None + + # validating that the pipeline can be used as it could previously + pipe.get("bar") + pipe.ping() + pipe.get("foo") + assert await pipe.execute() == [b"bar", True, b"foo"] assert await pipe2.execute() == [b"bar", True, b"foo"] await dp.stop() From 9516f2122e84c0c46ac156fde9017d9b8a769743 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Wed, 29 Mar 2023 10:10:56 +0300 Subject: [PATCH 14/17] skip specific test on python 3.7 --- tests/test_asyncio/test_pubsub.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index 8f3817a569..a9a398372c 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -973,6 +973,7 @@ async def get_msg_or_timeout(timeout=0.1): # the timeout on the read should not cause disconnect assert pubsub.connection.is_connected + @pytest.mark.skipif(sys.version_info < (3, 8), reason="desired state in 3.7") async def test_base_exception(self, r: redis.Redis): """ Manually trigger a BaseException inside the parser's .read_response method From e0517a8aec407b53af0a155460ccb3ce05715c0f Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Wed, 29 Mar 2023 10:35:56 +0300 Subject: [PATCH 15/17] except and the finally --- redis/asyncio/client.py | 2 ++ tests/test_asyncio/test_pubsub.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 0667ea05dd..6a16195d5e 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1434,6 +1434,8 @@ async def execute(self, raise_on_error: bool = True): ) except RuntimeError: self.reset() + finally: + self.reset() async def discard(self): """Flushes all previously queued commands diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index a9a398372c..8f3817a569 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -973,7 +973,6 @@ async def get_msg_or_timeout(timeout=0.1): # the timeout on the read should not cause disconnect assert pubsub.connection.is_connected - @pytest.mark.skipif(sys.version_info < (3, 8), reason="desired state in 3.7") async def test_base_exception(self, r: redis.Redis): """ Manually trigger a BaseException inside the parser's .read_response method From ceda6ac73e6fe504a2f629f8a1df7c08b93b33a5 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Wed, 29 Mar 2023 10:57:42 +0300 Subject: [PATCH 16/17] tagging tests appropriately --- tests/test_asyncio/test_cwe_404.py | 3 +++ tests/test_asyncio/test_pubsub.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 5b3d0a3b43..668344042d 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -45,6 +45,7 @@ async def stop(self): await loop.shutdown_asyncgens() +@pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) async def test_standalone(delay): @@ -80,6 +81,7 @@ async def test_standalone(delay): await dp.stop() +@pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) async def test_standalone_pipeline(delay): dp = DelayProxy( @@ -118,6 +120,7 @@ async def test_standalone_pipeline(delay): await dp.stop() +@pytest.mark.onlycluster async def test_cluster(request): dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1) diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index 8f3817a569..ba70782e42 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -973,6 +973,9 @@ async def get_msg_or_timeout(timeout=0.1): # the timeout on the read should not cause disconnect assert pubsub.connection.is_connected + @pytest.mark.skipif( + sys.version_info < (3, 8), reason="requires python 3.8 or higher" + ) async def test_base_exception(self, r: redis.Redis): """ Manually trigger a BaseException inside the parser's .read_response method From bb65a25d6d38d7f71111679ffd50a4d40b7816a3 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Wed, 29 Mar 2023 11:25:41 +0300 Subject: [PATCH 17/17] awaits ho --- redis/asyncio/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 6a16195d5e..b61033e561 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1433,9 +1433,9 @@ async def execute(self, raise_on_error: bool = True): self._try_execute(conn, execute, stack, raise_on_error) ) except RuntimeError: - self.reset() + await self.reset() finally: - self.reset() + await self.reset() async def discard(self): """Flushes all previously queued commands