From 7ffd3260bda93613c5bd758f704a182ffe5c1c88 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Mon, 30 Oct 2023 18:07:00 -0300 Subject: [PATCH] storage: record more information in LSA Relates to log_segment_appender_test::test_concurrent_append_flush, which is a fuzzer-style test, and output it when we fail. In storage_single_thread_rpunit concurrent flush test we now log test context which will be printed if the test fails. Critically this includes the seem used to generate the random series of actions to be performed on the appender. In addition we generate a single seed per invocation and then use that seed rather than the random helper methods which use an unspecified random seed each time. Finally we record more information about the operations performed in test and output the full action sequence on failure. Issue #13035. (cherry picked from commit b02c28c35a05121d8a82e495c89e81a4bb36c20e) --- .../tests/log_segment_appender_test.cc | 269 ++++++++++++------ 1 file changed, 180 insertions(+), 89 deletions(-) diff --git a/src/v/storage/tests/log_segment_appender_test.cc b/src/v/storage/tests/log_segment_appender_test.cc index 38441b23e49e3..bc0337b759466 100644 --- a/src/v/storage/tests/log_segment_appender_test.cc +++ b/src/v/storage/tests/log_segment_appender_test.cc @@ -9,13 +9,16 @@ #include "bytes/iobuf.h" #include "bytes/iostream.h" +#include "config/configuration.h" #include "random/generators.h" #include "seastarx.h" #include "storage/chunk_cache.h" #include "storage/segment_appender.h" #include "storage/storage_resources.h" +#include #include +#include #include #include @@ -24,12 +27,17 @@ #include #include +#include #include #include #include +#include +#include +#include #include #include +#include using namespace storage; // NOLINT using namespace std::chrono; @@ -309,121 +317,204 @@ SEASTAR_THREAD_TEST_CASE( run_test_can_append_10MB_sequential_write_sequential_read(32_MiB); } +/** + * @brief Returns true iff the current test is currently passing. + * + * From https://stackoverflow.com/a/22102899 + * https://creativecommons.org/licenses/by-sa/3.0/ + */ +bool current_test_passing() { + using namespace boost::unit_test; + test_case::id_t id = framework::current_test_case().p_id; + test_results rez = results_collector.results(id); + return rez.passed(); +} + static void run_concurrent_append_flush( - size_t fallocate_size, - const size_t max_buf_size, - const size_t buf_count = 10000) { + size_t fallocate_size, const size_t max_buf_size, const size_t buf_count) { auto filename = fmt::format( "run_concurrent_append_flush_{}_{}.log", fallocate_size, max_buf_size); - auto f = open_file(filename); - storage::storage_resources resources( - config::mock_binding(std::move(fallocate_size))); - auto appender = make_segment_appender(f, resources); + auto seg_file = open_file(filename); + storage::storage_resources resources(config::mock_binding(+fallocate_size)); + auto appender = make_segment_appender(seg_file, resources); auto close = ss::defer([&appender] { appender.close().get(); }); - std::vector bufs(buf_count); - unsigned char v = 1; - for (auto& buf : bufs) { - buf = make_iobuf_with_char(random_generators::get_int(max_buf_size), v); - if (++v == 0) { - v = 1; + auto seed = random_generators::get_int(); + std::default_random_engine rng(seed); + + BOOST_TEST_CONTEXT( + "run_concurrent_append_flush, seed: " + << seed << ", fallocate_size: " << fallocate_size + << ", max_buf_size: " << max_buf_size << ", buf_count :" << buf_count) { + // the basic idea is we create a bunch of random buffers, then randomly + // perform actions on the segment appedner, like appending one of the + // random buffers, flushing the appender, yeilding, etc. + + std::vector bufs(buf_count); + unsigned char v = 1; + std::uniform_int_distribution bufdist(0, max_buf_size); + for (auto& buf : bufs) { + buf = make_iobuf_with_char(bufdist(rng), v); + if (++v == 0) { + v = 1; + } } - } - - // we do one of the following actions with equal probability, - // respecting the rule that any previous append must have resolved before a - // new one is invoked - enum action { APPEND, FLUSH, WAIT_APPEND, YIELD, LAST }; - std::optional> last_append; - std::vector> futs; - - size_t max_inflight = 0, max_dispatched = 0; - - for (size_t buf_index = 0; buf_index < bufs.size();) { - auto next_action = (action)random_generators::get_int(LAST - 1); - - max_inflight = std::max( - max_inflight, access(appender).inflight().size()); - max_dispatched = std::max( - max_dispatched, access(appender).inflight_dispatched()); - - switch (next_action) { - case APPEND: - if (!last_append) { // only if the previous append has finished + // At each iteration we chose an action to perform with equal + // probability, respecting the rules of the appender, e.g., that any + // previous append must have resolved before a new one is invoked. + struct action { + enum kind_enum { APPEND, FLUSH, WAIT_APPEND, SLEEP, LAST = SLEEP }; + + action(int kind) + : kind{(kind_enum)kind} {} + + kind_enum kind; + segment_appender_info info{}; + ss::sstring extra; + ss::future<> flush_future + = ss::make_ready_future<>(); // if kind == FLUSH + + ss::sstring to_string() const { + ss::sstring astr = [this]() { + switch (kind) { + case APPEND: + return "APPEND"; + case FLUSH: + return "FLUSH"; + case WAIT_APPEND: + return "WAIT_APPEND"; + case SLEEP: + return "SLEEP"; + } + vassert(false, "bad kind"); + }(); + + return fmt::format("{:12}: {}", astr + extra, info.to_string()); + }; + }; + + std::optional> last_append; + std::vector> futs; + + size_t max_inflight = 0, max_dispatched = 0; + + std::uniform_int_distribution dist(0, action::LAST); + + std::vector all_actions; + + for (size_t buf_index = 0; buf_index < bufs.size();) { + auto& current_action = all_actions.emplace_back(dist(rng)); + + max_inflight = std::max( + max_inflight, access(appender).inflight().size()); + max_dispatched = std::max( + max_dispatched, access(appender).inflight_dispatched()); + + switch (current_action.kind) { + case action::APPEND: + if (last_append) { + // skip, as we already have an unawaited append in progress + all_actions.pop_back(); // delete action + continue; + } last_append = appender.append(bufs[buf_index++]); - } - break; - case FLUSH: - futs.push_back(appender.flush()); - break; - case WAIT_APPEND: - if (last_append) { + break; + case action::FLUSH: + futs.push_back(appender.flush()); + break; + case action::WAIT_APPEND: + if (!last_append) { + // no append to wait for, skip + all_actions.pop_back(); + continue; + } last_append->get(); last_append.reset(); + break; + case action::SLEEP: { + // yield 99% of the time, sleep for 0-10 us the other 1% + auto sleep_us = std::uniform_int_distribution(0, 1000)(rng) + * 1us; + (sleep_us > 10us ? ss::yield() : ss::sleep(sleep_us)).get(); + current_action.extra += fmt::format(" ({} ms)", sleep_us); + break; + } + default: + BOOST_TEST_FAIL("bad action"); } - break; - case YIELD: - ss::yield().get(); - break; - default: - BOOST_TEST_FAIL("bad action"); + current_action.info = access(appender).info(); } - } - // check that we got some visible inflight and dispatched IOs - BOOST_CHECK_GT(max_inflight, 0); - BOOST_CHECK_GT(max_dispatched, 0); + // check that we got some visible inflight and dispatched IOs + BOOST_CHECK_GT(max_inflight, 0); + BOOST_CHECK_GT(max_dispatched, 0); - // now we need to wait for the last append, if any - if (last_append) { - last_append->get(); - } + // now we need to wait for the last append, if any + if (last_append) { + last_append->get(); + } - // do a final flush and wait for it - appender.flush().get(); + auto sa_state = fmt::format("{}", appender); - // now there should be nothing in-flight - BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0); - - // NOTE this is not 0 always, see issue/13035 - BOOST_TEST_INFO(fmt::format( - "appender inflight operations (should be empty): {}", - access(appender).inflight_str())); - BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0); - - // now we expect all the prior flush futures to be available - // we don't guarantee this is in the API currently but it is how it - // works currently and we might as well assert it - for (auto& f : futs) { - BOOST_REQUIRE(f.available()); - f.get(); // propagate any exception - } + // now there should be nothing in-flight + BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0); - // check that we got some writes and merges (we don't know how many) - BOOST_CHECK_GT(access(appender).total_dispatched(), 0); - BOOST_CHECK_GT(access(appender).total_merged(), 0); - - // verify the output - auto in = make_file_input_stream(f); - auto closefile = ss::defer([&] { in.close().get(); }); - for (auto& buf : bufs) { - size_t sz = buf.size_bytes(); - iobuf result = read_iobuf_exactly(in, sz).get(); - BOOST_REQUIRE_EQUAL(buf, result); + BOOST_TEST_INFO(fmt::format( + "appender inflight operations (should be empty): {}", + access(appender).inflight_str())); + BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0); + + // check that we got some writes and merges (we don't know how many) + BOOST_CHECK_GT(access(appender).total_dispatched(), 0); + BOOST_CHECK_GT(access(appender).total_merged(), 0); + + // now we expect all the prior flush futures to be available + // we don't guarantee this is in the API currently but it is how it + // works currently and we might as well assert it + + // verify the output + auto in = make_file_input_stream(seg_file); + auto closefile = ss::defer([&] { in.close().get(); }); + for (auto& buf : bufs) { + size_t sz = buf.size_bytes(); + iobuf result = read_iobuf_exactly(in, sz).get(); + BOOST_CHECK_EQUAL(buf, result); + } + + if (!current_test_passing()) { + // test is about to fail, print details + // we jump through these hoops because I can't find a better way + // to defer generating the entire diagnosis string (which may be + // very large) until a test actually fails + std::string astr; + for (size_t aid = std::max(0, (int)all_actions.size() - 50); + aid < all_actions.size(); + aid++) { + auto& ar = all_actions.at(aid); + astr += fmt::format("action[{}]: {}\n", aid, ar.to_string()); + } + BOOST_TEST_INFO("actions: \n" << astr); + + BOOST_TEST_INFO("last_append: " << last_append.has_value()); + BOOST_TEST_INFO("fsize: " << futs.size()); + BOOST_TEST_INFO("segment_appender: " << sa_state); + BOOST_TEST_FAIL("failed see above"); + } } } SEASTAR_THREAD_TEST_CASE(test_concurrent_append_flush) { // we use smaller buffer counts for the large buffer size tests // to keep the runtime manageable (less than ~2 seconds for this test) - run_concurrent_append_flush(16_KiB, 1); - run_concurrent_append_flush(16_KiB, 1000); + + run_concurrent_append_flush(16_KiB, 1, 1000); + run_concurrent_append_flush(16_KiB, 1000, 100); run_concurrent_append_flush(16_KiB, 20000, 100); - run_concurrent_append_flush(64_KiB, 1000); + run_concurrent_append_flush(64_KiB, 1000, 100); - run_concurrent_append_flush(32_MiB, 1000, 1000); + run_concurrent_append_flush(32_MiB, 1000, 100); } static void run_test_can_append_little_data(size_t fallocate_size) {