diff --git a/src/go/rpk/go.mod b/src/go/rpk/go.mod index 2919b0e1836da..23a7c9d510d61 100644 --- a/src/go/rpk/go.mod +++ b/src/go/rpk/go.mod @@ -26,6 +26,7 @@ require ( github.com/hamba/avro/v2 v2.25.2 github.com/hashicorp/go-multierror v1.1.1 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 + github.com/kr/text v0.2.0 github.com/lestrrat-go/jwx v1.2.30 github.com/linkedin/goavro/v2 v2.13.0 github.com/lorenzosaino/go-sysctl v0.3.1 diff --git a/src/go/rpk/go.sum b/src/go/rpk/go.sum index 0eb79316a5340..a2c0e4d575f31 100644 --- a/src/go/rpk/go.sum +++ b/src/go/rpk/go.sum @@ -43,6 +43,7 @@ github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3 github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.17/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= diff --git a/src/go/rpk/pkg/adminapi/admin.go b/src/go/rpk/pkg/adminapi/admin.go index a29a2c653ecee..3348dca9c9367 100644 --- a/src/go/rpk/pkg/adminapi/admin.go +++ b/src/go/rpk/pkg/adminapi/admin.go @@ -16,8 +16,12 @@ import ( "fmt" "os" "strconv" + "strings" "time" + "github.com/kr/text" + mTerm "github.com/moby/term" + "go.uber.org/zap" "github.com/redpanda-data/common-go/rpadmin" @@ -137,22 +141,31 @@ func licenseFeatureChecks(ctx context.Context, fs afero.Fs, cl *rpadmin.AdminAPI // violation. (we only save successful responses). // 2. LicenseStatus was last checked more than 1 hour ago. if p.LicenseCheck == nil || p.LicenseCheck != nil && time.Unix(p.LicenseCheck.LastUpdate, 0).Add(1*time.Hour).Before(time.Now()) { - resp, err := cl.GetEnterpriseFeatures(ctx) + featResp, err := cl.GetEnterpriseFeatures(ctx) if err != nil { zap.L().Sugar().Warnf("unable to check licensed enterprise features in the cluster: %v", err) return "" } + info, err := cl.GetLicenseInfo(ctx) + if err != nil { + zap.L().Sugar().Warnf("unable to check license information: %v", err) + return "" + } // We don't write a profile if the config doesn't exist. y, exists := p.ActualConfig() var licenseCheck *config.LicenseStatusCache - if resp.Violation { - var features []string - for _, f := range resp.Features { - if f.Enabled { - features = append(features, f.Name) - } + var enabledFeatures []string + for _, f := range featResp.Features { + if f.Enabled { + enabledFeatures = append(enabledFeatures, f.Name) } - msg = fmt.Sprintf("\nWARNING: The following Enterprise features are being used in your Redpanda cluster: %v. These features require a license. To get a license, contact us at https://www.redpanda.com/contact. For more information, see https://docs.redpanda.com/current/get-started/licenses/#redpanda-enterprise-edition\n", features) + } + isTrialCheck := isTrialAboutToExpire(info, enabledFeatures) + + if featResp.Violation { + msg = fmt.Sprintf("\nWARNING: The following Enterprise features are being used in your Redpanda cluster: %v. These features require a license. To get a license, contact us at https://www.redpanda.com/contact. For more information, see https://docs.redpanda.com/current/get-started/licenses/#redpanda-enterprise-edition\n", enabledFeatures) + } else if isTrialCheck { + msg = fmt.Sprintf("\nWARNING: your TRIAL license is about to expire. The following Enterprise features are being used in your Redpanda cluster: %v. These features require a license. To get a license, contact us at https://www.redpanda.com/contact. For more information, see https://docs.redpanda.com/current/get-started/licenses/#redpanda-enterprise-edition\n", enabledFeatures) } else { licenseCheck = &config.LicenseStatusCache{ LastUpdate: time.Now().Unix(), @@ -168,5 +181,21 @@ func licenseFeatureChecks(ctx context.Context, fs afero.Fs, cl *rpadmin.AdminAPI } } } + if ws, err := mTerm.GetWinsize(0); err == nil && msg != "" { + // text.Wrap removes the newline from the text. We add it back. + msg = "\n" + text.Wrap(msg, int(ws.Width)) + "\n" + } return msg } + +// isTrialAboutToExpire returns true if we have a loaded free_trial license that +// expires in less than 15 days, and we have enterprise features enabled. +func isTrialAboutToExpire(info rpadmin.License, enabledFeatures []string) bool { + if len(enabledFeatures) > 0 && info.Loaded && strings.EqualFold(info.Properties.Type, "free_trial") { + ut := time.Unix(info.Properties.Expires, 0) + daysLeft := int(time.Until(ut).Hours() / 24) + + return daysLeft < 15 && !ut.Before(time.Now()) + } + return false +} diff --git a/src/go/rpk/pkg/adminapi/admin_test.go b/src/go/rpk/pkg/adminapi/admin_test.go index 1464596229507..9c8f417fae41f 100644 --- a/src/go/rpk/pkg/adminapi/admin_test.go +++ b/src/go/rpk/pkg/adminapi/admin_test.go @@ -2,6 +2,7 @@ package adminapi import ( "context" + "fmt" "net/http" "net/http/httptest" "testing" @@ -16,7 +17,7 @@ func Test_licenseFeatureChecks(t *testing.T) { tests := []struct { name string prof *config.RpkProfile - responseCase string // See the mapLicenseResponses below. + responseCase string // See the mapLicenseFeatureResponses below. expContain string withErr bool checkCache func(t *testing.T, before int64, after int64) @@ -27,6 +28,18 @@ func Test_licenseFeatureChecks(t *testing.T) { responseCase: "ok", expContain: "", }, + { + name: "free_trial about to expire, no features", + prof: &config.RpkProfile{}, + responseCase: "ok-free", + expContain: "", + }, + { + name: "free_trial about to expire, with features", + prof: &config.RpkProfile{}, + responseCase: "ok-features", + expContain: "WARNING: your TRIAL license is about to expire", + }, { name: "license ok, cache valid", prof: &config.RpkProfile{LicenseCheck: &config.LicenseStatusCache{LastUpdate: time.Now().Add(20 * time.Minute).Unix()}}, @@ -144,16 +157,33 @@ type response struct { body string } -var mapLicenseResponses = map[string]response{ - "ok": {http.StatusOK, `{"license_status": "valid", "violation": false}`}, +var mapLicenseFeatureResponses = map[string]response{ + "ok": {http.StatusOK, `{"license_status": "valid", "violation": false, "features": [{"name": "fips", "enabled": true},{"name": "partition_auto_balancing_continuous", "enabled": false}]}`}, "inViolation": {http.StatusOK, `{"license_status": "expired", "violation": true, "features": [{"name": "partition_auto_balancing_continuous", "enabled": true}]}`}, "failedRequest": {http.StatusBadRequest, ""}, + "ok-free": {http.StatusOK, `{"license_status": "valid", "violation": false}`}, + "ok-features": {http.StatusOK, `{"license_status": "valid", "violation": false, "features": [{"name": "partition_auto_balancing_continuous", "enabled": true}]}`}, +} + +var mapLicenseInfoResponses = map[string]response{ + "ok": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "enterprise", "expires": %d}}`, time.Now().Add(60*24*time.Hour).Unix())}, + "inViolation": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "enterprise", "expires": %d}}`, time.Now().Add(60*24*time.Hour).Unix())}, + "failedRequest": {http.StatusBadRequest, ""}, + "ok-free": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "free_trial", "expires": %d}}`, time.Now().Add(24*time.Hour).Unix())}, // expires in 1 day. + "ok-features": {http.StatusOK, fmt.Sprintf(`{"loaded": true, "license": {"type": "free_trial", "expires": %d}}`, time.Now().Add(24*time.Hour).Unix())}, } func licenseHandler(respCase string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - resp := mapLicenseResponses[respCase] - w.WriteHeader(resp.status) - w.Write([]byte(resp.body)) + fmt.Println(r.URL.Path) + if r.URL.Path == "/v1/features/enterprise" { + resp := mapLicenseFeatureResponses[respCase] + w.WriteHeader(resp.status) + w.Write([]byte(resp.body)) + } else if r.URL.Path == "/v1/features/license" { + resp := mapLicenseInfoResponses[respCase] + w.WriteHeader(resp.status) + w.Write([]byte(resp.body)) + } } } diff --git a/src/go/rpk/pkg/cli/cluster/license/info.go b/src/go/rpk/pkg/cli/cluster/license/info.go index 6aa58f7693ff2..be69b4fa489ba 100644 --- a/src/go/rpk/pkg/cli/cluster/license/info.go +++ b/src/go/rpk/pkg/cli/cluster/license/info.go @@ -3,6 +3,7 @@ package license import ( "fmt" "os" + "strings" "time" "github.com/redpanda-data/common-go/rpadmin" @@ -133,17 +134,21 @@ func printTextLicenseInfo(resp infoResponse) { if *resp.Expired { tw.Print("License expired:", *resp.Expired) } - checkLicenseExpiry(resp.ExpiresUnix) + checkLicenseExpiry(resp.ExpiresUnix, resp.Type) } out.Section("LICENSE INFORMATION") tw.Flush() } -func checkLicenseExpiry(expiresUnix int64) { +func checkLicenseExpiry(expiresUnix int64, licenseType string) { ut := time.Unix(expiresUnix, 0) daysLeft := int(time.Until(ut).Hours() / 24) - if daysLeft < 30 && !ut.Before(time.Now()) { + dayThreshold := 30 + if strings.EqualFold(licenseType, "free_trial") { + dayThreshold = 15 + } + if daysLeft < dayThreshold && !ut.Before(time.Now()) { fmt.Fprintf(os.Stderr, "WARNING: your license will expire soon.\n\n") } } diff --git a/src/transform-sdk/go/transform/internal/testdata/CMakeLists.txt b/src/transform-sdk/go/transform/internal/testdata/CMakeLists.txt index fda5e629bfcbb..4c3d5a99f3792 100644 --- a/src/transform-sdk/go/transform/internal/testdata/CMakeLists.txt +++ b/src/transform-sdk/go/transform/internal/testdata/CMakeLists.txt @@ -7,7 +7,7 @@ find_package(Python3 REQUIRED COMPONENTS Interpreter) function(add_wasm_transform NAME) find_program(TINYGO_BIN "tinygo") set(wasm_output "${CMAKE_CURRENT_BINARY_DIR}/${NAME}.wasm") - set(tinygo_cmd ${TINYGO_BIN} build -o ${wasm_output} -quiet -target wasi -scheduler none "${NAME}/transform.go") + set(tinygo_cmd ${TINYGO_BIN} build -o ${wasm_output} -target wasi -scheduler none "${NAME}/transform.go") set(gopath ${CMAKE_CURRENT_BINARY_DIR}/${NAME}) add_custom_command(OUTPUT ${wasm_output} COMMAND Python3::Interpreter ${CMAKE_CURRENT_SOURCE_DIR}/retry.py diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 16fe70690e69a..336080ecc3754 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -1233,6 +1234,7 @@ ss::future<> cache::put( ss::this_shard_id(), (++_cnt), cache_tmp_file_extension)); + auto tmp_filepath = dir_path / tmp_filename; ss::file tmp_cache_file; while (true) { @@ -1247,14 +1249,14 @@ ss::future<> cache::put( | ss::open_flags::exclusive; tmp_cache_file = co_await ss::open_file_dma( - (dir_path / tmp_filename).native(), flags); + tmp_filepath.native(), flags); break; } catch (std::filesystem::filesystem_error& e) { if (e.code() == std::errc::no_such_file_or_directory) { vlog( cst_log.debug, "Couldn't open {}, gonna retry", - (dir_path / tmp_filename).native()); + tmp_filepath.native()); } else { throw; } @@ -1267,42 +1269,59 @@ ss::future<> cache::put( options.io_priority_class = io_priority; auto out = co_await ss::make_file_output_stream(tmp_cache_file, options); - std::exception_ptr disk_full_error; + std::exception_ptr eptr; + bool no_space_on_device = false; try { co_await ss::copy(data, out) .then([&out]() { return out.flush(); }) .finally([&out]() { return out.close(); }); } catch (std::filesystem::filesystem_error& e) { // For ENOSPC errors, delay handling so that we can do a trim - if (e.code() == std::errc::no_space_on_device) { - disk_full_error = std::current_exception(); - } else { - throw; + no_space_on_device = e.code() == std::errc::no_space_on_device; + eptr = std::current_exception(); + } catch (...) { + // For other errors, delay handling so that we can clean up the tmp file + eptr = std::current_exception(); + } + + // If we failed to write to the tmp file, we should delete it, maybe do an + // eager trim, and rethrow the exception. + if (eptr) { + if (!_gate.is_closed()) { + auto delete_tmp_fut = co_await ss::coroutine::as_future( + delete_file_and_empty_parents(tmp_filepath.native())); + if ( + delete_tmp_fut.failed() + && !ssx::is_shutdown_exception(delete_tmp_fut.get_exception())) { + vlog( + cst_log.error, + "Failed to delete tmp file {}: {}", + tmp_filepath.native(), + delete_tmp_fut.get_exception()); + } } - } - if (disk_full_error) { - vlog(cst_log.error, "Out of space while writing to cache"); + if (no_space_on_device) { + vlog(cst_log.error, "Out of space while writing to cache"); - // Block further puts from being attempted until notify_disk_status - // reports that there is space available. - set_block_puts(true); + // Block further puts from being attempted until notify_disk_status + // reports that there is space available. + set_block_puts(true); - // Trim proactively: if many fibers hit this concurrently, - // they'll contend for cleanup_sm and the losers will skip - // trim due to throttling. - co_await trim_throttled(); + // Trim proactively: if many fibers hit this concurrently, + // they'll contend for cleanup_sm and the losers will skip + // trim due to throttling. + co_await trim_throttled(); + } - throw disk_full_error; + std::rethrow_exception(eptr); } // commit write transaction - auto src = (dir_path / tmp_filename).native(); - auto dest = (dir_path / filename).native(); + auto put_size = co_await ss::file_size(tmp_filepath.native()); - auto put_size = co_await ss::file_size(src); - - co_await ss::rename_file(src, dest); + auto dest = (dir_path / filename).native(); + co_await ss::rename_file(tmp_filepath.native(), dest); // We will now update reservation.wrote_data(put_size, 1); diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 8612a2a6d0ec6..a697e5917baa0 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -18,7 +18,9 @@ #include "random/generators.h" #include "ssx/sformat.h" #include "test_utils/fixture.h" +#include "test_utils/iostream.h" #include "test_utils/scoped_config.h" +#include "utils/directory_walker.h" #include "utils/file_io.h" #include "utils/human.h" @@ -451,6 +453,41 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) { BOOST_REQUIRE_EQUAL(out.size(), 0); } +ss::future count_files(ss::sstring dirname) { + directory_walker walker; + size_t count = 0; + co_await walker.walk( + dirname, + [dirname, &count](const ss::directory_entry& entry) -> ss::future<> { + if (entry.type == ss::directory_entry_type::directory) { + return count_files(fmt::format("{}/{}", dirname, entry.name)) + .then([&count](size_t sub_count) { count += sub_count; }); + } else { + ++count; + return ss::now(); + } + }); + + co_return count; +} + +FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) { + auto s = tests::make_throwing_stream(ss::abort_requested_exception()); + auto reservation = sharded_cache.local().reserve_space(1, 1).get(); + BOOST_CHECK_THROW( + sharded_cache.local().put(KEY, s, reservation).get(), + ss::abort_requested_exception); + vlog(test_log.info, "Put failed as expected"); + + BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_bytes(), 0); + BOOST_CHECK_EQUAL(sharded_cache.local().get_usage_objects(), 0); + + vlog(test_log.info, "Counting files in cache directory"); + BOOST_CHECK_EQUAL(count_files(CACHE_DIR.native()).get(), 0); + + vlog(test_log.info, "Test passed"); +} + /** * Validate that .part files and empty directories are deleted if found during * the startup walk of the cache. diff --git a/src/v/cluster/metrics_reporter.cc b/src/v/cluster/metrics_reporter.cc index 7b52d59fb286c..f9f30481301ba 100644 --- a/src/v/cluster/metrics_reporter.cc +++ b/src/v/cluster/metrics_reporter.cc @@ -43,7 +43,9 @@ #include #include #include +#include #include +#include #include #include @@ -53,9 +55,38 @@ #include #include #include +#include +#include +#include #include +namespace { +ss::sstring get_hostname() { + std::array hostname{}; + if (::gethostname(hostname.data(), hostname.size()) != 0) { + return {}; + } + + return hostname.data(); +} + +ss::sstring get_domainname() { + std::array domainname{}; + if (::getdomainname(domainname.data(), domainname.size()) != 0) { + return {}; + } + + return domainname.data(); +} + +ss::future> get_fqdns(std::string_view hostname) { + ss::net::dns_resolver resolver; + auto hostent = co_await resolver.get_host_by_name(hostname.data()); + co_return hostent.names; +} +} // namespace + namespace cluster { namespace details { @@ -217,6 +248,14 @@ metrics_reporter::build_metrics_snapshot() { } metrics.uptime_ms = report->local_state.uptime / 1ms; + auto& advertised_listeners + = nm->get().broker.kafka_advertised_listeners(); + metrics.advertised_listeners.reserve(advertised_listeners.size()); + std::transform( + advertised_listeners.begin(), + advertised_listeners.end(), + std::back_inserter(metrics.advertised_listeners), + [](const model::broker_endpoint& ep) { return ep.address; }); } auto& topics = _topics.local().topics_map(); snapshot.topic_count = 0; @@ -274,6 +313,12 @@ metrics_reporter::build_metrics_snapshot() { && !license.value().is_expired(); snapshot.has_enterprise_features = feature_report.any(); + snapshot.enterprise_features.emplace(std::move(feature_report)); + + snapshot.host_name = get_hostname(); + snapshot.domain_name = get_domainname(); + snapshot.fqdns = co_await get_fqdns(snapshot.host_name); + co_return snapshot; } @@ -550,6 +595,24 @@ void rjson_serialize( w.Key("has_enterprise_features"); w.Bool(snapshot.has_enterprise_features); + if (snapshot.enterprise_features.has_value()) { + w.Key("enterprise_features"); + w.StartArray(); + for (const auto& f : snapshot.enterprise_features.value().enabled()) { + w.String(fmt::format("{}", f)); + } + w.EndArray(); + } + + w.Key("hostname"); + w.String(snapshot.host_name); + + w.Key("domainname"); + w.String(snapshot.domain_name); + + w.Key("fqdns"); + rjson_serialize(w, snapshot.fqdns); + w.EndObject(); } @@ -586,6 +649,8 @@ void rjson_serialize( rjson_serialize(w, d); } w.EndArray(); + w.Key("kafka_advertised_listeners"); + rjson_serialize(w, nm.advertised_listeners); w.EndObject(); } diff --git a/src/v/cluster/metrics_reporter.h b/src/v/cluster/metrics_reporter.h index a415242180411..bb67472304ef9 100644 --- a/src/v/cluster/metrics_reporter.h +++ b/src/v/cluster/metrics_reporter.h @@ -14,11 +14,13 @@ #include "cluster/fwd.h" #include "cluster/plugin_table.h" #include "cluster/types.h" +#include "features/enterprise_features.h" #include "features/fwd.h" #include "http/client.h" #include "model/metadata.h" #include "security/fwd.h" #include "utils/prefix_logger.h" +#include "utils/unresolved_address.h" #include #include @@ -57,6 +59,7 @@ class metrics_reporter { cluster_version logical_version{invalid_version}; std::vector disks; uint64_t uptime_ms{0}; + std::vector advertised_listeners; }; struct metrics_snapshot { @@ -82,6 +85,12 @@ class metrics_reporter { bool has_enterprise_features{false}; bool has_valid_license{false}; + + std::optional enterprise_features; + + ss::sstring host_name; + ss::sstring domain_name; + std::vector fqdns; }; static constexpr ss::shard_id shard = 0; diff --git a/src/v/kafka/group_probe.h b/src/v/kafka/group_probe.h index 0ba5da9272f7c..606c698d7fe48 100644 --- a/src/v/kafka/group_probe.h +++ b/src/v/kafka/group_probe.h @@ -78,11 +78,10 @@ class group_offset_probe { _public_metrics.add_group( prometheus_sanitize::metrics_name("kafka:consumer:group"), {sm::make_gauge( - "committed_offset", - [this] { return _offset; }, - sm::description("Consumer group committed offset"), - labels) - .aggregate({sm::shard_label})}); + "committed_offset", + [this] { return _offset; }, + sm::description("Consumer group committed offset"), + labels)}); } private: @@ -130,15 +129,13 @@ class group_probe { "consumers", [this] { return _members.size(); }, sm::description("Number of consumers in a group"), - labels) - .aggregate({sm::shard_label}), + labels), sm::make_gauge( "topics", [this] { return _offsets.size(); }, sm::description("Number of topics in a group"), - labels) - .aggregate({sm::shard_label})}); + labels)}); } private: diff --git a/src/v/raft/configuration_bootstrap_state.h b/src/v/raft/configuration_bootstrap_state.h index fe285f50b800a..aed5fd8f5300c 100644 --- a/src/v/raft/configuration_bootstrap_state.h +++ b/src/v/raft/configuration_bootstrap_state.h @@ -35,10 +35,10 @@ class configuration_bootstrap_state { model::term_id term() const { return _term; } model::offset prev_log_index() const { return _prev_log_index; } model::term_id prev_log_term() const { return _prev_log_term; } - const std::vector& configurations() const { + const chunked_vector& configurations() const { return _configurations; } - std::vector release_configurations() && { + chunked_vector release_configurations() && { return std::move(_configurations); } void set_term(model::term_id t) { _term = t; } @@ -76,7 +76,7 @@ class configuration_bootstrap_state { model::term_id _term{0}; model::offset _prev_log_index{0}; model::term_id _prev_log_term{0}; - std::vector _configurations; + chunked_vector _configurations; // we need to keep track of what we have processed in case we re-reprocess a // multiple segments diff --git a/src/v/raft/configuration_manager.cc b/src/v/raft/configuration_manager.cc index d224ee942931a..63fa35d877223 100644 --- a/src/v/raft/configuration_manager.cc +++ b/src/v/raft/configuration_manager.cc @@ -169,8 +169,8 @@ void configuration_manager::add_configuration( } } -ss::future<> -configuration_manager::add(std::vector configurations) { +ss::future<> configuration_manager::add( + chunked_vector configurations) { return _lock.with([this, configurations = std::move(configurations)]() mutable { for (auto& co : configurations) { diff --git a/src/v/raft/configuration_manager.h b/src/v/raft/configuration_manager.h index 4422232f4ac2c..38dcc1dcdce14 100644 --- a/src/v/raft/configuration_manager.h +++ b/src/v/raft/configuration_manager.h @@ -92,7 +92,7 @@ class configuration_manager { /** * Add all configurations */ - ss::future<> add(std::vector); + ss::future<> add(chunked_vector); /** * Get the configuration that is valid for given offset. This method return diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 0d3e59d8426ba..796f6789adb8a 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -2831,7 +2831,7 @@ ss::future consensus::disk_append( consumer(_log->make_appender(cfg)), cfg.timeout) .then([this, should_update_last_quorum_idx]( - std::tuple> t) { + std::tuple> t) { auto& [ret, configurations] = t; _pending_flush_bytes += ret.byte_size; if (should_update_last_quorum_idx) { diff --git a/src/v/raft/consensus_utils.h b/src/v/raft/consensus_utils.h index f02a5a2687ba9..b0a02c71c4716 100644 --- a/src/v/raft/consensus_utils.h +++ b/src/v/raft/consensus_utils.h @@ -170,7 +170,7 @@ auto for_each_ref_extract_configuration( ReferenceConsumer wrapped; model::offset next_offset; - std::vector configurations; + chunked_vector configurations; }; return std::move(rdr).for_each_ref( diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index d71620a43b860..10e9464440758 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -2535,7 +2535,7 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) { model::node_id node_id; if (config::node().node_id().has_value() && ever_ran_controller) { vlog( - _log.debug, + _log.info, "Running with already-established node ID {}", config::node().node_id()); node_id = config::node().node_id().value(); diff --git a/src/v/storage/BUILD b/src/v/storage/BUILD index db1142ec23324..53a10678be931 100644 --- a/src/v/storage/BUILD +++ b/src/v/storage/BUILD @@ -160,6 +160,7 @@ redpanda_cc_library( include_prefix = "storage", visibility = ["//visibility:public"], deps = [ + ":log_manager_probe", ":parser_utils", ":record_batch_builder", "//src/v/base", @@ -229,3 +230,19 @@ redpanda_cc_library( "@seastar", ], ) + +redpanda_cc_library( + name = "log_manager_probe", + srcs = [ + "log_manager_probe.cc", + ], + hdrs = [ + "log_manager_probe.h", + ], + include_prefix = "storage", + deps = [ + "//src/v/config", + "//src/v/metrics", + "@seastar", + ], +) diff --git a/src/v/storage/CMakeLists.txt b/src/v/storage/CMakeLists.txt index 1c04c5040592b..b19ccd5ba35db 100644 --- a/src/v/storage/CMakeLists.txt +++ b/src/v/storage/CMakeLists.txt @@ -5,6 +5,7 @@ v_cc_library( segment_reader.cc segment_deduplication_utils.cc log_manager.cc + log_manager_probe.cc disk_log_impl.cc disk_log_appender.cc parser.cc diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index f4d8135477122..f9277f487238f 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -27,6 +27,7 @@ #include "storage/key_offset_map.h" #include "storage/kvstore.h" #include "storage/log.h" +#include "storage/log_manager_probe.h" #include "storage/logger.h" #include "storage/segment.h" #include "storage/segment_appender.h" @@ -143,7 +144,8 @@ log_manager::log_manager( , _feature_table(feature_table) , _jitter(_config.compaction_interval()) , _trigger_gc_jitter(0s, 5s) - , _batch_cache(_config.reclaim_opts) { + , _batch_cache(_config.reclaim_opts) + , _probe(std::make_unique()) { _config.compaction_interval.watch([this]() { _jitter = simple_time_jitter{ _config.compaction_interval()}; @@ -151,6 +153,8 @@ log_manager::log_manager( }); } +log_manager::~log_manager() = default; + ss::future<> log_manager::clean_close(ss::shared_ptr log) { auto clean_segment = co_await log->close(); @@ -171,6 +175,7 @@ ss::future<> log_manager::clean_close(ss::shared_ptr log) { } ss::future<> log_manager::start() { + _probe->setup_metrics(); if (unlikely(config::shard_local_cfg() .log_disable_housekeeping_for_tests.value())) { co_return; @@ -195,6 +200,8 @@ ss::future<> log_manager::stop() { co_await _compaction_hash_key_map->initialize(0); _compaction_hash_key_map.reset(); } + + _probe->clear_metrics(); } /** @@ -279,6 +286,7 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { _abort_source, std::move(ntp_sanitizer_cfg), _compaction_hash_key_map.get())); + _probe->housekeeping_log_processed(); // bail out of compaction early in order to get back to gc if (_gc_triggered) { @@ -358,6 +366,7 @@ ss::future<> log_manager::housekeeping_loop() { // it is expected that callers set the flag whenever they want the // next round of housekeeping to priortize gc. _gc_triggered = false; + _probe->urgent_gc_run(); /* * build a schedule of partitions to gc ordered by amount of @@ -575,7 +584,7 @@ ss::future> log_manager::do_manage( auto [it, success] = _logs.emplace( l->config().ntp(), std::make_unique(l)); _logs_list.push_back(*it->second); - _resources.update_partition_count(_logs.size()); + update_log_count(); vassert(success, "Could not keep track of:{} - concurrency issue", l); co_return l; } @@ -595,7 +604,7 @@ ss::future<> log_manager::remove(model::ntp ntp) { vlog(stlog.info, "Asked to remove: {}", ntp); auto g = _gate.hold(); auto handle = _logs.extract(ntp); - _resources.update_partition_count(_logs.size()); + update_log_count(); if (handle.empty()) { co_return; } @@ -878,4 +887,11 @@ gc_config log_manager::default_gc_config() const { return {collection_threshold, _config.retention_bytes()}; } +void log_manager::update_log_count() { + auto count = _logs.size(); + + _resources.update_partition_count(count); + _probe->set_log_count(count); +} + } // namespace storage diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index ac066b26243d9..2aef5c7d661dd 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -47,6 +47,8 @@ namespace storage { +class log_manager_probe; + namespace testing_details { class log_manager_accessor; }; @@ -170,6 +172,7 @@ class log_manager { kvstore& kvstore, storage_resources&, ss::sharded&) noexcept; + ~log_manager(); ss::future> manage( ntp_config, @@ -280,6 +283,8 @@ class log_manager { ss::future<> housekeeping_scan(model::timestamp); + void update_log_count(); + log_config _config; kvstore& _kvstore; storage_resources& _resources; @@ -293,6 +298,10 @@ class log_manager { // Hash key-map to use across multiple compactions to reuse reserved memory // rather than reallocating repeatedly. std::unique_ptr _compaction_hash_key_map; + + // Metrics. + std::unique_ptr _probe; + ss::gate _gate; ss::abort_source _abort_source; diff --git a/src/v/storage/log_manager_probe.cc b/src/v/storage/log_manager_probe.cc new file mode 100644 index 0000000000000..e0a87708314e0 --- /dev/null +++ b/src/v/storage/log_manager_probe.cc @@ -0,0 +1,50 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "storage/log_manager_probe.h" + +#include "config/configuration.h" +#include "metrics/prometheus_sanitize.h" + +#include + +namespace storage { + +void log_manager_probe::setup_metrics() { + if (config::shard_local_cfg().disable_metrics()) { + return; + } + + namespace sm = ss::metrics; + + auto group_name = prometheus_sanitize::metrics_name("storage:manager"); + + _metrics.add_group( + group_name, + { + sm::make_gauge( + "logs", + [this] { return _log_count; }, + sm::description("Number of logs managed")), + sm::make_counter( + "urgent_gc_runs", + [this] { return _urgent_gc_runs; }, + sm::description("Number of urgent GC runs")), + sm::make_counter( + "housekeeping_log_processed", + [this] { return _housekeeping_log_processed; }, + sm::description("Number of logs processed by housekeeping")), + }, + {}, + {}); +} + +void log_manager_probe::clear_metrics() { _metrics.clear(); } + +} // namespace storage diff --git a/src/v/storage/log_manager_probe.h b/src/v/storage/log_manager_probe.h new file mode 100644 index 0000000000000..dcaaf336b21f2 --- /dev/null +++ b/src/v/storage/log_manager_probe.h @@ -0,0 +1,45 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "metrics/metrics.h" + +#include + +namespace storage { + +/// Log manager per-shard storage probe. +class log_manager_probe { +public: + log_manager_probe() = default; + log_manager_probe(const log_manager_probe&) = delete; + log_manager_probe& operator=(const log_manager_probe&) = delete; + log_manager_probe(log_manager_probe&&) = delete; + log_manager_probe& operator=(log_manager_probe&&) = delete; + ~log_manager_probe() = default; + +public: + void setup_metrics(); + void clear_metrics(); + +public: + void set_log_count(uint32_t log_count) { _log_count = log_count; } + void housekeeping_log_processed() { ++_housekeeping_log_processed; } + void urgent_gc_run() { ++_urgent_gc_runs; } + +private: + uint32_t _log_count = 0; + uint64_t _urgent_gc_runs = 0; + uint64_t _housekeeping_log_processed = 0; + + metrics::internal_metric_groups _metrics; +}; + +}; // namespace storage diff --git a/src/v/test_utils/BUILD b/src/v/test_utils/BUILD index 684039b020f56..c9eee9d26f1e2 100644 --- a/src/v/test_utils/BUILD +++ b/src/v/test_utils/BUILD @@ -44,3 +44,16 @@ redpanda_cc_library( "@seastar//:testing", ], ) + +redpanda_test_cc_library( + name = "iostream", + hdrs = [ + "iostream.h", + ], + include_prefix = "test_utils", + visibility = ["//visibility:public"], + deps = [ + "//src/v/base", + "@seastar", + ], +) diff --git a/src/v/test_utils/iostream.h b/src/v/test_utils/iostream.h new file mode 100644 index 0000000000000..131ca6a9aa301 --- /dev/null +++ b/src/v/test_utils/iostream.h @@ -0,0 +1,42 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include "base/seastarx.h" + +#include +#include +#include + +namespace tests { + +/// Create an input stream that throws an exception on first interaction. +template +ss::input_stream make_throwing_stream(Err err) { + struct throwing_stream final : ss::data_source_impl { + explicit throwing_stream(Err e) + : _err(std::move(e)) {} + + ss::future> skip(uint64_t) final { + return get(); + } + + ss::future> get() final { + return ss::make_exception_future>( + std::move(_err)); + } + + Err _err; + }; + auto ds = ss::data_source(std::make_unique(err)); + return ss::input_stream(std::move(ds)); +} + +} // namespace tests diff --git a/src/v/utils/tests/BUILD b/src/v/utils/tests/BUILD index 483c587bb70f9..e9de8a149012b 100644 --- a/src/v/utils/tests/BUILD +++ b/src/v/utils/tests/BUILD @@ -389,6 +389,7 @@ redpanda_cc_btest( "//src/v/bytes:iostream", "//src/v/bytes:random", "//src/v/random:generators", + "//src/v/test_utils:iostream", "//src/v/test_utils:seastar_boost", "//src/v/utils:stream_utils", "@boost//:test", diff --git a/src/v/utils/tests/input_stream_fanout_test.cc b/src/v/utils/tests/input_stream_fanout_test.cc index 9fd9cc4b2cc71..8d2d86b427cfe 100644 --- a/src/v/utils/tests/input_stream_fanout_test.cc +++ b/src/v/utils/tests/input_stream_fanout_test.cc @@ -13,6 +13,7 @@ #include "bytes/iostream.h" #include "bytes/random.h" #include "random/generators.h" +#include "test_utils/iostream.h" #include "utils/stream_utils.h" #include @@ -437,29 +438,8 @@ SEASTAR_THREAD_TEST_CASE(input_stream_fanout_detach_10_size_limit) { test_detached_consumer<10>(4, 1000); } -template -ss::input_stream make_throwing_stream(Err err) { - struct throwing_stream final : ss::data_source_impl { - explicit throwing_stream(Err e) - : _err(std::move(e)) {} - - ss::future> skip(uint64_t) final { - return get(); - } - - ss::future> get() final { - return ss::make_exception_future>( - std::move(_err)); - } - - Err _err; - }; - auto ds = ss::data_source(std::make_unique(err)); - return ss::input_stream(std::move(ds)); -} - SEASTAR_THREAD_TEST_CASE(input_stream_fanout_producer_throw) { - auto is = make_throwing_stream(ss::abort_requested_exception()); + auto is = tests::make_throwing_stream(ss::abort_requested_exception()); auto [s1, s2] = input_stream_fanout<2>(std::move(is), 4, 8); BOOST_REQUIRE_THROW(s1.read().get(), ss::abort_requested_exception); diff --git a/tests/rptest/tests/full_disk_test.py b/tests/rptest/tests/full_disk_test.py index ecc113ed2a15c..37d3641e1cd95 100644 --- a/tests/rptest/tests/full_disk_test.py +++ b/tests/rptest/tests/full_disk_test.py @@ -24,7 +24,7 @@ from rptest.services.admin import Admin from rptest.services.cluster import cluster from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer -from rptest.services.redpanda import LoggingConfig, RedpandaService, SISettings +from rptest.services.redpanda import LoggingConfig, MetricsEndpoint, RedpandaService, SISettings from rptest.tests.end_to_end import EndToEndTest from rptest.tests.redpanda_test import RedpandaTest from rptest.util import produce_total_bytes, search_logs_with_timeout @@ -315,6 +315,17 @@ def observed_data_size(pred): timeout_sec=30, backoff_sec=2) + assert self.redpanda.metric_sum( + metric_name= + "vectorized_storage_manager_housekeeping_log_processed_total", + metrics_endpoint=MetricsEndpoint.METRICS + ) == 0, "Housekeeping should not have run yet" + + assert self.redpanda.metric_sum( + metric_name="vectorized_storage_manager_urgent_gc_runs_total", + metrics_endpoint=MetricsEndpoint.METRICS + ) == 0, "GC should not have run yet" + # now trigger the disk space alert on the same node. unlike the 30 # second delay above, we should almost immediately observe the data # be reclaimed from disk. @@ -331,6 +342,16 @@ def observed_data_size(pred): timeout_sec=10, backoff_sec=2) + assert self.redpanda.metric_sum( + metric_name= + "vectorized_storage_manager_housekeeping_log_processed_total", + metrics_endpoint=MetricsEndpoint.METRICS + ) > 0, "Housekeeping should have run" + + assert self.redpanda.metric_sum( + metric_name="vectorized_storage_manager_urgent_gc_runs_total", + metrics_endpoint=MetricsEndpoint.METRICS) > 0, "GC should have run" + class LocalDiskReportTimeTest(RedpandaTest): topics = (TopicSpec(segment_bytes=2**20, diff --git a/tests/rptest/tests/metrics_reporter_test.py b/tests/rptest/tests/metrics_reporter_test.py index 9b2927cc486ae..fa146c55360e6 100644 --- a/tests/rptest/tests/metrics_reporter_test.py +++ b/tests/rptest/tests/metrics_reporter_test.py @@ -137,6 +137,10 @@ def assert_fields_are_the_same(metadata, field): # license violation status should not change across requests assert_fields_are_the_same(metadata, 'has_valid_license') assert_fields_are_the_same(metadata, 'has_enterprise_features') + assert_fields_are_the_same(metadata, 'enterprise_features') + assert_fields_are_the_same(metadata, 'hostname') + assert_fields_are_the_same(metadata, 'domainname') + assert_fields_are_the_same(metadata, 'fqdns') # get the last report last = metadata.pop() assert last['topic_count'] == total_topics @@ -151,6 +155,11 @@ def assert_fields_are_the_same(metadata, field): # NOTE: value will vary depending on FIPS mode. we're confident that # the source of the value is sound, so assert on presence instead. assert 'has_enterprise_features' in last + assert 'enterprise_features' in last + assert type(last['enterprise_features']) == list + assert 'hostname' in last + assert 'domainname' in last + assert 'fqdns' in last nodes_meta = last['nodes'] assert len(last['nodes']) == len(self.redpanda.nodes) @@ -162,6 +171,7 @@ def assert_fields_are_the_same(metadata, field): assert all('uptime_ms' in n for n in nodes_meta) assert all('is_alive' in n for n in nodes_meta) assert all('disks' in n for n in nodes_meta) + assert all('kafka_advertised_listeners' in n for n in nodes_meta) # Check cluster UUID and creation time survive a restart for n in self.redpanda.nodes: