Skip to content

Commit

Permalink
chore: Update helio dependency (#553)
Browse files Browse the repository at this point in the history
Switch to using fibers_ext::Fiber.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Dec 9, 2022
1 parent c34270c commit 23c902d
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 79 deletions.
9 changes: 5 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"

#ifdef DFLY_USE_SSL
#include "util/tls/tls_socket.h"
Expand All @@ -29,7 +30,7 @@ ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console
using namespace util;
using namespace std;
using nonstd::make_unexpected;
namespace this_fiber = boost::this_fiber;

namespace fibers = boost::fibers;

namespace facade {
Expand Down Expand Up @@ -267,7 +268,7 @@ void Connection::UnregisterShutdownHook(ShutdownHandle id) {
}

void Connection::HandleRequests() {
this_fiber::properties<FiberProps>().set_name("DflyConnection");
FiberProps::SetName("DflyConnection");

LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());

Expand Down Expand Up @@ -508,7 +509,7 @@ auto Connection::ParseRedis() -> ParserStatus {
if (dispatch_q_.size() == 1) {
evc_.notify();
} else if (dispatch_q_.size() > 10) {
this_fiber::yield();
fibers_ext::Yield();
}
}
}
Expand Down Expand Up @@ -715,7 +716,7 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber");
FiberProps::SetName("DispatchFiber");

SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};
Expand Down
10 changes: 5 additions & 5 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"

using namespace std;

Expand All @@ -30,7 +31,6 @@ ABSL_DECLARE_FLAG(string, dbfilename);
namespace dfly {

using namespace util;
namespace this_fiber = ::boost::this_fiber;
using boost::intrusive_ptr;
using boost::fibers::fiber;
using namespace facade;
Expand Down Expand Up @@ -275,7 +275,7 @@ void DebugCmd::Populate(CmdArgList args) {
}
ranges.emplace_back(from, total_count - from);

vector<fiber> fb_arr(ranges.size());
vector<fibers_ext::Fiber> fb_arr(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {
auto range = ranges[i];

Expand All @@ -285,14 +285,14 @@ void DebugCmd::Populate(CmdArgList args) {
});
}
for (auto& fb : fb_arr)
fb.join();
fb.Join();

(*cntx_)->SendOk();
}

void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix,
unsigned value_len, bool populate_random_values) {
this_fiber::properties<FiberProps>().set_name("populate_range");
FiberProps::SetName("populate_range");
VLOG(1) << "PopulateRange: " << from << "-" << (from + len - 1);

string key = absl::StrCat(prefix, ":");
Expand All @@ -313,7 +313,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
ess.Add(sid, [=] {
DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch);
if (i % 50 == 0) {
this_fiber::yield();
fibers_ext::Yield();
}
});

Expand Down
23 changes: 11 additions & 12 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ using absl::StrCat;
using ::io::Result;
using testing::ElementsAre;
using testing::HasSubstr;
namespace this_fiber = boost::this_fiber;

namespace {

Expand Down Expand Up @@ -178,7 +177,7 @@ TEST_F(DflyEngineTest, MultiConsistent) {
auto fb = pp_->at(1)->LaunchFiber([&] {
RespExpr resp = Run({"multi"});
ASSERT_EQ(resp, "OK");
this_fiber::sleep_for(1ms);
fibers_ext::SleepFor(1ms);

resp = Run({"get", kKey1});
ASSERT_EQ(resp, "QUEUED");
Expand All @@ -201,8 +200,8 @@ TEST_F(DflyEngineTest, MultiConsistent) {
EXPECT_EQ(sub_arr[0].GetBuf(), resp_arr[0].GetBuf());
});

mset_fb.join();
fb.join();
mset_fb.Join();
fb.Join();
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
Expand Down Expand Up @@ -271,8 +270,8 @@ TEST_F(DflyEngineTest, MultiHop) {
}
});

p1_fb.join();
p2_fb.join();
p1_fb.Join();
p2_fb.Join();
}

TEST_F(DflyEngineTest, FlushDb) {
Expand All @@ -294,7 +293,7 @@ TEST_F(DflyEngineTest, FlushDb) {
}
});

fb0.join();
fb0.Join();

ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
Expand Down Expand Up @@ -472,12 +471,12 @@ TEST_F(DflyEngineTest, FlushAll) {
for (size_t i = 1; i < 100; ++i) {
RespExpr resp = Run({"set", "foo", "bar"});
ASSERT_EQ(resp, "OK");
this_fiber::yield();
fibers_ext::Yield();
}
});

fb0.join();
fb1.join();
fb0.Join();
fb1.Join();
}

