Skip to content

Commit

Permalink
tests: updated test validating handling of virtualized connections
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed May 14, 2024
1 parent 99ccf3e commit b28ebec
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions tests/rptest/tests/connection_virtualizing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
# by the Apache License, Version 2.0

from dataclasses import dataclass
import random
from types import MethodType
from rptest.services.cluster import cluster
from rptest.clients.types import TopicSpec
from kafka.protocol.fetch import FetchRequest
from ducktape.mark import matrix

from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until
Expand All @@ -22,6 +24,8 @@
from kafka import KafkaClient, KafkaConsumer
from kafka.protocol.produce import ProduceRequest

from rptest.utils.xid_utils import random_xid_string


@dataclass
class PartitionInfo:
Expand Down Expand Up @@ -157,6 +161,10 @@ def no_validation_process_response(self, read_buffer):
return (recv_correlation_id, response)


def create_client_id(vcluster_id: str, connection_id: int, client_id: str):
return f"{vcluster_id}{connection_id:08x}{client_id}"


class TestVirtualConnections(RedpandaTest):
def __init__(self, test_context):
super(TestVirtualConnections, self).__init__(
Expand Down Expand Up @@ -187,22 +195,31 @@ def _fetch_and_produce(self, client: MpxMockClient, topic: str,
return (fetch_fut, produce_fut)

@cluster(num_nodes=3)
def test_no_head_of_line_blocking(self):
@matrix(different_clusters=[True, False],
different_connections=[True, False])
def test_no_head_of_line_blocking(self, different_clusters,
different_connections):

# create topic with single partition
spec = TopicSpec(partition_count=1, replication_factor=3)
self.client().create_topic(spec)

mpx_client = MpxMockClient(self.redpanda)
mpx_client.start()
v_cluster_1 = random_xid_string()
v_cluster_2 = random_xid_string()

fetch_client = create_client_id(v_cluster_1, 0, "client-fetch")
produce_client = create_client_id(
v_cluster_1 if not different_clusters else v_cluster_2,
0 if not different_connections else 1, "client-produce")
# validate that fetch request is blocking produce request first as mpx extensions are disabled
(fetch_fut, produce_fut) = self._fetch_and_produce(
client=mpx_client,
topic=spec.name,
partition=0,
fetch_client_id="v-cluster-1",
produce_client_id="v-cluster-2")
fetch_client_id=fetch_client,
produce_client_id=produce_client)

mpx_client.poll(produce_fut)
assert produce_fut.is_done and produce_fut.succeeded
Expand Down Expand Up @@ -231,8 +248,8 @@ def test_no_head_of_line_blocking(self):
client=mpx_client,
topic=spec.name,
partition=0,
fetch_client_id="v-cluster-10",
produce_client_id="v-cluster-20")
fetch_client_id=fetch_client,
produce_client_id=produce_client)

for connection in mpx_client.client._conns.values():
if len(connection._protocol.in_flight_requests) == 2:
Expand All @@ -241,9 +258,11 @@ def test_no_head_of_line_blocking(self):
no_validation_process_response, connection._protocol)

# wait for fetch as it will be released after produce finishes
should_interleave_requests = different_clusters or different_connections

def _produce_is_ready():
mpx_client.poll(fetch_fut)
mpx_client.poll(
fetch_fut if should_interleave_requests else produce_fut)
return produce_fut.is_done

wait_until(
Expand All @@ -260,7 +279,10 @@ def _produce_is_ready():

f_resp = fetch_fut.value

#assert produce_fut.is_done and produce_fut.succeeded, "produce future should be ready when fetch resolved"
assert f_resp.topics[0][1][0][
6] != b'', "Fetch should be unblocked by produce from another virtual connection"
if should_interleave_requests:
assert f_resp.topics[0][1][0][
6] != b'', "Fetch should be unblocked by produce from another virtual connection"
else:
assert f_resp.topics[0][1][0][
6] == b'', "Fetch should be executed before the produce finishes"
mpx_client.close()

0 comments on commit b28ebec

Please sign in to comment.