Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update helio dependency #553

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"

#ifdef DFLY_USE_SSL
#include "util/tls/tls_socket.h"
Expand All @@ -29,7 +30,7 @@ ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console
using namespace util;
using namespace std;
using nonstd::make_unexpected;
namespace this_fiber = boost::this_fiber;

namespace fibers = boost::fibers;

namespace facade {
Expand Down Expand Up @@ -267,7 +268,7 @@ void Connection::UnregisterShutdownHook(ShutdownHandle id) {
}

void Connection::HandleRequests() {
this_fiber::properties<FiberProps>().set_name("DflyConnection");
FiberProps::SetName("DflyConnection");

LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());

Expand Down Expand Up @@ -508,7 +509,7 @@ auto Connection::ParseRedis() -> ParserStatus {
if (dispatch_q_.size() == 1) {
evc_.notify();
} else if (dispatch_q_.size() > 10) {
this_fiber::yield();
fibers_ext::Yield();
}
}
}
Expand Down Expand Up @@ -715,7 +716,7 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber");
FiberProps::SetName("DispatchFiber");

SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};
Expand Down
10 changes: 5 additions & 5 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"

using namespace std;

Expand All @@ -30,7 +31,6 @@ ABSL_DECLARE_FLAG(string, dbfilename);
namespace dfly {

using namespace util;
namespace this_fiber = ::boost::this_fiber;
using boost::intrusive_ptr;
using boost::fibers::fiber;
using namespace facade;
Expand Down Expand Up @@ -275,7 +275,7 @@ void DebugCmd::Populate(CmdArgList args) {
}
ranges.emplace_back(from, total_count - from);

vector<fiber> fb_arr(ranges.size());
vector<fibers_ext::Fiber> fb_arr(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {
auto range = ranges[i];

Expand All @@ -285,14 +285,14 @@ void DebugCmd::Populate(CmdArgList args) {
});
}
for (auto& fb : fb_arr)
fb.join();
fb.Join();

(*cntx_)->SendOk();
}

void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix,
unsigned value_len, bool populate_random_values) {
this_fiber::properties<FiberProps>().set_name("populate_range");
FiberProps::SetName("populate_range");
VLOG(1) << "PopulateRange: " << from << "-" << (from + len - 1);

string key = absl::StrCat(prefix, ":");
Expand All @@ -313,7 +313,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
ess.Add(sid, [=] {
DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch);
if (i % 50 == 0) {
this_fiber::yield();
fibers_ext::Yield();
}
});

Expand Down
23 changes: 11 additions & 12 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ using absl::StrCat;
using ::io::Result;
using testing::ElementsAre;
using testing::HasSubstr;
namespace this_fiber = boost::this_fiber;

namespace {

Expand Down Expand Up @@ -178,7 +177,7 @@ TEST_F(DflyEngineTest, MultiConsistent) {
auto fb = pp_->at(1)->LaunchFiber([&] {
RespExpr resp = Run({"multi"});
ASSERT_EQ(resp, "OK");
this_fiber::sleep_for(1ms);
fibers_ext::SleepFor(1ms);

resp = Run({"get", kKey1});
ASSERT_EQ(resp, "QUEUED");
Expand All @@ -201,8 +200,8 @@ TEST_F(DflyEngineTest, MultiConsistent) {
EXPECT_EQ(sub_arr[0].GetBuf(), resp_arr[0].GetBuf());
});

mset_fb.join();
fb.join();
mset_fb.Join();
fb.Join();
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
Expand Down Expand Up @@ -271,8 +270,8 @@ TEST_F(DflyEngineTest, MultiHop) {
}
});

p1_fb.join();
p2_fb.join();
p1_fb.Join();
p2_fb.Join();
}

TEST_F(DflyEngineTest, FlushDb) {
Expand All @@ -294,7 +293,7 @@ TEST_F(DflyEngineTest, FlushDb) {
}
});

fb0.join();
fb0.Join();

ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
Expand Down Expand Up @@ -472,12 +471,12 @@ TEST_F(DflyEngineTest, FlushAll) {
for (size_t i = 1; i < 100; ++i) {
RespExpr resp = Run({"set", "foo", "bar"});
ASSERT_EQ(resp, "OK");
this_fiber::yield();
fibers_ext::Yield();
}
});

fb0.join();
fb1.join();
fb0.Join();
fb1.Join();
}

