diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 5621ee30f208a..ab172470dff1d 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -496,12 +496,12 @@ configuration::configuration() , target_quota_byte_rate( *this, "target_quota_byte_rate", - "Target request size quota byte rate (bytes per second) - 2GB default", + "Target request size quota byte rate (bytes per second)", {.needs_restart = needs_restart::no, .example = "1073741824", .visibility = visibility::user}, target_produce_quota_byte_rate_default, - {.min = 1_MiB}) + {.min = 0}) , target_fetch_quota_byte_rate( *this, "target_fetch_quota_byte_rate", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 479b98c9345ba..92c65d77c7541 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -46,7 +46,8 @@ namespace config { /// can not depend on any other module to prevent cyclic dependencies. struct configuration final : public config_store { - constexpr static auto target_produce_quota_byte_rate_default = 2_GiB; + constexpr static auto target_produce_quota_byte_rate_default + = 0; // disabled // WAL bounded_property log_segment_size; diff --git a/src/v/kafka/server/client_quota_translator.cc b/src/v/kafka/server/client_quota_translator.cc index 509cbc612472c..2140b691f101c 100644 --- a/src/v/kafka/server/client_quota_translator.cc +++ b/src/v/kafka/server/client_quota_translator.cc @@ -18,6 +18,10 @@ #include #include +#include + +#include + namespace kafka { using cluster::client_quota::entity_key; @@ -292,8 +296,11 @@ client_quota_translator::get_quota_config(client_quota_type qt) const { std::optional client_quota_translator::get_default_config(client_quota_type qt) const { switch (qt) { - case kafka::client_quota_type::produce_quota: - return _default_target_produce_tp_rate(); + case kafka::client_quota_type::produce_quota: { + auto produce_quota = _default_target_produce_tp_rate(); + return (produce_quota > 0) ? std::make_optional(produce_quota) + : std::nullopt; + } case kafka::client_quota_type::fetch_quota: return _default_target_fetch_tp_rate(); case kafka::client_quota_type::partition_mutation_quota: @@ -325,4 +332,11 @@ void client_quota_translator::maybe_log_deprecated_configs_nag() const { } } +bool client_quota_translator::is_empty() const { + return _quota_store.local().size() == 0 + && absl::c_all_of(all_client_quota_types, [this](auto qt) { + return !get_default_config(qt); + }); +} + } // namespace kafka diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index f47c1b397ea29..e0e8a019cd840 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -141,6 +141,9 @@ class client_quota_translator { /// `watch` can be used to register for quota changes void watch(on_change_fn&& fn); + /// Returns true if there are no quotas configured + bool is_empty() const; + private: using quota_config = std::unordered_map; diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index 96a135bf1fd6e..085a32d483b32 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -320,6 +320,9 @@ ss::future quota_manager::record_partition_mutations( /// KIP-599 throttles create_topics / delete_topics / create_partitions /// request. This delay should only be applied to these requests if the /// quota has been exceeded + if (_translator.is_empty()) { + co_return 0ms; + } auto ctx = client_quota_request_ctx{ .q_type = client_quota_type::partition_mutation_quota, .client_id = client_id, @@ -391,6 +394,9 @@ ss::future quota_manager::record_produce_tp_and_throttle( std::optional client_id, uint64_t bytes, clock::time_point now) { + if (_translator.is_empty()) { + co_return 0ms; + } auto ctx = client_quota_request_ctx{ .q_type = client_quota_type::produce_quota, .client_id = client_id, @@ -440,6 +446,9 @@ ss::future<> quota_manager::record_fetch_tp( std::optional client_id, uint64_t bytes, clock::time_point now) { + if (_translator.is_empty()) { + co_return; + } auto ctx = client_quota_request_ctx{ .q_type = client_quota_type::fetch_quota, .client_id = client_id, @@ -469,6 +478,9 @@ ss::future<> quota_manager::record_fetch_tp( ss::future quota_manager::throttle_fetch_tp( std::optional client_id, clock::time_point now) { + if (_translator.is_empty()) { + co_return 0ms; + } auto ctx = client_quota_request_ctx{ .q_type = client_quota_type::fetch_quota, .client_id = client_id, diff --git a/src/v/kafka/server/tests/client_quota_translator_test.cc b/src/v/kafka/server/tests/client_quota_translator_test.cc index f3ebe29e5cabe..d7808b66983d0 100644 --- a/src/v/kafka/server/tests/client_quota_translator_test.cc +++ b/src/v/kafka/server/tests/client_quota_translator_test.cc @@ -87,7 +87,7 @@ SEASTAR_THREAD_TEST_CASE(quota_translator_default_test) { fixture f; auto default_limits = client_quota_limits{ - .produce_limit = scale_to_smp_count(2147483648), + .produce_limit = std::nullopt, .fetch_limit = std::nullopt, .partition_mutation_limit = std::nullopt, }; @@ -96,6 +96,7 @@ SEASTAR_THREAD_TEST_CASE(quota_translator_default_test) { auto limits = f.tr.find_quota_value(key); BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(default_limits, limits); + BOOST_CHECK(f.tr.is_empty()); } SEASTAR_THREAD_TEST_CASE(quota_translator_modified_default_test) { @@ -116,6 +117,7 @@ SEASTAR_THREAD_TEST_CASE(quota_translator_modified_default_test) { auto limits = f.tr.find_quota_value(key); BOOST_CHECK_EQUAL(test_client_id_key, key); BOOST_CHECK_EQUAL(expected_limits, limits); + BOOST_CHECK(!f.tr.is_empty()); } void run_quota_translator_client_group_test(fixture& f) {