Skip to content

Commit

Permalink
Merge pull request #17794 from abhijat/backport-pr-17785-v23.2.x-211
Browse files Browse the repository at this point in the history
[v23.2.x] CORE-1752: cst: improved logging (manual backport)
  • Loading branch information
piyushredpanda authored Apr 14, 2024
2 parents 1226817 + fa441d1 commit 7808db5
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/v/cloud_storage/segment_chunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@

namespace cloud_storage {

std::ostream& operator<<(std::ostream& os, chunk_state c) {
switch (c) {
case chunk_state::not_available:
return os << "not available";
case chunk_state::download_in_progress:
return os << "download in progress";
case chunk_state::hydrated:
return os << "hydrated";
}
}

std::strong_ordering
segment_chunk::operator<=>(const segment_chunk& chunk) const {
const auto cmp = required_by_readers_in_future
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage/segment_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum class chunk_state {
hydrated,
};

std::ostream& operator<<(std::ostream& os, chunk_state);

struct segment_chunk {
chunk_state current_state;

Expand Down
49 changes: 48 additions & 1 deletion src/v/cloud_storage/segment_chunk_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@

#include "cloud_storage/logger.h"
#include "cloud_storage/remote_segment.h"
#include "utils/gate_guard.h"
#include "ssx/watchdog.h"

#include <chrono>

using namespace std::chrono_literals;

namespace {
constexpr auto cache_backoff_duration = 5s;
Expand Down Expand Up @@ -124,8 +128,17 @@ ss::future<ss::file> segment_chunks::do_hydrate_and_materialize(

const auto prefetch = prefetch_override.value_or(
config::shard_local_cfg().cloud_storage_chunk_prefetch);

vlog(
_ctxlog.debug,
"hydrating chunk start: {}, prefetch: {}",
chunk_start,
prefetch);

co_await _segment.hydrate_chunk(
segment_chunk_range{_chunks, prefetch, chunk_start});

vlog(_ctxlog.debug, "materializing chunk start: {}", chunk_start, prefetch);
co_return co_await _segment.materialize_chunk(chunk_start);
}

Expand All @@ -141,6 +154,12 @@ ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(

auto& chunk = _chunks[chunk_start];
auto curr_state = chunk.current_state;

vlog(
_ctxlog.debug,
"hydrate_chunk for {}, current state: {}",
chunk_start,
curr_state);
if (curr_state == chunk_state::hydrated) {
vassert(
chunk.handle,
Expand All @@ -152,19 +171,42 @@ ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(
// If a download is already in progress, subsequent callers to hydrate are
// added to a wait list, and notified when the download finishes.
if (curr_state == chunk_state::download_in_progress) {
vlog(
_ctxlog.debug,
"adding waitor for {}, waiters before: {}",
chunk_start,
chunk.waiters.size());
co_return co_await add_waiter_to_chunk(chunk_start, chunk);
}

// Download is not in progress. Set the flag and begin download attempt.
try {
chunk.current_state = chunk_state::download_in_progress;

watchdog wd(
300s, [path = _segment.get_segment_path(), start = chunk_start] {
vlog(
cst_log.error,
"Stuck during do_hydrate_and_materialize for segment path: {}, "
"chunk start: {}",
path(),
start);
});

// Keep retrying if materialization fails.
bool done = false;
while (!done) {
vlog(
_ctxlog.debug,
"attempting do_hydrate_and_materialize for {}",
chunk_start);
auto handle = co_await do_hydrate_and_materialize(
chunk_start, prefetch_override);
if (handle) {
vlog(
_ctxlog.debug,
"do_hydrate_and_materialize for {} complete",
chunk_start);
done = true;
chunk.handle = ss::make_lw_shared(std::move(handle));
} else {
Expand All @@ -177,6 +219,11 @@ ss::future<segment_chunk::handle_t> segment_chunks::hydrate_chunk(
}
}
} catch (const std::exception& ex) {
vlog(
_ctxlog.error,
"Failed to hydrate chunk start {}, error: {}",
chunk_start,
ex.what());
chunk.current_state = chunk_state::not_available;
while (!chunk.waiters.empty()) {
chunk.waiters.front().set_to_current_exception();
Expand Down
15 changes: 15 additions & 0 deletions src/v/cloud_storage/segment_chunk_data_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ ss::future<ss::temporary_buffer<char>> chunk_data_source_impl::get() {
ss::future<>
chunk_data_source_impl::load_chunk_handle(chunk_start_offset_t chunk_start) {
try {
vlog(
_ctxlog.debug,
"Hydrating chunk {} with prefetch {}",
chunk_start,
_prefetch_override);
_current_data_file = co_await _chunks.hydrate_chunk(
chunk_start, _prefetch_override);
} catch (const ss::abort_requested_exception& ex) {
Expand All @@ -101,10 +106,20 @@ ss::future<> chunk_data_source_impl::load_stream_for_chunk(
co_await load_chunk_handle(chunk_start);
} catch (...) {
eptr = std::current_exception();
vlog(
_ctxlog.error,
"Hydrating chunk {} failed with error {}",
chunk_start,
eptr);
}

if (eptr) {
co_await maybe_close_stream();
vlog(
_ctxlog.debug,
"Closed stream after error {} while hydrating chunk start {}",
eptr,
chunk_start);
std::rethrow_exception(eptr);
}

Expand Down

0 comments on commit 7808db5

Please sign in to comment.