Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] CORE-1752: cst: improved logging (manual backport) #17794

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/v/cloud_storage/segment_chunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@

namespace cloud_storage {

std::ostream& operator<<(std::ostream& os, chunk_state c) {
switch (c) {
case chunk_state::not_available:
return os << "not available";
case chunk_state::download_in_progress:
return os << "download in progress";
case chunk_state::hydrated:
return os << "hydrated";
}
}

std::strong_ordering
segment_chunk::operator<=>(const segment_chunk& chunk) const {
const auto cmp = required_by_readers_in_future
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage/segment_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum class chunk_state {
hydrated,
};

std::ostream& operator<<(std::ostream& os, chunk_state);

struct segment_chunk {
chunk_state current_state;

Expand Down
49 changes: 48 additions & 1 deletion src/v/cloud_storage/segment_chunk_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@

#include "cloud_storage/logger.h"
#include "cloud_storage/remote_segment.h"
#include "utils/gate_guard.h"
#include "ssx/watchdog.h"

#include <chrono>

using namespace std::chrono_literals;

namespace {
constexpr auto cache_backoff_duration = 5s;
Expand Down Expand Up @@ -124,8 +128,17 @@ ss::future<ss::file> segment_chunks::do_hydrate_and_materialize(

const auto prefetch = prefetch_override.value_or(
config::shard_local_cfg().cloud_storage_chunk_prefetch);

vlog(
_ctxlog.debug,
"hydrating chunk start: {}, prefetch: {}",
chunk_start,
prefetch);

co_await _segment.hydrate_chunk(
segment_chunk_range{_chunks, prefetch, chunk_start});

vlog(_ctxlog.debug, "materializing chunk start: {}", chunk_start, prefetch);
co_return co_await _segment.materialize_chunk(chunk_start);
}

Expand All @@ -141,6 +154,12 @@ ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(

auto& chunk = _chunks[chunk_start];
auto curr_state = chunk.current_state;

vlog(
_ctxlog.debug,
"hydrate_chunk for {}, current state: {}",
chunk_start,
curr_state);
if (curr_state == chunk_state::hydrated) {
vassert(
chunk.handle,
Expand All @@ -152,19 +171,42 @@ ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(
// If a download is already in progress, subsequent callers to hydrate are
// added to a wait list, and notified when the download finishes.
if (curr_state == chunk_state::download_in_progress) {
vlog(
_ctxlog.debug,
"adding waitor for {}, waiters before: {}",
chunk_start,
chunk.waiters.size());
co_return co_await add_waiter_to_chunk(chunk_start, chunk);
}

// Download is not in progress. Set the flag and begin download attempt.
try {
chunk.current_state = chunk_state::download_in_progress;

watchdog wd(
300s, [path = _segment.get_segment_path(), start = chunk_start] {
vlog(
cst_log.error,
"Stuck during do_hydrate_and_materialize for segment path: {}, "
"chunk start: {}",
path(),
start);
});

// Keep retrying if materialization fails.
bool done = false;
while (!done) {
vlog(
_ctxlog.debug,
"attempting do_hydrate_and_materialize for {}",
chunk_start);
auto handle = co_await do_hydrate_and_materialize(
chunk_start, prefetch_override);
if (handle) {
vlog(
_ctxlog.debug,
"do_hydrate_and_materialize for {} complete",
chunk_start);
done = true;
chunk.handle = ss::make_lw_shared(std::move(handle));
} else {
Expand All @@ -177,6 +219,11 @@ ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(
}
}
} catch (const std::exception& ex) {
vlog(
_ctxlog.error,
"Failed to hydrate chunk start {}, error: {}",
chunk_start,
ex.what());
chunk.current_state = chunk_state::not_available;
while (!chunk.waiters.empty()) {
chunk.waiters.front().set_to_current_exception();
Expand Down
15 changes: 15 additions & 0 deletions src/v/cloud_storage/segment_chunk_data_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ ss::future<ss::temporary_buffer<char>> chunk_data_source_impl::get() {
ss::future<>
chunk_data_source_impl::load_chunk_handle(chunk_start_offset_t chunk_start) {
try {
vlog(
_ctxlog.debug,
"Hydrating chunk {} with prefetch {}",
chunk_start,
_prefetch_override);
_current_data_file = co_await _chunks.hydrate_chunk(
chunk_start, _prefetch_override);
} catch (const ss::abort_requested_exception& ex) {
Expand All @@ -101,10 +106,20 @@ ss::future<> chunk_data_source_impl::load_stream_for_chunk(
co_await load_chunk_handle(chunk_start);
} catch (...) {
eptr = std::current_exception();
vlog(
_ctxlog.error,
"Hydrating chunk {} failed with error {}",
chunk_start,
eptr);
}

if (eptr) {
co_await maybe_close_stream();
vlog(
_ctxlog.debug,
"Closed stream after error {} while hydrating chunk start {}",
eptr,
chunk_start);
std::rethrow_exception(eptr);
}

Expand Down
Loading