TEST_F(DflyEngineTest, OOM) {
Expand Down Expand Up @@ -791,7 +790,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms);
fibers_ext::SleepFor(100ms);
EXPECT_EQ(shard->stats().defrag_realloc_total, 0);
// we are expecting to have at least one try by now
EXPECT_GT(shard->stats().defrag_task_invocation_total, 0);
Expand All @@ -814,7 +813,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
if (stats.defrag_realloc_total > 0) {
break;
}
this_fiber::sleep_for(220ms);
fibers_ext::SleepFor(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.defrag_realloc_total, 0);
Expand Down
3 changes: 1 addition & 2 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ ABSL_FLAG(float, mem_utilization_threshold, 0.8,
namespace dfly {

using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;
using absl::GetFlag;

Expand Down Expand Up @@ -179,7 +178,7 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
FiberProps::SetName(absl::StrCat("shard_queue", index));
queue_.Run();
});

Expand Down
10 changes: 5 additions & 5 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ TEST_F(GenericFamilyTest, Del) {
}
});

exist_fb.join();
del_fb.join();
exist_fb.Join();
del_fb.Join();
}

TEST_F(GenericFamilyTest, TTL) {
Expand Down Expand Up @@ -160,8 +160,8 @@ TEST_F(GenericFamilyTest, Rename) {
}
});

exist_fb.join();
ren_fb.join();
exist_fb.Join();
ren_fb.Join();
}

TEST_F(GenericFamilyTest, RenameNonString) {
Expand Down Expand Up @@ -275,7 +275,7 @@ TEST_F(GenericFamilyTest, Move) {
Run({"move", "l", "1"});
});

fb_blpop.join();
fb_blpop.Join();
}

using testing::AnyOf;
Expand Down
3 changes: 1 addition & 2 deletions src/server/io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ using namespace util;
using namespace facade;
using uring::FiberCall;
using uring::Proactor;
namespace this_fiber = ::boost::this_fiber;

namespace {

Expand Down Expand Up @@ -137,7 +136,7 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {

void IoMgr::Shutdown() {
while (flags_val) {
this_fiber::sleep_for(200us); // TODO: hacky for now.
fibers_ext::SleepFor(200us); // TODO: hacky for now.
}
}

Expand Down
44 changes: 21 additions & 23 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
using namespace testing;
using namespace std;
using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;

namespace dfly {
Expand Down Expand Up @@ -94,17 +93,17 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
LOG(INFO) << "pop0";
});

this_fiber::sleep_for(50us);
fibers_ext::SleepFor(50us);
auto fb1 = pp_->at(1)->LaunchFiber([&] {
resp1 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop1";
});
this_fiber::sleep_for(30us);
fibers_ext::SleepFor(30us);

pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); });

fb0.join();
fb1.join();
fb0.Join();
fb1.Join();

// fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'.
// sometimes order is switched, need to think how to fix it.
Expand All @@ -131,7 +130,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
});

pp_->at(1)->Await([&] { Run({"lpush", kKey1, "1", "2", "3"}); });
fb1.join();
fb1.Join();

ASSERT_THAT(resp0, ArrLen(2));
EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3"));
Expand Down Expand Up @@ -202,10 +201,10 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
ASSERT_THAT(resp, ArrLen(6));
});

p1_fb.join();
p2_fb.join();
p1_fb.Join();
p2_fb.Join();

pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
Expand Down Expand Up @@ -267,10 +266,10 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
LOG(INFO) << "push2 ts: " << cl2;
});

p1_fb.join();
p2_fb.join();
p1_fb.Join();
p2_fb.Join();

pop_fb.join();
pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING)));
Expand Down Expand Up @@ -303,8 +302,8 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
Run({"lpush", kKey1, "B"});
});

p1_fb.join();
pop_fb.join();
p1_fb.Join();
pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "B"));
}
Expand All @@ -321,7 +320,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); });
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
Expand All @@ -333,7 +332,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); });
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar"));
Expand All @@ -355,7 +354,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
WaitUntilLocked(0, "x");

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); });
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar"));
Expand All @@ -377,7 +376,7 @@ TEST_F(ListFamilyTest, BPopRename) {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Run({"rename", "a", kKey1});
});
pop_fb.join();
pop_fb.Join();

ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
Expand All @@ -395,7 +394,7 @@ TEST_F(ListFamilyTest, BPopFlush) {
Run({"flushdb"});
EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"}));
});
pop_fb.join();
pop_fb.Join();
}

TEST_F(ListFamilyTest, LRem) {
Expand Down Expand Up @@ -641,24 +640,23 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {
for (int i = 0; i < 1000; i++) {
Run(id, {"rpush", "a", "DATA"});
}
::boost::this_fiber::sleep_for(100ms);
fibers_ext::SleepFor(100ms);
running = false;
};

vector<boost::fibers::fiber> fbs;
vector<fibers_ext::Fiber> fbs;

// more likely to reproduce the bug if we start pop_fiber first.
for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber));
}


for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber));
}

for (auto& f : fbs)
f.join();
f.Join();
}

} // namespace dfly
Loading