diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index df228b43e2..85f7038baf 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -52,7 +52,7 @@ jobs: test-ubuntu-latest: runs-on: ubuntu-latest needs: load-engine-matrix - timeout-minutes: 25 + timeout-minutes: 35 strategy: fail-fast: false matrix: @@ -128,6 +128,46 @@ jobs: with: language-flag: -python + test-pubsub-ubuntu-latest: + runs-on: ubuntu-latest + needs: load-engine-matrix + timeout-minutes: 35 + strategy: + fail-fast: false + matrix: + engine: ${{ fromJson(needs.load-engine-matrix.outputs.matrix) }} + python: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + - "3.12" + + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python }} + + - name: Build Python wrapper + uses: ./.github/workflows/build-python-wrapper + with: + os: "ubuntu" + target: "x86_64-unknown-linux-gnu" + github-token: ${{ secrets.GITHUB_TOKEN }} + engine-version: ${{ matrix.engine.version }} + + - name: Test pubsub with pytest + working-directory: ./python + run: | + source .env/bin/activate + cd python/tests/ + pytest -c ../../pytest_pubsub.ini --asyncio-mode=auto test_pubsub.py::TestPubSub + lint-rust: runs-on: ubuntu-latest timeout-minutes: 15 @@ -141,9 +181,14 @@ jobs: cargo-toml-folder: ./python name: lint python-rust - build-macos-latest: + test-macos-latest: runs-on: macos-latest - timeout-minutes: 25 + needs: load-engine-matrix + timeout-minutes: 35 + strategy: + fail-fast: false + matrix: + engine: ${{ fromJson(needs.load-engine-matrix.outputs.matrix) }} steps: - uses: actions/checkout@v4 with: @@ -157,13 +202,43 @@ jobs: os: "macos" target: "aarch64-apple-darwin" github-token: ${{ secrets.GITHUB_TOKEN }} - engine-version: "7.2.5" + engine-version: ${{ matrix.engine.version }} - - name: Test compatibility with pytest + - name: Test with pytest working-directory: ./python run: | source .env/bin/activate - pytest --asyncio-mode=auto -m smoke_test + pytest --asyncio-mode=auto + + test-pubsub-macos-latest: + runs-on: macos-latest + needs: load-engine-matrix + timeout-minutes: 35 + strategy: + fail-fast: false + matrix: + engine: ${{ fromJson(needs.load-engine-matrix.outputs.matrix) }} + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - name: Set up Homebrew + uses: Homebrew/actions/setup-homebrew@master + + - name: Build Python wrapper + uses: ./.github/workflows/build-python-wrapper + with: + os: "macos" + target: "aarch64-apple-darwin" + github-token: ${{ secrets.GITHUB_TOKEN }} + engine-version: ${{ matrix.engine.version }} + + - name: Test pubsub with pytest + working-directory: ./python + run: | + source .env/bin/activate + cd python/tests/ + pytest -c ../../pytest_pubsub.ini --asyncio-mode=auto test_pubsub.py::TestPubSub build-amazonlinux-latest: runs-on: ubuntu-latest diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index bce4c8bf6c..ba60676391 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -117,9 +117,15 @@ impl UnixStreamListener { return ReadSocketClosed.into(); } Ok(_) => { - return match self.rotating_buffer.get_requests() { - Ok(requests) => ReceivedValues(requests), - Err(err) => UnhandledError(err.into()).into(), + match self.rotating_buffer.get_requests() { + Ok(requests) => { + if !requests.is_empty() { + // continue to read from socket + return ReceivedValues(requests); + } + continue; + } + Err(err) => return UnhandledError(err.into()).into(), }; } Err(ref e) diff --git a/python/pytest.ini b/python/pytest.ini index 7c29dff74f..236b85a8ad 100644 --- a/python/pytest.ini +++ b/python/pytest.ini @@ -1,5 +1,4 @@ [pytest] markers = smoke_test: mark a test as a build verification testing. -# TODO: Remove pubsub exclusion after the flakey tests are fixed addopts = -k "not redis_modules and not pubsub" diff --git a/python/pytest_pubsub.ini b/python/pytest_pubsub.ini new file mode 100644 index 0000000000..bf42185756 --- /dev/null +++ b/python/pytest_pubsub.ini @@ -0,0 +1,4 @@ +[pytest] +markers = + smoke_test: mark a test as a build verification testing. +addopts = -k "not redis_modules" diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index 2ee030c555..0ba5a87c73 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -152,7 +152,7 @@ async def close(self, err_message: Optional[str] = None) -> None: try: self._pubsub_lock.acquire() for pubsub_future in self._pubsub_futures: - if not response_future.done() and not pubsub_future.cancelled(): + if not pubsub_future.done() and not pubsub_future.cancelled(): pubsub_future.set_exception(ClosingError("")) finally: self._pubsub_lock.release() diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index ae33e5ae0e..87a37dc954 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -15,7 +15,7 @@ ) from glide.constants import OK from glide.exceptions import ConfigurationError -from glide.glide_client import GlideClient, GlideClusterClient, TGlideClient +from glide.glide_client import BaseClient, GlideClient, GlideClusterClient, TGlideClient from tests.conftest import create_client from tests.utils.utils import check_if_server_version_lt, get_random_string @@ -33,49 +33,57 @@ class MethodTesting(IntEnum): "Uses callback-based subscription method." -async def create_two_clients( +async def create_two_clients_with_pubsub( request, cluster_mode, - pub_sub, - pub_sub2: Optional[Any] = None, + client1_pubsub: Optional[Any] = None, + client2_pubsub: Optional[Any] = None, protocol: ProtocolVersion = ProtocolVersion.RESP3, + timeout: Optional[int] = None, ) -> Tuple[ Union[GlideClient, GlideClusterClient], Union[GlideClient, GlideClusterClient] ]: """ - Sets 2 up clients for testing purposes. + Sets 2 up clients for testing purposes with optional pubsub configuration. Args: request: pytest request for creating a client. cluster_mode: the cluster mode. - pub_sub: pubsub configuration subscription for a client. - pub_sub2: pubsub configuration subscription for a client. + client1_pubsub: pubsub configuration subscription for the first client. + client2_pubsub: pubsub configuration subscription for the second client. protocol: what protocol to use, used for the test: `test_pubsub_resp2_raise_an_error`. """ - cluster_mode_pubsub, standalone_mode_pubsub = None, None + cluster_mode_pubsub1, standalone_mode_pubsub1 = None, None cluster_mode_pubsub2, standalone_mode_pubsub2 = None, None if cluster_mode: - cluster_mode_pubsub = pub_sub - cluster_mode_pubsub2 = pub_sub2 + cluster_mode_pubsub1 = client1_pubsub + cluster_mode_pubsub2 = client2_pubsub else: - standalone_mode_pubsub = pub_sub - standalone_mode_pubsub2 = pub_sub2 + standalone_mode_pubsub1 = client1_pubsub + standalone_mode_pubsub2 = client2_pubsub - client = await create_client( + client1 = await create_client( request, cluster_mode=cluster_mode, - cluster_mode_pubsub=cluster_mode_pubsub2, - standalone_mode_pubsub=standalone_mode_pubsub2, + cluster_mode_pubsub=cluster_mode_pubsub1, + standalone_mode_pubsub=standalone_mode_pubsub1, protocol=protocol, + timeout=timeout, ) - client2 = await create_client( - request, - cluster_mode=cluster_mode, - cluster_mode_pubsub=cluster_mode_pubsub, - standalone_mode_pubsub=standalone_mode_pubsub, - protocol=protocol, - ) - return client, client2 + try: + client2 = await create_client( + request, + cluster_mode=cluster_mode, + cluster_mode_pubsub=cluster_mode_pubsub2, + standalone_mode_pubsub=standalone_mode_pubsub2, + protocol=protocol, + timeout=timeout, + ) + except Exception as e: + await client1.close() + raise e + + return client1, client2 def decode_pubsub_msg(msg: Optional[CoreCommands.PubSubMsg]) -> CoreCommands.PubSubMsg: @@ -148,6 +156,64 @@ def new_message(msg: CoreCommands.PubSubMsg, context: Any): received_messages.append(msg) +async def client_cleanup( + client: Union[GlideClient, GlideClusterClient], + cluster_mode_subs: Optional[ + GlideClusterClientConfiguration.PubSubSubscriptions + ] = None, +): + """ + This function tries its best to clear state assosiated with client + Its explicitly calls client.close() and deletes the object + In addition, it tries to clean up cluster mode subsciptions since it was found the closing the client via close() is not enough. + Note that unsubscribing is not feasible in the current implementation since its unknown on which node the subs are configured + """ + # if is_cluster: + # pubsub_subs = cast(ClusterClientConfiguration.PubSubSubscriptions, pubsub_subs) + # else : + # pubsub_subs = cast(GlideClientConfiguration.PubSubSubscriptions, pubsub_subs) + + # for channel_type, channel_patterns in pubsub_subs: + # if channel_type == ClusterClientConfiguration.PubSubChannelModes.Exact or channel_type == GlideClientConfiguration.PubSubChannelModes.Exact: + # cmd = "UNSUBSCRIBE" + # elif channel_type == ClusterClientConfiguration.PubSubChannelModes.Pattern or channel_type == GlideClientConfiguration.PubSubChannelModes.Pattern: + # cmd = "PUNSUBSCRIBE" + # else: + # cmd = "SUNSUBSCRIBE" + + # # we need unsubscribe commands because close might + # # UNSUBSCRIBE commands are unsupported, also, the routing might be wrong in cluster mode + # for channel_patern in channel_patterns: + + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + + if cluster_mode_subs: + for ( + channel_type, + channel_patterns, + ) in cluster_mode_subs.channels_and_patterns.items(): + if channel_type == GlideClusterClientConfiguration.PubSubChannelModes.Exact: + cmd = "UNSUBSCRIBE" + elif ( + channel_type + == GlideClusterClientConfiguration.PubSubChannelModes.Pattern + ): + cmd = "PUNSUBSCRIBE" + elif not await check_if_server_version_lt(client, "7.0.0"): + cmd = "SUNSUBSCRIBE" + else: + continue + + for channel_patern in channel_patterns: + await client.custom_command([cmd, channel_patern]) + + await client.close() + del client + await asyncio.sleep(1) + + @pytest.mark.asyncio class TestPubSub: @pytest.mark.parametrize("cluster_mode", [True, False]) @@ -167,29 +233,31 @@ async def test_pubsub_exact_happy_path( Async, Sync, and Callback. It verifies that a message published to a specific channel is correctly received by a subscriber. """ - channel = get_random_string(10) - message = get_random_string(5) - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 # allow the message to propagate @@ -205,11 +273,18 @@ async def test_pubsub_exact_happy_path( await check_no_messages_left(method, listening_client, callback_messages, 1) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_happy_path_coexistence( @@ -222,23 +297,25 @@ async def test_pubsub_exact_happy_path_coexistence( and received using both async and sync methods to ensure that both methods can coexist and function correctly. """ - channel = get_random_string(10) - message = get_random_string(5) - message2 = get_random_string(7) - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + message2 = get_random_string(7) + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for msg in [message, message2]: result = await publishing_client.publish(msg, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -267,11 +344,18 @@ async def test_pubsub_exact_happy_path_coexistence( assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -287,44 +371,46 @@ async def test_pubsub_exact_happy_path_many_channels( unique message. It verifies that messages are correctly published and received using different retrieval methods: async, sync, and callback. """ - NUM_CHANNELS = 256 - shard_prefix = "{same-shard}" - - # Create a map of channels to random messages with shard prefix - channels_and_messages = { - f"{shard_prefix}{get_random_string(10)}": get_random_string(5) - for _ in range(NUM_CHANNELS) - } + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + shard_prefix = "{same-shard}" - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + # Create a map of channels to random messages with shard prefix + channels_and_messages = { + f"{shard_prefix}{get_random_string(10)}": get_random_string(5) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - callback=callback, - context=context, - ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + callback=callback, + context=context, + ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: # Publish messages to each channel for channel, message in channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -349,14 +435,21 @@ async def test_pubsub_exact_happy_path_many_channels( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_happy_path_many_channels_co_existence( self, request, cluster_mode: bool @@ -368,37 +461,39 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( It verifies that messages are correctly published and received using both async and sync methods to ensure that both methods can coexist and function correctly. """ - NUM_CHANNELS = 256 - shard_prefix = "{same-shard}" + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + shard_prefix = "{same-shard}" - # Create a map of channels to random messages with shard prefix - channels_and_messages = { - f"{shard_prefix}{get_random_string(10)}": get_random_string(5) - for _ in range(NUM_CHANNELS) - } + # Create a map of channels to random messages with shard prefix + channels_and_messages = { + f"{shard_prefix}{get_random_string(10)}": get_random_string(5) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - channels_and_messages.keys() - ) - }, - ) + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + channels_and_messages.keys() + ) + }, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: # Publish messages to each channel for channel, message in channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -424,14 +519,21 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -446,38 +548,45 @@ async def test_sharded_pubsub( Async, Sync, and Callback. It verifies that a message published to a specific sharded channel is correctly received by a subscriber. """ - channel = get_random_string(10) - message = get_random_string(5) - publish_response = 1 - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + publish_response = 1 + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) - min_version = "7.0.0" - if await check_if_server_version_lt(publishing_client, min_version): - pytest.skip(reason=f"Valkey version required >= {min_version}") + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) + min_version = "7.0.0" + if await check_if_server_version_lt(publishing_client, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) == publish_response ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the message to propagate await asyncio.sleep(1) @@ -488,14 +597,22 @@ async def test_sharded_pubsub( assert pubsub_msg.channel == channel assert pubsub_msg.pattern is None - finally: # assert there are no messages to read await check_no_messages_left(method, listening_client, callback_messages, 1) - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + + finally: + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): @@ -510,38 +627,48 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): both async and sync methods. This ensures that the asynchronous and synchronous message retrieval methods can coexist without interfering with each other and operate as expected. """ - channel = get_random_string(10) - message = get_random_string(5) - message2 = get_random_string(7) - publish_response = 1 if cluster_mode else OK - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = get_random_string(5) + message2 = get_random_string(7) + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - min_version = "7.0.0" - if await check_if_server_version_lt(publishing_client, min_version): - pytest.skip(reason=f"Valkey version required >= {min_version}") + min_version = "7.0.0" + if await check_if_server_version_lt(publishing_client, min_version): + pytest.skip(reason=f"Valkey version required >= {min_version}") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) - == publish_response + == 1 ) assert ( await cast(GlideClusterClient, publishing_client).publish( message2, channel, sharded=True ) - == publish_response + == 1 ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message2, channel, sharded=True + # ) + # allow the messages to propagate await asyncio.sleep(1) @@ -567,11 +694,18 @@ async def test_sharded_pubsub_co_existence(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["SUNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -587,45 +721,47 @@ async def test_sharded_pubsub_many_channels( across multiple sharded channels. It covers three different message retrieval methods: Async, Sync, and Callback. """ - NUM_CHANNELS = 256 - shard_prefix = "{same-shard}" - publish_response = 1 - - # Create a map of channels to random messages with shard prefix - channels_and_messages = { - f"{shard_prefix}{get_random_string(10)}": get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + shard_prefix = "{same-shard}" + publish_response = 1 + + # Create a map of channels to random messages with shard prefix + channels_and_messages = { + f"{shard_prefix}{get_random_string(10)}": get_random_string(5) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( - channels_and_messages.keys() - ) - }, - {}, - callback=callback, - context=context, - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( + channels_and_messages.keys() + ) + }, + {}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - min_version = "7.0.0" - if await check_if_server_version_lt(publishing_client, min_version): - pytest.skip(reason=f"Valkey version required >= {min_version}") + min_version = "7.0.0" + if await check_if_server_version_lt(publishing_client, min_version): + pytest.skip(reason=f"Redis version required >= {min_version}") - try: # Publish messages to each channel for channel, message in channels_and_messages.items(): + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -633,6 +769,10 @@ async def test_sharded_pubsub_many_channels( == publish_response ) + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # Allow the messages to propagate await asyncio.sleep(1) @@ -655,14 +795,21 @@ async def test_sharded_pubsub_many_channels( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["SUNSUBSCRIBE", *list(channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["SUNSUBSCRIBE", *list(channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -676,32 +823,38 @@ async def test_pubsub_pattern( This test verifies the behavior of PUBSUB when subscribing to a pattern and receiving messages using three different methods: Async, Sync, and Callback. """ - PATTERN = "{{{}}}:{}".format("channel", "*") - channels = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + listening_client, publishing_client = None, None + try: + PATTERN = "{{{}}}:{}".format("channel", "*") + channels = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + } - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for channel, message in channels.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -724,11 +877,18 @@ async def test_pubsub_pattern( await check_no_messages_left(method, listening_client, callback_messages, 2) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): @@ -739,25 +899,31 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): and received using both async and sync methods to ensure that both methods can coexist and function correctly. """ - PATTERN = "{{{}}}:{}".format("channel", "*") - channels = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), - } + listening_client, publishing_client = None, None + try: + PATTERN = "{{{}}}:{}".format("channel", "*") + channels = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ), + } - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - ) + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for channel, message in channels.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -784,11 +950,18 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -804,33 +977,37 @@ async def test_pubsub_pattern_many_channels( and received. It verifies that messages are correctly published and received using different retrieval methods: async, sync, and callback. """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("channel", "*") - channels = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("channel", "*") + channels = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 5 + ) + for _ in range(NUM_CHANNELS) + } - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub + ) - try: for channel, message in channels.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -855,11 +1032,18 @@ async def test_pubsub_pattern_many_channels( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -877,60 +1061,68 @@ async def test_pubsub_combined_exact_and_pattern_one_client( - Subscribing to channels using a pattern and verifying message reception. - Ensuring that messages are correctly published and received using different retrieval methods (async, sync, callback). """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - all_channels_and_messages = { - **exact_channels_and_messages, - **pattern_channels_and_messages, - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() - ), - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() - ), - GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, - }, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") - publishing_client, listening_client = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 + ) + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 + ) + for _ in range(NUM_CHANNELS) + } + + all_channels_and_messages = { + **exact_channels_and_messages, + **pattern_channels_and_messages, + } + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ), + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + PATTERN + }, + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ), + GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, + }, + callback=callback, + context=context, + ) + + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) - try: # Publish messages to all channels for channel, message in all_channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -961,14 +1153,21 @@ async def test_pubsub_combined_exact_and_pattern_one_client( method, listening_client, callback_messages, NUM_CHANNELS * 2 ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client: + await client_cleanup( + listening_client, pub_sub_exact if cluster_mode else None ) - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( @@ -988,76 +1187,91 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - - all_channels_and_messages = { - **exact_channels_and_messages, - **pattern_channels_and_messages, - } - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + ( + listening_client_exact, + publishing_client, + listening_client_pattern, + client_dont_care, + ) = (None, None, None, None) + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") + + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 ) - }, - callback=callback, - context=context, - ) + for _ in range(NUM_CHANNELS) + } - publishing_client, listening_client_exact = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + all_channels_and_messages = { + **exact_channels_and_messages, + **pattern_channels_and_messages, + } - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages_pattern + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages - _, listening_client_pattern = await create_two_clients( - request, cluster_mode, pub_sub_pattern - ) + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + callback=callback, + context=context, + ) + + listening_client_exact, publishing_client = ( + await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) + ) + + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages_pattern + + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + + listening_client_pattern, client_dont_care = ( + await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_pattern + ) + ) - try: # Publish messages to all channels for channel, message in all_channels_and_messages.items(): result = await publishing_client.publish(message, channel) + # TODO: enable when client closing works if cluster_mode: assert result == 1 @@ -1107,14 +1321,29 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client_exact.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client_exact.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) + if listening_client_exact: + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) + + if listening_client_pattern: + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None ) - await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) + + if client_dont_care: + await client_cleanup(client_dont_care, None) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1133,73 +1362,83 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( - Subscribing to channels using a with sharded subscription and verifying message reception. - Ensuring that messages are correctly published and received using different retrieval methods (async, sync, callback). """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - SHARD_PREFIX = "{same-shard}" - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - sharded_channels_and_messages = { - f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) - for _ in range(NUM_CHANNELS) - } - - publish_response = 1 - - callback, context = None, None - callback_messages: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() - ), - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}, - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( - sharded_channels_and_messages.keys() - ), - }, - {}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") + SHARD_PREFIX = "{same-shard}" + + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 + ) + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 + ) + for _ in range(NUM_CHANNELS) + } + sharded_channels_and_messages = { + f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) + for _ in range(NUM_CHANNELS) + } - publishing_client, listening_client = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + publish_response = 1 + + callback, context = None, None + callback_messages: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages + + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ), + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + PATTERN + }, + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( + sharded_channels_and_messages.keys() + ), + }, + {}, + callback=callback, + context=context, + ) - # Setup PUBSUB for sharded channels (Redis version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Redis version required >= 7.0.0") + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) + + # Setup PUBSUB for sharded channels (Redis version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Redis version required >= 7.0.0") - try: # Publish messages to all channels for channel, message in { **exact_channels_and_messages, **pattern_channels_and_messages, }.items(): + # TODO: enable when client closing works assert ( await publishing_client.publish(message, channel) == publish_response ) + # await publishing_client.publish(message, channel) # Publish sharded messages to all channels for channel, message in sharded_channels_and_messages.items(): + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -1207,6 +1446,10 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( == publish_response ) + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the messages to propagate await asyncio.sleep(1) @@ -1240,18 +1483,25 @@ async def test_pubsub_combined_exact_pattern_and_sharded_one_client( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] - ) - await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) - await listening_client.custom_command( - ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client.custom_command(["PUNSUBSCRIBE", PATTERN]) + # await listening_client.custom_command( + # ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + # ) + if listening_client: + await client_cleanup( + listening_client, pub_sub_exact if cluster_mode else None ) + if publishing_client: + await client_cleanup(publishing_client, None) + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -1271,106 +1521,123 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - NUM_CHANNELS = 256 - PATTERN = "{{{}}}:{}".format("pattern", "*") - SHARD_PREFIX = "{same-shard}" - - # Create dictionaries of channels and their corresponding messages - exact_channels_and_messages = { - "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(10) - for _ in range(NUM_CHANNELS) - } - pattern_channels_and_messages = { - "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string(5) - for _ in range(NUM_CHANNELS) - } - sharded_channels_and_messages = { - f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) - for _ in range(NUM_CHANNELS) - } - - publish_response = 1 - - callback, context = None, None - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages_exact - - # Setup PUBSUB for exact channels - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + ( + listening_client_exact, + publishing_client, + listening_client_pattern, + listening_client_sharded, + ) = (None, None, None, None) + try: + NUM_CHANNELS = 256 + PATTERN = "{{{}}}:{}".format("pattern", "*") + SHARD_PREFIX = "{same-shard}" + + # Create dictionaries of channels and their corresponding messages + exact_channels_and_messages = { + "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string( + 10 ) - }, - { - GlideClientConfiguration.PubSubChannelModes.Exact: set( - exact_channels_and_messages.keys() + for _ in range(NUM_CHANNELS) + } + pattern_channels_and_messages = { + "{{{}}}:{}".format("pattern", get_random_string(5)): get_random_string( + 5 ) - }, - callback=callback, - context=context, - ) - - publishing_client, listening_client_exact = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) + for _ in range(NUM_CHANNELS) + } + sharded_channels_and_messages = { + f"{SHARD_PREFIX}:{get_random_string(10)}": get_random_string(7) + for _ in range(NUM_CHANNELS) + } - # Setup PUBSUB for sharded channels (Valkey version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") + publish_response = 1 - if method == MethodTesting.Callback: - context = callback_messages_pattern + callback, context = None, None + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, - callback=callback, - context=context, - ) + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages_exact - if method == MethodTesting.Callback: - context = callback_messages_sharded + # Setup PUBSUB for exact channels + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + { + GlideClientConfiguration.PubSubChannelModes.Exact: set( + exact_channels_and_messages.keys() + ) + }, + callback=callback, + context=context, + ) - pub_sub_sharded = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( - sharded_channels_and_messages.keys() + listening_client_exact, publishing_client = ( + await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, ) - }, - {}, - callback=callback, - context=context, - ) + ) - listening_client_sharded, listening_client_pattern = await create_two_clients( - request, cluster_mode, pub_sub_pattern, pub_sub_sharded - ) + # Setup PUBSUB for sharded channels (Valkey version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") + + if method == MethodTesting.Callback: + context = callback_messages_pattern + + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {PATTERN}}, + callback=callback, + context=context, + ) + + if method == MethodTesting.Callback: + context = callback_messages_sharded + + pub_sub_sharded = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: set( + sharded_channels_and_messages.keys() + ) + }, + {}, + callback=callback, + context=context, + ) + + listening_client_pattern, listening_client_sharded = ( + await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_pattern, pub_sub_sharded + ) + ) - try: # Publish messages to all channels for channel, message in { **exact_channels_and_messages, **pattern_channels_and_messages, }.items(): + # TODO: enable when client closing works assert ( await publishing_client.publish(message, channel) == publish_response ) + # await publishing_client.publish(message, channel) # Publish sharded messages to all channels for channel, message in sharded_channels_and_messages.items(): + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True @@ -1378,6 +1645,10 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( == publish_response ) + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the messages to propagate await asyncio.sleep(1) @@ -1446,18 +1717,33 @@ async def test_pubsub_combined_exact_pattern_and_sharded_multi_client( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client_exact.custom_command( - ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client_exact.custom_command( + # ["UNSUBSCRIBE", *list(exact_channels_and_messages.keys())] + # ) + # await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) + # await listening_client_sharded.custom_command( + # ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + # ) + if listening_client_exact: + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None ) - await listening_client_pattern.custom_command(["PUNSUBSCRIBE", PATTERN]) - await listening_client_sharded.custom_command( - ["SUNSUBSCRIBE", *list(sharded_channels_and_messages.keys())] + + if publishing_client: + await client_cleanup(publishing_client, None) + + if listening_client_pattern: + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None ) + if listening_client_sharded: + await client_cleanup(listening_client_sharded, None) + @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -1478,77 +1764,92 @@ async def test_pubsub_combined_different_channels_with_same_name( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - CHANNEL_NAME = "same-channel-name" - MESSAGE_EXACT = get_random_string(10) - MESSAGE_PATTERN = get_random_string(7) - MESSAGE_SHARDED = get_random_string(5) - - callback, context = None, None - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context = callback_messages_exact - - # Setup PUBSUB for exact channel - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - callback=callback, - context=context, - ) - - publishing_client, listening_client_exact = await create_two_clients( - request, - cluster_mode, - pub_sub_exact, - ) - - # (Valkey version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") - - # Setup PUBSUB for pattern channel - if method == MethodTesting.Callback: - context = callback_messages_pattern + ( + listening_client_exact, + publishing_client, + listening_client_pattern, + listening_client_sharded, + ) = (None, None, None, None) + try: + CHANNEL_NAME = "same-channel-name" + MESSAGE_EXACT = get_random_string(10) + MESSAGE_PATTERN = get_random_string(7) + MESSAGE_SHARDED = get_random_string(5) + + callback, context = None, None + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context = callback_messages_exact + + # Setup PUBSUB for exact channel + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, + callback=callback, + context=context, + ) - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { - CHANNEL_NAME - } - }, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, - callback=callback, - context=context, - ) + listening_client_exact, publishing_client = ( + await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub_exact, + ) + ) - if method == MethodTesting.Callback: - context = callback_messages_sharded + # (Valkey version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") + + # Setup PUBSUB for pattern channel + if method == MethodTesting.Callback: + context = callback_messages_pattern + + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, + callback=callback, + context=context, + ) - pub_sub_sharded = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { - CHANNEL_NAME - } - }, - {}, - callback=callback, - context=context, - ) + if method == MethodTesting.Callback: + context = callback_messages_sharded + + pub_sub_sharded = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + CHANNEL_NAME + } + }, + {}, + callback=callback, + context=context, + ) - listening_client_sharded, listening_client_pattern = await create_two_clients( - request, cluster_mode, pub_sub_pattern, pub_sub_sharded - ) + listening_client_pattern, listening_client_sharded = ( + await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_pattern, pub_sub_sharded + ) + ) - try: # Publish messages to each channel + # TODO: enable when client closing works assert await publishing_client.publish(MESSAGE_EXACT, CHANNEL_NAME) == 2 assert await publishing_client.publish(MESSAGE_PATTERN, CHANNEL_NAME) == 2 assert ( @@ -1558,6 +1859,12 @@ async def test_pubsub_combined_different_channels_with_same_name( == 1 ) + # await publishing_client.publish(MESSAGE_EXACT, CHANNEL_NAME) + # await publishing_client.publish(MESSAGE_PATTERN, CHANNEL_NAME) + # await cast(GlideClusterClient, publishing_client).publish( + # MESSAGE_SHARDED, CHANNEL_NAME, sharded=True + # ) + # allow the message to propagate await asyncio.sleep(1) @@ -1594,20 +1901,35 @@ async def test_pubsub_combined_different_channels_with_same_name( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client_exact.custom_command( - ["UNSUBSCRIBE", CHANNEL_NAME] - ) - await listening_client_pattern.custom_command( - ["PUNSUBSCRIBE", CHANNEL_NAME] + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client_exact.custom_command( + # ["UNSUBSCRIBE", CHANNEL_NAME] + # ) + # await listening_client_pattern.custom_command( + # ["PUNSUBSCRIBE", CHANNEL_NAME] + # ) + # await listening_client_sharded.custom_command( + # ["SUNSUBSCRIBE", CHANNEL_NAME] + # ) + if listening_client_exact: + await client_cleanup( + listening_client_exact, pub_sub_exact if cluster_mode else None ) - await listening_client_sharded.custom_command( - ["SUNSUBSCRIBE", CHANNEL_NAME] + + if publishing_client: + await client_cleanup(publishing_client, None) + + if listening_client_pattern: + await client_cleanup( + listening_client_pattern, pub_sub_pattern if cluster_mode else None ) + if listening_client_sharded: + await client_cleanup(listening_client_sharded, None) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize( "method", [MethodTesting.Async, MethodTesting.Sync, MethodTesting.Callback] @@ -1628,47 +1950,53 @@ async def test_pubsub_two_publishing_clients_same_name( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - CHANNEL_NAME = "channel-name" - MESSAGE_EXACT = get_random_string(10) - MESSAGE_PATTERN = get_random_string(7) - callback, context_exact, context_pattern = None, None, None - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context_exact = callback_messages_exact - context_pattern = callback_messages_pattern - - # Setup PUBSUB for exact channel - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - callback=callback, - context=context_exact, - ) - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { - CHANNEL_NAME - } - }, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, - callback=callback, - context=context_pattern, - ) + client_exact, client_pattern = None, None + try: + CHANNEL_NAME = "channel-name" + MESSAGE_EXACT = get_random_string(10) + MESSAGE_PATTERN = get_random_string(7) + callback, context_exact, context_pattern = None, None, None + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context_exact = callback_messages_exact + context_pattern = callback_messages_pattern + + # Setup PUBSUB for exact channel + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, + callback=callback, + context=context_exact, + ) + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, + callback=callback, + context=context_pattern, + ) - client_pattern, client_exact = await create_two_clients( - request, cluster_mode, pub_sub_exact, pub_sub_pattern - ) + client_exact, client_pattern = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_exact, pub_sub_pattern + ) - try: # Publish messages to each channel - both clients publishing for msg in [MESSAGE_EXACT, MESSAGE_PATTERN]: result = await client_pattern.publish(msg, CHANNEL_NAME) + # TODO: enable when client closing works if cluster_mode: assert result == 2 @@ -1697,12 +2025,21 @@ async def test_pubsub_two_publishing_clients_same_name( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) - await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) + # await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) + if client_exact: + await client_cleanup( + client_exact, pub_sub_exact if cluster_mode else None + ) + + if client_pattern: + await client_cleanup( + client_pattern, pub_sub_pattern if cluster_mode else None + ) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize( @@ -1724,72 +2061,83 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( - Verifying that no messages are left unread. - Properly unsubscribing from all channels to avoid interference with other tests. """ - CHANNEL_NAME = "same-channel-name" - MESSAGE_EXACT = get_random_string(10) - MESSAGE_PATTERN = get_random_string(7) - MESSAGE_SHARDED = get_random_string(5) - publish_response = 2 if cluster_mode else OK - callback, context_exact, context_pattern, context_sharded = ( + client_exact, client_pattern, client_sharded, client_dont_care = ( None, None, None, None, ) - callback_messages_exact: List[CoreCommands.PubSubMsg] = [] - callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] - callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] - - if method == MethodTesting.Callback: - callback = new_message - context_exact = callback_messages_exact - context_pattern = callback_messages_pattern - context_sharded = callback_messages_sharded - - # Setup PUBSUB for exact channel - pub_sub_exact = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, - callback=callback, - context=context_exact, - ) - # Setup PUBSUB for pattern channels - pub_sub_pattern = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { - CHANNEL_NAME - } - }, - {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, - callback=callback, - context=context_pattern, - ) - # Setup PUBSUB for pattern channels - pub_sub_sharded = create_pubsub_subscription( - cluster_mode, - { - GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { - CHANNEL_NAME - } - }, - {}, - callback=callback, - context=context_sharded, - ) + try: + CHANNEL_NAME = "same-channel-name" + MESSAGE_EXACT = get_random_string(10) + MESSAGE_PATTERN = get_random_string(7) + MESSAGE_SHARDED = get_random_string(5) + publish_response = 2 if cluster_mode else OK + callback, context_exact, context_pattern, context_sharded = ( + None, + None, + None, + None, + ) + callback_messages_exact: List[CoreCommands.PubSubMsg] = [] + callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] + callback_messages_sharded: List[CoreCommands.PubSubMsg] = [] + + if method == MethodTesting.Callback: + callback = new_message + context_exact = callback_messages_exact + context_pattern = callback_messages_pattern + context_sharded = callback_messages_sharded + + # Setup PUBSUB for exact channel + pub_sub_exact = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Exact: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Exact: {CHANNEL_NAME}}, + callback=callback, + context=context_exact, + ) + # Setup PUBSUB for pattern channels + pub_sub_pattern = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Pattern: { + CHANNEL_NAME + } + }, + {GlideClientConfiguration.PubSubChannelModes.Pattern: {CHANNEL_NAME}}, + callback=callback, + context=context_pattern, + ) + # Setup PUBSUB for pattern channels + pub_sub_sharded = create_pubsub_subscription( + cluster_mode, + { + GlideClusterClientConfiguration.PubSubChannelModes.Sharded: { + CHANNEL_NAME + } + }, + {}, + callback=callback, + context=context_sharded, + ) - client_pattern, client_exact = await create_two_clients( - request, cluster_mode, pub_sub_exact, pub_sub_pattern - ) - _, client_sharded = await create_two_clients( - request, cluster_mode, pub_sub_sharded - ) - # (Valkey version > 7) - if await check_if_server_version_lt(client_pattern, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") + client_exact, client_pattern = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_exact, pub_sub_pattern + ) + client_sharded, client_dont_care = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub_sharded + ) + # (Valkey version > 7) + if await check_if_server_version_lt(client_pattern, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") - try: # Publish messages to each channel - both clients publishing + # TODO: enable when client closing works assert ( await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) == publish_response @@ -1805,6 +2153,12 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( == 1 ) + # await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) + # await client_sharded.publish(MESSAGE_PATTERN, CHANNEL_NAME) + # await cast(GlideClusterClient, client_exact).publish( + # MESSAGE_SHARDED, CHANNEL_NAME, sharded=True + # ) + # allow the message to propagate await asyncio.sleep(1) @@ -1840,16 +2194,33 @@ async def test_pubsub_three_publishing_clients_same_name_with_sharded( ) finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) - await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) - await client_sharded.custom_command(["SUNSUBSCRIBE", CHANNEL_NAME]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await client_exact.custom_command(["UNSUBSCRIBE", CHANNEL_NAME]) + # await client_pattern.custom_command(["PUNSUBSCRIBE", CHANNEL_NAME]) + # await client_sharded.custom_command(["SUNSUBSCRIBE", CHANNEL_NAME]) + if client_exact: + await client_cleanup( + client_exact, pub_sub_exact if cluster_mode else None + ) + + if client_pattern: + await client_cleanup( + client_pattern, pub_sub_pattern if cluster_mode else None + ) + + if client_sharded: + await client_cleanup( + client_sharded, pub_sub_sharded if cluster_mode else None + ) + + if client_dont_care: + await client_cleanup(client_dont_care, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): @@ -1868,9 +2239,8 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): - Ensuring that no additional messages are left after the expected messages are received. """ channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - message2 = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK + message = "1" * 512 * 1024 * 1024 + message2 = "2" * 512 * 1024 * 1024 pub_sub = create_pubsub_subscription( cluster_mode, @@ -1878,26 +2248,35 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub, + timeout=10000, ) try: - assert await publishing_client.publish(message, channel) == publish_response - assert ( - await publishing_client.publish(message2, channel) == publish_response - ) + # TODO: enable when client closing works + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 + + result = await publishing_client.publish(message2, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) - async_msg = await listening_client.get_pubsub_message() - sync_msg = listening_client.try_get_pubsub_message() - assert sync_msg + # await publishing_client.publish(message, channel) + # await publishing_client.publish(message2, channel) + async_msg = await listening_client.get_pubsub_message() assert async_msg.message == message.encode() assert async_msg.channel == channel.encode() assert async_msg.pattern is None + sync_msg = listening_client.try_get_pubsub_message() + assert sync_msg assert sync_msg.message == message2.encode() assert sync_msg.channel == channel.encode() assert sync_msg.pattern is None @@ -1909,14 +2288,21 @@ async def test_pubsub_exact_max_size_message(self, request, cluster_mode: bool): assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True]) async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool): @@ -1934,37 +2320,51 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool - Verifying that the messages are received correctly using both async and sync methods. - Ensuring that no additional messages are left after the expected messages are received. """ - channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - message2 = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - ) + publishing_client, listening_client = None, None + try: + channel = get_random_string(10) + message = "1" * 512 * 1024 * 1024 + message2 = "2" * 512 * 1024 * 1024 + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, + cluster_mode, + pub_sub, + timeout=10000, + ) - # (Redis version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Redis version required >= 7.0.0") + # (Redis version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Redis version required >= 7.0.0") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) - == publish_response + == 1 ) + assert ( - await publishing_client.publish(message2, channel) == publish_response + await cast(GlideClusterClient, publishing_client).publish( + message2, channel, sharded=True + ) + == 1 ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # await publishing_client.publish(message2, channel) + # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) async_msg = await listening_client.get_pubsub_message() sync_msg = listening_client.try_get_pubsub_message() @@ -1985,14 +2385,21 @@ async def test_pubsub_sharded_max_size_message(self, request, cluster_mode: bool assert listening_client.try_get_pubsub_message() is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_exact_max_size_message_callback( @@ -2011,45 +2418,56 @@ async def test_pubsub_exact_max_size_message_callback( - Publishing a maximum size message to the channel. - Verifying that the message is received correctly using the callback method. """ - channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK - - callback_messages: List[CoreCommands.PubSubMsg] = [] - callback, context = new_message, callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, - {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, - callback=callback, - context=context, - ) + listening_client, publishing_client = None, None + try: + channel = get_random_string(10) + message = "0" * 12 * 1024 * 1024 + + callback_messages: List[CoreCommands.PubSubMsg] = [] + callback, context = new_message, callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Exact: {channel}}, + {GlideClientConfiguration.PubSubChannelModes.Exact: {channel}}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub, timeout=10000 + ) - try: - assert await publishing_client.publish(message, channel) == publish_response + # TODO: enable when client closing works + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 + # await publishing_client.publish(message, channel) # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) assert len(callback_messages) == 1 - assert callback_messages[0].message == message - assert callback_messages[0].channel == channel + assert callback_messages[0].message == message.encode() + assert callback_messages[0].channel == channel.encode() assert callback_messages[0].pattern is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.skip( - reason="no way of currently testing this, see https://github.com/aws/glide-for-redis/issues/1649" + reason="This test requires special configuration for client-output-buffer-limit for valkey-server and timeouts seems to vary across platforms and server versions" ) @pytest.mark.parametrize("cluster_mode", [True]) async def test_pubsub_sharded_max_size_message_callback( @@ -2068,51 +2486,64 @@ async def test_pubsub_sharded_max_size_message_callback( - Publishing a maximum size message to the channel. - Verifying that the message is received correctly using the callback method. """ - channel = get_random_string(10) - message = get_random_string(512 * 1024 * 1024) - publish_response = 1 if cluster_mode else OK - - callback_messages: List[CoreCommands.PubSubMsg] = [] - callback, context = new_message, callback_messages - - pub_sub = create_pubsub_subscription( - cluster_mode, - {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, - {}, - callback=callback, - context=context, - ) + publishing_client, listening_client = None, None + try: + channel = get_random_string(10) + message = "0" * 512 * 1024 * 1024 + + callback_messages: List[CoreCommands.PubSubMsg] = [] + callback, context = new_message, callback_messages + + pub_sub = create_pubsub_subscription( + cluster_mode, + {GlideClusterClientConfiguration.PubSubChannelModes.Sharded: {channel}}, + {}, + callback=callback, + context=context, + ) - publishing_client, listening_client = await create_two_clients( - request, cluster_mode, pub_sub - ) + listening_client, publishing_client = await create_two_clients_with_pubsub( + request, cluster_mode, pub_sub, timeout=10000 + ) - # (Valkey version > 7) - if await check_if_server_version_lt(publishing_client, "7.0.0"): - pytest.skip("Valkey version required >= 7.0.0") + # (Valkey version > 7) + if await check_if_server_version_lt(publishing_client, "7.0.0"): + pytest.skip("Valkey version required >= 7.0.0") - try: + # TODO: enable when client closing works assert ( await cast(GlideClusterClient, publishing_client).publish( message, channel, sharded=True ) - == publish_response + == 1 ) + + # await cast(GlideClusterClient, publishing_client).publish( + # message, channel, sharded=True + # ) + # allow the message to propagate - await asyncio.sleep(5) + await asyncio.sleep(15) assert len(callback_messages) == 1 - assert callback_messages[0].message == message - assert callback_messages[0].channel == channel + assert callback_messages[0].message == message.encode() + assert callback_messages[0].channel == channel.encode() assert callback_messages[0].pattern is None finally: - if cluster_mode: - # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running - # In cluster mode, we check how many subscriptions received the message - # So to avoid flakiness, we make sure to unsubscribe from the channels - await listening_client.custom_command(["UNSUBSCRIBE", channel]) + # if cluster_mode: + # # Since all tests run on the same cluster, when closing the client, garbage collector can be called after another test will start running + # # In cluster mode, we check how many subscriptions received the message + # # So to avoid flakiness, we make sure to unsubscribe from the channels + # await listening_client.custom_command(["UNSUBSCRIBE", channel]) + if listening_client: + await client_cleanup( + listening_client, pub_sub if cluster_mode else None + ) + + if publishing_client: + await client_cleanup(publishing_client, None) @pytest.mark.parametrize("cluster_mode", [True, False]) async def test_pubsub_resp2_raise_an_error(self, request, cluster_mode: bool): @@ -2126,7 +2557,7 @@ async def test_pubsub_resp2_raise_an_error(self, request, cluster_mode: bool): ) with pytest.raises(ConfigurationError): - await create_two_clients( + await create_two_clients_with_pubsub( request, cluster_mode, pub_sub_exact, protocol=ProtocolVersion.RESP2 ) @@ -2145,4 +2576,4 @@ async def test_pubsub_context_with_no_callback_raise_error( ) with pytest.raises(ConfigurationError): - await create_two_clients(request, cluster_mode, pub_sub_exact) + await create_two_clients_with_pubsub(request, cluster_mode, pub_sub_exact)