Skip to content

Commit

Permalink
find metric
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijat committed Sep 18, 2023
1 parent f7df767 commit 3c51228
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 14 deletions.
159 changes: 159 additions & 0 deletions src/v/cloud_storage/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/metrics_api.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
Expand Down Expand Up @@ -66,6 +67,164 @@ std::ostream& operator<<(std::ostream& o, cache_element_status s) {

static constexpr std::string_view tmp_extension{".part"};

namespace {

using namespace std::chrono_literals;

[[maybe_unused]] bool has_label(
const ss::metrics::impl::labels_type& labels,
std::string_view k,
std::string_view v) {
return std::find_if(
labels.cbegin(),
labels.cend(),
[&](const auto& pair) {
return pair.first == k && pair.second == v;
})
!= labels.cend();
}

struct cache_control {
static constexpr ss::lowres_clock::duration limit = 10s;

public:
explicit cache_control()
: _updated(ss::lowres_clock::now())
, _duration(limit) {}

bool should_skip_cache() {
const auto now = ss::lowres_clock::now();
const auto delta = now - _updated;

if (delta > _duration) {
_updated = now;
if (_throttled) {
_duration = 3s;
vlog(
cst_log.info,
"un-throttling cache access for {}ms",
_duration);
_throttled = false;
} else {
auto curr_delta = current_consumption_delta();
_throttled = curr_delta <= 0.0;
_duration = limit;
vlog(
cst_log.info,
"calculated consumption delta: {}, throttling: {}",
curr_delta,
_throttled);
}
}

return _throttled;
}

private:
double queue_adjusted_consumption(std::string_view class_name) {
auto all_metrics = *ss::metrics::impl::get_values();
auto metric_values_v = all_metrics.values;
auto metadata_v = all_metrics.metadata;

auto values_it = metric_values_v.cbegin();

for (auto it = metadata_v->cbegin();
it != metadata_v->cend() && values_it != metric_values_v.cend();
++it, ++values_it) {
if (it->mf.name != "io_queue_adjusted_consumption") {
continue;
}

auto values = values_it->begin();
for (const auto& metric_info : it->metrics) {
const auto& labels = metric_info.id.labels();
if (
has_label(labels, "mountpoint", "/var/lib/redpanda/data")
&& has_label(labels, "class", class_name)) {
return values->d();
}
values++;
}
}

return 0;
}

double current_consumption_delta() {
const auto raft_cons = queue_adjusted_consumption("raft");
const auto si_cons = queue_adjusted_consumption("shadow-indexing");
const auto delta = raft_cons - si_cons;
vlog(
cst_log.info,
"raft cons: {} SI cons: {} delta: {}",
raft_cons,
si_cons,
delta);
return delta;
}

std::optional<uint64_t> current_p95_latency() {
auto all_metrics = *ss::metrics::impl::get_values();
auto metric_values_v = all_metrics.values;
auto metadata_v = all_metrics.metadata;

std::vector<std::pair<double, long>> counts{};
bool found_family = false;
auto values_it = metric_values_v.cbegin();
for (auto it = metadata_v->cbegin();
it != metadata_v->cend() && values_it != metric_values_v.cend()
&& !found_family;
++it, ++values_it) {
if (it->mf.name != "kafka_latency_produce_latency_us") {
continue;
}

found_family = true;
for (const auto& bucket :
values_it->begin()->get_histogram().buckets) {
counts.emplace_back(bucket.upper_bound, bucket.count);
}
}

if (counts.empty()) {
vlog(cst_log.error, "metrics no count!!");
return std::nullopt;
}

auto result = counts.front().first;
const auto p95_count = counts.back().second * 0.95;
for (const auto& [u, c] : counts) {
if (c < p95_count) {
result = u;
} else {
break;
}
}
return result;
}

ss::lowres_clock::time_point _updated;
uint64_t _current_p95_latency{0};
ss::lowres_clock::duration _duration;
bool _throttled{false};
};

static thread_local std::unique_ptr<cache_control> cache_controller;

void init_cc() { cache_controller = std::make_unique<cache_control>(); }

bool control_skip_cache() {
if (unlikely(!cache_controller)) {
init_cc();
}

return cache_controller->should_skip_cache();
}

} // namespace

bool cache::should_skip_cache() const { return control_skip_cache(); }

cache::cache(
std::filesystem::path cache_dir,
size_t disk_size,
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_storage/cache_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class space_reservation_guard {

class cache : public ss::peering_sharded_service<cache> {
public:
bool should_skip_cache() const;

/// C-tor.
///
/// \param cache_dir is a directory where cached data is stored
Expand Down
62 changes: 51 additions & 11 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "model/record.h"
#include "model/record_batch_types.h"
#include "raft/consensus.h"
#include "random/generators.h"
#include "resource_mgmt/io_priority.h"
#include "ssx/future-util.h"
#include "ssx/sformat.h"
Expand All @@ -39,6 +40,7 @@
#include <seastar/core/io_priority_class.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/metrics_api.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/temporary_buffer.hh>
Expand Down Expand Up @@ -305,25 +307,49 @@ remote_segment::offset_data_stream(
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp> first_timestamp,
ss::io_priority_class io_priority,
bool skip_cache) {
ss::io_priority_class io_priority) {
vlog(_ctxlog.debug, "remote segment file input stream at offset {}", start);
ss::gate::holder g(_gate);
co_await hydrate();

std::optional<offset_index::find_result> indexed_pos;
std::optional<uint16_t> prefetch_override = std::nullopt;
// skip_cache = random_generators::get_int(100)
// >= config::shard_local_cfg().skip_cache_chance();

auto skip_cache = _cache.should_skip_cache();

bool is_materialized = is_state_materialized();
if (skip_cache && !is_legacy_mode_engaged() && _coarse_index.has_value()) {
is_materialized = cache_element_status::available
== co_await _cache.is_cached(get_path_to_chunk(
get_chunk_start_for_kafka_offset(start)));
if (is_state_materialized() && !is_materialized) {
vlog(
_ctxlog.info,
"index present but chunk for kafka offset {} absent, cache will "
"be skipped",
start);
}
}

if (skip_cache) {
vlog(_ctxlog.info, "using direct stream to serve request");
if (skip_cache & !is_materialized) {
vlog(
_ctxlog.info,
"using direct stream to serve request {{{}-{}}}",
start,
end);
auto stream = std::make_unique<stream_with_leased_client>(
co_await build_stream());
co_return input_stream_with_offsets{
.stream = ss::input_stream<char>{ss::data_source{std::move(stream)}},
.rp_offset = _base_rp_offset,
.kafka_offset = _base_rp_offset - _base_offset_delta};
.kafka_offset = _base_rp_offset - _base_offset_delta,
.persistent = false};
}

co_await hydrate();

std::optional<offset_index::find_result> indexed_pos;
std::optional<uint16_t> prefetch_override = std::nullopt;

// Perform index lookup by timestamp or offset. This reduces the number
// of hydrated chunks required to serve the request.
if (first_timestamp) {
Expand Down Expand Up @@ -506,7 +532,14 @@ ss::future<> remote_segment::put_chunk_in_cache(
ss::input_stream<char> stream,
chunk_start_offset_t chunk_start) {
try {
co_await _cache.put(get_path_to_chunk(chunk_start), stream, reservation)
co_await _cache
.put(
get_path_to_chunk(chunk_start),
stream,
reservation,
priority_manager::local().shadow_indexing_priority(),
128_KiB,
1)
.finally([&stream] { return stream.close(); });
} catch (...) {
auto put_exception = std::current_exception();
Expand Down Expand Up @@ -1366,6 +1399,11 @@ remote_segment_batch_reader::read_some(
co_return ss::circular_buffer<model::record_batch>{};
}
_bytes_consumed = new_bytes_consumed.value();

// if (_config.over_budget && should_reset_parser) {
// co_await _parser->close();
// _parser.reset();
// }
}
_total_size = 0;
co_return std::move(_ringbuf);
Expand All @@ -1383,8 +1421,10 @@ remote_segment_batch_reader::init_parser() {
model::offset_cast(_config.start_offset),
model::offset_cast(_config.max_offset),
_config.first_timestamp,
priority_manager::local().shadow_indexing_priority(),
true);
priority_manager::local().shadow_indexing_priority());

// Reset parser if overbudget when stream is not disk-based
should_reset_parser = !stream_off.persistent;

auto parser = std::make_unique<storage::continuous_batch_parser>(
std::make_unique<remote_segment_batch_consumer>(
Expand Down
17 changes: 14 additions & 3 deletions src/v/cloud_storage/remote_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,16 @@ struct stream_with_leased_client : public ss::data_source_impl {
co_return co_await stream.read();
}

ss::future<> close() override { co_return co_await stream.close(); }
ss::future<> close() override {
auto is_eof = stream.eof();
co_await stream.close();
if (!is_eof) {
vlog(
cst_log.info,
"stopping leased client because it did not consume stream");
lease->client->shutdown();
}
}

lease_t lease;
ss::input_stream<char> stream;
Expand Down Expand Up @@ -123,15 +132,15 @@ class remote_segment final {
ss::input_stream<char> stream;
model::offset rp_offset;
kafka::offset kafka_offset;
bool persistent{true};
};
/// create an input stream _sharing_ the underlying file handle
/// starting at position @pos
ss::future<input_stream_with_offsets> offset_data_stream(
kafka::offset start,
kafka::offset end,
std::optional<model::timestamp>,
ss::io_priority_class,
bool skip_cache = false);
ss::io_priority_class);

/// Hydrate the segment for segment meta version v2 or lower. For v3 or
/// higher, only hydrate the index. If the index hydration fails, fall back
Expand Down Expand Up @@ -346,6 +355,8 @@ class remote_segment_batch_reader final {
friend class remote_segment_batch_consumer;

public:
bool should_reset_parser{false};

remote_segment_batch_reader(
ss::lw_shared_ptr<remote_segment>,
const storage::log_reader_config& config,
Expand Down

0 comments on commit 3c51228

Please sign in to comment.