From 963a5d3e30abd2a9714d84cd098afd442a486cae Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Thu, 11 Apr 2024 12:34:33 +0530 Subject: [PATCH 1/2] cst: Add ostream operator for chunk state --- src/v/cloud_storage/segment_chunk.cc | 11 +++++++++++ src/v/cloud_storage/segment_chunk.h | 2 ++ 2 files changed, 13 insertions(+) diff --git a/src/v/cloud_storage/segment_chunk.cc b/src/v/cloud_storage/segment_chunk.cc index fb0f951cdf8c..6594a9db19ed 100644 --- a/src/v/cloud_storage/segment_chunk.cc +++ b/src/v/cloud_storage/segment_chunk.cc @@ -12,6 +12,17 @@ namespace cloud_storage { +std::ostream& operator<<(std::ostream& os, chunk_state c) { + switch (c) { + case chunk_state::not_available: + return os << "not available"; + case chunk_state::download_in_progress: + return os << "download in progress"; + case chunk_state::hydrated: + return os << "hydrated"; + } +} + std::strong_ordering segment_chunk::operator<=>(const segment_chunk& chunk) const { const auto cmp = required_by_readers_in_future diff --git a/src/v/cloud_storage/segment_chunk.h b/src/v/cloud_storage/segment_chunk.h index 9bc36e2ae8f5..b4ad480b3bbb 100644 --- a/src/v/cloud_storage/segment_chunk.h +++ b/src/v/cloud_storage/segment_chunk.h @@ -30,6 +30,8 @@ enum class chunk_state { hydrated, }; +std::ostream& operator<<(std::ostream& os, chunk_state); + struct segment_chunk { chunk_state current_state; From 33eeca9b05452b7fde1d13898008c334c87c3c4d Mon Sep 17 00:00:00 2001 From: Abhijat Malviya Date: Thu, 11 Apr 2024 12:35:01 +0530 Subject: [PATCH 2/2] cst: Add debug logs during chunk hydration --- src/v/cloud_storage/segment_chunk_api.cc | 44 +++++++++++++++++++ .../segment_chunk_data_source.cc | 15 +++++++ 2 files changed, 59 insertions(+) diff --git a/src/v/cloud_storage/segment_chunk_api.cc b/src/v/cloud_storage/segment_chunk_api.cc index b95441cf48c8..e54256f6bf2c 100644 --- a/src/v/cloud_storage/segment_chunk_api.cc +++ b/src/v/cloud_storage/segment_chunk_api.cc @@ -12,6 +12,7 @@ #include "cloud_storage/logger.h" #include "cloud_storage/remote_segment.h" +#include "ssx/watchdog.h" #include @@ -127,8 +128,17 @@ ss::future segment_chunks::do_hydrate_and_materialize( const auto prefetch = prefetch_override.value_or( config::shard_local_cfg().cloud_storage_chunk_prefetch); + + vlog( + _ctxlog.debug, + "hydrating chunk start: {}, prefetch: {}", + chunk_start, + prefetch); + co_await _segment.hydrate_chunk( segment_chunk_range{_chunks, prefetch, chunk_start}); + + vlog(_ctxlog.debug, "materializing chunk start: {}", chunk_start, prefetch); co_return co_await _segment.materialize_chunk(chunk_start); } @@ -144,6 +154,12 @@ ss::future segment_chunks::hydrate_chunk( auto& chunk = _chunks[chunk_start]; auto curr_state = chunk.current_state; + + vlog( + _ctxlog.debug, + "hydrate_chunk for {}, current state: {}", + chunk_start, + curr_state); if (curr_state == chunk_state::hydrated) { vassert( chunk.handle, @@ -155,6 +171,11 @@ ss::future segment_chunks::hydrate_chunk( // If a download is already in progress, subsequent callers to hydrate are // added to a wait list, and notified when the download finishes. if (curr_state == chunk_state::download_in_progress) { + vlog( + _ctxlog.debug, + "adding waitor for {}, waiters before: {}", + chunk_start, + chunk.waiters.size()); co_return co_await add_waiter_to_chunk(chunk_start, chunk); } @@ -162,12 +183,30 @@ ss::future segment_chunks::hydrate_chunk( try { chunk.current_state = chunk_state::download_in_progress; + watchdog wd( + 300s, [path = _segment.get_segment_path(), start = chunk_start] { + vlog( + cst_log.error, + "Stuck during do_hydrate_and_materialize for segment path: {}, " + "chunk start: {}", + path(), + start); + }); + // Keep retrying if materialization fails. bool done = false; while (!done) { + vlog( + _ctxlog.debug, + "attempting do_hydrate_and_materialize for {}", + chunk_start); auto handle = co_await do_hydrate_and_materialize( chunk_start, prefetch_override); if (handle) { + vlog( + _ctxlog.debug, + "do_hydrate_and_materialize for {} complete", + chunk_start); done = true; chunk.handle = ss::make_lw_shared(std::move(handle)); } else { @@ -180,6 +219,11 @@ ss::future segment_chunks::hydrate_chunk( } } } catch (const std::exception& ex) { + vlog( + _ctxlog.error, + "Failed to hydrate chunk start {}, error: {}", + chunk_start, + ex.what()); chunk.current_state = chunk_state::not_available; while (!chunk.waiters.empty()) { chunk.waiters.front().set_to_current_exception(); diff --git a/src/v/cloud_storage/segment_chunk_data_source.cc b/src/v/cloud_storage/segment_chunk_data_source.cc index b59c8863936c..bf880250fe4d 100644 --- a/src/v/cloud_storage/segment_chunk_data_source.cc +++ b/src/v/cloud_storage/segment_chunk_data_source.cc @@ -74,6 +74,11 @@ ss::future> chunk_data_source_impl::get() { ss::future<> chunk_data_source_impl::load_chunk_handle(chunk_start_offset_t chunk_start) { try { + vlog( + _ctxlog.debug, + "Hydrating chunk {} with prefetch {}", + chunk_start, + _prefetch_override); _current_data_file = co_await _chunks.hydrate_chunk( chunk_start, _prefetch_override); } catch (const ss::abort_requested_exception& ex) { @@ -100,10 +105,20 @@ ss::future<> chunk_data_source_impl::load_stream_for_chunk( co_await load_chunk_handle(chunk_start); } catch (...) { eptr = std::current_exception(); + vlog( + _ctxlog.error, + "Hydrating chunk {} failed with error {}", + chunk_start, + eptr); } if (eptr) { co_await maybe_close_stream(); + vlog( + _ctxlog.debug, + "Closed stream after error {} while hydrating chunk start {}", + eptr, + chunk_start); std::rethrow_exception(eptr); }