From 5aae63903b6396b00b734b4ed47892f0f1effa5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 14 May 2024 06:43:14 +0000 Subject: [PATCH 1/6] u/xid: use string_view in xid constructor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- src/v/utils/xid.cc | 4 ++-- src/v/utils/xid.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/utils/xid.cc b/src/v/utils/xid.cc index b4e62006b03b8..6015f2af2626b 100644 --- a/src/v/utils/xid.cc +++ b/src/v/utils/xid.cc @@ -39,10 +39,10 @@ constexpr decoder_t build_decoder_table() { static constexpr decoder_t decoder = build_decoder_table(); } // namespace -invalid_xid::invalid_xid(const ss::sstring& current_string) +invalid_xid::invalid_xid(std::string_view current_string) : _msg(ssx::sformat("String '{}' is not a valid xid", current_string)) {} -xid xid::from_string(const ss::sstring& str) { +xid xid::from_string(std::string_view str) { if (str.size() != str_size) { throw invalid_xid(str); } diff --git a/src/v/utils/xid.h b/src/v/utils/xid.h index 61600e7d057ca..42466f3782233 100644 --- a/src/v/utils/xid.h +++ b/src/v/utils/xid.h @@ -28,7 +28,7 @@ */ class invalid_xid final : public std::exception { public: - explicit invalid_xid(const ss::sstring&); + explicit invalid_xid(std::string_view); const char* what() const noexcept final { return _msg.c_str(); } private: @@ -72,7 +72,7 @@ class xid { * * @return an xid decoded from the string provided */ - static xid from_string(const ss::sstring&); + static xid from_string(std::string_view); friend bool operator==(const xid&, const xid&) = default; From 934c09c0cceb52125f652a0547cce157a985b927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 14 May 2024 08:44:39 +0000 Subject: [PATCH 2/6] k/request_context: introduced method to override client id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When using mpx protocol extension actual client id is only a part of the whole client id buffer sent by MPX to Redpanda. Added a method allowing overriding client id. Signed-off-by: Michał Maślanka --- src/v/kafka/server/request_context.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 7176654d397c3..f9582a0f2e3a2 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -106,6 +106,12 @@ class request_context { const request_header& header() const { return _header; } + // override the client id. This method is used when handling virtual + // connections and an actual client id is part of the client id buffer. + void override_client_id(std::optional new_client_id) { + _header.client_id = new_client_id; + } + ss::lw_shared_ptr connection() { return _conn; } ssx::sharded_abort_source& abort_source() { return _conn->abort_source(); } From b4cf84379017963f0fd46422a20ae4d9aad4cfa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 14 May 2024 08:54:51 +0000 Subject: [PATCH 3/6] k/connection_context: made client id parsing vcluster aware MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously Redpanda virtualized Kafka connections based on the full client_id string and the string was used as a client id in all downstream processing. Change the parsing logic to add context to the parsed client id. The client id format expected by Redpanda has the following structure: ``` [vcluster_id][connection_id][actual client id] ``` where: `vcluster_id` - string encoded XID representing virtual cluster (20 characters) `connection_id` - hex encoded 32 bit integer representing virtual connection id (8 characters) `client_id` - standard protocol defined client id If Redpanda fails to parse the client id while working with virtualized connections the whole connection is closed. Signed-off-by: Michał Maślanka --- src/v/kafka/server/connection_context.cc | 113 ++++++++++++++++++++--- src/v/kafka/server/connection_context.h | 25 ++++- 2 files changed, 124 insertions(+), 14 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 10e6027f9c320..f8ddab2a72549 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -64,24 +64,94 @@ class invalid_virtual_connection_id : public std::exception { explicit invalid_virtual_connection_id(ss::sstring msg) : _msg(std::move(msg)) {} + const char* what() const noexcept final { return _msg.c_str(); } + private: ss::sstring _msg; }; -bytes parse_virtual_connection_id(const kafka::request_header& header) { +// Tuple containing virtual connection id and client id. It is returned after +// parsing parts of virtual connection id. +struct virtual_connection_client_id { + virtual_connection_id v_connection_id; + std::optional client_id; +}; + +const std::regex hex_characters_regexp{R"REGEX(^[a-f0-9A-F]{8}$)REGEX"}; + +vcluster_connection_id +parse_vcluster_connection_id(const std::string& hex_str) { + std::smatch matches; + auto match = std::regex_match( + hex_str.cbegin(), hex_str.cend(), matches, hex_characters_regexp); + if (!match) { + throw invalid_virtual_connection_id(fmt::format( + "virtual cluster connection id '{}' is not a hexadecimal integer", + hex_str)); + } + + vcluster_connection_id cid; + + std::stringstream sstream(hex_str); + sstream >> std::hex >> cid; + return cid; +} + +/** + * Virtual connection id is encoded as with the following structure: + * + * [vcluster_id][connection_id][actual client id] + * + * vcluster_id - is a string encoded XID representing virtual cluster (20 + * characters) + * connection_id - is a hex encoded 32 bit integer representing virtual + * connection id (8 characters) + * + * client_id - standard protocol defined client id + */ +virtual_connection_client_id +parse_virtual_connection_id(const kafka::request_header& header) { + static constexpr size_t connection_id_str_size + = sizeof(vcluster_connection_id::type) * 2; + static constexpr size_t v_connection_id_size = xid::str_size + + connection_id_str_size; if (header.client_id_buffer.empty()) { throw invalid_virtual_connection_id( "virtual connection client id can not be empty"); } - // TODO: should we use vcluster_id here ? - return bytes{ - reinterpret_cast(header.client_id_buffer.begin()), - header.client_id_buffer.size()}; + if (header.client_id->size() < v_connection_id_size) { + throw invalid_virtual_connection_id(fmt::format( + "virtual connection client id size must contain at least {} " + "characters. Current size: {}", + v_connection_id_size, + header.client_id_buffer.size())); + } + try { + virtual_connection_id connection_id{ + .virtual_cluster_id = xid::from_string( + std::string_view(header.client_id->begin(), xid::str_size)), + .connection_id = parse_vcluster_connection_id(std::string( + std::next(header.client_id_buffer.begin(), xid::str_size), + connection_id_str_size))}; + + return virtual_connection_client_id{ + .v_connection_id = connection_id, + // a reminder of client id buffer is used as a standard protocol + // client_id. + .client_id + = header.client_id_buffer.size() == v_connection_id_size + ? std::nullopt + : std::make_optional( + std::next( + header.client_id_buffer.begin(), v_connection_id_size), + header.client_id_buffer.size() - v_connection_id_size), + }; + } catch (const invalid_xid& e) { + throw invalid_virtual_connection_id(e.what()); + } } - } // namespace - connection_context::connection_context( std::optional< std::reference_wrapper>> hook, @@ -637,17 +707,25 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { * Not virtualized connection, simply forward to protocol state for request * processing. */ - if (!_is_virtualized_connection) { + if ( + !_is_virtualized_connection + || rctx.header().client_id == multi_proxy_initial_client_id) { co_return co_await _protocol_state.process_request( shared_from_this(), std::move(rctx), sres); } - - auto v_connection_id = parse_virtual_connection_id(rctx.header()); + auto client_connection_id = parse_virtual_connection_id(rctx.header()); + rctx.override_client_id(client_connection_id.client_id); + vlog( + klog.trace, + "request from virtual connection {}, client id: {}", + client_connection_id.v_connection_id, + client_connection_id.client_id); auto it = _virtual_states.lazy_emplace( - v_connection_id, [v_connection_id](const auto& ctr) mutable { + client_connection_id.v_connection_id, + [v_connection_id = client_connection_id.v_connection_id]( + const auto& ctr) mutable { return ctr( - std::move(v_connection_id), - ss::make_lw_shared()); + v_connection_id, ss::make_lw_shared()); }); co_await it->second->process_request( @@ -865,4 +943,13 @@ ss::future<> connection_context::client_protocol_state::maybe_process_responses( }); } +std::ostream& operator<<(std::ostream& o, const virtual_connection_id& id) { + fmt::print( + o, + "{{virtual_cluster_id: {}, connection_id: {}}}", + id.virtual_cluster_id, + id.connection_id); + return o; +} + } // namespace kafka diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 37dbf6a1919cd..63977312ee1f6 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -120,7 +120,28 @@ struct session_resources { std::unique_ptr tracker; request_data request_data; }; +using vcluster_connection_id + = named_type; +/** + * Struct representing virtual connection identifier. Each virtual cluster may + * have multiple connections identified with connection_id. + */ +struct virtual_connection_id { + xid virtual_cluster_id; + vcluster_connection_id connection_id; + + template + friend H AbslHashValue(H h, const virtual_connection_id& id) { + return H::combine( + std::move(h), id.virtual_cluster_id, id.connection_id); + } + friend bool + operator==(const virtual_connection_id&, const virtual_connection_id&) + = default; + friend std::ostream& + operator<<(std::ostream& o, const virtual_connection_id& id); +}; class connection_context final : public ss::enable_lw_shared_from_this , public boost::intrusive::list_base_hook<> { @@ -416,7 +437,9 @@ class connection_context final * A map keeping virtual connection states, during default operation the map * is empty */ - absl::node_hash_map> + absl::node_hash_map< + virtual_connection_id, + ss::lw_shared_ptr> _virtual_states; ss::gate _gate; From 5a11716c3a6381bc84a2bded56a74c81cf0f9582 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 14 May 2024 08:59:32 +0000 Subject: [PATCH 4/6] tests: extracted xid generation to utils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- .../tests/vcluster_topic_property_test.py | 11 +++-------- tests/rptest/utils/xid_utils.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 8 deletions(-) create mode 100644 tests/rptest/utils/xid_utils.py diff --git a/tests/rptest/tests/vcluster_topic_property_test.py b/tests/rptest/tests/vcluster_topic_property_test.py index 7d9511259f74d..4b8aee4f8aa66 100644 --- a/tests/rptest/tests/vcluster_topic_property_test.py +++ b/tests/rptest/tests/vcluster_topic_property_test.py @@ -14,7 +14,7 @@ from rptest.tests.redpanda_test import RedpandaTest from random import randbytes -BASE32_ALPHABET = "0123456789abcdefghijklmnopqrstuv" +from rptest.utils.xid_utils import random_xid_string def get_vcluster_id(topic_properties: dict): @@ -28,17 +28,12 @@ def __init__(self, test_context): super(VirtualClusterTopicPropertyTest, self).__init__(test_context=test_context, num_brokers=3) - def _random_xid_string(self): - xid = ''.join([random.choice(BASE32_ALPHABET) for _ in range(19)]) - - return xid + random.choice(['0', 'g']) - @cluster(num_nodes=3) def test_basic_virtual_cluster_property(self): rpk = RpkTool(self.redpanda) # try creating vcluster enabled topic with extensions disabled topic = TopicSpec() - vcluster_id = self._random_xid_string() + vcluster_id = random_xid_string() rpk.create_topic( topic=topic.name, partitions=1, @@ -72,7 +67,7 @@ def test_basic_virtual_cluster_property(self): try: rpk.alter_topic_config(topic.name, TopicSpec.PROPERTY_VIRTUAL_CLUSTER_ID, - self._random_xid_string()) + random_xid_string()) assert False, "Altering topic virtual cluster property should not be supported" except RpkException as e: assert "INVALID_CONFIG" in e.msg diff --git a/tests/rptest/utils/xid_utils.py b/tests/rptest/utils/xid_utils.py new file mode 100644 index 0000000000000..76ec6f672ea72 --- /dev/null +++ b/tests/rptest/utils/xid_utils.py @@ -0,0 +1,18 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import random + +BASE32_ALPHABET = "0123456789abcdefghijklmnopqrstuv" + + +def random_xid_string(): + xid = ''.join([random.choice(BASE32_ALPHABET) for _ in range(19)]) + + return xid + random.choice(['0', 'g']) From 953135b6f5a11db239bc6b3d2a5dec177a4b7830 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 14 May 2024 08:59:52 +0000 Subject: [PATCH 5/6] tests: updated test validating handling of virtualized connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- .../tests/connection_virtualizing_test.py | 88 +++++++++++++++++-- 1 file changed, 79 insertions(+), 9 deletions(-) diff --git a/tests/rptest/tests/connection_virtualizing_test.py b/tests/rptest/tests/connection_virtualizing_test.py index 6077e68aca6bd..c086b0d036fca 100644 --- a/tests/rptest/tests/connection_virtualizing_test.py +++ b/tests/rptest/tests/connection_virtualizing_test.py @@ -8,10 +8,12 @@ # by the Apache License, Version 2.0 from dataclasses import dataclass +import random from types import MethodType from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec from kafka.protocol.fetch import FetchRequest +from ducktape.mark import matrix from rptest.tests.redpanda_test import RedpandaTest from rptest.util import wait_until @@ -22,6 +24,8 @@ from kafka import KafkaClient, KafkaConsumer from kafka.protocol.produce import ProduceRequest +from rptest.utils.xid_utils import random_xid_string + @dataclass class PartitionInfo: @@ -157,6 +161,10 @@ def no_validation_process_response(self, read_buffer): return (recv_correlation_id, response) +def create_client_id(vcluster_id: str, connection_id: int, client_id: str): + return f"{vcluster_id}{connection_id:08x}{client_id}" + + class TestVirtualConnections(RedpandaTest): def __init__(self, test_context): super(TestVirtualConnections, self).__init__( @@ -187,7 +195,10 @@ def _fetch_and_produce(self, client: MpxMockClient, topic: str, return (fetch_fut, produce_fut) @cluster(num_nodes=3) - def test_no_head_of_line_blocking(self): + @matrix(different_clusters=[True, False], + different_connections=[True, False]) + def test_no_head_of_line_blocking(self, different_clusters, + different_connections): # create topic with single partition spec = TopicSpec(partition_count=1, replication_factor=3) @@ -195,14 +206,20 @@ def test_no_head_of_line_blocking(self): mpx_client = MpxMockClient(self.redpanda) mpx_client.start() + v_cluster_1 = random_xid_string() + v_cluster_2 = random_xid_string() + fetch_client = create_client_id(v_cluster_1, 0, "client-fetch") + produce_client = create_client_id( + v_cluster_1 if not different_clusters else v_cluster_2, + 0 if not different_connections else 1, "client-produce") # validate that fetch request is blocking produce request first as mpx extensions are disabled (fetch_fut, produce_fut) = self._fetch_and_produce( client=mpx_client, topic=spec.name, partition=0, - fetch_client_id="v-cluster-1", - produce_client_id="v-cluster-2") + fetch_client_id=fetch_client, + produce_client_id=produce_client) mpx_client.poll(produce_fut) assert produce_fut.is_done and produce_fut.succeeded @@ -231,8 +248,8 @@ def test_no_head_of_line_blocking(self): client=mpx_client, topic=spec.name, partition=0, - fetch_client_id="v-cluster-10", - produce_client_id="v-cluster-20") + fetch_client_id=fetch_client, + produce_client_id=produce_client) for connection in mpx_client.client._conns.values(): if len(connection._protocol.in_flight_requests) == 2: @@ -241,9 +258,11 @@ def test_no_head_of_line_blocking(self): no_validation_process_response, connection._protocol) # wait for fetch as it will be released after produce finishes + should_interleave_requests = different_clusters or different_connections def _produce_is_ready(): - mpx_client.poll(fetch_fut) + mpx_client.poll( + fetch_fut if should_interleave_requests else produce_fut) return produce_fut.is_done wait_until( @@ -260,7 +279,58 @@ def _produce_is_ready(): f_resp = fetch_fut.value - #assert produce_fut.is_done and produce_fut.succeeded, "produce future should be ready when fetch resolved" - assert f_resp.topics[0][1][0][ - 6] != b'', "Fetch should be unblocked by produce from another virtual connection" + if should_interleave_requests: + assert f_resp.topics[0][1][0][ + 6] != b'', "Fetch should be unblocked by produce from another virtual connection" + else: + assert f_resp.topics[0][1][0][ + 6] == b'', "Fetch should be executed before the produce finishes" mpx_client.close() + + @cluster(num_nodes=3) + def test_handling_invalid_ids(self): + self.redpanda.set_cluster_config({"enable_mpx_extensions": True}) + # create topic with single partition + spec = TopicSpec(partition_count=1, replication_factor=3) + topic = spec.name + self.client().create_topic(spec) + + def produce_with_client(client_id: str): + mpx_client = MpxMockClient(self.redpanda) + mpx_client.start() + partition_info = mpx_client.get_partition_info(topic, 0) + mpx_client.set_client_id(client_id) + produce_fut = mpx_client.send( + node_id=partition_info.leader_id, + request=mpx_client.create_produce_request(topic=topic, + partition=0)) + + mpx_client.poll(produce_fut) + assert produce_fut.is_done and produce_fut.succeeded + pi = mpx_client.get_partition_info(topic, 0) + mpx_client.close() + return pi + + v_cluster = random_xid_string() + valid_client_id = create_client_id(v_cluster, 0, "client-fetch") + p_info = produce_with_client(valid_client_id) + + assert p_info.end_offset > 0, "Produce request should be successful" + starting_end_offset = p_info.end_offset + invalid_xid_id = create_client_id("zzzzzzzzzzzzzzzzzzzz", 0, + "client-fetch") + + p_info = produce_with_client(invalid_xid_id) + + assert starting_end_offset == p_info.end_offset, "Produce request with invalid client id should fail" + + invalid_connection_id = f"{v_cluster}00blob00client" + + p_info = produce_with_client(invalid_connection_id) + + assert starting_end_offset == p_info.end_offset, "Produce request with invalid client id should fail" + + valid_client_id_empty = create_client_id(v_cluster, 0, "") + p_info = produce_with_client(valid_client_id_empty) + + assert starting_end_offset < p_info.end_offset, "Produce request with valid client id should succeed " From 187dcbf54ad2b5c65fe09d3fcf25a2864790f8c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Wed, 15 May 2024 14:25:40 +0000 Subject: [PATCH 6/6] k/connection_context: replaced node hash map with chunked hash map MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced `node_hash_map` keeping state of virtualized connections with `chunked_hash_map`. The change will allow us to avoid large allocations when dealing with large virtual connection number. Signed-off-by: Michał Maślanka --- src/v/kafka/server/connection_context.cc | 15 ++++++++------- src/v/kafka/server/connection_context.h | 3 ++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index f8ddab2a72549..49344846ac02c 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -720,13 +720,14 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { "request from virtual connection {}, client id: {}", client_connection_id.v_connection_id, client_connection_id.client_id); - auto it = _virtual_states.lazy_emplace( - client_connection_id.v_connection_id, - [v_connection_id = client_connection_id.v_connection_id]( - const auto& ctr) mutable { - return ctr( - v_connection_id, ss::make_lw_shared()); - }); + + auto it = _virtual_states.find(client_connection_id.v_connection_id); + if (it == _virtual_states.end()) { + auto p = _virtual_states.emplace( + client_connection_id.v_connection_id, + ss::make_lw_shared()); + it = p.first; + } co_await it->second->process_request( shared_from_this(), std::move(rctx), sres); diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 63977312ee1f6..2451cbf408591 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -11,6 +11,7 @@ #pragma once #include "base/seastarx.h" #include "config/property.h" +#include "container/chunked_hash_map.h" #include "kafka/server/fwd.h" #include "kafka/server/handlers/handler_probe.h" #include "kafka/server/logger.h" @@ -437,7 +438,7 @@ class connection_context final * A map keeping virtual connection states, during default operation the map * is empty */ - absl::node_hash_map< + chunked_hash_map< virtual_connection_id, ss::lw_shared_ptr> _virtual_states;