Skip to content

Commit

Permalink
storage: record more information in LSA
Browse files Browse the repository at this point in the history
Relates to log_segment_appender_test::test_concurrent_append_flush,
which is a fuzzer-style test, and output it when we fail.

In storage_single_thread_rpunit concurrent flush test we now log
test context which will be printed if the test fails. Critically this
includes the seem used to generate the random series of actions to
be performed on the appender.

In addition we generate a single seed per invocation and then use that
seed rather than the random helper methods which use an unspecified
random seed each time.

Finally we record more information about the operations performed in
test and output the full action sequence on failure.

Issue redpanda-data#13035.

(cherry picked from commit b02c28c)
  • Loading branch information
travisdowns authored and vbotbuildovich committed Dec 18, 2023
1 parent 674b0ab commit 7ffd326
Showing 1 changed file with 180 additions and 89 deletions.
269 changes: 180 additions & 89 deletions src/v/storage/tests/log_segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@

#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "config/configuration.h"
#include "random/generators.h"
#include "seastarx.h"
#include "storage/chunk_cache.h"
#include "storage/segment_appender.h"
#include "storage/storage_resources.h"

#include <seastar/core/future.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/testing/thread_test_case.hh>

Expand All @@ -24,12 +27,17 @@
#include <seastar/util/defer.hh>
#include <seastar/util/later.hh>

#include <boost/test/results_collector.hpp>
#include <boost/test/tools/interface.hpp>
#include <boost/test/tools/old/interface.hpp>
#include <fmt/format.h>

#include <algorithm>
#include <chrono>
#include <random>
#include <ranges>
#include <string_view>
#include <vector>

using namespace storage; // NOLINT
using namespace std::chrono;
Expand Down Expand Up @@ -309,121 +317,204 @@ SEASTAR_THREAD_TEST_CASE(
run_test_can_append_10MB_sequential_write_sequential_read(32_MiB);
}

/**
* @brief Returns true iff the current test is currently passing.
*
* From https://stackoverflow.com/a/22102899
* https://creativecommons.org/licenses/by-sa/3.0/
*/
bool current_test_passing() {
using namespace boost::unit_test;
test_case::id_t id = framework::current_test_case().p_id;
test_results rez = results_collector.results(id);
return rez.passed();
}

