diff --git a/src/v/storage/tests/log_segment_appender_test.cc b/src/v/storage/tests/log_segment_appender_test.cc index 38441b23e49e3..bc0337b759466 100644 --- a/src/v/storage/tests/log_segment_appender_test.cc +++ b/src/v/storage/tests/log_segment_appender_test.cc @@ -9,13 +9,16 @@ #include "bytes/iobuf.h" #include "bytes/iostream.h" +#include "config/configuration.h" #include "random/generators.h" #include "seastarx.h" #include "storage/chunk_cache.h" #include "storage/segment_appender.h" #include "storage/storage_resources.h" +#include #include +#include #include #include @@ -24,12 +27,17 @@ #include #include +#include #include #include #include +#include +#include +#include #include #include +#include using namespace storage; // NOLINT using namespace std::chrono; @@ -309,121 +317,204 @@ SEASTAR_THREAD_TEST_CASE( run_test_can_append_10MB_sequential_write_sequential_read(32_MiB); } +/** + * @brief Returns true iff the current test is currently passing. + * + * From https://stackoverflow.com/a/22102899 + * https://creativecommons.org/licenses/by-sa/3.0/ + */ +bool current_test_passing() { + using namespace boost::unit_test; + test_case::id_t id = framework::current_test_case().p_id; + test_results rez = results_collector.results(id); + return rez.passed(); +} + static void run_concurrent_append_flush( - size_t fallocate_size, - const size_t max_buf_size, - const size_t buf_count = 10000) { + size_t fallocate_size, const size_t max_buf_size, const size_t buf_count) { auto filename = fmt::format( "run_concurrent_append_flush_{}_{}.log", fallocate_size, max_buf_size); - auto f = open_file(filename); - storage::storage_resources resources( - config::mock_binding(std::move(fallocate_size))); - auto appender = make_segment_appender(f, resources); + auto seg_file = open_file(filename); + storage::storage_resources resources(config::mock_binding(+fallocate_size)); + auto appender = make_segment_appender(seg_file, resources); auto close = ss::defer([&appender] { appender.close().get(); }); - std::vector bufs(buf_count); - unsigned char v = 1; - for (auto& buf : bufs) { - buf = make_iobuf_with_char(random_generators::get_int(max_buf_size), v); - if (++v == 0) { - v = 1; + auto seed = random_generators::get_int(); + std::default_random_engine rng(seed); + + BOOST_TEST_CONTEXT( + "run_concurrent_append_flush, seed: " + << seed << ", fallocate_size: " << fallocate_size + << ", max_buf_size: " << max_buf_size << ", buf_count :" << buf_count) { + // the basic idea is we create a bunch of random buffers, then randomly + // perform actions on the segment appedner, like appending one of the + // random buffers, flushing the appender, yeilding, etc. + + std::vector bufs(buf_count); + unsigned char v = 1; + std::uniform_int_distribution bufdist(0, max_buf_size); + for (auto& buf : bufs) { + buf = make_iobuf_with_char(bufdist(rng), v); + if (++v == 0) { + v = 1; + } } - } - - // we do one of the following actions with equal probability, - // respecting the rule that any previous append must have resolved before a - // new one is invoked - enum action { APPEND, FLUSH, WAIT_APPEND, YIELD, LAST }; - std::optional> last_append; - std::vector> futs; - - size_t max_inflight = 0, max_dispatched = 0; - - for (size_t buf_index = 0; buf_index < bufs.size();) { - auto next_action = (action)random_generators::get_int(LAST - 1); - - max_inflight = std::max( - max_inflight, access(appender).inflight().size()); - max_dispatched = std::max( - max_dispatched, access(appender).inflight_dispatched()); - - switch (next_action) { - case APPEND: - if (!last_append) { // only if the previous append has finished + // At each iteration we chose an action to perform with equal + // probability, respecting the rules of the appender, e.g., that any + // previous append must have resolved before a new one is invoked. + struct action { + enum kind_enum { APPEND, FLUSH, WAIT_APPEND, SLEEP, LAST = SLEEP }; + + action(int kind) + : kind{(kind_enum)kind} {} + + kind_enum kind; + segment_appender_info info{}; + ss::sstring extra; + ss::future<> flush_future + = ss::make_ready_future<>(); // if kind == FLUSH + + ss::sstring to_string() const { + ss::sstring astr = [this]() { + switch (kind) { + case APPEND: + return "APPEND"; + case FLUSH: + return "FLUSH"; + case WAIT_APPEND: + return "WAIT_APPEND"; + case SLEEP: + return "SLEEP"; + } + vassert(false, "bad kind"); + }(); + + return fmt::format("{:12}: {}", astr + extra, info.to_string()); + }; + }; + + std::optional> last_append; + std::vector> futs; + + size_t max_inflight = 0, max_dispatched = 0; + + std::uniform_int_distribution dist(0, action::LAST); + + std::vector all_actions; + + for (size_t buf_index = 0; buf_index < bufs.size();) { + auto& current_action = all_actions.emplace_back(dist(rng)); + + max_inflight = std::max( + max_inflight, access(appender).inflight().size()); + max_dispatched = std::max( + max_dispatched, access(appender).inflight_dispatched()); + + switch (current_action.kind) { + case action::APPEND: + if (last_append) { + // skip, as we already have an unawaited append in progress + all_actions.pop_back(); // delete action + continue; + } last_append = appender.append(bufs[buf_index++]); - } - break; - case FLUSH: - futs.push_back(appender.flush()); - break; - case WAIT_APPEND: - if (last_append) { + break; + case action::FLUSH: + futs.push_back(appender.flush()); + break; + case action::WAIT_APPEND: + if (!last_append) { + // no append to wait for, skip + all_actions.pop_back(); + continue; + } last_append->get(); last_append.reset(); + break; + case action::SLEEP: { + // yield 99% of the time, sleep for 0-10 us the other 1% + auto sleep_us = std::uniform_int_distribution(0, 1000)(rng) + * 1us; + (sleep_us > 10us ? ss::yield() : ss::sleep(sleep_us)).get(); + current_action.extra += fmt::format(" ({} ms)", sleep_us); + break; + } + default: + BOOST_TEST_FAIL("bad action"); } - break; - case YIELD: - ss::yield().get(); - break; - default: - BOOST_TEST_FAIL("bad action"); + current_action.info = access(appender).info(); } - } - // check that we got some visible inflight and dispatched IOs - BOOST_CHECK_GT(max_inflight, 0); - BOOST_CHECK_GT(max_dispatched, 0); + // check that we got some visible inflight and dispatched IOs + BOOST_CHECK_GT(max_inflight, 0); + BOOST_CHECK_GT(max_dispatched, 0); - // now we need to wait for the last append, if any - if (last_append) { - last_append->get(); - } + // now we need to wait for the last append, if any + if (last_append) { + last_append->get(); + } - // do a final flush and wait for it - appender.flush().get(); + auto sa_state = fmt::format("{}", appender); - // now there should be nothing in-flight - BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0); - - // NOTE this is not 0 always, see issue/13035 - BOOST_TEST_INFO(fmt::format( - "appender inflight operations (should be empty): {}", - access(appender).inflight_str())); - BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0); - - // now we expect all the prior flush futures to be available - // we don't guarantee this is in the API currently but it is how it - // works currently and we might as well assert it - for (auto& f : futs) { - BOOST_REQUIRE(f.available()); - f.get(); // propagate any exception - } + // now there should be nothing in-flight + BOOST_CHECK_EQUAL(access(appender).inflight_dispatched(), 0); - // check that we got some writes and merges (we don't know how many) - BOOST_CHECK_GT(access(appender).total_dispatched(), 0); - BOOST_CHECK_GT(access(appender).total_merged(), 0); - - // verify the output - auto in = make_file_input_stream(f); - auto closefile = ss::defer([&] { in.close().get(); }); - for (auto& buf : bufs) { - size_t sz = buf.size_bytes(); - iobuf result = read_iobuf_exactly(in, sz).get(); - BOOST_REQUIRE_EQUAL(buf, result); + BOOST_TEST_INFO(fmt::format( + "appender inflight operations (should be empty): {}", + access(appender).inflight_str())); + BOOST_CHECK_EQUAL(access(appender).inflight().size(), 0); + + // check that we got some writes and merges (we don't know how many) + BOOST_CHECK_GT(access(appender).total_dispatched(), 0); + BOOST_CHECK_GT(access(appender).total_merged(), 0); + + // now we expect all the prior flush futures to be available + // we don't guarantee this is in the API currently but it is how it + // works currently and we might as well assert it + + // verify the output + auto in = make_file_input_stream(seg_file); + auto closefile = ss::defer([&] { in.close().get(); }); + for (auto& buf : bufs) { + size_t sz = buf.size_bytes(); + iobuf result = read_iobuf_exactly(in, sz).get(); + BOOST_CHECK_EQUAL(buf, result); + } + + if (!current_test_passing()) { + // test is about to fail, print details + // we jump through these hoops because I can't find a better way + // to defer generating the entire diagnosis string (which may be + // very large) until a test actually fails + std::string astr; + for (size_t aid = std::max(0, (int)all_actions.size() - 50); + aid < all_actions.size(); + aid++) { + auto& ar = all_actions.at(aid); + astr += fmt::format("action[{}]: {}\n", aid, ar.to_string()); + } + BOOST_TEST_INFO("actions: \n" << astr); + + BOOST_TEST_INFO("last_append: " << last_append.has_value()); + BOOST_TEST_INFO("fsize: " << futs.size()); + BOOST_TEST_INFO("segment_appender: " << sa_state); + BOOST_TEST_FAIL("failed see above"); + } } } SEASTAR_THREAD_TEST_CASE(test_concurrent_append_flush) { // we use smaller buffer counts for the large buffer size tests // to keep the runtime manageable (less than ~2 seconds for this test) - run_concurrent_append_flush(16_KiB, 1); - run_concurrent_append_flush(16_KiB, 1000); + + run_concurrent_append_flush(16_KiB, 1, 1000); + run_concurrent_append_flush(16_KiB, 1000, 100); run_concurrent_append_flush(16_KiB, 20000, 100); - run_concurrent_append_flush(64_KiB, 1000); + run_concurrent_append_flush(64_KiB, 1000, 100); - run_concurrent_append_flush(32_MiB, 1000, 1000); + run_concurrent_append_flush(32_MiB, 1000, 100); } static void run_test_can_append_little_data(size_t fallocate_size) {