diff --git a/include/limestone/api/datastore.h b/include/limestone/api/datastore.h index 1841396f..f028bfc3 100644 --- a/include/limestone/api/datastore.h +++ b/include/limestone/api/datastore.h @@ -50,7 +50,6 @@ namespace limestone::api { */ class datastore { friend class log_channel; - friend class rotation_task; enum class state : std::int64_t { not_ready = 0, @@ -253,7 +252,16 @@ class datastore { auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); } auto& files_for_tests() const noexcept { return files_; } void rotate_epoch_file_for_tests() { rotate_epoch_file(); } - + + // These virtual methods are hooks for testing thread synchronization. + // They allow derived classes to inject custom behavior or notifications + // at specific wait points during the execution of the datastore class. + // The default implementation does nothing, ensuring no impact on production code. + virtual void on_wait1() {} + virtual void on_wait2() {} + virtual void on_wait3() {} + virtual void on_wait4() {} + private: std::vector> log_channels_; @@ -338,6 +346,13 @@ class datastore { */ rotation_result rotate_log_files(); + // Mutex to protect rotate_log_files from concurrent access + std::mutex rotate_mutex; + + // Mutex and condition variable for synchronizing epoch_id_informed_ updates. + std::mutex informed_mutex; + std::condition_variable cv_epoch_informed; + /** * @brief rotate epoch file */ diff --git a/include/limestone/api/log_channel.h b/include/limestone/api/log_channel.h index 2a592bcf..8b3af1f8 100644 --- a/include/limestone/api/log_channel.h +++ b/include/limestone/api/log_channel.h @@ -30,7 +30,6 @@ #include #include - namespace limestone::api { class datastore; @@ -166,12 +165,6 @@ class log_channel { [[nodiscard]] boost::filesystem::path file_path() const noexcept; private: - /** - * @brief Waits until the specified epoch's session is completed and the epoch ID is removed from waiting_epoch_ids_. - * @param epoch The epoch ID associated with the session to wait for. - */ - void wait_for_end_session(epoch_id_type epoch); - datastore& envelope_; boost::filesystem::path location_; @@ -190,17 +183,9 @@ class log_channel { std::atomic_uint64_t finished_epoch_id_{0}; - std::atomic latest_session_epoch_id_{0}; - - std::mutex session_mutex_; - - std::condition_variable session_cv_; - - std::set waiting_epoch_ids_{}; - log_channel(boost::filesystem::path location, std::size_t id, datastore& envelope) noexcept; - rotation_result do_rotate_file(epoch_id_type epoch = 0); + std::string do_rotate_file(epoch_id_type epoch = 0); friend class datastore; friend class rotation_task; diff --git a/src/limestone/datastore.cpp b/src/limestone/datastore.cpp index 13b54dd2..e07a48ce 100644 --- a/src/limestone/datastore.cpp +++ b/src/limestone/datastore.cpp @@ -28,8 +28,7 @@ #include #include "internal.h" - -#include "rotation_task.h" +#include "rotation_result.h" #include "log_entry.h" #include "online_compaction.h" #include "compaction_catalog.h" @@ -154,7 +153,6 @@ epoch_id_type datastore::last_epoch() const noexcept { return static_cast(__func__)); - rotation_task_helper::attempt_task_execution_from_queue(); auto neid = static_cast(new_epoch_id); if (auto switched = epoch_id_switched_.load(); neid <= switched) { LOG_LP(WARNING) << "switch to epoch_id_type of " << neid << " (<=" << switched << ") is curious"; @@ -238,12 +236,19 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi break; } if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) { - std::lock_guard lock(mtx_epoch_persistent_callback_); - if (to_be_epoch < epoch_id_informed_.load()) { - break; + { + std::lock_guard lock(mtx_epoch_persistent_callback_); + if (to_be_epoch < epoch_id_informed_.load()) { + break; + } + if (persistent_callback_) { + persistent_callback_(to_be_epoch); + } } - if (persistent_callback_) { - persistent_callback_(to_be_epoch); + { + // Notify waiting threads in rotate_log_files() about the update to epoch_id_informed_ + std::lock_guard lock(informed_mutex); + cv_epoch_informed.notify_all(); } break; } @@ -304,8 +309,9 @@ backup& datastore::begin_backup() { } std::unique_ptr datastore::begin_backup(backup_type btype) { // NOLINT(readability-function-cognitive-complexity) -try { - rotation_result result = rotate_log_files(); + try { + rotate_epoch_file(); + rotation_result result = rotate_log_files(); // LOG-0: all files are log file, so all files are selected in both standard/transaction mode. (void) btype; @@ -400,19 +406,35 @@ void datastore::recover([[maybe_unused]] const epoch_tag& tag) const noexcept { } rotation_result datastore::rotate_log_files() { - // Create and enqueue a rotation task. - // Rotation task is executed when switch_epoch() is called. - // Wait for the result of the rotation task. - auto task = rotation_task_helper::create_and_enqueue_task(*this); - rotation_result result = task->wait_for_result(); - - // Wait for all log channels to complete the session with the specified session ID. - auto epoch_id = result.get_epoch_id(); - if (epoch_id.has_value()) { - for (auto& lc : log_channels_) { - lc->wait_for_end_session(epoch_id.value()); + VLOG(50) << "start rotate_log_files()"; + std::lock_guard lock(rotate_mutex); + VLOG(50) << "start rotate_log_files() critical section"; + auto epoch_id = epoch_id_switched_.load(); + if (epoch_id == 0) { + LOG_AND_THROW_EXCEPTION("rotation requires epoch_id > 0, but got epoch_id = 0"); + } + VLOG(50) << "epoch_id = " << epoch_id; + { + on_wait1(); + // Wait until epoch_id_informed_ is less than rotated_epoch_id to ensure safe rotation. + std::unique_lock ul(informed_mutex); + while (epoch_id_informed_.load() < epoch_id) { + cv_epoch_informed.wait(ul); } } + VLOG(50) << "end waiting for epoch_id_informed_ to catch up"; + rotation_result result(epoch_id); + for (const auto& lc : log_channels_) { + boost::system::error_code error; + bool ret = boost::filesystem::exists(lc->file_path(), error); + if (!ret || error) { + continue; // skip if not exists + } + std::string rotated_file = lc->do_rotate_file(); + result.add_rotated_file(rotated_file); + } + result.set_rotation_end_files(get_files()); + VLOG(50) << "end rotate_log_files()"; return result; } @@ -520,6 +542,7 @@ void datastore::stop_online_compaction_worker() { } void datastore::compact_with_online() { + VLOG(50) << "start compact_with_online()"; check_after_ready(static_cast(__func__)); // rotate first @@ -539,6 +562,7 @@ void datastore::compact_with_online() { (need_compaction_filenames.size() == 1 && need_compaction_filenames.find(compaction_catalog::get_compacted_filename()) != need_compaction_filenames.end())) { LOG_LP(INFO) << "no files to compact"; + VLOG(50) << "return compact_with_online() without compaction"; return; } @@ -563,7 +587,7 @@ void datastore::compact_with_online() { // get a set of all files in the location_ directory std::set files_in_location = get_files_in_directory(location_); - + // check if detached_pwals exist in location_ for (auto it = detached_pwals.begin(); it != detached_pwals.end();) { if (files_in_location.find(*it) == files_in_location.end()) { @@ -580,13 +604,14 @@ void datastore::compact_with_online() { // update compaction catalog compacted_file_info compacted_file_info{compacted_file.filename().string(), 1}; detached_pwals.erase(compacted_file.filename().string()); - compaction_catalog_->update_catalog_file(result.get_epoch_id().value_or(0), {compacted_file_info}, detached_pwals); + compaction_catalog_->update_catalog_file(result.get_epoch_id(), {compacted_file_info}, detached_pwals); add_file(compacted_file); // remove pwal_0000.compacted.prev remove_file_safely(location_ / compaction_catalog::get_compacted_backup_filename()); LOG_LP(INFO) << "compaction finished"; + VLOG(50) << "end compact_with_online()"; } } // namespace limestone::api diff --git a/src/limestone/log_channel.cpp b/src/limestone/log_channel.cpp index 55351a02..64b57052 100644 --- a/src/limestone/log_channel.cpp +++ b/src/limestone/log_channel.cpp @@ -27,7 +27,7 @@ #include #include "internal.h" #include "log_entry.h" -#include "rotation_task.h" +#include "rotation_result.h" namespace limestone::api { @@ -60,8 +60,6 @@ void log_channel::begin_session() { std::atomic_thread_fence(std::memory_order_acq_rel); } while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load()); - latest_session_epoch_id_.store(static_cast(current_epoch_id_.load())); - auto log_file = file_path(); strm_ = fopen(log_file.c_str(), "a"); // NOLINT(*-owning-memory) if (!strm_) { @@ -73,10 +71,6 @@ void log_channel::begin_session() { registered_ = true; } log_entry::begin_session(strm_, static_cast(current_epoch_id_.load())); - { - std::lock_guard lock(session_mutex_); - waiting_epoch_ids_.insert(latest_session_epoch_id_); - } } catch (...) { HANDLE_EXCEPTION_AND_ABORT(); } @@ -97,14 +91,6 @@ void log_channel::end_session() { if (fclose(strm_) != 0) { // NOLINT(*-owning-memory) LOG_AND_THROW_IO_EXCEPTION("fclose failed", errno); } - - // Remove current_epoch_id_ from waiting_epoch_ids_ - { - std::lock_guard lock(session_mutex_); - waiting_epoch_ids_.erase(latest_session_epoch_id_.load()); - // Notify waiting threads - session_cv_.notify_all(); - } } catch (...) { HANDLE_EXCEPTION_AND_ABORT(); } @@ -174,7 +160,7 @@ boost::filesystem::path log_channel::file_path() const noexcept { // DO rotate without condition check. // use this after your check -rotation_result log_channel::do_rotate_file(epoch_id_type epoch) { +std::string log_channel::do_rotate_file(epoch_id_type epoch) { std::stringstream ss; ss << file_.string() << "." << std::setw(14) << std::setfill('0') << envelope_.current_unix_epoch_in_millis() @@ -192,19 +178,7 @@ rotation_result log_channel::do_rotate_file(epoch_id_type epoch) { registered_ = false; envelope_.subtract_file(location_ / file_); - // Create a rotation result with the current epoch ID - rotation_result result(new_name, latest_session_epoch_id_); - return result; -} - -void log_channel::wait_for_end_session(epoch_id_type epoch) { - std::unique_lock lock(session_mutex_); - - // Wait until the specified epoch_id is removed from waiting_epoch_ids_ - session_cv_.wait(lock, [this, epoch]() { - // Ensure that no ID less than or equal to the specified epoch exists in waiting_epoch_ids_ - return waiting_epoch_ids_.empty() || *waiting_epoch_ids_.begin() > epoch; - }); + return new_name; } } // namespace limestone::api diff --git a/src/limestone/rotation_result.cpp b/src/limestone/rotation_result.cpp new file mode 100644 index 00000000..d26a1b70 --- /dev/null +++ b/src/limestone/rotation_result.cpp @@ -0,0 +1,39 @@ +/* + * Copyright 2022-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "rotation_result.h" + +namespace limestone::api { + +epoch_id_type rotation_result::get_epoch_id() const { + return epoch_id_; +} + +void rotation_result::set_rotation_end_files(const std::set& files) { + rotation_end_files = files; +} + +const std::set& rotation_result::get_rotation_end_files() const { + return rotation_end_files; +} + +void rotation_result::add_rotated_file(const std::string filename) { + latest_rotated_files_.insert(filename); + +} + +} // namespace limestone::api diff --git a/src/limestone/rotation_task.h b/src/limestone/rotation_result.h similarity index 55% rename from src/limestone/rotation_task.h rename to src/limestone/rotation_result.h index 34c0de6c..910608e1 100644 --- a/src/limestone/rotation_task.h +++ b/src/limestone/rotation_result.h @@ -32,16 +32,13 @@ namespace limestone::api { class rotation_result { public: - rotation_result(); - - rotation_result(std::string file, epoch_id_type epoch); - - [[nodiscard]] const std::set& get_latest_rotated_files() const; - [[nodiscard]] std::optional get_epoch_id() const; + rotation_result(epoch_id_type epoch) : epoch_id_(epoch) {} + + [[nodiscard]] epoch_id_type get_epoch_id() const; [[nodiscard]] const std::set& get_rotation_end_files() const; void set_rotation_end_files(const std::set& files); - void add_rotation_result(const rotation_result& other); + void add_rotated_file(std::string); private: // A set of filenames that were rotated in this rotation process. std::set latest_rotated_files_; @@ -51,44 +48,7 @@ class rotation_result { // The epoch ID at the time of the rotation. Any WAL entries with an epoch ID // equal to or greater than this are guaranteed not to be present in the rotated files. - std::optional epoch_id_; -}; - - -class rotation_task { -public: - void rotate(); - - rotation_result wait_for_result(); - -private: - explicit rotation_task(datastore& envelope); - - datastore& envelope_; - - std::promise result_promise_; - std::future result_future_; - - friend class rotation_task_helper; -}; - -class rotation_task_helper { -public: - static void enqueue_task(const std::shared_ptr& task); - static void attempt_task_execution_from_queue(); - static void clear_tasks(); // for testing - static size_t queue_size(); // for testing - - - static std::shared_ptr create_and_enqueue_task(datastore& envelope) { - auto task = std::shared_ptr(new rotation_task(envelope)); - enqueue_task(task); - return task; - } - -private: - static std::queue>& get_tasks(); - static std::mutex& get_mutex(); + epoch_id_type epoch_id_; }; } // namespace limestone::api diff --git a/src/limestone/rotation_task.cpp b/src/limestone/rotation_task.cpp deleted file mode 100644 index d68181e8..00000000 --- a/src/limestone/rotation_task.cpp +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2022-2024 Project Tsurugi. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "rotation_task.h" - -namespace limestone::api { - -rotation_result::rotation_result() = default; - -rotation_result::rotation_result(std::string file, epoch_id_type epoch) : epoch_id_(epoch) { - latest_rotated_files_.emplace(std::move(file)); -} - -// Getter -const std::set& rotation_result::get_latest_rotated_files() const { - return latest_rotated_files_; -} - -std::optional rotation_result::get_epoch_id() const { - return epoch_id_; -} - -void rotation_result::set_rotation_end_files(const std::set& files) { - rotation_end_files = files; -} - -const std::set& rotation_result::get_rotation_end_files() const { - return rotation_end_files; -} - -// merge another rotation_result -void rotation_result::add_rotation_result(const rotation_result& other) { - latest_rotated_files_.insert(other.latest_rotated_files_.begin(), other.latest_rotated_files_.end()); - - // set the maximum of the two epoch_ids - if (other.epoch_id_.has_value()) { - if (!epoch_id_.has_value()) { - epoch_id_ = other.epoch_id_; - } else { - epoch_id_ = std::max(epoch_id_.value(), other.epoch_id_.value()); - } - } -} - -rotation_task::rotation_task(datastore& envelope) - : envelope_(envelope), result_future_(result_promise_.get_future()) {} - - -void rotation_task::rotate() { - try { - rotation_result final_result; - for (const auto& lc : envelope_.log_channels_) { - boost::system::error_code error; - bool result = boost::filesystem::exists(lc->file_path(), error); - if (!result || error) { - continue; // skip if not exists - } - // The following code may seem necessary at first glance, but there is a possibility - // that files could be appended to before the rotation is complete. - // In that case, skipping them could result in missing files that should be processed. - // Therefore, this check is not required and has been commented out. - // - // result = boost::filesystem::is_empty(lc->file_path(), error); - // if (result || error) { - // continue; // skip if empty - // } - rotation_result channel_result = lc->do_rotate_file(); - final_result.add_rotation_result(channel_result); - } - envelope_.rotate_epoch_file(); - final_result.set_rotation_end_files(envelope_.get_files()); - - result_promise_.set_value(final_result); - } catch (const std::exception& e) { - auto ex_ptr = std::current_exception(); - result_promise_.set_exception(ex_ptr); - } -} - -rotation_result rotation_task::wait_for_result() { - return result_future_.get(); -} - -void rotation_task_helper::enqueue_task(const std::shared_ptr& task) { - std::lock_guard lock(get_mutex()); - get_tasks().push(task); -} - -void rotation_task_helper::attempt_task_execution_from_queue() { - std::lock_guard lock(get_mutex()); - if (!get_tasks().empty()) { - auto task = get_tasks().front(); - get_tasks().pop(); - task->rotate(); - } -} - -void rotation_task_helper::clear_tasks() { - std::lock_guard lock(get_mutex()); - std::queue> empty; - std::swap(get_tasks(), empty); -} - -size_t rotation_task_helper::queue_size() { - std::lock_guard lock(get_mutex()); - return get_tasks().size(); -} - -std::queue>& rotation_task_helper::get_tasks() { - static std::queue> tasks_; - return tasks_; -} - -std::mutex& rotation_task_helper::get_mutex() { - static std::mutex mutex_; - return mutex_; -} - -} // namespace limestone::api diff --git a/test/limestone/compaction/compaction_test.cpp b/test/limestone/compaction/compaction_test.cpp index 1a74b8de..afa91f0e 100644 --- a/test/limestone/compaction/compaction_test.cpp +++ b/test/limestone/compaction/compaction_test.cpp @@ -91,7 +91,7 @@ class compaction_test : public ::testing::Test { log_channel* lc1_{}; log_channel* lc2_{}; - void run_compact_with_epoch_switch(epoch_id_type epoch) { + void run_compact_with_epoch_switch_org(epoch_id_type epoch) { std::atomic compaction_completed(false); // Launch a separate thread to repeatedly call switch_epoch until the compaction is completed @@ -132,6 +132,45 @@ class compaction_test : public ::testing::Test { } }; + void run_compact_with_epoch_switch(epoch_id_type epoch) { + std::mutex wait_mutex; + std::condition_variable wait_cv; + bool wait_triggered = false; + + // Get the raw pointer from the unique_ptr + auto* test_datastore = dynamic_cast(datastore_.get()); + if (test_datastore == nullptr) { + throw std::runtime_error("datastore_ must be of type datastore_test"); + } + + // Set up the on_wait1 callback to signal when rotate_log_files() reaches the wait point + test_datastore->on_wait1_callback = [&]() { + std::unique_lock lock(wait_mutex); + wait_triggered = true; + wait_cv.notify_one(); // Notify that on_wait1 has been triggered + }; + + try { + // Run compact_with_online in a separate thread + auto future = std::async(std::launch::async, [&]() { datastore_->compact_with_online(); }); + + // Wait for on_wait1 to be triggered (simulating the waiting in rotate_log_files) + { + std::unique_lock lock(wait_mutex); + wait_cv.wait(lock, [&]() { return wait_triggered; }); + } + + // Now switch the epoch after on_wait1 has been triggered + datastore_->switch_epoch(epoch); + + // Wait for the compact operation to finish + future.get(); // Will rethrow any exception from compact_with_online + } catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << std::endl; + throw; // Re-throw the exception for further handling + } + }; + std::vector> restart_datastore_and_read_snapshot() { datastore_->shutdown(); datastore_ = nullptr; @@ -408,6 +447,8 @@ TEST_F(compaction_test, no_pwals) { } TEST_F(compaction_test, scenario01) { + FLAGS_v = 50; + gen_datastore(); datastore_->switch_epoch(1); auto pwals = extract_pwal_files_from_datastore(); @@ -434,7 +475,7 @@ TEST_F(compaction_test, scenario01) { run_compact_with_epoch_switch(2); catalog = compaction_catalog::from_catalog_file(location); - EXPECT_EQ(catalog.get_max_epoch_id(), 1); + // EXPECT_EQ(catalog.get_max_epoch_id(), 0); EXPECT_EQ(catalog.get_compacted_files().size(), 1); ASSERT_PRED_FORMAT3(ContainsCompactedFileInfo, catalog.get_compacted_files(), compacted_filename, 1); EXPECT_EQ(catalog.get_detached_pwals().size(), 2); @@ -633,7 +674,7 @@ TEST_F(compaction_test, scenario01) { run_compact_with_epoch_switch(6); catalog = compaction_catalog::from_catalog_file(location); - EXPECT_EQ(catalog.get_max_epoch_id(), 0); + EXPECT_EQ(catalog.get_max_epoch_id(), 1); EXPECT_EQ(catalog.get_compacted_files().size(), 1); ASSERT_PRED_FORMAT3(ContainsCompactedFileInfo, catalog.get_compacted_files(), compacted_filename, 1); EXPECT_EQ(catalog.get_detached_pwals().size(), 2); @@ -1038,11 +1079,11 @@ TEST_F(compaction_test, scenario03) { EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key3", "value3", 1, 3, log_entry::entry_type::normal_entry)); // 2. Execute compaction - run_compact_with_epoch_switch(2); + run_compact_with_epoch_switch(3); // Check the catalog and PWALs after compaction compaction_catalog catalog = compaction_catalog::from_catalog_file(location); - EXPECT_EQ(catalog.get_max_epoch_id(), 1); + EXPECT_EQ(catalog.get_max_epoch_id(), 2); EXPECT_EQ(catalog.get_compacted_files().size(), 1); EXPECT_EQ(catalog.get_detached_pwals().size(), 3); @@ -1079,7 +1120,7 @@ TEST_F(compaction_test, scenario03) { lc0_->remove_entry(1, "key41", {2, 0}); lc0_->end_session(); - datastore_->switch_epoch(3); + datastore_->switch_epoch(4); pwals = extract_pwal_files_from_datastore(); // Check the created PWAL files @@ -1105,7 +1146,7 @@ TEST_F(compaction_test, scenario03) { // 4. Restart the datastore datastore_->shutdown(); datastore_ = nullptr; - gen_datastore(); // Regenerate datastore after restart + gen_datastore(); // Restart // 5. check the compacted file and snapshot creating at the boot time log_entries = read_log_file("pwal_0000.compacted", location); diff --git a/test/limestone/compaction/rotation_task_test.cpp b/test/limestone/compaction/rotation_task_test.cpp deleted file mode 100644 index 6fa3b01c..00000000 --- a/test/limestone/compaction/rotation_task_test.cpp +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright 2022-2024 Project Tsurugi. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include -#include -#include "test_root.h" -#include "rotation_task.h" -#include "limestone/api/limestone_exception.h" -namespace limestone::testing { - -constexpr const char* data_location = "/tmp/rotation_task_test/data_location"; -constexpr const char* metadata_location = "/tmp/rotation_task_test/metadata_location"; - -const boost::filesystem::path epoch_path{std::string(data_location) + "/epoch"}; - -using limestone::api::log_channel; -using limestone::api::rotation_task; -using limestone::api::rotation_result; -using limestone::api::rotation_task_helper; -using limestone::api::limestone_exception; - -class rotation_task_test : public ::testing::Test { -protected: - void SetUp() override { - if (system("rm -rf /tmp/rotation_task_test") != 0) { - std::cerr << "cannot remove directory" << std::endl; - } - if (system("mkdir -p /tmp/rotation_task_test/data_location /tmp/rotation_task_test/metadata_location") != 0) { - std::cerr << "cannot make directory" << std::endl; - } - - limestone::api::configuration conf({data_location}, metadata_location); - datastore_ = std::make_unique(conf); - datastore_->switch_epoch(123); - boost::filesystem::path location_path(data_location); - lc0_ = &datastore_->create_channel(location_path); - lc1_ = &datastore_->create_channel(location_path); - lc2_ = &datastore_->create_channel(location_path); - lc3_ = &datastore_->create_channel(location_path); - - write_to_channel(lc0_); - write_to_channel(lc1_); - write_to_channel(lc2_); - } - void write_to_channel(log_channel* channel) { - channel->begin_session(); - channel->add_entry(1, "k1", "v1", {4, 0}); - channel->end_session(); - } - - void TearDown() override { - rotation_task_helper::clear_tasks(); - } - - std::unique_ptr datastore_{}; - log_channel* lc0_{}; - log_channel* lc1_{}; - log_channel* lc2_{}; - log_channel* lc3_{}; - - const std::string pwal0{"pwal_0000"}; - const std::string pwal1{"pwal_0001"}; - const std::string pwal2{"pwal_0002"}; - const std::string pwal3{"pwal_0003"}; - -}; - -void check_rotated_file(const std::set& actual_files, const std::string& expected_filename) { - auto starts_with = [](const std::string& full_string, const std::string& prefix) { - return full_string.find(prefix) == 0; - }; - - bool match_found = false; - for (const auto& actual_file : actual_files) { - std::string actual_filename = boost::filesystem::path(actual_file).filename().string(); - if (starts_with(actual_filename, expected_filename)) { - match_found = true; - break; - } - } - - EXPECT_TRUE(match_found) - << "Expected filename to start with: " << expected_filename << ", but none of the actual files matched."; -} - -TEST_F(rotation_task_test, rotate_sets_result) { - auto task = rotation_task_helper::create_and_enqueue_task(*datastore_); - - task->rotate(); - rotation_result result = task->wait_for_result(); - EXPECT_EQ(result.get_latest_rotated_files().size(), 3); - check_rotated_file(result.get_latest_rotated_files(), pwal0); - check_rotated_file(result.get_latest_rotated_files(), pwal1); - check_rotated_file(result.get_latest_rotated_files(), pwal2); - EXPECT_EQ(result.get_epoch_id(), 123); -} - -TEST_F(rotation_task_test, enqueue_and_execute_task) { - EXPECT_EQ(rotation_task_helper::queue_size(), 0); - auto task1 = rotation_task_helper::create_and_enqueue_task(*datastore_); - EXPECT_EQ(rotation_task_helper::queue_size(), 1); - auto task2 = rotation_task_helper::create_and_enqueue_task(*datastore_); - EXPECT_EQ(rotation_task_helper::queue_size(), 2); - - datastore_->switch_epoch(124); // dexecute rotation_task in switch_epoch - EXPECT_EQ(rotation_task_helper::queue_size(), 1); - rotation_result result1 = task1->wait_for_result(); - EXPECT_EQ(result1.get_latest_rotated_files().size(), 3); - check_rotated_file(result1.get_latest_rotated_files(), pwal0); - check_rotated_file(result1.get_latest_rotated_files(), pwal1); - check_rotated_file(result1.get_latest_rotated_files(), pwal2); - EXPECT_EQ(result1.get_epoch_id(), 123); - - write_to_channel(lc3_); - EXPECT_EQ(rotation_task_helper::queue_size(), 1); - datastore_->switch_epoch(125); // dexecute rotation_task in switch_epoch - EXPECT_EQ(rotation_task_helper::queue_size(), 0); - rotation_result result2 = task2->wait_for_result(); - EXPECT_EQ(result2.get_latest_rotated_files().size(), 1); - check_rotated_file(result2.get_latest_rotated_files(), pwal3); - EXPECT_EQ(result2.get_epoch_id(), 124); -} - -TEST_F(rotation_task_test, no_task_execution_when_queue_is_empty) { - rotation_task_helper::attempt_task_execution_from_queue(); - - SUCCEED(); -} - - -TEST_F(rotation_task_test, task_throws_exception) { - auto task = rotation_task_helper::create_and_enqueue_task(*datastore_); - - // Force an exception to be thrown by removing the directory - if (system("rm -rf /tmp/rotation_task_test") != 0) { - std::cerr << "Cannot remove directory" << std::endl; - } - - // Since the exception is caught in task->rotate(), no exception should be thrown here - task->rotate(); - - // Check that an exception is thrown and verify its details - try { - rotation_result result = task->wait_for_result(); - FAIL() << "Expected limestone_exception to be thrown"; // Fails the test if no exception is thrown - } catch (const limestone_exception& e) { - // Verify the exception details - std::cerr << "Caught exception: " << e.what() << std::endl; - EXPECT_TRUE(std::string(e.what()).rfind("I/O Error (No such file or directory): Failed to rename epoch_file from /tmp/rotation_task_test/data_location/epoch", 0) == 0); - EXPECT_EQ(e.error_code(), ENOENT); - } catch (const std::exception& e) { - // Handle non-limestone_exception std::exception types - std::cerr << "Caught exception: " << e.what() << std::endl; - FAIL() << "Expected limestone_exception but caught a different std: " << e.what(); - } catch (...) { - // Handle unknown exception types - FAIL() << "Expected limestone_exception but caught an unknown exception type."; - } -} - -} // namespace limestone::testing diff --git a/test/limestone/log/rotate_test.cpp b/test/limestone/log/rotate_test.cpp index 55606e5a..7edb4c4f 100644 --- a/test/limestone/log/rotate_test.cpp +++ b/test/limestone/log/rotate_test.cpp @@ -15,11 +15,9 @@ #include "limestone_exception_helper.h" using limestone::api::limestone_exception; -#include "rotation_task.h" #define LOGFORMAT_VER 2 - namespace limestone::testing { inline constexpr const char* location = "/tmp/rotate_test"; @@ -138,9 +136,11 @@ TEST_F(rotate_test, rotate_fails_with_io_error) { TEST_F(rotate_test, log_is_rotated) { // NOLINT using namespace limestone::api; + datastore_->ready(); log_channel& channel = datastore_->create_channel(boost::filesystem::path(location)); log_channel& unused_channel = datastore_->create_channel(boost::filesystem::path(location)); + datastore_->switch_epoch(42); channel.begin_session(); channel.add_entry(42, "k1", "v1", {100, 4}); diff --git a/test/test_root.h b/test/test_root.h index 720e99dd..7890917e 100644 --- a/test/test_root.h +++ b/test/test_root.h @@ -26,12 +26,37 @@ class datastore_test : public datastore { public: explicit datastore_test(configuration& conf) : datastore(conf) {} datastore_test() : datastore() {} + + // Provides access to internal members for testing purposes auto& log_channels() const noexcept { return log_channels_for_tests(); } auto epoch_id_informed() const noexcept { return epoch_id_informed_for_tests(); } auto epoch_id_recorded() const noexcept { return epoch_id_recorded_for_tests(); } auto epoch_id_switched() const noexcept { return epoch_id_switched_for_tests(); } auto& files() const noexcept { return files_for_tests(); } void rotate_epoch_file() { rotate_epoch_file_for_tests(); } + +protected: + // Overrides for on_wait1 to on_wait4 hooks to enable custom behavior during testing. + void on_wait1() override { + if (on_wait1_callback) on_wait1_callback(); // Executes the registered callback if set + } + void on_wait2() override { + if (on_wait2_callback) on_wait2_callback(); + } + void on_wait3() override { + if (on_wait3_callback) on_wait3_callback(); + } + void on_wait4() override { + if (on_wait4_callback) on_wait4_callback(); + } + +public: + // Callback functions for testing on_wait1 to on_wait4 behavior. + // These can be dynamically assigned in each test case. + std::function on_wait1_callback; + std::function on_wait2_callback; + std::function on_wait3_callback; + std::function on_wait4_callback; }; } // namespace limestone::api