static void run_concurrent_append_flush(
size_t fallocate_size,
const size_t max_buf_size,
const size_t buf_count = 10000) {
size_t fallocate_size, const size_t max_buf_size, const size_t buf_count) {
auto filename = fmt::format(
"run_concurrent_append_flush_{}_{}.log", fallocate_size, max_buf_size);
auto f = open_file(filename);
storage::storage_resources resources(
config::mock_binding<size_t>(std::move(fallocate_size)));
auto appender = make_segment_appender(f, resources);
auto seg_file = open_file(filename);
storage::storage_resources resources(config::mock_binding(+fallocate_size));
auto appender = make_segment_appender(seg_file, resources);
auto close = ss::defer([&appender] { appender.close().get(); });

std::vector<iobuf> bufs(buf_count);
unsigned char v = 1;
for (auto& buf : bufs) {
buf = make_iobuf_with_char(random_generators::get_int(max_buf_size), v);
if (++v == 0) {
v = 1;
auto seed = random_generators::get_int<size_t>();
std::default_random_engine rng(seed);

BOOST_TEST_CONTEXT(
"run_concurrent_append_flush, seed: "
<< seed << ", fallocate_size: " << fallocate_size
<< ", max_buf_size: " << max_buf_size << ", buf_count :" << buf_count) {
// the basic idea is we create a bunch of random buffers, then randomly
// perform actions on the segment appedner, like appending one of the
// random buffers, flushing the appender, yeilding, etc.

std::vector<iobuf> bufs(buf_count);
unsigned char v = 1;
std::uniform_int_distribution<size_t> bufdist(0, max_buf_size);
for (auto& buf : bufs) {
buf = make_iobuf_with_char(bufdist(rng), v);
if (++v == 0) {
v = 1;
}
}
}

// we do one of the following actions with equal probability,
// respecting the rule that any previous append must have resolved before a
// new one is invoked
enum action { APPEND, FLUSH, WAIT_APPEND, YIELD, LAST };

std::optional<ss::future<>> last_append;
std::vector<ss::future<>> futs;

size_t max_inflight = 0, max_dispatched = 0;

for (size_t buf_index = 0; buf_index < bufs.size();) {
auto next_action = (action)random_generators::get_int(LAST - 1);

max_inflight = std::max(
max_inflight, access(appender).inflight().size());
max_dispatched = std::max(
max_dispatched, access(appender).inflight_dispatched());

switch (next_action) {
case APPEND:
if (!last_append) { // only if the previous append has finished
// At each iteration we chose an action to perform with equal
// probability, respecting the rules of the appender, e.g., that any
// previous append must have resolved before a new one is invoked.
struct action {
enum kind_enum { APPEND, FLUSH, WAIT_APPEND, SLEEP, LAST = SLEEP };

action(int kind)
: kind{(kind_enum)kind} {}

kind_enum kind;
segment_appender_info info{};
ss::sstring extra;
ss::future<> flush_future
= ss::make_ready_future<>(); // if kind == FLUSH

ss::sstring to_string() const {
ss::sstring astr = [this]() {
switch (kind) {
case APPEND:
return "APPEND";
case FLUSH:
return "FLUSH";
case WAIT_APPEND:
return "WAIT_APPEND";
case SLEEP:
return "SLEEP";
}
vassert(false, "bad kind");
}();

return fmt::format("{:12}: {}", astr + extra, info.to_string());
};
};

std::optional<ss::future<>> last_append;
std::vector<ss::future<>> futs;

size_t max_inflight = 0, max_dispatched = 0;

std::uniform_int_distribution<int> dist(0, action::LAST);

std::vector<action> all_actions;

for (size_t buf_index = 0; buf_index < bufs.size();) {
auto& current_action = all_actions.emplace_back(dist(rng));

max_inflight = std::max(
max_inflight, access(appender).inflight().size());
max_dispatched = std::max(
max_dispatched, access(appender).inflight_dispatched());

switch (current_action.kind) {
case action::APPEND:
if (last_append) {
// skip, as we already have an unawaited append in progress
all_actions.pop_back(); // delete action
continue;
}
last_append = appender.append(bufs[buf_index++]);
}
break;
case FLUSH:
futs.push_back(appender.flush());
break;
case WAIT_APPEND:
if (last_append) {
break;
case action::FLUSH:
futs.push_back(appender.flush());
break;
case action::WAIT_APPEND:
if (!last_append) {
// no append to wait for, skip
all_actions.pop_back();
continue;
}
last_append->get();
last_append.reset();
break;
case action::SLEEP: {
// yield 99% of the time, sleep for 0-10 us the other 1%
auto sleep_us = std::uniform_int_distribution<int>(0, 1000)(rng)
* 1us;
(sleep_us > 10us ? ss::yield() : ss::sleep(sleep_us)).get();
current_action.extra += fmt::format(" ({} ms)", sleep_us);
break;
}
default:
BOOST_TEST_FAIL("bad action");
}
break;
case YIELD:
ss::yield().get();
break;
default:
BOOST_TEST_FAIL("bad action");
current_action.info = access(appender).info();
}
}

// check that we got some visible inflight and dispatched IOs
BOOST_CHECK_GT(max_inflight, 0);
BOOST_CHECK_GT(max_dispatched, 0);
// check that we got some visible inflight and dispatched IOs
BOOST_CHECK_GT(max_inflight, 0);
BOOST_CHECK_GT(max_dispatched, 0);

// now we need to wait for the last append, if any
if (last_append) {
last_append->get();
}
// now we need to wait for the last append, if any
if (last_append) {
last_append->get();
}

// do a final flush and wait for it
appender.flush().get();
auto sa_state = fmt::format("{}", appender);

// now there should be nothing in-flight
BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0);

// NOTE this is not 0 always, see issue/13035
BOOST_TEST_INFO(fmt::format(
"appender inflight operations (should be empty): {}",
access(appender).inflight_str()));
BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0);

// now we expect all the prior flush futures to be available
// we don't guarantee this is in the API currently but it is how it
// works currently and we might as well assert it
for (auto& f : futs) {
BOOST_REQUIRE(f.available());
f.get(); // propagate any exception
}
// now there should be nothing in-flight
BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0);

// check that we got some writes and merges (we don't know how many)
BOOST_CHECK_GT(access(appender).total_dispatched(), 0);
BOOST_CHECK_GT(access(appender).total_merged(), 0);

// verify the output
auto in = make_file_input_stream(f);
auto closefile = ss::defer([&] { in.close().get(); });
for (auto& buf : bufs) {
size_t sz = buf.size_bytes();
iobuf result = read_iobuf_exactly(in, sz).get();
BOOST_REQUIRE_EQUAL(buf, result);
BOOST_TEST_INFO(fmt::format(
"appender inflight operations (should be empty): {}",
access(appender).inflight_str()));
BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0);

// check that we got some writes and merges (we don't know how many)
BOOST_CHECK_GT(access(appender).total_dispatched(), 0);
BOOST_CHECK_GT(access(appender).total_merged(), 0);

// now we expect all the prior flush futures to be available
// we don't guarantee this is in the API currently but it is how it
// works currently and we might as well assert it

// verify the output
auto in = make_file_input_stream(seg_file);
auto closefile = ss::defer([&] { in.close().get(); });
for (auto& buf : bufs) {
size_t sz = buf.size_bytes();
iobuf result = read_iobuf_exactly(in, sz).get();
BOOST_CHECK_EQUAL(buf, result);
}

if (!current_test_passing()) {
// test is about to fail, print details
// we jump through these hoops because I can't find a better way
// to defer generating the entire diagnosis string (which may be
// very large) until a test actually fails
std::string astr;
for (size_t aid = std::max(0, (int)all_actions.size() - 50);
aid < all_actions.size();
aid++) {
auto& ar = all_actions.at(aid);
astr += fmt::format("action[{}]: {}\n", aid, ar.to_string());
}
BOOST_TEST_INFO("actions: \n" << astr);

BOOST_TEST_INFO("last_append: " << last_append.has_value());
BOOST_TEST_INFO("fsize: " << futs.size());
BOOST_TEST_INFO("segment_appender: " << sa_state);
BOOST_TEST_FAIL("failed see above");
}
}
}

SEASTAR_THREAD_TEST_CASE(test_concurrent_append_flush) {
// we use smaller buffer counts for the large buffer size tests
// to keep the runtime manageable (less than ~2 seconds for this test)
run_concurrent_append_flush(16_KiB, 1);
run_concurrent_append_flush(16_KiB, 1000);

run_concurrent_append_flush(16_KiB, 1, 1000);
run_concurrent_append_flush(16_KiB, 1000, 100);
run_concurrent_append_flush(16_KiB, 20000, 100);

run_concurrent_append_flush(64_KiB, 1000);
run_concurrent_append_flush(64_KiB, 1000, 100);

run_concurrent_append_flush(32_MiB, 1000, 1000);
run_concurrent_append_flush(32_MiB, 1000, 100);
}

static void run_test_can_append_little_data(size_t fallocate_size) {
Expand Down

0 comments on commit 7ffd326

Please sign in to comment.