TEST_F(DflyEngineTest, OOM) {
Expand Down Expand Up @@ -791,7 +790,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms);
fibers_ext::SleepFor(100ms);
EXPECT_EQ(shard->stats().defrag_realloc_total, 0);
// we are expecting to have at least one try by now
EXPECT_GT(shard->stats().defrag_task_invocation_total, 0);
Expand All @@ -814,7 +813,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
if (stats.defrag_realloc_total > 0) {
break;
}
this_fiber::sleep_for(220ms);
fibers_ext::SleepFor(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.defrag_realloc_total, 0);
Expand Down
3 changes: 1 addition & 2 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ ABSL_FLAG(float, mem_utilization_threshold, 0.8,
namespace dfly {

using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;
using absl::GetFlag;

Expand Down Expand Up @@ -179,7 +178,7 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
FiberProps::SetName(absl::StrCat("shard_queue", index));
queue_.Run();
});

Expand Down
10 changes: 5 additions & 5 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ TEST_F(GenericFamilyTest, Del) {
}
});

exist_fb.join();
del_fb.join();
exist_fb.Join();
del_fb.Join();
}

TEST_F(GenericFamilyTest, TTL) {
Expand Down Expand Up @@ -160,8 +160,8 @@ TEST_F(GenericFamilyTest, Rename) {
}
});

exist_fb.join();
ren_fb.join();
exist_fb.Join();
ren_fb.Join();
}

TEST_F(GenericFamilyTest, RenameNonString) {
Expand Down Expand Up @@ -275,7 +275,7 @@ TEST_F(GenericFamilyTest, Move) {
Run({"move", "l", "1"});
});

fb_blpop.join();
fb_blpop.Join();
}

using testing::AnyOf;
Expand Down
3 changes: 1 addition & 2 deletions src/server/io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ using namespace util;
using namespace facade;
using uring::FiberCall;
using uring::Proactor;
namespace this_fiber = ::boost::this_fiber;

namespace {

Expand Down Expand Up @@ -137,7 +136,7 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {

void IoMgr::Shutdown() {
while (flags_val) {
this_fiber::sleep_for(200us); // TODO: hacky for now.
fibers_ext::SleepFor(200us); // TODO: hacky for now.
}
}

Expand Down
44 changes: 21 additions & 23 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
using namespace testing;
using namespace std;
using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;

namespace dfly {
Expand Down Expand Up @@ -94,17 +93,17 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
LOG(INFO) << "pop0";
});

this_fiber::sleep_for(50us);
fibers_ext::SleepFor(50us);
auto fb1 = pp_->at(1)->LaunchFiber([&] {
resp1 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop1";
});
this_fiber::sleep_for(30us);
fibers_ext::SleepFor(30us);

pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); });

fb0.join();
fb1.join();
fb0.Join();
fb1.Join();

// fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'.
// sometimes order is switched, need to think how to fix it.
Expand All @@ -131,7 +130,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
});

pp_->at(1)->Await([&] { Run({"lpush", kKey1, "1", "2", "3"}); });
fb1.join();
fb1.Join();

ASSERT_THAT(resp0, ArrLen(2));
EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3"));
Expand Down Expand Up @@ -202,10 +201,10 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
ASSERT_THAT(resp, ArrLen(6));
});

p1_fb.join();
p2_fb.join();
p1_fb.Join();
p2_fb.Join();

pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
Expand Down Expand Up @@ -267,10 +266,10 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
LOG(INFO) << "push2 ts: " << cl2;
});

p1_fb.join();
p2_fb.join();
p1_fb.Join();
p2_fb.Join();

pop_fb.join();
pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING)));
Expand Down Expand Up @@ -303,8 +302,8 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
Run({"lpush", kKey1, "B"});
});

p1_fb.join();
pop_fb.join();
p1_fb.Join();
pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "B"));
}
Expand All @@ -321,7 +320,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); });
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
Expand All @@ -333,7 +332,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); });
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar"));
Expand All @@ -355,7 +354,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
WaitUntilLocked(0, "x");

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); });
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar"));
Expand All @@ -377,7 +376,7 @@ TEST_F(ListFamilyTest, BPopRename) {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Run({"rename", "a", kKey1});
});
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
Expand All @@ -395,7 +394,7 @@ TEST_F(ListFamilyTest, BPopFlush) {
Run({"flushdb"});
EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"}));
});
pop_fb.join();
pop_fb.Join();
}

TEST_F(ListFamilyTest, LRem) {
Expand Down Expand Up @@ -641,24 +640,23 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {
for (int i = 0; i < 1000; i++) {
Run(id, {"rpush", "a", "DATA"});
}
::boost::this_fiber::sleep_for(100ms);
fibers_ext::SleepFor(100ms);
running = false;
};

vector<boost::fibers::fiber> fbs;
vector<fibers_ext::Fiber> fbs;

// more likely to reproduce the bug if we start pop_fiber first.
for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber));
}


for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber));
}

for (auto& f : fbs)
f.join();
f.Join();
}

} // namespace dfly
Loading

0 comments on commit 23c902d

Please sign in to comment.