diff --git a/helio b/helio index 8d0f0cc37908..0c0afcbc85a5 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 8d0f0cc37908623705125128d3c64d35d410fb0f +Subproject commit 0c0afcbc85a5ac3347b07a76bb06c0fc98d0fc79 diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index bdf1684d5ddd..5da7f3f0d510 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -17,6 +17,7 @@ #include "facade/redis_parser.h" #include "facade/service_interface.h" #include "util/fiber_sched_algo.h" +#include "util/fibers/fiber.h" #ifdef DFLY_USE_SSL #include "util/tls/tls_socket.h" @@ -29,7 +30,7 @@ ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console using namespace util; using namespace std; using nonstd::make_unexpected; -namespace this_fiber = boost::this_fiber; + namespace fibers = boost::fibers; namespace facade { @@ -267,7 +268,7 @@ void Connection::UnregisterShutdownHook(ShutdownHandle id) { } void Connection::HandleRequests() { - this_fiber::properties().set_name("DflyConnection"); + FiberProps::SetName("DflyConnection"); LinuxSocketBase* lsb = static_cast(socket_.get()); @@ -508,7 +509,7 @@ auto Connection::ParseRedis() -> ParserStatus { if (dispatch_q_.size() == 1) { evc_.notify(); } else if (dispatch_q_.size() > 10) { - this_fiber::yield(); + fibers_ext::Yield(); } } } @@ -715,7 +716,7 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { // InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the // DispatchFiber. void Connection::DispatchFiber(util::FiberSocketBase* peer) { - this_fiber::properties().set_name("DispatchFiber"); + FiberProps::SetName("DispatchFiber"); SinkReplyBuilder* builder = cc_->reply_builder(); DispatchOperations dispatch_op{builder, this}; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 2bbe2a5166ae..d7be503867c3 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -21,6 +21,7 @@ #include "server/string_family.h" #include "server/transaction.h" #include "util/fiber_sched_algo.h" +#include "util/fibers/fiber.h" using namespace std; @@ -30,7 +31,6 @@ ABSL_DECLARE_FLAG(string, dbfilename); namespace dfly { using namespace util; -namespace this_fiber = ::boost::this_fiber; using boost::intrusive_ptr; using boost::fibers::fiber; using namespace facade; @@ -275,7 +275,7 @@ void DebugCmd::Populate(CmdArgList args) { } ranges.emplace_back(from, total_count - from); - vector fb_arr(ranges.size()); + vector fb_arr(ranges.size()); for (size_t i = 0; i < ranges.size(); ++i) { auto range = ranges[i]; @@ -285,14 +285,14 @@ void DebugCmd::Populate(CmdArgList args) { }); } for (auto& fb : fb_arr) - fb.join(); + fb.Join(); (*cntx_)->SendOk(); } void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, unsigned value_len, bool populate_random_values) { - this_fiber::properties().set_name("populate_range"); + FiberProps::SetName("populate_range"); VLOG(1) << "PopulateRange: " << from << "-" << (from + len - 1); string key = absl::StrCat(prefix, ":"); @@ -313,7 +313,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view ess.Add(sid, [=] { DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch); if (i % 50 == 0) { - this_fiber::yield(); + fibers_ext::Yield(); } }); diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 2a4489f57fa4..d68fc95795d2 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -27,7 +27,6 @@ using absl::StrCat; using ::io::Result; using testing::ElementsAre; using testing::HasSubstr; -namespace this_fiber = boost::this_fiber; namespace { @@ -178,7 +177,7 @@ TEST_F(DflyEngineTest, MultiConsistent) { auto fb = pp_->at(1)->LaunchFiber([&] { RespExpr resp = Run({"multi"}); ASSERT_EQ(resp, "OK"); - this_fiber::sleep_for(1ms); + fibers_ext::SleepFor(1ms); resp = Run({"get", kKey1}); ASSERT_EQ(resp, "QUEUED"); @@ -201,8 +200,8 @@ TEST_F(DflyEngineTest, MultiConsistent) { EXPECT_EQ(sub_arr[0].GetBuf(), resp_arr[0].GetBuf()); }); - mset_fb.join(); - fb.join(); + mset_fb.Join(); + fb.Join(); ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); ASSERT_FALSE(service_->IsShardSetLocked()); @@ -271,8 +270,8 @@ TEST_F(DflyEngineTest, MultiHop) { } }); - p1_fb.join(); - p2_fb.join(); + p1_fb.Join(); + p2_fb.Join(); } TEST_F(DflyEngineTest, FlushDb) { @@ -294,7 +293,7 @@ TEST_F(DflyEngineTest, FlushDb) { } }); - fb0.join(); + fb0.Join(); ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey4)); @@ -472,12 +471,12 @@ TEST_F(DflyEngineTest, FlushAll) { for (size_t i = 1; i < 100; ++i) { RespExpr resp = Run({"set", "foo", "bar"}); ASSERT_EQ(resp, "OK"); - this_fiber::yield(); + fibers_ext::Yield(); } }); - fb0.join(); - fb1.join(); + fb0.Join(); + fb1.Join(); } TEST_F(DflyEngineTest, OOM) { @@ -791,7 +790,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { EngineShard* shard = EngineShard::tlocal(); ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! - this_fiber::sleep_for(100ms); + fibers_ext::SleepFor(100ms); EXPECT_EQ(shard->stats().defrag_realloc_total, 0); // we are expecting to have at least one try by now EXPECT_GT(shard->stats().defrag_task_invocation_total, 0); @@ -814,7 +813,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { if (stats.defrag_realloc_total > 0) { break; } - this_fiber::sleep_for(220ms); + fibers_ext::SleepFor(220ms); } // make sure that we successfully found places to defrag in memory EXPECT_GT(stats.defrag_realloc_total, 0); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 9ec5fe6d536e..2c392841440a 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -45,7 +45,6 @@ ABSL_FLAG(float, mem_utilization_threshold, 0.8, namespace dfly { using namespace util; -namespace this_fiber = ::boost::this_fiber; namespace fibers = ::boost::fibers; using absl::GetFlag; @@ -179,7 +178,7 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { - this_fiber::properties().set_name(absl::StrCat("shard_queue", index)); + FiberProps::SetName(absl::StrCat("shard_queue", index)); queue_.Run(); }); diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index e43d334bb280..f17b0596bce2 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -93,8 +93,8 @@ TEST_F(GenericFamilyTest, Del) { } }); - exist_fb.join(); - del_fb.join(); + exist_fb.Join(); + del_fb.Join(); } TEST_F(GenericFamilyTest, TTL) { @@ -160,8 +160,8 @@ TEST_F(GenericFamilyTest, Rename) { } }); - exist_fb.join(); - ren_fb.join(); + exist_fb.Join(); + ren_fb.Join(); } TEST_F(GenericFamilyTest, RenameNonString) { @@ -275,7 +275,7 @@ TEST_F(GenericFamilyTest, Move) { Run({"move", "l", "1"}); }); - fb_blpop.join(); + fb_blpop.Join(); } using testing::AnyOf; diff --git a/src/server/io_mgr.cc b/src/server/io_mgr.cc index 58ccfde5a378..67acf7724bb0 100644 --- a/src/server/io_mgr.cc +++ b/src/server/io_mgr.cc @@ -21,7 +21,6 @@ using namespace util; using namespace facade; using uring::FiberCall; using uring::Proactor; -namespace this_fiber = ::boost::this_fiber; namespace { @@ -137,7 +136,7 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) { void IoMgr::Shutdown() { while (flags_val) { - this_fiber::sleep_for(200us); // TODO: hacky for now. + fibers_ext::SleepFor(200us); // TODO: hacky for now. } } diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 4e2896bc6b89..c0116733ab60 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -19,7 +19,6 @@ using namespace testing; using namespace std; using namespace util; -namespace this_fiber = ::boost::this_fiber; namespace fibers = ::boost::fibers; namespace dfly { @@ -94,17 +93,17 @@ TEST_F(ListFamilyTest, BLPopBlocking) { LOG(INFO) << "pop0"; }); - this_fiber::sleep_for(50us); + fibers_ext::SleepFor(50us); auto fb1 = pp_->at(1)->LaunchFiber([&] { resp1 = Run({"blpop", "x", "0"}); LOG(INFO) << "pop1"; }); - this_fiber::sleep_for(30us); + fibers_ext::SleepFor(30us); pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); }); - fb0.join(); - fb1.join(); + fb0.Join(); + fb1.Join(); // fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'. // sometimes order is switched, need to think how to fix it. @@ -131,7 +130,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) { }); pp_->at(1)->Await([&] { Run({"lpush", kKey1, "1", "2", "3"}); }); - fb1.join(); + fb1.Join(); ASSERT_THAT(resp0, ArrLen(2)); EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3")); @@ -202,10 +201,10 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { ASSERT_THAT(resp, ArrLen(6)); }); - p1_fb.join(); - p2_fb.join(); + p1_fb.Join(); + p2_fb.Join(); - pop_fb.join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); auto resp_arr = blpop_resp.GetVec(); @@ -267,10 +266,10 @@ TEST_F(ListFamilyTest, BLPopSerialize) { LOG(INFO) << "push2 ts: " << cl2; }); - p1_fb.join(); - p2_fb.join(); + p1_fb.Join(); + p2_fb.Join(); - pop_fb.join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); auto resp_arr = blpop_resp.GetVec(); EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING))); @@ -303,8 +302,8 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { Run({"lpush", kKey1, "B"}); }); - p1_fb.join(); - pop_fb.join(); + p1_fb.Join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "B")); } @@ -321,7 +320,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { WaitUntilLocked(0, kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); }); - pop_fb.join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); @@ -333,7 +332,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { WaitUntilLocked(0, kKey1); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); }); - pop_fb.join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar")); @@ -355,7 +354,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) { WaitUntilLocked(0, "x"); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); }); - pop_fb.join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar")); @@ -377,7 +376,7 @@ TEST_F(ListFamilyTest, BPopRename) { EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"})); Run({"rename", "a", kKey1}); }); - pop_fb.join(); + pop_fb.Join(); ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); @@ -395,7 +394,7 @@ TEST_F(ListFamilyTest, BPopFlush) { Run({"flushdb"}); EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); }); - pop_fb.join(); + pop_fb.Join(); } TEST_F(ListFamilyTest, LRem) { @@ -641,24 +640,23 @@ TEST_F(ListFamilyTest, TwoQueueBug451) { for (int i = 0; i < 1000; i++) { Run(id, {"rpush", "a", "DATA"}); } - ::boost::this_fiber::sleep_for(100ms); + fibers_ext::SleepFor(100ms); running = false; }; - vector fbs; + vector fbs; // more likely to reproduce the bug if we start pop_fiber first. for (int i = 0; i < 2; i++) { fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber)); } - for (int i = 0; i < 2; i++) { fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber)); } for (auto& f : fbs) - f.join(); + f.Join(); } } // namespace dfly diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 3a6b87fe6b3b..2b18005c935f 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -56,7 +56,6 @@ using namespace util; using base::VarzValue; using ::boost::intrusive_ptr; namespace fibers = ::boost::fibers; -namespace this_fiber = ::boost::this_fiber; using absl::GetFlag; using absl::StrCat; using namespace facade; @@ -510,7 +509,7 @@ void Service::Shutdown() { shard_set->Shutdown(); // wait for all the pending callbacks to stop. - boost::this_fiber::sleep_for(10ms); + fibers_ext::SleepFor(10ms); } static void MultiSetError(ConnectionContext* cntx) { diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index cc82dec954e5..6a982670eb9a 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -226,7 +226,7 @@ TEST_F(RdbTest, SaveFlush) { } while (!service_->server_family().IsSaving()); Run({"flushdb"}); - save_fb.join(); + save_fb.Join(); auto save_info = service_->server_family().GetLastSaveInfo(); ASSERT_EQ(1, save_info->freq_map.size()); auto& k_v = save_info->freq_map.front(); @@ -261,7 +261,7 @@ TEST_F(RdbTest, SaveManyDbs) { } }); - save_fb.join(); + save_fb.Join(); auto save_info = service_->server_family().GetLastSaveInfo(); ASSERT_EQ(1, save_info->freq_map.size()); diff --git a/src/server/replica.cc b/src/server/replica.cc index c5562ea7c325..9f030ac74a37 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -28,7 +28,6 @@ using namespace util; using namespace boost::asio; using namespace facade; using absl::StrCat; -namespace this_fiber = ::boost::this_fiber; namespace { @@ -171,7 +170,7 @@ void Replica::MainReplicationFb() { while (state_mask_ & R_ENABLED) { // 1. Connect socket. if ((state_mask_ & R_TCP_CONNECTED) == 0) { - this_fiber::sleep_for(500ms); + fibers_ext::SleepFor(500ms); if (is_paused_) continue; @@ -210,7 +209,7 @@ void Replica::MainReplicationFb() { // triggered // before Redis is ready to transition to the streaming state and it silenty ignores "ACK // 0". We reduce the chance it happens with this delay. - this_fiber::sleep_for(50ms); + fibers_ext::SleepFor(50ms); } service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index cfba4d444108..a5cc98eac987 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -403,8 +403,8 @@ void ServerFamily::Shutdown() { load_result_.wait(); is_snapshot_done_.Notify(); - if (snapshot_fiber_.joinable()) { - snapshot_fiber_.join(); + if (snapshot_fiber_.IsJoinable()) { + snapshot_fiber_.Join(); } pb_task_->Await([this] { @@ -479,7 +479,7 @@ fibers::future ServerFamily::Load(const std::string& load_path) auto& pool = service_.proactor_pool(); - std::vector<::boost::fibers::fiber> load_fibers; + vector load_fibers; load_fibers.reserve(paths.size()); auto first_error = std::make_shared(); @@ -507,7 +507,7 @@ fibers::future ServerFamily::Load(const std::string& load_path) auto load_join_fiber = [this, first_error, load_fibers = std::move(load_fibers), ec_promise = std::move(ec_promise)]() mutable { for (auto& fiber : load_fibers) { - fiber.join(); + fiber.Join(); } VLOG(1) << "Load finished"; diff --git a/src/server/server_family.h b/src/server/server_family.h index 7490c8cf30cd..5f0b3a0606ff 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -9,6 +9,7 @@ #include "facade/conn_context.h" #include "facade/redis_parser.h" #include "server/engine_shard_set.h" +#include "util/fibers/fiber.h" #include "util/proactor_pool.h" namespace util { @@ -148,7 +149,7 @@ class ServerFamily { void SnapshotScheduling(const SnapshotSpec& time); - boost::fibers::fiber snapshot_fiber_; + util::fibers_ext::Fiber snapshot_fiber_; boost::fibers::future load_result_; uint32_t stats_caching_task_ = 0; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index cb0be677181b..5bbc6436fb2d 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -108,7 +108,7 @@ void SliceSnapshot::Join() { void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { { auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::GetIndex()); - this_fiber::properties().set_name(std::move(fiber_name)); + FiberProps::SetName(std::move(fiber_name)); } PrimeTable::Cursor cursor; @@ -138,7 +138,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { if (stats_.serialized >= last_yield + 100) { DVLOG(2) << "Before sleep " << this_fiber::properties().name(); - this_fiber::yield(); + fibers_ext::Yield(); DVLOG(2) << "After sleep"; last_yield = stats_.serialized; diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index e03349360315..f914b8ca3dfa 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -223,8 +223,8 @@ TEST_F(StringFamilyTest, MGetSet) { } }); - mget_fb.join(); - set_fb.join(); + mget_fb.Join(); + set_fb.Join(); } TEST_F(StringFamilyTest, MSetGet) { @@ -260,8 +260,8 @@ TEST_F(StringFamilyTest, MSetGet) { } }); - mset_fb.join(); - get_fb.join(); + mset_fb.Join(); + get_fb.Join(); } TEST_F(StringFamilyTest, MSetDel) { @@ -277,8 +277,8 @@ TEST_F(StringFamilyTest, MSetDel) { } }); - mset_fb.join(); - del_fb.join(); + mset_fb.Join(); + del_fb.Join(); } TEST_F(StringFamilyTest, IntKey) { @@ -310,8 +310,8 @@ TEST_F(StringFamilyTest, SingleShard) { Run({"mget", "x", "b", "y"}); } }); - mset_fb.join(); - mget_fb.join(); + mset_fb.Join(); + mget_fb.Join(); } TEST_F(StringFamilyTest, MSetIncr) { @@ -352,8 +352,8 @@ TEST_F(StringFamilyTest, MSetIncr) { ASSERT_LE(a, c); } }); - mset_fb.join(); - get_fb.join(); + mset_fb.Join(); + get_fb.Join(); } TEST_F(StringFamilyTest, SetEx) { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index c0de0bdae76f..a994d61a2c4c 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -166,7 +166,7 @@ void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double t auto timeout_micro = chrono::duration_cast(1000ms * timeout); int64_t steps = timeout_micro.count() / step.count(); do { - ::boost::this_fiber::sleep_for(step); + fibers_ext::SleepFor(step); } while (!IsLocked(db_index, key) && --steps > 0); CHECK(IsLocked(db_index, key)); }