From 0b79515669d632291d7b3a52631c70145b6807d7 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Thu, 27 Apr 2023 16:13:31 +0100 Subject: [PATCH 1/5] application: Enable memory sampling Adds a new memory_sampling service. The service enables seastar sampled memory profiling. It hooks into the batch cache which calls back into the service whenever a reclaim is run. The service then prints top-n allocation sites once we reach the thresholds of 10% and 20% of available memory left. --- src/v/cluster/tests/local_monitor_fixture.h | 2 + .../tests/bootstrap_configuration_test.cc | 8 +- .../raft/tests/configuration_manager_test.cc | 11 +- src/v/raft/tests/foreign_entry_test.cc | 8 +- src/v/raft/tests/mux_state_machine_fixture.h | 15 ++- src/v/raft/tests/offset_translator_tests.cc | 8 +- src/v/raft/tests/raft_group_fixture.h | 5 +- src/v/redpanda/application.cc | 8 +- src/v/redpanda/application.h | 2 + src/v/resource_mgmt/CMakeLists.txt | 1 + src/v/resource_mgmt/memory_sampling.cc | 127 ++++++++++++++++++ src/v/resource_mgmt/memory_sampling.h | 79 +++++++++++ src/v/resource_mgmt/tests/CMakeLists.txt | 21 +++ .../tests/memory_sampling_reclaimer_test.cc | 77 +++++++++++ .../tests/memory_sampling_tests.cc | 89 ++++++++++++ src/v/storage/api.h | 14 +- src/v/storage/batch_cache.cc | 14 +- src/v/storage/batch_cache.h | 6 +- src/v/storage/log_manager.cc | 6 +- src/v/storage/log_manager.h | 4 +- .../storage/tests/batch_cache_reclaim_test.cc | 6 +- src/v/storage/tests/batch_cache_test.cc | 33 ++++- src/v/storage/tests/log_manager_test.cc | 18 ++- src/v/storage/tests/storage_test_fixture.h | 19 ++- src/v/storage/tests/utils/disk_log_builder.cc | 9 +- src/v/storage/tests/utils/disk_log_builder.h | 3 + src/v/test_utils/logs.h | 94 ++++++++----- 27 files changed, 616 insertions(+), 71 deletions(-) create mode 100644 src/v/resource_mgmt/memory_sampling.cc create mode 100644 src/v/resource_mgmt/memory_sampling.h create mode 100644 src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc create mode 100644 src/v/resource_mgmt/tests/memory_sampling_tests.cc diff --git a/src/v/cluster/tests/local_monitor_fixture.h b/src/v/cluster/tests/local_monitor_fixture.h index b6889daf4f85..e7d0616938a3 100644 --- a/src/v/cluster/tests/local_monitor_fixture.h +++ b/src/v/cluster/tests/local_monitor_fixture.h @@ -11,9 +11,11 @@ #pragma once #include "cluster/node/local_monitor.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include +#include #include diff --git a/src/v/raft/tests/bootstrap_configuration_test.cc b/src/v/raft/tests/bootstrap_configuration_test.cc index 514145e63912..f9d2923ef0c8 100644 --- a/src/v/raft/tests/bootstrap_configuration_test.cc +++ b/src/v/raft/tests/bootstrap_configuration_test.cc @@ -16,6 +16,7 @@ #include "raft/consensus_utils.h" #include "random/generators.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/log.h" #include "storage/log_manager.h" @@ -49,12 +50,14 @@ struct bootstrap_fixture : raft::simple_record_fixture { storage::with_cache::no, storage::make_sanitized_file_config()); }, - _feature_table) { + _feature_table, + _memory_sampling_service) { _feature_table.start().get(); _feature_table .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service.start(std::ref(_test_logger)).get(); _storage.start().get(); // ignore the get_log() (void)_storage.log_mgr() @@ -81,10 +84,13 @@ struct bootstrap_fixture : raft::simple_record_fixture { ~bootstrap_fixture() { _storage.stop().get(); + _memory_sampling_service.stop().get(); _feature_table.stop().get(); } + seastar::logger _test_logger{"bootstrap-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::api _storage; ss::abort_source _as; }; diff --git a/src/v/raft/tests/configuration_manager_test.cc b/src/v/raft/tests/configuration_manager_test.cc index c74be452e905..fde7bcfb5a16 100644 --- a/src/v/raft/tests/configuration_manager_test.cc +++ b/src/v/raft/tests/configuration_manager_test.cc @@ -16,6 +16,7 @@ #include "raft/logger.h" #include "raft/types.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -25,6 +26,7 @@ #include "units.h" #include +#include #include @@ -51,7 +53,8 @@ struct config_manager_fixture { ss::default_priority_class(), storage::make_sanitized_file_config()); }, - _feature_table)) + _feature_table, + _memory_sampling_service)) , _logger( raft::group_id(1), model::ntp(model::ns("t"), model::topic("t"), model::partition_id(0))) @@ -66,19 +69,23 @@ struct config_manager_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service.start(std::ref(_test_logger)).get(); _storage.start().get0(); } ss::sstring base_dir = "test_cfg_manager_" + random_generators::gen_alphanum_string(6); + ss::logger _test_logger{"config-mgmr-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::api _storage; raft::ctx_log _logger; raft::configuration_manager _cfg_mgr; ~config_manager_fixture() { - _feature_table.stop().get(); _storage.stop().get0(); + _memory_sampling_service.stop().get(); + _feature_table.stop().get(); } raft::group_configuration random_configuration() { diff --git a/src/v/raft/tests/foreign_entry_test.cc b/src/v/raft/tests/foreign_entry_test.cc index 2f5ce825e061..637c7a139f4b 100644 --- a/src/v/raft/tests/foreign_entry_test.cc +++ b/src/v/raft/tests/foreign_entry_test.cc @@ -21,6 +21,7 @@ #include "raft/types.h" #include "random/generators.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/log.h" #include "storage/log_manager.h" @@ -61,12 +62,14 @@ struct foreign_entry_fixture { ss::default_priority_class(), storage::make_sanitized_file_config()); }, - _feature_table) { + _feature_table, + _memory_sampling_service) { _feature_table.start().get(); _feature_table .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service.start(std::ref(_test_logger)).get(); _storage.start().get(); (void)_storage.log_mgr() .manage(storage::ntp_config(_ntp, "test.dir")) @@ -136,10 +139,13 @@ struct foreign_entry_fixture { } ~foreign_entry_fixture() { _storage.stop().get(); + _memory_sampling_service.stop().get(); _feature_table.stop().get(); } model::offset _base_offset{0}; + ss::logger _test_logger{"foreign-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::api _storage; storage::log get_log() { return _storage.log_mgr().get(_ntp).value(); } model::ntp _ntp{ diff --git a/src/v/raft/tests/mux_state_machine_fixture.h b/src/v/raft/tests/mux_state_machine_fixture.h index faa3ade39c46..4e218614c8e9 100644 --- a/src/v/raft/tests/mux_state_machine_fixture.h +++ b/src/v/raft/tests/mux_state_machine_fixture.h @@ -19,6 +19,7 @@ #include "raft/mux_state_machine.h" #include "raft/types.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/connection_cache.h" #include "storage/api.h" #include "storage/kvstore.h" @@ -57,7 +58,8 @@ struct mux_state_machine_fixture { .start( [kv_conf]() { return kv_conf; }, [this]() { return default_log_cfg(); }, - std::ref(_feature_table)) + std::ref(_feature_table), + std::ref(_memory_sampling_service)) .get0(); _storage.invoke_on_all(&storage::api::start).get0(); _as.start().get(); @@ -74,6 +76,8 @@ struct mux_state_machine_fixture { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service.start(std::ref(_test_logger)).get(); + _group_mgr .start( _self, @@ -139,9 +143,10 @@ struct mux_state_machine_fixture { if (_raft) { _raft.release(); } - _connections.stop().get0(); - _feature_table.stop().get0(); - _storage.stop().get0(); + _connections.stop().get(); + _storage.stop().get(); + _memory_sampling_service.stop().get(); + _feature_table.stop().get(); _as.stop().get(); } } @@ -189,11 +194,13 @@ struct mux_state_machine_fixture { model::ntp _ntp = model::ntp( model::ns("default"), model::topic("test"), model::partition_id(0)); + ss::logger _test_logger{"mux-test-logger"}; ss::sstring _data_dir; cluster::consensus_ptr _raft; ss::sharded _as; ss::sharded _connections; ss::sharded _storage; + ss::sharded _memory_sampling_service; ss::sharded _feature_table; ss::sharded _group_mgr; ss::sharded _recovery_throttle; diff --git a/src/v/raft/tests/offset_translator_tests.cc b/src/v/raft/tests/offset_translator_tests.cc index d7c60ae93638..930a1aef9836 100644 --- a/src/v/raft/tests/offset_translator_tests.cc +++ b/src/v/raft/tests/offset_translator_tests.cc @@ -10,6 +10,7 @@ #include "model/fundamental.h" #include "raft/offset_translator.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/fwd.h" #include "storage/kvstore.h" @@ -50,11 +51,13 @@ struct base_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service.start(std::ref(_test_logger)).get(); _api .start( [this]() { return make_kv_cfg(); }, [this]() { return make_log_cfg(); }, - std::ref(_feature_table)) + std::ref(_feature_table), + std::ref(_memory_sampling_service)) .get(); _api.invoke_on_all(&storage::api::start).get(); } @@ -87,11 +90,14 @@ struct base_fixture { model::ntp test_ntp = model::ntp( model::ns("test"), model::topic("tp"), model::partition_id(0)); ss::sstring _test_dir; + ss::logger _test_logger{"offset-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; ss::sharded _api; ~base_fixture() { _api.stop().get(); + _memory_sampling_service.stop().get(); _feature_table.stop().get(); } }; diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index 42f950e3c51a..73a6f73ba053 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -26,6 +26,7 @@ #include "raft/rpc_client_protocol.h" #include "raft/service.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/backoff_policy.h" #include "rpc/connection_cache.h" #include "rpc/rpc_server.h" @@ -130,7 +131,8 @@ struct raft_node { ss::default_priority_class(), storage::make_sanitized_file_config()); }, - std::ref(feature_table)) + std::ref(feature_table), + std::ref(memory_sampling_service)) .get(); storage.invoke_on_all(&storage::api::start).get(); @@ -362,6 +364,7 @@ struct raft_node { consensus_ptr consensus; std::unique_ptr _nop_stm; ss::sharded feature_table; + ss::sharded memory_sampling_service; ss::abort_source _as; }; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 66317b13e4b1..89097ee1798c 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -82,6 +82,7 @@ #include "raft/service.h" #include "redpanda/admin_server.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/thread_worker.h" #include "storage/backlog_controller.h" #include "storage/chunk_cache.h" @@ -111,6 +112,7 @@ #include #include +#include #include #include #include @@ -396,6 +398,9 @@ void application::initialize( std::optional schema_reg_cfg, std::optional schema_reg_client_cfg, std::optional groups) { + construct_service(_memory_sampling, std::ref(_log)).get(); + _memory_sampling.invoke_on_all(&memory_sampling::start).get(); + // Set up the abort_on_oom value based on the associated cluster config // property, and watch for changes. _abort_on_oom @@ -1673,7 +1678,8 @@ void application::wire_up_bootstrap_services() { = sched_groups.cache_background_reclaim_sg(); return log_cfg; }, - std::ref(feature_table)) + std::ref(feature_table), + std::ref(_memory_sampling)) .get(); // Hook up local_monitor to update storage_resources when disk state changes diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 6ea88e2d1ef6..4492dda2848c 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -40,6 +40,7 @@ #include "redpanda/monitor_unsafe_log_flag.h" #include "resource_mgmt/cpu_scheduling.h" #include "resource_mgmt/memory_groups.h" +#include "resource_mgmt/memory_sampling.h" #include "resource_mgmt/scheduling_groups_probe.h" #include "resource_mgmt/smp_groups.h" #include "rpc/fwd.h" @@ -248,6 +249,7 @@ class application { std::optional> _abort_on_oom; + ss::sharded _memory_sampling; ss::sharded _connection_cache; ss::sharded _group_manager; ss::sharded _rpc; diff --git a/src/v/resource_mgmt/CMakeLists.txt b/src/v/resource_mgmt/CMakeLists.txt index 422f6d137e9e..3b93f8a90d72 100644 --- a/src/v/resource_mgmt/CMakeLists.txt +++ b/src/v/resource_mgmt/CMakeLists.txt @@ -2,6 +2,7 @@ v_cc_library( NAME resource_mgmt SRCS available_memory.cc + memory_sampling.cc DEPS Seastar::seastar ) diff --git a/src/v/resource_mgmt/memory_sampling.cc b/src/v/resource_mgmt/memory_sampling.cc new file mode 100644 index 000000000000..cc10d2832a9d --- /dev/null +++ b/src/v/resource_mgmt/memory_sampling.cc @@ -0,0 +1,127 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "resource_mgmt/memory_sampling.h" + +#include "resource_mgmt/available_memory.h" +#include "ssx/future-util.h" +#include "vlog.h" + +#include +#include +#include +#include +#include + +#include + +constexpr std::string_view diagnostics_header() { + return "Top-N alloc sites - size count stack:"; +} + +constexpr std::string_view allocation_site_format_str() { return "{} {} {}\n"; } + +/// Put `top_n` allocation sites into the front of `allocation_sites` +static void top_n_allocation_sites( + std::vector& allocation_sites, size_t top_n) { + std::partial_sort( + allocation_sites.begin(), + allocation_sites.begin() + top_n, + allocation_sites.end(), + [](const auto& lhs, const auto& rhs) { return lhs.size > rhs.size; }); +} + +ss::sstring memory_sampling::format_allocation_site( + const ss::memory::allocation_site& alloc_site) { + return fmt::format( + allocation_site_format_str(), + alloc_site.size, + alloc_site.count, + alloc_site.backtrace); +} + +void memory_sampling::notify_of_reclaim() { _low_watermark_cond.signal(); } + +ss::future<> memory_sampling::start_low_available_memory_logging() { + // We want some periodic logging "on the way" to OOM. At the same time we + // don't want to spam the logs. Hence, we periodically look at the available + // memory low watermark (this is without the batch cache). If we see that we + // have crossed the 10% and 20% marks we log the allocation sites. We stop + // afterwards. + + size_t first_log_limit = _first_log_limit_fraction + * seastar::memory::stats().total_memory(); + size_t second_log_limit = _second_log_limit_fraction + * seastar::memory::stats().total_memory(); + size_t next_log_limit = first_log_limit; + + while (true) { + try { + co_await _low_watermark_cond.wait([&next_log_limit]() { + auto current_low_water_mark + = resources::available_memory::local() + .available_low_water_mark(); + + return current_low_water_mark <= next_log_limit; + }); + } catch (const ss::broken_condition_variable&) { + co_return; + } + + auto allocation_sites = ss::memory::sampled_memory_profile(); + const size_t top_n = std::min(size_t(5), allocation_sites.size()); + top_n_allocation_sites(allocation_sites, top_n); + + vlog( + _logger.info, + "{} {}", + diagnostics_header(), + fmt::join( + allocation_sites.begin(), allocation_sites.begin() + top_n, "|")); + + if (next_log_limit == first_log_limit) { + next_log_limit = second_log_limit; + } else { + co_return; + } + } +} + +memory_sampling::memory_sampling(ss::logger& logger) + : _logger(logger) + , _first_log_limit_fraction(0.2) + , _second_log_limit_fraction(0.1) {} + +memory_sampling::memory_sampling( + ss::logger& logger, + double first_log_limit_fraction, + double second_log_limit_fraction) + : _logger(logger) + , _first_log_limit_fraction(first_log_limit_fraction) + , _second_log_limit_fraction(second_log_limit_fraction) {} + +void memory_sampling::start() { + // We chose a sampling rate of ~3MB. From testing this has a very low + // overhead of something like ~1%. We could still get away with something + // smaller like 1MB and have acceptable overhead (~3%) but 3MB should be a + // safer default for the initial rollout. + ss::memory::set_heap_profiling_sampling_rate(3000037); + + ssx::spawn_with_gate(_low_watermark_gate, [this]() { + return start_low_available_memory_logging(); + }); +} + +ss::future<> memory_sampling::stop() { + _low_watermark_cond.broken(); + + co_await _low_watermark_gate.close(); +} diff --git a/src/v/resource_mgmt/memory_sampling.h b/src/v/resource_mgmt/memory_sampling.h new file mode 100644 index 000000000000..42930a7fa62b --- /dev/null +++ b/src/v/resource_mgmt/memory_sampling.h @@ -0,0 +1,79 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "seastarx.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +template<> +struct fmt::formatter + : fmt::formatter { + template + auto + format(const seastar::memory::allocation_site& site, FormatContext& ctx) { + return fmt::format_to( + ctx.out(), "{} {} {}", site.size, site.count, site.backtrace); + } +}; + +/// Very simple service enabling memory profiling on all shards. +class memory_sampling : public ss::peering_sharded_service { +public: + ss::sstring + format_allocation_site(const ss::memory::allocation_site& backtrace); + + /// Starts the service (enables memory sampling, sets up additional OOM + /// information and enables logging in highwatermark situations) + void start(); + ss::future<> stop(); + + /// Notify the memory sampling service that a memory reclaim from the + /// seastar allocator has happened. Used by the batch_cache + void notify_of_reclaim(); + + /// Constructs the service. Logger will be used to log top stacks under high + /// memory pressure + explicit memory_sampling(ss::logger& logger); + + /// Constructor as above but allows overriding high memory thresholds. Used + /// for testing. + explicit memory_sampling( + ss::logger& logger, + double first_log_limit_fraction, + double second_log_limit_fraction); + +private: + /// Starts the background future running the allocation site logging on low + /// available memory + ss::future<> start_low_available_memory_logging(); + + ss::logger& _logger; + + // When a memory reclaim from the seastar allocator happens the batch_cache + // notifies us via below condvar. If we see a new low watermark that's and + // we are below 20% then we log once and a second time the first time we saw + // a 10% lower watermark. Values are overridable for tests + double _first_log_limit_fraction; + double _second_log_limit_fraction; + ss::condition_variable _low_watermark_cond; + ss::gate _low_watermark_gate; +}; diff --git a/src/v/resource_mgmt/tests/CMakeLists.txt b/src/v/resource_mgmt/tests/CMakeLists.txt index 05f8627a90a1..6605737470d3 100644 --- a/src/v/resource_mgmt/tests/CMakeLists.txt +++ b/src/v/resource_mgmt/tests/CMakeLists.txt @@ -6,3 +6,24 @@ rp_test( LIBRARIES v::seastar_testing_main v::resource_mgmt v::config LABELS resource_mgmt ) + +# NB: Some of these rely on global state (low watermark of available_memory) so need to run in a separate binary +rp_test( + UNIT_TEST + BINARY_NAME test_memory_sampling + SOURCES memory_sampling_tests.cc + DEFINITIONS BOOST_TEST_DYN_LINK + LIBRARIES v::seastar_testing_main v::application + LABELS memory_sampling + SKIP_BUILD_TYPES "Debug" +) + +rp_test( + UNIT_TEST + BINARY_NAME test_memory_sampling_reclaimer + SOURCES memory_sampling_reclaimer_test.cc + DEFINITIONS BOOST_TEST_DYN_LINK + LIBRARIES v::seastar_testing_main v::application + LABELS memory_sampling + SKIP_BUILD_TYPES "Debug" +) diff --git a/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc b/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc new file mode 100644 index 000000000000..f244e40f7e4c --- /dev/null +++ b/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc @@ -0,0 +1,77 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "resource_mgmt/memory_sampling.h" +#include "storage/batch_cache.h" +#include "test_utils/async.h" + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#ifndef SEASTAR_DEFAULT_ALLOCATOR + +/// Test batch cache integration independently +SEASTAR_THREAD_TEST_CASE(reclaim_notifies_memory_sampling) { + std::stringstream logger_buf; + seastar::logger test_logger("test"); + test_logger.set_ostream(logger_buf); + ss::sharded memory_sampling_service; + + std::string_view needle("Top-N alloc"); + const auto first_log_limit = 0.80; + memory_sampling_service.start(std::ref(test_logger), first_log_limit, 0.2) + .get(); + memory_sampling_service.invoke_on_all(&memory_sampling::start).get(); + + { + auto buf = logger_buf.str(); + auto view = std::string_view{buf}; + BOOST_REQUIRE_EQUAL(view.find(needle), std::string_view::npos); + } + + storage::batch_cache::reclaim_options opts = { + .growth_window = std::chrono::milliseconds(3000), + .stable_window = std::chrono::milliseconds(10000), + .min_size = 128 << 10, + .max_size = 4 << 20, + .min_free_memory = 1}; + storage::batch_cache cache(opts, memory_sampling_service); + storage::batch_cache_index index(cache); + + std::vector> dummy_bufs; + size_t total_memory = seastar::memory::stats().total_memory(); + auto allocate_till_limit = [&](size_t limit) { + while (seastar::memory::stats().free_memory() > limit) { + dummy_bufs.emplace_back(1000000); + } + }; + + allocate_till_limit(first_log_limit * total_memory); + + // simulate a callback from the allocator + cache.reclaim(1); + + tests::cooperative_spin_wait_with_timeout(std::chrono::seconds(10), [&]() { + auto buf = logger_buf.str(); + auto view = std::string_view{buf}; + return view.find(needle) != std::string_view::npos; + }).get(); // will throw if false at timeout +} + +#endif // SEASTAR_DEFAULT_ALLOCATOR diff --git a/src/v/resource_mgmt/tests/memory_sampling_tests.cc b/src/v/resource_mgmt/tests/memory_sampling_tests.cc new file mode 100644 index 000000000000..d2d22a0586d6 --- /dev/null +++ b/src/v/resource_mgmt/tests/memory_sampling_tests.cc @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "resource_mgmt/memory_sampling.h" +#include "storage/batch_cache.h" +#include "test_utils/async.h" + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#ifndef SEASTAR_DEFAULT_ALLOCATOR + +SEASTAR_THREAD_TEST_CASE(test_low_watermark_logging) { + seastar::logger dummy_logger("dummy"); + std::stringstream output_buf; + dummy_logger.set_ostream(output_buf); + + const auto first_log_limit = 0.90; + const auto second_log_limit = 0.80; + + memory_sampling sampling(dummy_logger, first_log_limit, second_log_limit); + sampling.start(); + + std::string_view needle("Top-N alloc"); + + { + // notification shouldn't do anything + sampling.notify_of_reclaim(); + auto buf = output_buf.str(); + auto view = std::string_view{buf}; + BOOST_REQUIRE_EQUAL(view.find(needle), std::string_view::npos); + } + + std::vector> dummy_bufs; + size_t total_memory = seastar::memory::stats().total_memory(); + + auto allocate_till_limit = [&](size_t limit) { + while (seastar::memory::stats().free_memory() > limit) { + dummy_bufs.emplace_back(1000000); + } + }; + + allocate_till_limit(first_log_limit * total_memory); + sampling.notify_of_reclaim(); + + tests::cooperative_spin_wait_with_timeout(std::chrono::seconds(10), [&]() { + auto buf = output_buf.str(); + auto view = std::string_view{buf}; + return view.find(needle) != std::string_view::npos; + }).get(); // will throw if false at timeout + + allocate_till_limit(second_log_limit * total_memory); + sampling.notify_of_reclaim(); + + tests::cooperative_spin_wait_with_timeout(std::chrono::seconds(10), [&]() { + auto buf = output_buf.str(); + auto view = std::string_view{buf}; + return view.find(needle) != view.rfind(needle); + }).get(); // will throw if false at timeout + + auto old_size = output_buf.str().size(); + + sampling.notify_of_reclaim(); + + { + // no more notifications should happen + auto buf = output_buf.str(); + BOOST_REQUIRE_EQUAL(buf.size(), old_size); + } +} + +#endif // SEASTAR_DEFAULT_ALLOCATOR diff --git a/src/v/storage/api.h b/src/v/storage/api.h index 4537cf6b104c..c045bcdda9d5 100644 --- a/src/v/storage/api.h +++ b/src/v/storage/api.h @@ -13,6 +13,7 @@ #include "features/feature_table.h" #include "model/fundamental.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -28,17 +29,23 @@ class api : public ss::peering_sharded_service { explicit api( std::function kv_conf_cb, std::function log_conf_cb, - ss::sharded& feature_table) noexcept + ss::sharded& feature_table, + ss::sharded& memory_sampling_service) noexcept : _kv_conf_cb(std::move(kv_conf_cb)) , _log_conf_cb(std::move(log_conf_cb)) - , _feature_table(feature_table) {} + , _feature_table(feature_table) + , _memory_sampling_service(memory_sampling_service) {} ss::future<> start() { _kvstore = std::make_unique( _kv_conf_cb(), _resources, _feature_table); return _kvstore->start().then([this] { _log_mgr = std::make_unique( - _log_conf_cb(), kvs(), _resources, _feature_table); + _log_conf_cb(), + kvs(), + _resources, + _feature_table, + _memory_sampling_service); return _log_mgr->start(); }); } @@ -91,6 +98,7 @@ class api : public ss::peering_sharded_service { std::function _kv_conf_cb; std::function _log_conf_cb; ss::sharded& _feature_table; + ss::sharded& _memory_sampling_service; std::unique_ptr _kvstore; std::unique_ptr _log_mgr; diff --git a/src/v/storage/batch_cache.cc b/src/v/storage/batch_cache.cc index 75be14b7a683..8586d5d743c2 100644 --- a/src/v/storage/batch_cache.cc +++ b/src/v/storage/batch_cache.cc @@ -12,6 +12,7 @@ #include "bytes/iobuf_parser.h" #include "model/adl_serde.h" #include "resource_mgmt/available_memory.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/future-util.h" #include "storage/logger.h" #include "utils/gate_guard.h" @@ -125,14 +126,17 @@ register_memory_reporter(const batch_cache& bc) { "batch_cache", [&bc] { return bc.size_bytes(); }); } -batch_cache::batch_cache(const reclaim_options& opts) +batch_cache::batch_cache( + const reclaim_options& opts, + ss::sharded& memory_sampling_service) : _reclaimer( [this](reclaimer::request r) { return reclaim(r); }, reclaim_scope::sync) , _reclaim_opts(opts) , _reclaim_size(_reclaim_opts.min_size) , _background_reclaimer( *this, opts.min_free_memory, opts.background_reclaimer_sg) - , _available_mem_deregister(register_memory_reporter(*this)) { + , _available_mem_deregister(register_memory_reporter(*this)) + , _memory_sampling_service(memory_sampling_service) { _background_reclaimer.start(); } @@ -300,6 +304,12 @@ size_t batch_cache::reclaim(size_t size) { } }); + // so that memory_sampling service can print top memory allocation sites for + // this shard + if (_memory_sampling_service.local_is_initialized()) { + _memory_sampling_service.local().notify_of_reclaim(); + } + _last_reclaim = ss::lowres_clock::now(); _size_bytes -= reclaimed; return reclaimed; diff --git a/src/v/storage/batch_cache.h b/src/v/storage/batch_cache.h index b73bee1d9b01..3d8b39df6914 100644 --- a/src/v/storage/batch_cache.h +++ b/src/v/storage/batch_cache.h @@ -12,6 +12,7 @@ #pragma once #include "model/record.h" #include "resource_mgmt/available_memory.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/semaphore.h" #include "units.h" #include "utils/intrusive_list_helpers.h" @@ -226,7 +227,9 @@ class batch_cache { range_ptr _range; }; - explicit batch_cache(const reclaim_options& opts); + explicit batch_cache( + const reclaim_options& opts, + ss::sharded& memory_sampling_service); batch_cache(const batch_cache&) = delete; batch_cache& operator=(const batch_cache&) = delete; @@ -384,6 +387,7 @@ class batch_cache { size_t _reclaim_size; background_reclaimer _background_reclaimer; resources::available_memory::deregister_holder _available_mem_deregister; + ss::sharded& _memory_sampling_service; friend std::ostream& operator<<(std::ostream&, const reclaim_options&); friend std::ostream& operator<<(std::ostream&, const batch_cache&); diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index c9d8a8e447bc..ac5e50ba38bc 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -17,6 +17,7 @@ #include "model/metadata.h" #include "model/timestamp.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/async-clear.h" #include "ssx/future-util.h" #include "storage/batch_cache.h" @@ -133,13 +134,14 @@ log_manager::log_manager( log_config config, kvstore& kvstore, storage_resources& resources, - ss::sharded& feature_table) noexcept + ss::sharded& feature_table, + ss::sharded& memory_sampling) noexcept : _config(std::move(config)) , _kvstore(kvstore) , _resources(resources) , _feature_table(feature_table) , _jitter(_config.compaction_interval()) - , _batch_cache(config.reclaim_opts) { + , _batch_cache(config.reclaim_opts, memory_sampling) { _config.compaction_interval.watch([this]() { _jitter = simple_time_jitter{ _config.compaction_interval()}; diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 01173102c3ab..2bfac7fcb311 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -17,6 +17,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "random/simple_time_jitter.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/batch_cache.h" #include "storage/file_sanitizer_types.h" @@ -166,7 +167,8 @@ class log_manager { log_config, kvstore& kvstore, storage_resources&, - ss::sharded&) noexcept; + ss::sharded&, + ss::sharded&) noexcept; ss::future manage(ntp_config); diff --git a/src/v/storage/tests/batch_cache_reclaim_test.cc b/src/v/storage/tests/batch_cache_reclaim_test.cc index 764b680de4ec..d64d86a4df91 100644 --- a/src/v/storage/tests/batch_cache_reclaim_test.cc +++ b/src/v/storage/tests/batch_cache_reclaim_test.cc @@ -45,7 +45,11 @@ class fixture {}; FIXTURE_TEST(reclaim, fixture) { using namespace std::chrono_literals; - storage::batch_cache cache(opts); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service.start(std::ref(test_logger)).get(); + + storage::batch_cache cache(opts, memory_sampling_service); storage::batch_cache_index index(cache); std::vector cache_entries; cache_entries.reserve(30); diff --git a/src/v/storage/tests/batch_cache_test.cc b/src/v/storage/tests/batch_cache_test.cc index e279263a1caf..2edd6ddef999 100644 --- a/src/v/storage/tests/batch_cache_test.cc +++ b/src/v/storage/tests/batch_cache_test.cc @@ -11,10 +11,15 @@ #include "model/fundamental.h" #include "model/record.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/batch_cache.h" #include "test_utils/fixture.h" +#include #include +#include + +#include static storage::batch_cache::reclaim_options opts = { .growth_window = std::chrono::milliseconds(3000), @@ -43,11 +48,19 @@ static model::record_batch make_random_batch( class batch_cache_test_fixture { public: batch_cache_test_fixture() - : cache(opts) {} + : test_logger("batch-cache-test") + , cache(opts, memory_sampling_service) { + memory_sampling_service.start(std::ref(test_logger)).get(); + } auto& get_lru() { return cache._lru; }; - ~batch_cache_test_fixture() { cache.stop().get(); } + ~batch_cache_test_fixture() { + cache.stop().get(); + memory_sampling_service.stop().get(); + } + ss::logger test_logger; + ss::sharded memory_sampling_service; storage::batch_cache cache; }; @@ -130,7 +143,13 @@ SEASTAR_THREAD_TEST_CASE(touch) { std::unique_ptr index_1; std::unique_ptr index_2; - storage::batch_cache cache(opts); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service.start(std::ref(test_logger)).get(); + auto action = ss::defer( + [&memory_sampling_service] { memory_sampling_service.stop().get(); }); + + storage::batch_cache cache(opts, memory_sampling_service); index_1 = std::make_unique(cache); index_2 = std::make_unique(cache); auto b0 = cache.put(*index_1, make_batch(10)); @@ -148,7 +167,13 @@ SEASTAR_THREAD_TEST_CASE(touch) { std::unique_ptr index_2; // build the cache the same way - storage::batch_cache cache(opts); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service.start(std::ref(test_logger)).get(); + auto action = ss::defer( + [&memory_sampling_service] { memory_sampling_service.stop().get(); }); + + storage::batch_cache cache(opts, memory_sampling_service); index_1 = std::make_unique(cache); index_2 = std::make_unique(cache); auto b0 = cache.put(*index_1, make_batch(10)); diff --git a/src/v/storage/tests/log_manager_test.cc b/src/v/storage/tests/log_manager_test.cc index 4cb50462e962..2a440a090cb4 100644 --- a/src/v/storage/tests/log_manager_test.cc +++ b/src/v/storage/tests/log_manager_test.cc @@ -11,6 +11,7 @@ #include "model/record_utils.h" #include "model/tests/random_batch.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/directories.h" #include "storage/disk_log_appender.h" @@ -61,6 +62,7 @@ constexpr unsigned default_segment_readahead_count = 10; SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { auto conf = make_config(); + ss::logger test_logger("test-logger"); ss::sharded feature_table; feature_table.start().get(); feature_table @@ -68,6 +70,9 @@ SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + ss::sharded memory_sampling_service; + memory_sampling_service.start(std::ref(test_logger)).get(); + storage::api store( [conf]() { return storage::kvstore_config( @@ -77,12 +82,15 @@ SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { storage::make_sanitized_file_config()); }, [conf]() { return conf; }, - feature_table); + feature_table, + memory_sampling_service); store.start().get(); - auto stop_kvstore = ss::defer([&store, &feature_table] { - store.stop().get(); - feature_table.stop().get(); - }); + auto stop_kvstore = ss::defer( + [&store, &feature_table, &memory_sampling_service] { + store.stop().get(); + memory_sampling_service.stop().get(); + feature_table.stop().get(); + }); auto& m = store.log_mgr(); std::vector ntps; ntps.reserve(4); diff --git a/src/v/storage/tests/storage_test_fixture.h b/src/v/storage/tests/storage_test_fixture.h index eac9f5050de6..45a58ece51aa 100644 --- a/src/v/storage/tests/storage_test_fixture.h +++ b/src/v/storage/tests/storage_test_fixture.h @@ -21,6 +21,7 @@ #include "model/tests/random_batch.h" #include "random/generators.h" #include "reflection/adl.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -188,6 +189,7 @@ class storage_test_fixture { storage::kvstore kvstore; storage::storage_resources resources; ss::sharded feature_table; + ss::sharded memory_sampling_service; std::optional ts_cursor; @@ -200,7 +202,8 @@ class storage_test_fixture { test_dir, storage::make_sanitized_file_config()), resources, - feature_table) { + feature_table) + , memory_sampling_service() { configure_unit_test_logging(); // avoid double metric registrations ss::smp::invoke_on_all([] { @@ -214,12 +217,14 @@ class storage_test_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + memory_sampling_service.start(std::ref(tlog)).get(); kvstore.start().get(); } ~storage_test_fixture() { kvstore.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); } @@ -234,13 +239,21 @@ class storage_test_fixture { /// Creates a log manager in test directory storage::log_manager make_log_manager(storage::log_config cfg) { return storage::log_manager( - std::move(cfg), kvstore, resources, feature_table); + std::move(cfg), + kvstore, + resources, + feature_table, + memory_sampling_service); } /// Creates a log manager in test directory with default config storage::log_manager make_log_manager() { return storage::log_manager( - default_log_config(test_dir), kvstore, resources, feature_table); + default_log_config(test_dir), + kvstore, + resources, + feature_table, + memory_sampling_service); } /// \brief randomizes the configuration options diff --git a/src/v/storage/tests/utils/disk_log_builder.cc b/src/v/storage/tests/utils/disk_log_builder.cc index d8a6c90cc0a0..6fe96f212d2e 100644 --- a/src/v/storage/tests/utils/disk_log_builder.cc +++ b/src/v/storage/tests/utils/disk_log_builder.cc @@ -32,7 +32,10 @@ disk_log_builder::disk_log_builder(storage::log_config config) storage::make_sanitized_file_config()); }, [this]() { return _log_config; }, - _feature_table) {} + _feature_table, + _memory_sampling_service) { + _memory_sampling_service.start(std::ref(_test_logger)).get(); +} // Batch generation ss::future<> disk_log_builder::add_random_batch( @@ -160,7 +163,9 @@ disk_log_builder::update_start_offset(model::offset start_offset) { } ss::future<> disk_log_builder::stop() { - return _storage.stop().then([this]() { return _feature_table.stop(); }); + return _storage.stop() + .then([this]() { return _memory_sampling_service.stop(); }) + .then([this]() { return _feature_table.stop(); }); } // Low lever interface access diff --git a/src/v/storage/tests/utils/disk_log_builder.h b/src/v/storage/tests/utils/disk_log_builder.h index 95c6fe8dfb16..49ab5ca7f57b 100644 --- a/src/v/storage/tests/utils/disk_log_builder.h +++ b/src/v/storage/tests/utils/disk_log_builder.h @@ -16,6 +16,7 @@ #include "model/record_batch_reader.h" #include "model/tests/random_batch.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "ssx/sformat.h" #include "storage/api.h" @@ -404,7 +405,9 @@ class disk_log_builder { const log_append_config& config, should_flush_after flush); + ss::logger _test_logger{"disk-log-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::log_config _log_config; storage::api _storage; std::optional _log; diff --git a/src/v/test_utils/logs.h b/src/v/test_utils/logs.h index 36a2efe4f0c8..69158e44f744 100644 --- a/src/v/test_utils/logs.h +++ b/src/v/test_utils/logs.h @@ -12,6 +12,7 @@ #pragma once #include "model/fundamental.h" #include "model/record_batch_reader.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/api.h" @@ -37,24 +38,33 @@ static inline ss::future<> persist_log_file( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - storage::api storage( - [base_dir]() { - return storage::kvstore_config( - 1_MiB, - config::mock_binding(10ms), - base_dir, - storage::make_sanitized_file_config()); - }, - [base_dir]() { - return storage::log_config( - base_dir, - 1_GiB, - ss::default_priority_class(), - storage::make_sanitized_file_config()); - }, - feature_table); - storage.start().get(); - auto& mgr = storage.log_mgr(); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service.start(std::ref(test_logger)).get(); + + ss::sharded storage; + storage + .start( + [base_dir]() { + return storage::kvstore_config( + 1_MiB, + config::mock_binding(10ms), + base_dir, + storage::make_sanitized_file_config()); + }, + [base_dir]() { + return storage::log_config( + base_dir, + 1_GiB, + ss::default_priority_class(), + storage::make_sanitized_file_config()); + }, + std::ref(feature_table), + std::ref(memory_sampling_service)) + .get(); + storage.invoke_on_all(&storage::api::start).get(); + + auto& mgr = storage.local().log_mgr(); try { mgr.manage(storage::ntp_config(file_ntp, mgr.config().base_dir)) .then([b = std::move(batches)](storage::log log) mutable { @@ -73,9 +83,11 @@ static inline ss::future<> persist_log_file( }) .get(); storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); } catch (...) { storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); throw; } @@ -108,24 +120,32 @@ read_log_file(ss::sstring base_dir, model::ntp file_ntp) { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - storage::api storage( - [base_dir]() { - return storage::kvstore_config( - 1_MiB, - config::mock_binding(10ms), - base_dir, - storage::make_sanitized_file_config()); - }, - [base_dir]() { - return storage::log_config( - base_dir, - 1_GiB, - ss::default_priority_class(), - storage::make_sanitized_file_config()); - }, - feature_table); - storage.start().get(); - auto& mgr = storage.log_mgr(); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service.start(std::ref(test_logger)).get(); + + ss::sharded storage; + storage + .start( + [base_dir]() { + return storage::kvstore_config( + 1_MiB, + config::mock_binding(10ms), + base_dir, + storage::make_sanitized_file_config()); + }, + [base_dir]() { + return storage::log_config( + base_dir, + 1_GiB, + ss::default_priority_class(), + storage::make_sanitized_file_config()); + }, + std::ref(feature_table), + std::ref(memory_sampling_service)) + .get(); + storage.invoke_on_all(&storage::api::start).get(); + auto& mgr = storage.local().log_mgr(); try { auto batches = mgr.manage(storage::ntp_config(file_ntp, mgr.config().base_dir)) @@ -142,10 +162,12 @@ read_log_file(ss::sstring base_dir, model::ntp file_ntp) { }) .get0(); storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); return batches; } catch (...) { storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); throw; } From 5462773128cfb5e75a8878328cef84a349a951fb Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Thu, 4 May 2023 12:00:44 +0100 Subject: [PATCH 2/5] memory-sampling: Add on-OOM diagnostics Print top-10 allocation sites on OOM. Uses the seastar on-OOM hook to add additional output when we fail to alloc. The output looks similar to what we print when reaching the low watermarks. --- src/v/resource_mgmt/memory_sampling.cc | 49 ++++++++++++++----- src/v/resource_mgmt/memory_sampling.h | 7 +++ .../tests/memory_sampling_tests.cc | 46 +++++++++++++++++ 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/src/v/resource_mgmt/memory_sampling.cc b/src/v/resource_mgmt/memory_sampling.cc index cc10d2832a9d..afeb5e6f0981 100644 --- a/src/v/resource_mgmt/memory_sampling.cc +++ b/src/v/resource_mgmt/memory_sampling.cc @@ -13,6 +13,7 @@ #include "resource_mgmt/available_memory.h" #include "ssx/future-util.h" +#include "ssx/sformat.h" #include "vlog.h" #include @@ -21,14 +22,10 @@ #include #include -#include - constexpr std::string_view diagnostics_header() { return "Top-N alloc sites - size count stack:"; } -constexpr std::string_view allocation_site_format_str() { return "{} {} {}\n"; } - /// Put `top_n` allocation sites into the front of `allocation_sites` static void top_n_allocation_sites( std::vector& allocation_sites, size_t top_n) { @@ -39,13 +36,41 @@ static void top_n_allocation_sites( [](const auto& lhs, const auto& rhs) { return lhs.size > rhs.size; }); } -ss::sstring memory_sampling::format_allocation_site( - const ss::memory::allocation_site& alloc_site) { - return fmt::format( - allocation_site_format_str(), - alloc_site.size, - alloc_site.count, - alloc_site.backtrace); +ss::noncopyable_function +memory_sampling::get_oom_diagnostics_callback() { + // preallocate those so that we don't allocate on OOM + std::vector allocation_sites(1000); + std::vector format_buf(1000); + + return [allocation_sites = std::move(allocation_sites), + format_buf = std::move(format_buf)]( + seastar::memory::memory_diagnostics_writer writer) mutable { + auto num_sites = ss::memory::sampled_memory_profile(allocation_sites); + + const size_t top_n = std::min(size_t(10), num_sites); + top_n_allocation_sites(allocation_sites, top_n); + + writer(diagnostics_header()); + writer("\n"); + + for (size_t i = 0; i < top_n; ++i) { + auto bytes_written = fmt::format_to_n( + format_buf.begin(), + format_buf.size(), + "{}", + allocation_sites[i]) + .size; + + writer(std::string_view(format_buf.data(), bytes_written)); + } + }; +} + +/// We want to print the top-n allocation sites on OOM +/// Set this up and make sure we don't allocate extra at that point +void setup_additional_oom_diagnostics() { + seastar::memory::set_additional_diagnostics_producer( + memory_sampling::get_oom_diagnostics_callback()); } void memory_sampling::notify_of_reclaim() { _low_watermark_cond.signal(); } @@ -109,6 +134,8 @@ memory_sampling::memory_sampling( , _second_log_limit_fraction(second_log_limit_fraction) {} void memory_sampling::start() { + setup_additional_oom_diagnostics(); + // We chose a sampling rate of ~3MB. From testing this has a very low // overhead of something like ~1%. We could still get away with something // smaller like 1MB and have acceptable overhead (~3%) but 3MB should be a diff --git a/src/v/resource_mgmt/memory_sampling.h b/src/v/resource_mgmt/memory_sampling.h index 42930a7fa62b..71b612c80c8e 100644 --- a/src/v/resource_mgmt/memory_sampling.h +++ b/src/v/resource_mgmt/memory_sampling.h @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include @@ -61,6 +63,11 @@ class memory_sampling : public ss::peering_sharded_service { double first_log_limit_fraction, double second_log_limit_fraction); + /// Returns the callback we register to run on OOM to add the memory + /// sampling output + static ss::noncopyable_function + get_oom_diagnostics_callback(); + private: /// Starts the background future running the allocation site logging on low /// available memory diff --git a/src/v/resource_mgmt/tests/memory_sampling_tests.cc b/src/v/resource_mgmt/tests/memory_sampling_tests.cc index d2d22a0586d6..71fa38442976 100644 --- a/src/v/resource_mgmt/tests/memory_sampling_tests.cc +++ b/src/v/resource_mgmt/tests/memory_sampling_tests.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -27,6 +28,51 @@ #ifndef SEASTAR_DEFAULT_ALLOCATOR +SEASTAR_THREAD_TEST_CASE(test_no_allocs_in_oom_callback) { + seastar::memory::scoped_heap_profiling profiling(16000); + + std::vector> dummy_bufs; + std::vector output_buffer; + const size_t max_output_size = 1000000; + output_buffer.reserve(max_output_size); + + std::vector allocation_sites(1000); + size_t sampled_sites = 0; + + // make sure we have at least one allocation site + while (sampled_sites == 0) { + dummy_bufs.emplace_back(1000); + sampled_sites = seastar::memory::sampled_memory_profile( + allocation_sites); + } + + auto oom_callback = memory_sampling::get_oom_diagnostics_callback(); + auto writer = [&output_buffer](std::string_view bytes) { + if (output_buffer.size() + bytes.size() < max_output_size) { + output_buffer.insert( + output_buffer.end(), bytes.begin(), bytes.end()); + } + }; + + auto before = seastar::memory::stats(); + oom_callback(writer); + auto after = seastar::memory::stats(); + + // to make sure we are actually testing something + BOOST_REQUIRE_GT(before.mallocs(), 0); + BOOST_REQUIRE_GT(output_buffer.size(), 0); + BOOST_REQUIRE_EQUAL(before.large_allocations(), after.large_allocations()); + BOOST_REQUIRE_EQUAL(before.allocated_memory(), after.allocated_memory()); + BOOST_REQUIRE_EQUAL(before.mallocs(), after.mallocs()); + + // confirm an average allocation site fits into the oom writer line buffer + auto allocation_site_needle = fmt::format("{}", allocation_sites[0]); + BOOST_REQUIRE_NE( + std::string_view(output_buffer.data(), output_buffer.size()) + .find(allocation_site_needle), + std::string_view::npos); +} + SEASTAR_THREAD_TEST_CASE(test_low_watermark_logging) { seastar::logger dummy_logger("dummy"); std::stringstream output_buf; From 4306a0db9fec24e98b0e97e25e1a380c20df77ea Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Fri, 28 Apr 2023 15:29:38 +0100 Subject: [PATCH 3/5] admin_server: Add sampled_memory_profile endpoint Adds an endpoint that the collects the current sampled memory profile from all shards and returns them to the caller. For now the stack is just stringified but we could make this a proper json structure all the way down. Keeping it simple for now. The following (or some form thereof) can be used to get a flamegraph: ``` curl localhost:9644/v1/debug/sampled_memory_profile?shard=3 \ | jq -r .[0].profile \ | ./symbolize_mem_dump.py /home/stephan/build/redpanda/vbuild/release/clang/bin/redpanda \ | flamegraph.pl - > flamegraph.svg ``` --- src/v/redpanda/admin/api-doc/debug.json | 60 ++++++++++++++++++++ src/v/redpanda/admin_server.cc | 59 ++++++++++++++++++- src/v/redpanda/admin_server.h | 8 ++- src/v/redpanda/application.cc | 3 +- src/v/resource_mgmt/memory_sampling.cc | 45 +++++++++++++++ src/v/resource_mgmt/memory_sampling.h | 39 ++++++++++++- tests/rptest/services/admin.py | 14 +++++ tests/rptest/tests/memory_sampling_test.py | 66 ++++++++++++++++++++++ 8 files changed, 289 insertions(+), 5 deletions(-) create mode 100644 tests/rptest/tests/memory_sampling_test.py diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index 37df25896d22..66d41ce548e8 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -205,6 +205,32 @@ } ] }, + { + "path": "/v1/debug/sampled_memory_profile", + "operations": [ + { + "method": "GET", + "summary": "Get the currently sampled live memory set for the specified or all shards", + "nickname": "sampled_memory_profile", + "produces": [ + "application/json" + ], + "type": "array", + "items": { + "type": "memory_profile" + }, + "parameters": [ + { + "name": "shard", + "in": "query", + "required": false, + "allowMultiple": false, + "type": "long" + } + ] + } + ] + }, { "path": "/v1/debug/refresh_disk_health_info", "operations": [ @@ -497,6 +523,40 @@ } } }, + "memory_profile": { + "id": "memory_profile", + "description": "Sampled memory profile of a shard", + "properties": { + "shard": { + "type": "long", + "description": "Id of the shard the profile is from" + }, + "allocation_sites": { + "type": "array", + "items": { + "type": "allocation_site" + } + } + } + }, + "allocation_site": { + "id": "allocation_site", + "description": "A single allocation site with backtrace, size and count", + "properties": { + "size": { + "type": "long", + "description": "Current bytes allocated at this allocation site (note this is the upscaled size and not the sampled one)" + }, + "count": { + "type": "long", + "description": "Live allocations at this site" + }, + "backtrace": { + "type": "string", + "description": "Backtrace of this allocation site" + } + } + }, "controller_status": { "id": "controller_status", "description": "Controller status", diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index f0b0e36c99b9..d53cccedd1d6 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -78,6 +78,7 @@ #include "redpanda/admin/api-doc/status.json.h" #include "redpanda/admin/api-doc/transaction.json.h" #include "redpanda/admin/api-doc/usage.json.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/errc.h" #include "security/acl.h" #include "security/credential_store.h" @@ -93,9 +94,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -117,9 +120,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -203,7 +208,8 @@ admin_server::admin_server( ss::sharded& topic_recovery_status_frontend, ss::sharded& tx_registry_frontend, - ss::sharded& storage_node) + ss::sharded& storage_node, + ss::sharded& memory_sampling_service) : _log_level_timer([this] { log_level_timer_handler(); }) , _server("admin") , _cfg(std::move(cfg)) @@ -225,6 +231,7 @@ admin_server::admin_server( , _topic_recovery_status_frontend(topic_recovery_status_frontend) , _tx_registry_frontend(tx_registry_frontend) , _storage_node(storage_node) + , _memory_sampling_service(memory_sampling_service) , _default_blocked_reactor_notify( ss::engine().get_blocked_reactor_notify_ms()) {} @@ -4128,6 +4135,13 @@ void admin_server::register_debug_routes() { }); }); + register_route( + ss::httpd::debug_json::sampled_memory_profile, + [this](std::unique_ptr req) + -> ss::future { + return sampled_memory_profile_handler(std::move(req)); + }); + register_route( ss::httpd::debug_json::restart_service, [this](std::unique_ptr req) { @@ -5005,3 +5019,46 @@ admin_server::restart_service_handler(std::unique_ptr req) { co_await restart_redpanda_service(*service); co_return ss::json::json_return_type(ss::json::json_void()); } + +ss::future +admin_server::sampled_memory_profile_handler( + std::unique_ptr req) { + vlog(logger.info, "Request to sampled memory profile"); + + std::optional shard_id; + if (auto e = req->get_query_param("shard"); !e.empty()) { + try { + shard_id = boost::lexical_cast(e); + } catch (const boost::bad_lexical_cast&) { + throw ss::httpd::bad_param_exception( + fmt::format("Invalid parameter 'shard_id' value {{{}}}", e)); + } + } + + if (shard_id.has_value()) { + auto max_shard_id = ss::smp::count; + if (*shard_id > max_shard_id) { + throw ss::httpd::bad_param_exception(fmt::format( + "Shard id too high, max shard id is {}", max_shard_id)); + } + } + + auto profiles = co_await _memory_sampling_service.local() + .get_sampled_memory_profiles(shard_id); + + std::vector resp(profiles.size()); + for (size_t i = 0; i < resp.size(); ++i) { + resp[i].shard = profiles[i].shard_id; + + for (auto& allocation_sites : profiles[i].allocation_sites) { + ss::httpd::debug_json::allocation_site allocation_site; + allocation_site.size = allocation_sites.size; + allocation_site.count = allocation_sites.count; + allocation_site.backtrace = std::move(allocation_sites.backtrace); + resp[i].allocation_sites.push(allocation_site); + } + } + + co_return co_await ss::make_ready_future( + std::move(resp)); +} diff --git a/src/v/redpanda/admin_server.h b/src/v/redpanda/admin_server.h index f60eb5e0a899..0dfcafc8a2b3 100644 --- a/src/v/redpanda/admin_server.h +++ b/src/v/redpanda/admin_server.h @@ -18,12 +18,14 @@ #include "model/metadata.h" #include "pandaproxy/rest/fwd.h" #include "pandaproxy/schema_registry/fwd.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/connection_cache.h" #include "seastarx.h" #include "storage/node.h" #include "utils/request_auth.h" #include +#include #include #include #include @@ -73,7 +75,8 @@ class admin_server { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future<> start(); ss::future<> stop(); @@ -460,6 +463,8 @@ class admin_server { cloud_storage_usage_handler(std::unique_ptr); ss::future restart_service_handler(std::unique_ptr); + ss::future + sampled_memory_profile_handler(std::unique_ptr); ss::future<> throw_on_error( ss::http::request& req, @@ -510,6 +515,7 @@ class admin_server { _topic_recovery_status_frontend; ss::sharded& _tx_registry_frontend; ss::sharded& _storage_node; + ss::sharded& _memory_sampling_service; // Value before the temporary override std::chrono::milliseconds _default_blocked_reactor_notify; ss::timer<> _blocked_reactor_notify_reset_timer; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 89097ee1798c..c318fb9b3aa6 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -849,7 +849,8 @@ void application::configure_admin_server() { std::ref(topic_recovery_service), std::ref(topic_recovery_status_frontend), std::ref(tx_registry_frontend), - std::ref(storage_node)) + std::ref(storage_node), + std::ref(_memory_sampling)) .get(); } diff --git a/src/v/resource_mgmt/memory_sampling.cc b/src/v/resource_mgmt/memory_sampling.cc index afeb5e6f0981..b08e8961093a 100644 --- a/src/v/resource_mgmt/memory_sampling.cc +++ b/src/v/resource_mgmt/memory_sampling.cc @@ -19,9 +19,13 @@ #include #include #include +#include #include #include +#include +#include + constexpr std::string_view diagnostics_header() { return "Top-N alloc sites - size count stack:"; } @@ -152,3 +156,44 @@ ss::future<> memory_sampling::stop() { co_await _low_watermark_gate.close(); } + +memory_sampling::serialized_memory_profile +memory_sampling::get_sampled_memory_profile() { + auto stacks = ss::memory::sampled_memory_profile(); + + std::vector + allocation_sites; + allocation_sites.reserve(stacks.size()); + + for (auto& stack : stacks) { + allocation_sites.emplace_back( + stack.size, + stack.count, + ssx::sformat("{}", std::move(stack.backtrace))); + } + + return memory_sampling::serialized_memory_profile{ + ss::this_shard_id(), std::move(allocation_sites)}; +} + +ss::future> +memory_sampling::get_sampled_memory_profiles(std::optional shard_id) { + using result_t = memory_sampling::serialized_memory_profile; + std::vector resp; + + if (shard_id.has_value()) { + resp.push_back(co_await container().invoke_on(*shard_id, [](auto&) { + return memory_sampling::get_sampled_memory_profile(); + })); + } else { + resp = co_await container().map_reduce0( + [](auto&) { return memory_sampling::get_sampled_memory_profile(); }, + std::vector{}, + [](std::vector all, result_t result) { + all.push_back(std::move(result)); + return all; + }); + } + + co_return resp; +} diff --git a/src/v/resource_mgmt/memory_sampling.h b/src/v/resource_mgmt/memory_sampling.h index 71b612c80c8e..c1cab72784a2 100644 --- a/src/v/resource_mgmt/memory_sampling.h +++ b/src/v/resource_mgmt/memory_sampling.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,8 @@ #include #include +#include + template<> struct fmt::formatter : fmt::formatter { @@ -40,14 +43,42 @@ struct fmt::formatter /// Very simple service enabling memory profiling on all shards. class memory_sampling : public ss::peering_sharded_service { public: - ss::sstring - format_allocation_site(const ss::memory::allocation_site& backtrace); + struct serialized_memory_profile { + struct allocation_site { + // cumulative size at the allocation site (upscaled not sampled) + size_t size; + // count at the allocation site + size_t count; + // backtrace of this allocation site + ss::sstring backtrace; + + allocation_site(size_t size, size_t count, ss::sstring backtrace) + : size(size) + , count(count) + , backtrace(std::move(backtrace)) {} + }; + + /// shard id of this profile + ss::shard_id shard_id; + /// Backtraces of this shard + std::vector allocation_sites; + + explicit serialized_memory_profile( + long shard_id, std::vector traces) + : shard_id(shard_id) + , allocation_sites(std::move(traces)) {} + }; /// Starts the service (enables memory sampling, sets up additional OOM /// information and enables logging in highwatermark situations) void start(); ss::future<> stop(); + /// Get the serialized memory profile for a shard or all shards if shard_id + /// is nullopt + ss::future> + get_sampled_memory_profiles(std::optional shard_id); + /// Notify the memory sampling service that a memory reclaim from the /// seastar allocator has happened. Used by the batch_cache void notify_of_reclaim(); @@ -73,6 +104,10 @@ class memory_sampling : public ss::peering_sharded_service { /// available memory ss::future<> start_low_available_memory_logging(); + /// Returns the serialized memory_profile for the current shard + static memory_sampling::serialized_memory_profile + get_sampled_memory_profile(); + ss::logger& _logger; // When a memory reclaim from the seastar allocator happens the batch_cache diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index fdce9fa1a158..d40dfd51adbe 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -993,3 +993,17 @@ def set_disk_stat_override(self, f"debug/storage/disk_stat/{disk_type}", json=json, node=node) + + def get_sampled_memory_profile(self, node=None, shard=None): + """ + Gets the sampled memory profile debug output + """ + if shard is not None: + kwargs = {"params": {"shard": shard}} + else: + kwargs = {} + + return self._request("get", + "debug/sampled_memory_profile", + node=node, + **kwargs).json() diff --git a/tests/rptest/tests/memory_sampling_test.py b/tests/rptest/tests/memory_sampling_test.py new file mode 100644 index 000000000000..da24e0d5e1ed --- /dev/null +++ b/tests/rptest/tests/memory_sampling_test.py @@ -0,0 +1,66 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.admin import Admin +from rptest.services.cluster import cluster +from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.mode_checks import skip_debug_mode + +BOOTSTRAP_CONFIG = { + 'memory_enable_memory_sampling': True, +} + + +class MemorySamplingTestTest(RedpandaTest): + def __init__(self, *args, **kwargs): + rp_conf = BOOTSTRAP_CONFIG.copy() + + super(MemorySamplingTestTest, self).__init__(*args, + extra_rp_conf=rp_conf, + **kwargs) + + self.admin = Admin(self.redpanda) + + @cluster(num_nodes=1) + @skip_debug_mode # not using seastar allocator in debug + def test_get_all_stacks(self): + """ + Verify that the sampled_memory_profile GET endpoint serves valid json + with some profile data. + + """ + admin = Admin(self.redpanda) + profile = admin.get_sampled_memory_profile() + + assert len(profile) == 2 + assert 'shard' in profile[0] + assert 'allocation_sites' in profile[0] + assert len(profile[0]['allocation_sites']) > 0 + assert 'size' in profile[0]['allocation_sites'][0] + assert 'count' in profile[0]['allocation_sites'][0] + assert 'backtrace' in profile[0]['allocation_sites'][0] + + @cluster(num_nodes=1) + @skip_debug_mode # not using seastar allocator in debug + def test_get_per_shard_stacks(self): + """ + Verify that the sampled_memory_profile GET endpoint serves valid json + with some profile data with a shard parameter + + """ + admin = Admin(self.redpanda) + profile = admin.get_sampled_memory_profile(shard=1) + + assert len(profile) == 1 + assert 'shard' in profile[0] + assert 'allocation_sites' in profile[0] + assert len(profile[0]['allocation_sites']) > 0 + assert 'size' in profile[0]['allocation_sites'][0] + assert 'count' in profile[0]['allocation_sites'][0] + assert 'backtrace' in profile[0]['allocation_sites'][0] From 6c0f1c0960b61ac0d2710dfb1a1ec7b60040f215 Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Tue, 2 May 2023 11:48:46 +0100 Subject: [PATCH 4/5] config: Add config flag for memory profiling Adds a config flag to enable/disable the memory sampling. Flag can be changed without a restart. However, turning it on after the fact is not that useful because we will be missing lots of live allocations. It's still useful if profiling needs to be turned off for whatever reason. --- src/v/config/configuration.cc | 13 ++++++ src/v/config/configuration.h | 1 + .../tests/bootstrap_configuration_test.cc | 4 +- .../raft/tests/configuration_manager_test.cc | 4 +- src/v/raft/tests/foreign_entry_test.cc | 4 +- src/v/raft/tests/mux_state_machine_fixture.h | 4 +- src/v/raft/tests/offset_translator_tests.cc | 4 +- src/v/redpanda/application.cc | 6 ++- src/v/resource_mgmt/memory_sampling.cc | 41 +++++++++++++++---- src/v/resource_mgmt/memory_sampling.h | 9 +++- .../tests/memory_sampling_reclaimer_test.cc | 7 +++- .../tests/memory_sampling_tests.cc | 6 ++- .../storage/tests/batch_cache_reclaim_test.cc | 4 +- src/v/storage/tests/batch_cache_test.cc | 12 ++++-- src/v/storage/tests/log_manager_test.cc | 4 +- src/v/storage/tests/storage_test_fixture.h | 4 +- src/v/storage/tests/utils/disk_log_builder.cc | 4 +- src/v/test_utils/logs.h | 8 +++- 18 files changed, 112 insertions(+), 27 deletions(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index dddea31ca309..1b41f93de4c4 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1881,6 +1881,19 @@ configuration::configuration() "exception is thrown instead.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, true) + , sampled_memory_profile( + *this, + "memory_enable_memory_sampling", + "If true, memory allocations will be sampled and tracked. A sampled live " + "set of allocations can then be retrieved from the Admin API. " + "Additionally, we will periodically log the top-n allocation sites", + {// Enabling/Disabling this dynamically doesn't make much sense as for the + // memory profile to be meaning full you'll want to have this on from the + // beginning. However, we still provide the option to be able to disable + // it dynamically in case something goes wrong + .needs_restart = needs_restart::no, + .visibility = visibility::tunable}, + true) , enable_metrics_reporter( *this, "enable_metrics_reporter", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 994dbe96eb59..84b2efb3e10a 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -375,6 +375,7 @@ struct configuration final : public config_store { // memory related settings property memory_abort_on_alloc_failure; + property sampled_memory_profile; // metrics reporter property enable_metrics_reporter; diff --git a/src/v/raft/tests/bootstrap_configuration_test.cc b/src/v/raft/tests/bootstrap_configuration_test.cc index f9d2923ef0c8..0248f927b727 100644 --- a/src/v/raft/tests/bootstrap_configuration_test.cc +++ b/src/v/raft/tests/bootstrap_configuration_test.cc @@ -57,7 +57,9 @@ struct bootstrap_fixture : raft::simple_record_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - _memory_sampling_service.start(std::ref(_test_logger)).get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _storage.start().get(); // ignore the get_log() (void)_storage.log_mgr() diff --git a/src/v/raft/tests/configuration_manager_test.cc b/src/v/raft/tests/configuration_manager_test.cc index fde7bcfb5a16..02a5d3608e79 100644 --- a/src/v/raft/tests/configuration_manager_test.cc +++ b/src/v/raft/tests/configuration_manager_test.cc @@ -69,7 +69,9 @@ struct config_manager_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - _memory_sampling_service.start(std::ref(_test_logger)).get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _storage.start().get0(); } diff --git a/src/v/raft/tests/foreign_entry_test.cc b/src/v/raft/tests/foreign_entry_test.cc index 637c7a139f4b..281c29babb53 100644 --- a/src/v/raft/tests/foreign_entry_test.cc +++ b/src/v/raft/tests/foreign_entry_test.cc @@ -69,7 +69,9 @@ struct foreign_entry_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - _memory_sampling_service.start(std::ref(_test_logger)).get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _storage.start().get(); (void)_storage.log_mgr() .manage(storage::ntp_config(_ntp, "test.dir")) diff --git a/src/v/raft/tests/mux_state_machine_fixture.h b/src/v/raft/tests/mux_state_machine_fixture.h index 4e218614c8e9..e57ae04a8922 100644 --- a/src/v/raft/tests/mux_state_machine_fixture.h +++ b/src/v/raft/tests/mux_state_machine_fixture.h @@ -76,7 +76,9 @@ struct mux_state_machine_fixture { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - _memory_sampling_service.start(std::ref(_test_logger)).get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _group_mgr .start( diff --git a/src/v/raft/tests/offset_translator_tests.cc b/src/v/raft/tests/offset_translator_tests.cc index 930a1aef9836..eabdc31e77c3 100644 --- a/src/v/raft/tests/offset_translator_tests.cc +++ b/src/v/raft/tests/offset_translator_tests.cc @@ -51,7 +51,9 @@ struct base_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - _memory_sampling_service.start(std::ref(_test_logger)).get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _api .start( [this]() { return make_kv_cfg(); }, diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index c318fb9b3aa6..6583195315e5 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -398,7 +398,11 @@ void application::initialize( std::optional schema_reg_cfg, std::optional schema_reg_client_cfg, std::optional groups) { - construct_service(_memory_sampling, std::ref(_log)).get(); + construct_service( + _memory_sampling, std::ref(_log), ss::sharded_parameter([]() { + return config::shard_local_cfg().sampled_memory_profile.bind(); + })) + .get(); _memory_sampling.invoke_on_all(&memory_sampling::start).get(); // Set up the abort_on_oom value based on the associated cluster config diff --git a/src/v/resource_mgmt/memory_sampling.cc b/src/v/resource_mgmt/memory_sampling.cc index b08e8961093a..63783a827844 100644 --- a/src/v/resource_mgmt/memory_sampling.cc +++ b/src/v/resource_mgmt/memory_sampling.cc @@ -124,27 +124,50 @@ ss::future<> memory_sampling::start_low_available_memory_logging() { } } -memory_sampling::memory_sampling(ss::logger& logger) - : _logger(logger) - , _first_log_limit_fraction(0.2) - , _second_log_limit_fraction(0.1) {} +memory_sampling::memory_sampling( + ss::logger& logger, config::binding enabled) + : memory_sampling(logger, std::move(enabled), 0.2, 0.1) {} memory_sampling::memory_sampling( ss::logger& logger, + config::binding enabled, double first_log_limit_fraction, double second_log_limit_fraction) : _logger(logger) + , _enabled(std::move(enabled)) , _first_log_limit_fraction(first_log_limit_fraction) - , _second_log_limit_fraction(second_log_limit_fraction) {} - -void memory_sampling::start() { - setup_additional_oom_diagnostics(); + , _second_log_limit_fraction(second_log_limit_fraction) { + _enabled.watch([this]() { on_enabled_change(); }); +} +void memory_sampling::on_enabled_change() { // We chose a sampling rate of ~3MB. From testing this has a very low // overhead of something like ~1%. We could still get away with something // smaller like 1MB and have acceptable overhead (~3%) but 3MB should be a // safer default for the initial rollout. - ss::memory::set_heap_profiling_sampling_rate(3000037); + const size_t sampling_rate = 3000037; + + // Note no logging here as seastar already logs about this + if (_enabled()) { + if (ss::memory::get_heap_profiling_sample_rate() == sampling_rate) { + return; + } + + ss::memory::set_heap_profiling_sampling_rate(sampling_rate); + } else { + if (ss::memory::get_heap_profiling_sample_rate() == 0) { + return; + } + + ss::memory::set_heap_profiling_sampling_rate(0); + } +} + +void memory_sampling::start() { + setup_additional_oom_diagnostics(); + + // start now if enabled + on_enabled_change(); ssx::spawn_with_gate(_low_watermark_gate, [this]() { return start_low_available_memory_logging(); diff --git a/src/v/resource_mgmt/memory_sampling.h b/src/v/resource_mgmt/memory_sampling.h index c1cab72784a2..022fd455079d 100644 --- a/src/v/resource_mgmt/memory_sampling.h +++ b/src/v/resource_mgmt/memory_sampling.h @@ -11,6 +11,7 @@ #pragma once +#include "config/property.h" #include "seastarx.h" #include @@ -85,12 +86,13 @@ class memory_sampling : public ss::peering_sharded_service { /// Constructs the service. Logger will be used to log top stacks under high /// memory pressure - explicit memory_sampling(ss::logger& logger); + explicit memory_sampling(ss::logger& logger, config::binding enabled); /// Constructor as above but allows overriding high memory thresholds. Used /// for testing. explicit memory_sampling( ss::logger& logger, + config::binding enabled, double first_log_limit_fraction, double second_log_limit_fraction); @@ -108,8 +110,13 @@ class memory_sampling : public ss::peering_sharded_service { static memory_sampling::serialized_memory_profile get_sampled_memory_profile(); + void on_enabled_change(); + ss::logger& _logger; + /// Are we currently sampling memory + config::binding _enabled; + // When a memory reclaim from the seastar allocator happens the batch_cache // notifies us via below condvar. If we see a new low watermark that's and // we are below 20% then we log once and a second time the first time we saw diff --git a/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc b/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc index f244e40f7e4c..555885b18f7c 100644 --- a/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc +++ b/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc @@ -35,7 +35,12 @@ SEASTAR_THREAD_TEST_CASE(reclaim_notifies_memory_sampling) { std::string_view needle("Top-N alloc"); const auto first_log_limit = 0.80; - memory_sampling_service.start(std::ref(test_logger), first_log_limit, 0.2) + memory_sampling_service + .start( + std::ref(test_logger), + config::mock_binding(true), + first_log_limit, + 0.2) .get(); memory_sampling_service.invoke_on_all(&memory_sampling::start).get(); diff --git a/src/v/resource_mgmt/tests/memory_sampling_tests.cc b/src/v/resource_mgmt/tests/memory_sampling_tests.cc index 71fa38442976..205db00652c2 100644 --- a/src/v/resource_mgmt/tests/memory_sampling_tests.cc +++ b/src/v/resource_mgmt/tests/memory_sampling_tests.cc @@ -81,7 +81,11 @@ SEASTAR_THREAD_TEST_CASE(test_low_watermark_logging) { const auto first_log_limit = 0.90; const auto second_log_limit = 0.80; - memory_sampling sampling(dummy_logger, first_log_limit, second_log_limit); + memory_sampling sampling( + dummy_logger, + config::mock_binding(true), + first_log_limit, + second_log_limit); sampling.start(); std::string_view needle("Top-N alloc"); diff --git a/src/v/storage/tests/batch_cache_reclaim_test.cc b/src/v/storage/tests/batch_cache_reclaim_test.cc index d64d86a4df91..5d56802b2786 100644 --- a/src/v/storage/tests/batch_cache_reclaim_test.cc +++ b/src/v/storage/tests/batch_cache_reclaim_test.cc @@ -47,7 +47,9 @@ FIXTURE_TEST(reclaim, fixture) { seastar::logger test_logger("test"); ss::sharded memory_sampling_service; - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); storage::batch_cache cache(opts, memory_sampling_service); storage::batch_cache_index index(cache); diff --git a/src/v/storage/tests/batch_cache_test.cc b/src/v/storage/tests/batch_cache_test.cc index 2edd6ddef999..e418bbb35eee 100644 --- a/src/v/storage/tests/batch_cache_test.cc +++ b/src/v/storage/tests/batch_cache_test.cc @@ -50,7 +50,9 @@ class batch_cache_test_fixture { batch_cache_test_fixture() : test_logger("batch-cache-test") , cache(opts, memory_sampling_service) { - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); } auto& get_lru() { return cache._lru; }; @@ -145,7 +147,9 @@ SEASTAR_THREAD_TEST_CASE(touch) { seastar::logger test_logger("test"); ss::sharded memory_sampling_service; - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); auto action = ss::defer( [&memory_sampling_service] { memory_sampling_service.stop().get(); }); @@ -169,7 +173,9 @@ SEASTAR_THREAD_TEST_CASE(touch) { // build the cache the same way seastar::logger test_logger("test"); ss::sharded memory_sampling_service; - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); auto action = ss::defer( [&memory_sampling_service] { memory_sampling_service.stop().get(); }); diff --git a/src/v/storage/tests/log_manager_test.cc b/src/v/storage/tests/log_manager_test.cc index 2a440a090cb4..6fe7907cf7db 100644 --- a/src/v/storage/tests/log_manager_test.cc +++ b/src/v/storage/tests/log_manager_test.cc @@ -71,7 +71,9 @@ SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { .get(); ss::sharded memory_sampling_service; - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); storage::api store( [conf]() { diff --git a/src/v/storage/tests/storage_test_fixture.h b/src/v/storage/tests/storage_test_fixture.h index 45a58ece51aa..13c5c259f77f 100644 --- a/src/v/storage/tests/storage_test_fixture.h +++ b/src/v/storage/tests/storage_test_fixture.h @@ -217,7 +217,9 @@ class storage_test_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - memory_sampling_service.start(std::ref(tlog)).get(); + memory_sampling_service + .start(std::ref(tlog), config::mock_binding(false)) + .get(); kvstore.start().get(); } diff --git a/src/v/storage/tests/utils/disk_log_builder.cc b/src/v/storage/tests/utils/disk_log_builder.cc index 6fe96f212d2e..658dde59218a 100644 --- a/src/v/storage/tests/utils/disk_log_builder.cc +++ b/src/v/storage/tests/utils/disk_log_builder.cc @@ -34,7 +34,9 @@ disk_log_builder::disk_log_builder(storage::log_config config) [this]() { return _log_config; }, _feature_table, _memory_sampling_service) { - _memory_sampling_service.start(std::ref(_test_logger)).get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); } // Batch generation diff --git a/src/v/test_utils/logs.h b/src/v/test_utils/logs.h index 69158e44f744..c46bcd406415 100644 --- a/src/v/test_utils/logs.h +++ b/src/v/test_utils/logs.h @@ -40,7 +40,9 @@ static inline ss::future<> persist_log_file( seastar::logger test_logger("test"); ss::sharded memory_sampling_service; - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); ss::sharded storage; storage @@ -122,7 +124,9 @@ read_log_file(ss::sstring base_dir, model::ntp file_ntp) { seastar::logger test_logger("test"); ss::sharded memory_sampling_service; - memory_sampling_service.start(std::ref(test_logger)).get(); + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); ss::sharded storage; storage From ec26919b33984f5c5c8c5606e48ad97ad66e142f Mon Sep 17 00:00:00 2001 From: Stephan Dollberg Date: Wed, 14 Jun 2023 09:43:11 +0100 Subject: [PATCH 5/5] cmake: Update OSS cmake seastar pointer Match vtools seastar pointer and pull in the memory sampling changes. --- cmake/oss.cmake.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/oss.cmake.in b/cmake/oss.cmake.in index e7745c80d685..9ecc8dba5697 100644 --- a/cmake/oss.cmake.in +++ b/cmake/oss.cmake.in @@ -179,7 +179,7 @@ ExternalProject_Add(fmt ExternalProject_Add(seastar GIT_REPOSITORY https://github.com/redpanda-data/seastar.git - GIT_TAG 777ad7c4c1e280c63877b80036a5b15fd0a6388a + GIT_TAG 6e869a2068ab27ca84ffbe0fe7f7f172fdcde01c INSTALL_DIR @REDPANDA_DEPS_INSTALL_DIR@ CMAKE_COMMAND ${CMAKE_COMMAND} -E env ${cmake_build_env} ${CMAKE_COMMAND} LIST_SEPARATOR |