diff --git a/cmake/oss.cmake.in b/cmake/oss.cmake.in index e7745c80d6855..9ecc8dba56978 100644 --- a/cmake/oss.cmake.in +++ b/cmake/oss.cmake.in @@ -179,7 +179,7 @@ ExternalProject_Add(fmt ExternalProject_Add(seastar GIT_REPOSITORY https://github.com/redpanda-data/seastar.git - GIT_TAG 777ad7c4c1e280c63877b80036a5b15fd0a6388a + GIT_TAG 6e869a2068ab27ca84ffbe0fe7f7f172fdcde01c INSTALL_DIR @REDPANDA_DEPS_INSTALL_DIR@ CMAKE_COMMAND ${CMAKE_COMMAND} -E env ${cmake_build_env} ${CMAKE_COMMAND} LIST_SEPARATOR | diff --git a/src/v/cluster/tests/local_monitor_fixture.h b/src/v/cluster/tests/local_monitor_fixture.h index b6889daf4f857..e7d0616938a39 100644 --- a/src/v/cluster/tests/local_monitor_fixture.h +++ b/src/v/cluster/tests/local_monitor_fixture.h @@ -11,9 +11,11 @@ #pragma once #include "cluster/node/local_monitor.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include +#include #include diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index dddea31ca3096..1b41f93de4c4c 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1881,6 +1881,19 @@ configuration::configuration() "exception is thrown instead.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, true) + , sampled_memory_profile( + *this, + "memory_enable_memory_sampling", + "If true, memory allocations will be sampled and tracked. A sampled live " + "set of allocations can then be retrieved from the Admin API. " + "Additionally, we will periodically log the top-n allocation sites", + {// Enabling/Disabling this dynamically doesn't make much sense as for the + // memory profile to be meaning full you'll want to have this on from the + // beginning. However, we still provide the option to be able to disable + // it dynamically in case something goes wrong + .needs_restart = needs_restart::no, + .visibility = visibility::tunable}, + true) , enable_metrics_reporter( *this, "enable_metrics_reporter", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 994dbe96eb592..84b2efb3e10aa 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -375,6 +375,7 @@ struct configuration final : public config_store { // memory related settings property memory_abort_on_alloc_failure; + property sampled_memory_profile; // metrics reporter property enable_metrics_reporter; diff --git a/src/v/raft/tests/bootstrap_configuration_test.cc b/src/v/raft/tests/bootstrap_configuration_test.cc index 514145e63912a..0248f927b727a 100644 --- a/src/v/raft/tests/bootstrap_configuration_test.cc +++ b/src/v/raft/tests/bootstrap_configuration_test.cc @@ -16,6 +16,7 @@ #include "raft/consensus_utils.h" #include "random/generators.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/log.h" #include "storage/log_manager.h" @@ -49,12 +50,16 @@ struct bootstrap_fixture : raft::simple_record_fixture { storage::with_cache::no, storage::make_sanitized_file_config()); }, - _feature_table) { + _feature_table, + _memory_sampling_service) { _feature_table.start().get(); _feature_table .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _storage.start().get(); // ignore the get_log() (void)_storage.log_mgr() @@ -81,10 +86,13 @@ struct bootstrap_fixture : raft::simple_record_fixture { ~bootstrap_fixture() { _storage.stop().get(); + _memory_sampling_service.stop().get(); _feature_table.stop().get(); } + seastar::logger _test_logger{"bootstrap-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::api _storage; ss::abort_source _as; }; diff --git a/src/v/raft/tests/configuration_manager_test.cc b/src/v/raft/tests/configuration_manager_test.cc index c74be452e9054..02a5d3608e79d 100644 --- a/src/v/raft/tests/configuration_manager_test.cc +++ b/src/v/raft/tests/configuration_manager_test.cc @@ -16,6 +16,7 @@ #include "raft/logger.h" #include "raft/types.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -25,6 +26,7 @@ #include "units.h" #include +#include #include @@ -51,7 +53,8 @@ struct config_manager_fixture { ss::default_priority_class(), storage::make_sanitized_file_config()); }, - _feature_table)) + _feature_table, + _memory_sampling_service)) , _logger( raft::group_id(1), model::ntp(model::ns("t"), model::topic("t"), model::partition_id(0))) @@ -66,19 +69,25 @@ struct config_manager_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _storage.start().get0(); } ss::sstring base_dir = "test_cfg_manager_" + random_generators::gen_alphanum_string(6); + ss::logger _test_logger{"config-mgmr-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::api _storage; raft::ctx_log _logger; raft::configuration_manager _cfg_mgr; ~config_manager_fixture() { - _feature_table.stop().get(); _storage.stop().get0(); + _memory_sampling_service.stop().get(); + _feature_table.stop().get(); } raft::group_configuration random_configuration() { diff --git a/src/v/raft/tests/foreign_entry_test.cc b/src/v/raft/tests/foreign_entry_test.cc index 2f5ce825e0616..281c29babb53e 100644 --- a/src/v/raft/tests/foreign_entry_test.cc +++ b/src/v/raft/tests/foreign_entry_test.cc @@ -21,6 +21,7 @@ #include "raft/types.h" #include "random/generators.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/log.h" #include "storage/log_manager.h" @@ -61,12 +62,16 @@ struct foreign_entry_fixture { ss::default_priority_class(), storage::make_sanitized_file_config()); }, - _feature_table) { + _feature_table, + _memory_sampling_service) { _feature_table.start().get(); _feature_table .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _storage.start().get(); (void)_storage.log_mgr() .manage(storage::ntp_config(_ntp, "test.dir")) @@ -136,10 +141,13 @@ struct foreign_entry_fixture { } ~foreign_entry_fixture() { _storage.stop().get(); + _memory_sampling_service.stop().get(); _feature_table.stop().get(); } model::offset _base_offset{0}; + ss::logger _test_logger{"foreign-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::api _storage; storage::log get_log() { return _storage.log_mgr().get(_ntp).value(); } model::ntp _ntp{ diff --git a/src/v/raft/tests/mux_state_machine_fixture.h b/src/v/raft/tests/mux_state_machine_fixture.h index faa3ade39c468..e57ae04a89225 100644 --- a/src/v/raft/tests/mux_state_machine_fixture.h +++ b/src/v/raft/tests/mux_state_machine_fixture.h @@ -19,6 +19,7 @@ #include "raft/mux_state_machine.h" #include "raft/types.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/connection_cache.h" #include "storage/api.h" #include "storage/kvstore.h" @@ -57,7 +58,8 @@ struct mux_state_machine_fixture { .start( [kv_conf]() { return kv_conf; }, [this]() { return default_log_cfg(); }, - std::ref(_feature_table)) + std::ref(_feature_table), + std::ref(_memory_sampling_service)) .get0(); _storage.invoke_on_all(&storage::api::start).get0(); _as.start().get(); @@ -74,6 +76,10 @@ struct mux_state_machine_fixture { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); + _group_mgr .start( _self, @@ -139,9 +145,10 @@ struct mux_state_machine_fixture { if (_raft) { _raft.release(); } - _connections.stop().get0(); - _feature_table.stop().get0(); - _storage.stop().get0(); + _connections.stop().get(); + _storage.stop().get(); + _memory_sampling_service.stop().get(); + _feature_table.stop().get(); _as.stop().get(); } } @@ -189,11 +196,13 @@ struct mux_state_machine_fixture { model::ntp _ntp = model::ntp( model::ns("default"), model::topic("test"), model::partition_id(0)); + ss::logger _test_logger{"mux-test-logger"}; ss::sstring _data_dir; cluster::consensus_ptr _raft; ss::sharded _as; ss::sharded _connections; ss::sharded _storage; + ss::sharded _memory_sampling_service; ss::sharded _feature_table; ss::sharded _group_mgr; ss::sharded _recovery_throttle; diff --git a/src/v/raft/tests/offset_translator_tests.cc b/src/v/raft/tests/offset_translator_tests.cc index d7c60ae936387..eabdc31e77c30 100644 --- a/src/v/raft/tests/offset_translator_tests.cc +++ b/src/v/raft/tests/offset_translator_tests.cc @@ -10,6 +10,7 @@ #include "model/fundamental.h" #include "raft/offset_translator.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/fwd.h" #include "storage/kvstore.h" @@ -50,11 +51,15 @@ struct base_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); _api .start( [this]() { return make_kv_cfg(); }, [this]() { return make_log_cfg(); }, - std::ref(_feature_table)) + std::ref(_feature_table), + std::ref(_memory_sampling_service)) .get(); _api.invoke_on_all(&storage::api::start).get(); } @@ -87,11 +92,14 @@ struct base_fixture { model::ntp test_ntp = model::ntp( model::ns("test"), model::topic("tp"), model::partition_id(0)); ss::sstring _test_dir; + ss::logger _test_logger{"offset-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; ss::sharded _api; ~base_fixture() { _api.stop().get(); + _memory_sampling_service.stop().get(); _feature_table.stop().get(); } }; diff --git a/src/v/raft/tests/raft_group_fixture.h b/src/v/raft/tests/raft_group_fixture.h index 42f950e3c51a1..73a6f73ba053a 100644 --- a/src/v/raft/tests/raft_group_fixture.h +++ b/src/v/raft/tests/raft_group_fixture.h @@ -26,6 +26,7 @@ #include "raft/rpc_client_protocol.h" #include "raft/service.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/backoff_policy.h" #include "rpc/connection_cache.h" #include "rpc/rpc_server.h" @@ -130,7 +131,8 @@ struct raft_node { ss::default_priority_class(), storage::make_sanitized_file_config()); }, - std::ref(feature_table)) + std::ref(feature_table), + std::ref(memory_sampling_service)) .get(); storage.invoke_on_all(&storage::api::start).get(); @@ -362,6 +364,7 @@ struct raft_node { consensus_ptr consensus; std::unique_ptr _nop_stm; ss::sharded feature_table; + ss::sharded memory_sampling_service; ss::abort_source _as; }; diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index 37df25896d225..66d41ce548e8a 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -205,6 +205,32 @@ } ] }, + { + "path": "/v1/debug/sampled_memory_profile", + "operations": [ + { + "method": "GET", + "summary": "Get the currently sampled live memory set for the specified or all shards", + "nickname": "sampled_memory_profile", + "produces": [ + "application/json" + ], + "type": "array", + "items": { + "type": "memory_profile" + }, + "parameters": [ + { + "name": "shard", + "in": "query", + "required": false, + "allowMultiple": false, + "type": "long" + } + ] + } + ] + }, { "path": "/v1/debug/refresh_disk_health_info", "operations": [ @@ -497,6 +523,40 @@ } } }, + "memory_profile": { + "id": "memory_profile", + "description": "Sampled memory profile of a shard", + "properties": { + "shard": { + "type": "long", + "description": "Id of the shard the profile is from" + }, + "allocation_sites": { + "type": "array", + "items": { + "type": "allocation_site" + } + } + } + }, + "allocation_site": { + "id": "allocation_site", + "description": "A single allocation site with backtrace, size and count", + "properties": { + "size": { + "type": "long", + "description": "Current bytes allocated at this allocation site (note this is the upscaled size and not the sampled one)" + }, + "count": { + "type": "long", + "description": "Live allocations at this site" + }, + "backtrace": { + "type": "string", + "description": "Backtrace of this allocation site" + } + } + }, "controller_status": { "id": "controller_status", "description": "Controller status", diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index f0b0e36c99b90..d53cccedd1d62 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -78,6 +78,7 @@ #include "redpanda/admin/api-doc/status.json.h" #include "redpanda/admin/api-doc/transaction.json.h" #include "redpanda/admin/api-doc/usage.json.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/errc.h" #include "security/acl.h" #include "security/credential_store.h" @@ -93,9 +94,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -117,9 +120,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -203,7 +208,8 @@ admin_server::admin_server( ss::sharded& topic_recovery_status_frontend, ss::sharded& tx_registry_frontend, - ss::sharded& storage_node) + ss::sharded& storage_node, + ss::sharded& memory_sampling_service) : _log_level_timer([this] { log_level_timer_handler(); }) , _server("admin") , _cfg(std::move(cfg)) @@ -225,6 +231,7 @@ admin_server::admin_server( , _topic_recovery_status_frontend(topic_recovery_status_frontend) , _tx_registry_frontend(tx_registry_frontend) , _storage_node(storage_node) + , _memory_sampling_service(memory_sampling_service) , _default_blocked_reactor_notify( ss::engine().get_blocked_reactor_notify_ms()) {} @@ -4128,6 +4135,13 @@ void admin_server::register_debug_routes() { }); }); + register_route( + ss::httpd::debug_json::sampled_memory_profile, + [this](std::unique_ptr req) + -> ss::future { + return sampled_memory_profile_handler(std::move(req)); + }); + register_route( ss::httpd::debug_json::restart_service, [this](std::unique_ptr req) { @@ -5005,3 +5019,46 @@ admin_server::restart_service_handler(std::unique_ptr req) { co_await restart_redpanda_service(*service); co_return ss::json::json_return_type(ss::json::json_void()); } + +ss::future +admin_server::sampled_memory_profile_handler( + std::unique_ptr req) { + vlog(logger.info, "Request to sampled memory profile"); + + std::optional shard_id; + if (auto e = req->get_query_param("shard"); !e.empty()) { + try { + shard_id = boost::lexical_cast(e); + } catch (const boost::bad_lexical_cast&) { + throw ss::httpd::bad_param_exception( + fmt::format("Invalid parameter 'shard_id' value {{{}}}", e)); + } + } + + if (shard_id.has_value()) { + auto max_shard_id = ss::smp::count; + if (*shard_id > max_shard_id) { + throw ss::httpd::bad_param_exception(fmt::format( + "Shard id too high, max shard id is {}", max_shard_id)); + } + } + + auto profiles = co_await _memory_sampling_service.local() + .get_sampled_memory_profiles(shard_id); + + std::vector resp(profiles.size()); + for (size_t i = 0; i < resp.size(); ++i) { + resp[i].shard = profiles[i].shard_id; + + for (auto& allocation_sites : profiles[i].allocation_sites) { + ss::httpd::debug_json::allocation_site allocation_site; + allocation_site.size = allocation_sites.size; + allocation_site.count = allocation_sites.count; + allocation_site.backtrace = std::move(allocation_sites.backtrace); + resp[i].allocation_sites.push(allocation_site); + } + } + + co_return co_await ss::make_ready_future( + std::move(resp)); +} diff --git a/src/v/redpanda/admin_server.h b/src/v/redpanda/admin_server.h index f60eb5e0a8998..0dfcafc8a2b37 100644 --- a/src/v/redpanda/admin_server.h +++ b/src/v/redpanda/admin_server.h @@ -18,12 +18,14 @@ #include "model/metadata.h" #include "pandaproxy/rest/fwd.h" #include "pandaproxy/schema_registry/fwd.h" +#include "resource_mgmt/memory_sampling.h" #include "rpc/connection_cache.h" #include "seastarx.h" #include "storage/node.h" #include "utils/request_auth.h" #include +#include #include #include #include @@ -73,7 +75,8 @@ class admin_server { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future<> start(); ss::future<> stop(); @@ -460,6 +463,8 @@ class admin_server { cloud_storage_usage_handler(std::unique_ptr); ss::future restart_service_handler(std::unique_ptr); + ss::future + sampled_memory_profile_handler(std::unique_ptr); ss::future<> throw_on_error( ss::http::request& req, @@ -510,6 +515,7 @@ class admin_server { _topic_recovery_status_frontend; ss::sharded& _tx_registry_frontend; ss::sharded& _storage_node; + ss::sharded& _memory_sampling_service; // Value before the temporary override std::chrono::milliseconds _default_blocked_reactor_notify; ss::timer<> _blocked_reactor_notify_reset_timer; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 66317b13e4b17..6583195315e58 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -82,6 +82,7 @@ #include "raft/service.h" #include "redpanda/admin_server.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/thread_worker.h" #include "storage/backlog_controller.h" #include "storage/chunk_cache.h" @@ -111,6 +112,7 @@ #include #include +#include #include #include #include @@ -396,6 +398,13 @@ void application::initialize( std::optional schema_reg_cfg, std::optional schema_reg_client_cfg, std::optional groups) { + construct_service( + _memory_sampling, std::ref(_log), ss::sharded_parameter([]() { + return config::shard_local_cfg().sampled_memory_profile.bind(); + })) + .get(); + _memory_sampling.invoke_on_all(&memory_sampling::start).get(); + // Set up the abort_on_oom value based on the associated cluster config // property, and watch for changes. _abort_on_oom @@ -844,7 +853,8 @@ void application::configure_admin_server() { std::ref(topic_recovery_service), std::ref(topic_recovery_status_frontend), std::ref(tx_registry_frontend), - std::ref(storage_node)) + std::ref(storage_node), + std::ref(_memory_sampling)) .get(); } @@ -1673,7 +1683,8 @@ void application::wire_up_bootstrap_services() { = sched_groups.cache_background_reclaim_sg(); return log_cfg; }, - std::ref(feature_table)) + std::ref(feature_table), + std::ref(_memory_sampling)) .get(); // Hook up local_monitor to update storage_resources when disk state changes diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 6ea88e2d1ef66..4492dda2848c0 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -40,6 +40,7 @@ #include "redpanda/monitor_unsafe_log_flag.h" #include "resource_mgmt/cpu_scheduling.h" #include "resource_mgmt/memory_groups.h" +#include "resource_mgmt/memory_sampling.h" #include "resource_mgmt/scheduling_groups_probe.h" #include "resource_mgmt/smp_groups.h" #include "rpc/fwd.h" @@ -248,6 +249,7 @@ class application { std::optional> _abort_on_oom; + ss::sharded _memory_sampling; ss::sharded _connection_cache; ss::sharded _group_manager; ss::sharded _rpc; diff --git a/src/v/resource_mgmt/CMakeLists.txt b/src/v/resource_mgmt/CMakeLists.txt index 422f6d137e9e9..3b93f8a90d72b 100644 --- a/src/v/resource_mgmt/CMakeLists.txt +++ b/src/v/resource_mgmt/CMakeLists.txt @@ -2,6 +2,7 @@ v_cc_library( NAME resource_mgmt SRCS available_memory.cc + memory_sampling.cc DEPS Seastar::seastar ) diff --git a/src/v/resource_mgmt/memory_sampling.cc b/src/v/resource_mgmt/memory_sampling.cc new file mode 100644 index 0000000000000..63783a827844b --- /dev/null +++ b/src/v/resource_mgmt/memory_sampling.cc @@ -0,0 +1,222 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "resource_mgmt/memory_sampling.h" + +#include "resource_mgmt/available_memory.h" +#include "ssx/future-util.h" +#include "ssx/sformat.h" +#include "vlog.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +constexpr std::string_view diagnostics_header() { + return "Top-N alloc sites - size count stack:"; +} + +/// Put `top_n` allocation sites into the front of `allocation_sites` +static void top_n_allocation_sites( + std::vector& allocation_sites, size_t top_n) { + std::partial_sort( + allocation_sites.begin(), + allocation_sites.begin() + top_n, + allocation_sites.end(), + [](const auto& lhs, const auto& rhs) { return lhs.size > rhs.size; }); +} + +ss::noncopyable_function +memory_sampling::get_oom_diagnostics_callback() { + // preallocate those so that we don't allocate on OOM + std::vector allocation_sites(1000); + std::vector format_buf(1000); + + return [allocation_sites = std::move(allocation_sites), + format_buf = std::move(format_buf)]( + seastar::memory::memory_diagnostics_writer writer) mutable { + auto num_sites = ss::memory::sampled_memory_profile(allocation_sites); + + const size_t top_n = std::min(size_t(10), num_sites); + top_n_allocation_sites(allocation_sites, top_n); + + writer(diagnostics_header()); + writer("\n"); + + for (size_t i = 0; i < top_n; ++i) { + auto bytes_written = fmt::format_to_n( + format_buf.begin(), + format_buf.size(), + "{}", + allocation_sites[i]) + .size; + + writer(std::string_view(format_buf.data(), bytes_written)); + } + }; +} + +/// We want to print the top-n allocation sites on OOM +/// Set this up and make sure we don't allocate extra at that point +void setup_additional_oom_diagnostics() { + seastar::memory::set_additional_diagnostics_producer( + memory_sampling::get_oom_diagnostics_callback()); +} + +void memory_sampling::notify_of_reclaim() { _low_watermark_cond.signal(); } + +ss::future<> memory_sampling::start_low_available_memory_logging() { + // We want some periodic logging "on the way" to OOM. At the same time we + // don't want to spam the logs. Hence, we periodically look at the available + // memory low watermark (this is without the batch cache). If we see that we + // have crossed the 10% and 20% marks we log the allocation sites. We stop + // afterwards. + + size_t first_log_limit = _first_log_limit_fraction + * seastar::memory::stats().total_memory(); + size_t second_log_limit = _second_log_limit_fraction + * seastar::memory::stats().total_memory(); + size_t next_log_limit = first_log_limit; + + while (true) { + try { + co_await _low_watermark_cond.wait([&next_log_limit]() { + auto current_low_water_mark + = resources::available_memory::local() + .available_low_water_mark(); + + return current_low_water_mark <= next_log_limit; + }); + } catch (const ss::broken_condition_variable&) { + co_return; + } + + auto allocation_sites = ss::memory::sampled_memory_profile(); + const size_t top_n = std::min(size_t(5), allocation_sites.size()); + top_n_allocation_sites(allocation_sites, top_n); + + vlog( + _logger.info, + "{} {}", + diagnostics_header(), + fmt::join( + allocation_sites.begin(), allocation_sites.begin() + top_n, "|")); + + if (next_log_limit == first_log_limit) { + next_log_limit = second_log_limit; + } else { + co_return; + } + } +} + +memory_sampling::memory_sampling( + ss::logger& logger, config::binding enabled) + : memory_sampling(logger, std::move(enabled), 0.2, 0.1) {} + +memory_sampling::memory_sampling( + ss::logger& logger, + config::binding enabled, + double first_log_limit_fraction, + double second_log_limit_fraction) + : _logger(logger) + , _enabled(std::move(enabled)) + , _first_log_limit_fraction(first_log_limit_fraction) + , _second_log_limit_fraction(second_log_limit_fraction) { + _enabled.watch([this]() { on_enabled_change(); }); +} + +void memory_sampling::on_enabled_change() { + // We chose a sampling rate of ~3MB. From testing this has a very low + // overhead of something like ~1%. We could still get away with something + // smaller like 1MB and have acceptable overhead (~3%) but 3MB should be a + // safer default for the initial rollout. + const size_t sampling_rate = 3000037; + + // Note no logging here as seastar already logs about this + if (_enabled()) { + if (ss::memory::get_heap_profiling_sample_rate() == sampling_rate) { + return; + } + + ss::memory::set_heap_profiling_sampling_rate(sampling_rate); + } else { + if (ss::memory::get_heap_profiling_sample_rate() == 0) { + return; + } + + ss::memory::set_heap_profiling_sampling_rate(0); + } +} + +void memory_sampling::start() { + setup_additional_oom_diagnostics(); + + // start now if enabled + on_enabled_change(); + + ssx::spawn_with_gate(_low_watermark_gate, [this]() { + return start_low_available_memory_logging(); + }); +} + +ss::future<> memory_sampling::stop() { + _low_watermark_cond.broken(); + + co_await _low_watermark_gate.close(); +} + +memory_sampling::serialized_memory_profile +memory_sampling::get_sampled_memory_profile() { + auto stacks = ss::memory::sampled_memory_profile(); + + std::vector + allocation_sites; + allocation_sites.reserve(stacks.size()); + + for (auto& stack : stacks) { + allocation_sites.emplace_back( + stack.size, + stack.count, + ssx::sformat("{}", std::move(stack.backtrace))); + } + + return memory_sampling::serialized_memory_profile{ + ss::this_shard_id(), std::move(allocation_sites)}; +} + +ss::future> +memory_sampling::get_sampled_memory_profiles(std::optional shard_id) { + using result_t = memory_sampling::serialized_memory_profile; + std::vector resp; + + if (shard_id.has_value()) { + resp.push_back(co_await container().invoke_on(*shard_id, [](auto&) { + return memory_sampling::get_sampled_memory_profile(); + })); + } else { + resp = co_await container().map_reduce0( + [](auto&) { return memory_sampling::get_sampled_memory_profile(); }, + std::vector{}, + [](std::vector all, result_t result) { + all.push_back(std::move(result)); + return all; + }); + } + + co_return resp; +} diff --git a/src/v/resource_mgmt/memory_sampling.h b/src/v/resource_mgmt/memory_sampling.h new file mode 100644 index 0000000000000..022fd455079dd --- /dev/null +++ b/src/v/resource_mgmt/memory_sampling.h @@ -0,0 +1,128 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "config/property.h" +#include "seastarx.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +template<> +struct fmt::formatter + : fmt::formatter { + template + auto + format(const seastar::memory::allocation_site& site, FormatContext& ctx) { + return fmt::format_to( + ctx.out(), "{} {} {}", site.size, site.count, site.backtrace); + } +}; + +/// Very simple service enabling memory profiling on all shards. +class memory_sampling : public ss::peering_sharded_service { +public: + struct serialized_memory_profile { + struct allocation_site { + // cumulative size at the allocation site (upscaled not sampled) + size_t size; + // count at the allocation site + size_t count; + // backtrace of this allocation site + ss::sstring backtrace; + + allocation_site(size_t size, size_t count, ss::sstring backtrace) + : size(size) + , count(count) + , backtrace(std::move(backtrace)) {} + }; + + /// shard id of this profile + ss::shard_id shard_id; + /// Backtraces of this shard + std::vector allocation_sites; + + explicit serialized_memory_profile( + long shard_id, std::vector traces) + : shard_id(shard_id) + , allocation_sites(std::move(traces)) {} + }; + + /// Starts the service (enables memory sampling, sets up additional OOM + /// information and enables logging in highwatermark situations) + void start(); + ss::future<> stop(); + + /// Get the serialized memory profile for a shard or all shards if shard_id + /// is nullopt + ss::future> + get_sampled_memory_profiles(std::optional shard_id); + + /// Notify the memory sampling service that a memory reclaim from the + /// seastar allocator has happened. Used by the batch_cache + void notify_of_reclaim(); + + /// Constructs the service. Logger will be used to log top stacks under high + /// memory pressure + explicit memory_sampling(ss::logger& logger, config::binding enabled); + + /// Constructor as above but allows overriding high memory thresholds. Used + /// for testing. + explicit memory_sampling( + ss::logger& logger, + config::binding enabled, + double first_log_limit_fraction, + double second_log_limit_fraction); + + /// Returns the callback we register to run on OOM to add the memory + /// sampling output + static ss::noncopyable_function + get_oom_diagnostics_callback(); + +private: + /// Starts the background future running the allocation site logging on low + /// available memory + ss::future<> start_low_available_memory_logging(); + + /// Returns the serialized memory_profile for the current shard + static memory_sampling::serialized_memory_profile + get_sampled_memory_profile(); + + void on_enabled_change(); + + ss::logger& _logger; + + /// Are we currently sampling memory + config::binding _enabled; + + // When a memory reclaim from the seastar allocator happens the batch_cache + // notifies us via below condvar. If we see a new low watermark that's and + // we are below 20% then we log once and a second time the first time we saw + // a 10% lower watermark. Values are overridable for tests + double _first_log_limit_fraction; + double _second_log_limit_fraction; + ss::condition_variable _low_watermark_cond; + ss::gate _low_watermark_gate; +}; diff --git a/src/v/resource_mgmt/tests/CMakeLists.txt b/src/v/resource_mgmt/tests/CMakeLists.txt index 05f8627a90a1c..6605737470d37 100644 --- a/src/v/resource_mgmt/tests/CMakeLists.txt +++ b/src/v/resource_mgmt/tests/CMakeLists.txt @@ -6,3 +6,24 @@ rp_test( LIBRARIES v::seastar_testing_main v::resource_mgmt v::config LABELS resource_mgmt ) + +# NB: Some of these rely on global state (low watermark of available_memory) so need to run in a separate binary +rp_test( + UNIT_TEST + BINARY_NAME test_memory_sampling + SOURCES memory_sampling_tests.cc + DEFINITIONS BOOST_TEST_DYN_LINK + LIBRARIES v::seastar_testing_main v::application + LABELS memory_sampling + SKIP_BUILD_TYPES "Debug" +) + +rp_test( + UNIT_TEST + BINARY_NAME test_memory_sampling_reclaimer + SOURCES memory_sampling_reclaimer_test.cc + DEFINITIONS BOOST_TEST_DYN_LINK + LIBRARIES v::seastar_testing_main v::application + LABELS memory_sampling + SKIP_BUILD_TYPES "Debug" +) diff --git a/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc b/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc new file mode 100644 index 0000000000000..555885b18f7c8 --- /dev/null +++ b/src/v/resource_mgmt/tests/memory_sampling_reclaimer_test.cc @@ -0,0 +1,82 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "resource_mgmt/memory_sampling.h" +#include "storage/batch_cache.h" +#include "test_utils/async.h" + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#ifndef SEASTAR_DEFAULT_ALLOCATOR + +/// Test batch cache integration independently +SEASTAR_THREAD_TEST_CASE(reclaim_notifies_memory_sampling) { + std::stringstream logger_buf; + seastar::logger test_logger("test"); + test_logger.set_ostream(logger_buf); + ss::sharded memory_sampling_service; + + std::string_view needle("Top-N alloc"); + const auto first_log_limit = 0.80; + memory_sampling_service + .start( + std::ref(test_logger), + config::mock_binding(true), + first_log_limit, + 0.2) + .get(); + memory_sampling_service.invoke_on_all(&memory_sampling::start).get(); + + { + auto buf = logger_buf.str(); + auto view = std::string_view{buf}; + BOOST_REQUIRE_EQUAL(view.find(needle), std::string_view::npos); + } + + storage::batch_cache::reclaim_options opts = { + .growth_window = std::chrono::milliseconds(3000), + .stable_window = std::chrono::milliseconds(10000), + .min_size = 128 << 10, + .max_size = 4 << 20, + .min_free_memory = 1}; + storage::batch_cache cache(opts, memory_sampling_service); + storage::batch_cache_index index(cache); + + std::vector> dummy_bufs; + size_t total_memory = seastar::memory::stats().total_memory(); + auto allocate_till_limit = [&](size_t limit) { + while (seastar::memory::stats().free_memory() > limit) { + dummy_bufs.emplace_back(1000000); + } + }; + + allocate_till_limit(first_log_limit * total_memory); + + // simulate a callback from the allocator + cache.reclaim(1); + + tests::cooperative_spin_wait_with_timeout(std::chrono::seconds(10), [&]() { + auto buf = logger_buf.str(); + auto view = std::string_view{buf}; + return view.find(needle) != std::string_view::npos; + }).get(); // will throw if false at timeout +} + +#endif // SEASTAR_DEFAULT_ALLOCATOR diff --git a/src/v/resource_mgmt/tests/memory_sampling_tests.cc b/src/v/resource_mgmt/tests/memory_sampling_tests.cc new file mode 100644 index 0000000000000..205db00652c2a --- /dev/null +++ b/src/v/resource_mgmt/tests/memory_sampling_tests.cc @@ -0,0 +1,139 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "resource_mgmt/memory_sampling.h" +#include "storage/batch_cache.h" +#include "test_utils/async.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#ifndef SEASTAR_DEFAULT_ALLOCATOR + +SEASTAR_THREAD_TEST_CASE(test_no_allocs_in_oom_callback) { + seastar::memory::scoped_heap_profiling profiling(16000); + + std::vector> dummy_bufs; + std::vector output_buffer; + const size_t max_output_size = 1000000; + output_buffer.reserve(max_output_size); + + std::vector allocation_sites(1000); + size_t sampled_sites = 0; + + // make sure we have at least one allocation site + while (sampled_sites == 0) { + dummy_bufs.emplace_back(1000); + sampled_sites = seastar::memory::sampled_memory_profile( + allocation_sites); + } + + auto oom_callback = memory_sampling::get_oom_diagnostics_callback(); + auto writer = [&output_buffer](std::string_view bytes) { + if (output_buffer.size() + bytes.size() < max_output_size) { + output_buffer.insert( + output_buffer.end(), bytes.begin(), bytes.end()); + } + }; + + auto before = seastar::memory::stats(); + oom_callback(writer); + auto after = seastar::memory::stats(); + + // to make sure we are actually testing something + BOOST_REQUIRE_GT(before.mallocs(), 0); + BOOST_REQUIRE_GT(output_buffer.size(), 0); + BOOST_REQUIRE_EQUAL(before.large_allocations(), after.large_allocations()); + BOOST_REQUIRE_EQUAL(before.allocated_memory(), after.allocated_memory()); + BOOST_REQUIRE_EQUAL(before.mallocs(), after.mallocs()); + + // confirm an average allocation site fits into the oom writer line buffer + auto allocation_site_needle = fmt::format("{}", allocation_sites[0]); + BOOST_REQUIRE_NE( + std::string_view(output_buffer.data(), output_buffer.size()) + .find(allocation_site_needle), + std::string_view::npos); +} + +SEASTAR_THREAD_TEST_CASE(test_low_watermark_logging) { + seastar::logger dummy_logger("dummy"); + std::stringstream output_buf; + dummy_logger.set_ostream(output_buf); + + const auto first_log_limit = 0.90; + const auto second_log_limit = 0.80; + + memory_sampling sampling( + dummy_logger, + config::mock_binding(true), + first_log_limit, + second_log_limit); + sampling.start(); + + std::string_view needle("Top-N alloc"); + + { + // notification shouldn't do anything + sampling.notify_of_reclaim(); + auto buf = output_buf.str(); + auto view = std::string_view{buf}; + BOOST_REQUIRE_EQUAL(view.find(needle), std::string_view::npos); + } + + std::vector> dummy_bufs; + size_t total_memory = seastar::memory::stats().total_memory(); + + auto allocate_till_limit = [&](size_t limit) { + while (seastar::memory::stats().free_memory() > limit) { + dummy_bufs.emplace_back(1000000); + } + }; + + allocate_till_limit(first_log_limit * total_memory); + sampling.notify_of_reclaim(); + + tests::cooperative_spin_wait_with_timeout(std::chrono::seconds(10), [&]() { + auto buf = output_buf.str(); + auto view = std::string_view{buf}; + return view.find(needle) != std::string_view::npos; + }).get(); // will throw if false at timeout + + allocate_till_limit(second_log_limit * total_memory); + sampling.notify_of_reclaim(); + + tests::cooperative_spin_wait_with_timeout(std::chrono::seconds(10), [&]() { + auto buf = output_buf.str(); + auto view = std::string_view{buf}; + return view.find(needle) != view.rfind(needle); + }).get(); // will throw if false at timeout + + auto old_size = output_buf.str().size(); + + sampling.notify_of_reclaim(); + + { + // no more notifications should happen + auto buf = output_buf.str(); + BOOST_REQUIRE_EQUAL(buf.size(), old_size); + } +} + +#endif // SEASTAR_DEFAULT_ALLOCATOR diff --git a/src/v/storage/api.h b/src/v/storage/api.h index 4537cf6b104cd..c045bcdda9d53 100644 --- a/src/v/storage/api.h +++ b/src/v/storage/api.h @@ -13,6 +13,7 @@ #include "features/feature_table.h" #include "model/fundamental.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -28,17 +29,23 @@ class api : public ss::peering_sharded_service { explicit api( std::function kv_conf_cb, std::function log_conf_cb, - ss::sharded& feature_table) noexcept + ss::sharded& feature_table, + ss::sharded& memory_sampling_service) noexcept : _kv_conf_cb(std::move(kv_conf_cb)) , _log_conf_cb(std::move(log_conf_cb)) - , _feature_table(feature_table) {} + , _feature_table(feature_table) + , _memory_sampling_service(memory_sampling_service) {} ss::future<> start() { _kvstore = std::make_unique( _kv_conf_cb(), _resources, _feature_table); return _kvstore->start().then([this] { _log_mgr = std::make_unique( - _log_conf_cb(), kvs(), _resources, _feature_table); + _log_conf_cb(), + kvs(), + _resources, + _feature_table, + _memory_sampling_service); return _log_mgr->start(); }); } @@ -91,6 +98,7 @@ class api : public ss::peering_sharded_service { std::function _kv_conf_cb; std::function _log_conf_cb; ss::sharded& _feature_table; + ss::sharded& _memory_sampling_service; std::unique_ptr _kvstore; std::unique_ptr _log_mgr; diff --git a/src/v/storage/batch_cache.cc b/src/v/storage/batch_cache.cc index 75be14b7a683d..8586d5d743c20 100644 --- a/src/v/storage/batch_cache.cc +++ b/src/v/storage/batch_cache.cc @@ -12,6 +12,7 @@ #include "bytes/iobuf_parser.h" #include "model/adl_serde.h" #include "resource_mgmt/available_memory.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/future-util.h" #include "storage/logger.h" #include "utils/gate_guard.h" @@ -125,14 +126,17 @@ register_memory_reporter(const batch_cache& bc) { "batch_cache", [&bc] { return bc.size_bytes(); }); } -batch_cache::batch_cache(const reclaim_options& opts) +batch_cache::batch_cache( + const reclaim_options& opts, + ss::sharded& memory_sampling_service) : _reclaimer( [this](reclaimer::request r) { return reclaim(r); }, reclaim_scope::sync) , _reclaim_opts(opts) , _reclaim_size(_reclaim_opts.min_size) , _background_reclaimer( *this, opts.min_free_memory, opts.background_reclaimer_sg) - , _available_mem_deregister(register_memory_reporter(*this)) { + , _available_mem_deregister(register_memory_reporter(*this)) + , _memory_sampling_service(memory_sampling_service) { _background_reclaimer.start(); } @@ -300,6 +304,12 @@ size_t batch_cache::reclaim(size_t size) { } }); + // so that memory_sampling service can print top memory allocation sites for + // this shard + if (_memory_sampling_service.local_is_initialized()) { + _memory_sampling_service.local().notify_of_reclaim(); + } + _last_reclaim = ss::lowres_clock::now(); _size_bytes -= reclaimed; return reclaimed; diff --git a/src/v/storage/batch_cache.h b/src/v/storage/batch_cache.h index b73bee1d9b016..3d8b39df6914b 100644 --- a/src/v/storage/batch_cache.h +++ b/src/v/storage/batch_cache.h @@ -12,6 +12,7 @@ #pragma once #include "model/record.h" #include "resource_mgmt/available_memory.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/semaphore.h" #include "units.h" #include "utils/intrusive_list_helpers.h" @@ -226,7 +227,9 @@ class batch_cache { range_ptr _range; }; - explicit batch_cache(const reclaim_options& opts); + explicit batch_cache( + const reclaim_options& opts, + ss::sharded& memory_sampling_service); batch_cache(const batch_cache&) = delete; batch_cache& operator=(const batch_cache&) = delete; @@ -384,6 +387,7 @@ class batch_cache { size_t _reclaim_size; background_reclaimer _background_reclaimer; resources::available_memory::deregister_holder _available_mem_deregister; + ss::sharded& _memory_sampling_service; friend std::ostream& operator<<(std::ostream&, const reclaim_options&); friend std::ostream& operator<<(std::ostream&, const batch_cache&); diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index c9d8a8e447bce..ac5e50ba38bce 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -17,6 +17,7 @@ #include "model/metadata.h" #include "model/timestamp.h" #include "resource_mgmt/io_priority.h" +#include "resource_mgmt/memory_sampling.h" #include "ssx/async-clear.h" #include "ssx/future-util.h" #include "storage/batch_cache.h" @@ -133,13 +134,14 @@ log_manager::log_manager( log_config config, kvstore& kvstore, storage_resources& resources, - ss::sharded& feature_table) noexcept + ss::sharded& feature_table, + ss::sharded& memory_sampling) noexcept : _config(std::move(config)) , _kvstore(kvstore) , _resources(resources) , _feature_table(feature_table) , _jitter(_config.compaction_interval()) - , _batch_cache(config.reclaim_opts) { + , _batch_cache(config.reclaim_opts, memory_sampling) { _config.compaction_interval.watch([this]() { _jitter = simple_time_jitter{ _config.compaction_interval()}; diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 01173102c3abd..2bfac7fcb3113 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -17,6 +17,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "random/simple_time_jitter.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/batch_cache.h" #include "storage/file_sanitizer_types.h" @@ -166,7 +167,8 @@ class log_manager { log_config, kvstore& kvstore, storage_resources&, - ss::sharded&) noexcept; + ss::sharded&, + ss::sharded&) noexcept; ss::future manage(ntp_config); diff --git a/src/v/storage/tests/batch_cache_reclaim_test.cc b/src/v/storage/tests/batch_cache_reclaim_test.cc index 764b680de4eca..5d56802b27864 100644 --- a/src/v/storage/tests/batch_cache_reclaim_test.cc +++ b/src/v/storage/tests/batch_cache_reclaim_test.cc @@ -45,7 +45,13 @@ class fixture {}; FIXTURE_TEST(reclaim, fixture) { using namespace std::chrono_literals; - storage::batch_cache cache(opts); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + + storage::batch_cache cache(opts, memory_sampling_service); storage::batch_cache_index index(cache); std::vector cache_entries; cache_entries.reserve(30); diff --git a/src/v/storage/tests/batch_cache_test.cc b/src/v/storage/tests/batch_cache_test.cc index e279263a1caf0..e418bbb35eeea 100644 --- a/src/v/storage/tests/batch_cache_test.cc +++ b/src/v/storage/tests/batch_cache_test.cc @@ -11,10 +11,15 @@ #include "model/fundamental.h" #include "model/record.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/batch_cache.h" #include "test_utils/fixture.h" +#include #include +#include + +#include static storage::batch_cache::reclaim_options opts = { .growth_window = std::chrono::milliseconds(3000), @@ -43,11 +48,21 @@ static model::record_batch make_random_batch( class batch_cache_test_fixture { public: batch_cache_test_fixture() - : cache(opts) {} + : test_logger("batch-cache-test") + , cache(opts, memory_sampling_service) { + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + } auto& get_lru() { return cache._lru; }; - ~batch_cache_test_fixture() { cache.stop().get(); } + ~batch_cache_test_fixture() { + cache.stop().get(); + memory_sampling_service.stop().get(); + } + ss::logger test_logger; + ss::sharded memory_sampling_service; storage::batch_cache cache; }; @@ -130,7 +145,15 @@ SEASTAR_THREAD_TEST_CASE(touch) { std::unique_ptr index_1; std::unique_ptr index_2; - storage::batch_cache cache(opts); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + auto action = ss::defer( + [&memory_sampling_service] { memory_sampling_service.stop().get(); }); + + storage::batch_cache cache(opts, memory_sampling_service); index_1 = std::make_unique(cache); index_2 = std::make_unique(cache); auto b0 = cache.put(*index_1, make_batch(10)); @@ -148,7 +171,15 @@ SEASTAR_THREAD_TEST_CASE(touch) { std::unique_ptr index_2; // build the cache the same way - storage::batch_cache cache(opts); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + auto action = ss::defer( + [&memory_sampling_service] { memory_sampling_service.stop().get(); }); + + storage::batch_cache cache(opts, memory_sampling_service); index_1 = std::make_unique(cache); index_2 = std::make_unique(cache); auto b0 = cache.put(*index_1, make_batch(10)); diff --git a/src/v/storage/tests/log_manager_test.cc b/src/v/storage/tests/log_manager_test.cc index 4cb50462e9624..6fe7907cf7db2 100644 --- a/src/v/storage/tests/log_manager_test.cc +++ b/src/v/storage/tests/log_manager_test.cc @@ -11,6 +11,7 @@ #include "model/record_utils.h" #include "model/tests/random_batch.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "storage/api.h" #include "storage/directories.h" #include "storage/disk_log_appender.h" @@ -61,6 +62,7 @@ constexpr unsigned default_segment_readahead_count = 10; SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { auto conf = make_config(); + ss::logger test_logger("test-logger"); ss::sharded feature_table; feature_table.start().get(); feature_table @@ -68,6 +70,11 @@ SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + ss::sharded memory_sampling_service; + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + storage::api store( [conf]() { return storage::kvstore_config( @@ -77,12 +84,15 @@ SEASTAR_THREAD_TEST_CASE(test_can_load_logs) { storage::make_sanitized_file_config()); }, [conf]() { return conf; }, - feature_table); + feature_table, + memory_sampling_service); store.start().get(); - auto stop_kvstore = ss::defer([&store, &feature_table] { - store.stop().get(); - feature_table.stop().get(); - }); + auto stop_kvstore = ss::defer( + [&store, &feature_table, &memory_sampling_service] { + store.stop().get(); + memory_sampling_service.stop().get(); + feature_table.stop().get(); + }); auto& m = store.log_mgr(); std::vector ntps; ntps.reserve(4); diff --git a/src/v/storage/tests/storage_test_fixture.h b/src/v/storage/tests/storage_test_fixture.h index eac9f5050de6b..13c5c259f77fc 100644 --- a/src/v/storage/tests/storage_test_fixture.h +++ b/src/v/storage/tests/storage_test_fixture.h @@ -21,6 +21,7 @@ #include "model/tests/random_batch.h" #include "random/generators.h" #include "reflection/adl.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/kvstore.h" #include "storage/log_manager.h" @@ -188,6 +189,7 @@ class storage_test_fixture { storage::kvstore kvstore; storage::storage_resources resources; ss::sharded feature_table; + ss::sharded memory_sampling_service; std::optional ts_cursor; @@ -200,7 +202,8 @@ class storage_test_fixture { test_dir, storage::make_sanitized_file_config()), resources, - feature_table) { + feature_table) + , memory_sampling_service() { configure_unit_test_logging(); // avoid double metric registrations ss::smp::invoke_on_all([] { @@ -214,12 +217,16 @@ class storage_test_fixture { .invoke_on_all( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); + memory_sampling_service + .start(std::ref(tlog), config::mock_binding(false)) + .get(); kvstore.start().get(); } ~storage_test_fixture() { kvstore.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); } @@ -234,13 +241,21 @@ class storage_test_fixture { /// Creates a log manager in test directory storage::log_manager make_log_manager(storage::log_config cfg) { return storage::log_manager( - std::move(cfg), kvstore, resources, feature_table); + std::move(cfg), + kvstore, + resources, + feature_table, + memory_sampling_service); } /// Creates a log manager in test directory with default config storage::log_manager make_log_manager() { return storage::log_manager( - default_log_config(test_dir), kvstore, resources, feature_table); + default_log_config(test_dir), + kvstore, + resources, + feature_table, + memory_sampling_service); } /// \brief randomizes the configuration options diff --git a/src/v/storage/tests/utils/disk_log_builder.cc b/src/v/storage/tests/utils/disk_log_builder.cc index d8a6c90cc0a04..658dde59218a3 100644 --- a/src/v/storage/tests/utils/disk_log_builder.cc +++ b/src/v/storage/tests/utils/disk_log_builder.cc @@ -32,7 +32,12 @@ disk_log_builder::disk_log_builder(storage::log_config config) storage::make_sanitized_file_config()); }, [this]() { return _log_config; }, - _feature_table) {} + _feature_table, + _memory_sampling_service) { + _memory_sampling_service + .start(std::ref(_test_logger), config::mock_binding(false)) + .get(); +} // Batch generation ss::future<> disk_log_builder::add_random_batch( @@ -160,7 +165,9 @@ disk_log_builder::update_start_offset(model::offset start_offset) { } ss::future<> disk_log_builder::stop() { - return _storage.stop().then([this]() { return _feature_table.stop(); }); + return _storage.stop() + .then([this]() { return _memory_sampling_service.stop(); }) + .then([this]() { return _feature_table.stop(); }); } // Low lever interface access diff --git a/src/v/storage/tests/utils/disk_log_builder.h b/src/v/storage/tests/utils/disk_log_builder.h index 95c6fe8dfb168..49ab5ca7f57b6 100644 --- a/src/v/storage/tests/utils/disk_log_builder.h +++ b/src/v/storage/tests/utils/disk_log_builder.h @@ -16,6 +16,7 @@ #include "model/record_batch_reader.h" #include "model/tests/random_batch.h" #include "random/generators.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "ssx/sformat.h" #include "storage/api.h" @@ -404,7 +405,9 @@ class disk_log_builder { const log_append_config& config, should_flush_after flush); + ss::logger _test_logger{"disk-log-test-logger"}; ss::sharded _feature_table; + ss::sharded _memory_sampling_service; storage::log_config _log_config; storage::api _storage; std::optional _log; diff --git a/src/v/test_utils/logs.h b/src/v/test_utils/logs.h index 36a2efe4f0c8c..c46bcd4064152 100644 --- a/src/v/test_utils/logs.h +++ b/src/v/test_utils/logs.h @@ -12,6 +12,7 @@ #pragma once #include "model/fundamental.h" #include "model/record_batch_reader.h" +#include "resource_mgmt/memory_sampling.h" #include "seastarx.h" #include "storage/api.h" @@ -37,24 +38,35 @@ static inline ss::future<> persist_log_file( [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - storage::api storage( - [base_dir]() { - return storage::kvstore_config( - 1_MiB, - config::mock_binding(10ms), - base_dir, - storage::make_sanitized_file_config()); - }, - [base_dir]() { - return storage::log_config( - base_dir, - 1_GiB, - ss::default_priority_class(), - storage::make_sanitized_file_config()); - }, - feature_table); - storage.start().get(); - auto& mgr = storage.log_mgr(); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + + ss::sharded storage; + storage + .start( + [base_dir]() { + return storage::kvstore_config( + 1_MiB, + config::mock_binding(10ms), + base_dir, + storage::make_sanitized_file_config()); + }, + [base_dir]() { + return storage::log_config( + base_dir, + 1_GiB, + ss::default_priority_class(), + storage::make_sanitized_file_config()); + }, + std::ref(feature_table), + std::ref(memory_sampling_service)) + .get(); + storage.invoke_on_all(&storage::api::start).get(); + + auto& mgr = storage.local().log_mgr(); try { mgr.manage(storage::ntp_config(file_ntp, mgr.config().base_dir)) .then([b = std::move(batches)](storage::log log) mutable { @@ -73,9 +85,11 @@ static inline ss::future<> persist_log_file( }) .get(); storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); } catch (...) { storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); throw; } @@ -108,24 +122,34 @@ read_log_file(ss::sstring base_dir, model::ntp file_ntp) { [](features::feature_table& f) { f.testing_activate_all(); }) .get(); - storage::api storage( - [base_dir]() { - return storage::kvstore_config( - 1_MiB, - config::mock_binding(10ms), - base_dir, - storage::make_sanitized_file_config()); - }, - [base_dir]() { - return storage::log_config( - base_dir, - 1_GiB, - ss::default_priority_class(), - storage::make_sanitized_file_config()); - }, - feature_table); - storage.start().get(); - auto& mgr = storage.log_mgr(); + seastar::logger test_logger("test"); + ss::sharded memory_sampling_service; + memory_sampling_service + .start(std::ref(test_logger), config::mock_binding(false)) + .get(); + + ss::sharded storage; + storage + .start( + [base_dir]() { + return storage::kvstore_config( + 1_MiB, + config::mock_binding(10ms), + base_dir, + storage::make_sanitized_file_config()); + }, + [base_dir]() { + return storage::log_config( + base_dir, + 1_GiB, + ss::default_priority_class(), + storage::make_sanitized_file_config()); + }, + std::ref(feature_table), + std::ref(memory_sampling_service)) + .get(); + storage.invoke_on_all(&storage::api::start).get(); + auto& mgr = storage.local().log_mgr(); try { auto batches = mgr.manage(storage::ntp_config(file_ntp, mgr.config().base_dir)) @@ -142,10 +166,12 @@ read_log_file(ss::sstring base_dir, model::ntp file_ntp) { }) .get0(); storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); return batches; } catch (...) { storage.stop().get(); + memory_sampling_service.stop().get(); feature_table.stop().get(); throw; } diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index fdce9fa1a1586..d40dfd51adbe3 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -993,3 +993,17 @@ def set_disk_stat_override(self, f"debug/storage/disk_stat/{disk_type}", json=json, node=node) + + def get_sampled_memory_profile(self, node=None, shard=None): + """ + Gets the sampled memory profile debug output + """ + if shard is not None: + kwargs = {"params": {"shard": shard}} + else: + kwargs = {} + + return self._request("get", + "debug/sampled_memory_profile", + node=node, + **kwargs).json() diff --git a/tests/rptest/tests/memory_sampling_test.py b/tests/rptest/tests/memory_sampling_test.py new file mode 100644 index 0000000000000..da24e0d5e1ed3 --- /dev/null +++ b/tests/rptest/tests/memory_sampling_test.py @@ -0,0 +1,66 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.services.admin import Admin +from rptest.services.cluster import cluster +from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.mode_checks import skip_debug_mode + +BOOTSTRAP_CONFIG = { + 'memory_enable_memory_sampling': True, +} + + +class MemorySamplingTestTest(RedpandaTest): + def __init__(self, *args, **kwargs): + rp_conf = BOOTSTRAP_CONFIG.copy() + + super(MemorySamplingTestTest, self).__init__(*args, + extra_rp_conf=rp_conf, + **kwargs) + + self.admin = Admin(self.redpanda) + + @cluster(num_nodes=1) + @skip_debug_mode # not using seastar allocator in debug + def test_get_all_stacks(self): + """ + Verify that the sampled_memory_profile GET endpoint serves valid json + with some profile data. + + """ + admin = Admin(self.redpanda) + profile = admin.get_sampled_memory_profile() + + assert len(profile) == 2 + assert 'shard' in profile[0] + assert 'allocation_sites' in profile[0] + assert len(profile[0]['allocation_sites']) > 0 + assert 'size' in profile[0]['allocation_sites'][0] + assert 'count' in profile[0]['allocation_sites'][0] + assert 'backtrace' in profile[0]['allocation_sites'][0] + + @cluster(num_nodes=1) + @skip_debug_mode # not using seastar allocator in debug + def test_get_per_shard_stacks(self): + """ + Verify that the sampled_memory_profile GET endpoint serves valid json + with some profile data with a shard parameter + + """ + admin = Admin(self.redpanda) + profile = admin.get_sampled_memory_profile(shard=1) + + assert len(profile) == 1 + assert 'shard' in profile[0] + assert 'allocation_sites' in profile[0] + assert len(profile[0]['allocation_sites']) > 0 + assert 'size' in profile[0]['allocation_sites'][0] + assert 'count' in profile[0]['allocation_sites'][0] + assert 'backtrace' in profile[0]['allocation_sites'][0]