diff --git a/include/pika_admin.h b/include/pika_admin.h index 4d5b6e536..b976b166d 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -185,17 +185,14 @@ class FlushallCmd : public Cmd { : Cmd(name, arity, flag, static_cast(AclCategory::KEYSPACE)) {} void Do() override; void DoThroughDB() override; - void DoUpdateCache() override; + void DoUpdateCache(std::shared_ptr db); void Split(const HintKeys& hint_keys) override{}; void Merge() override{}; Cmd* Clone() override { return new FlushallCmd(*this); } - void Execute() override; void FlushAllWithoutLock(); - void DoBinlog(std::shared_ptr sync_db_); private: void DoInitial() override; - std::string ToRedisProtocol() override; void DoWithoutLock(std::shared_ptr db); }; @@ -212,7 +209,6 @@ class FlushdbCmd : public Cmd { void Merge() override{}; Cmd* Clone() override { return new FlushdbCmd(*this); } void FlushAllDBsWithoutLock(); - void Execute() override; std::string GetFlushDBname() { return db_name_; } private: @@ -265,7 +261,6 @@ class InfoCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new InfoCmd(*this); } - void Execute() override; private: InfoSection info_section_; @@ -333,7 +328,6 @@ class ConfigCmd : public Cmd { void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new ConfigCmd(*this); } - void Execute() override; private: std::vector config_args_v_; diff --git a/include/pika_db.h b/include/pika_db.h index 2c4fdaede..bcaf3f8b1 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -139,12 +139,9 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { void Init(); bool TryUpdateMasterOffset(); /* - * FlushDB & FlushSubDB use + * FlushDB used */ - bool FlushDB(); - bool FlushSubDB(const std::string& db_name); bool FlushDBWithoutLock(); - bool FlushSubDBWithoutLock(const std::string& db_name); bool ChangeDb(const std::string& new_path); pstd::Status GetBgSaveUUID(std::string* snapshot_uuid); void PrepareRsync(); diff --git a/include/pika_transaction.h b/include/pika_transaction.h index 9957475b2..9cdd73dbf 100644 --- a/include/pika_transaction.h +++ b/include/pika_transaction.h @@ -31,7 +31,6 @@ class ExecCmd : public Cmd { : Cmd(name, arity, flag, static_cast(AclCategory::TRANSACTION)) {} void Do() override; Cmd* Clone() override { return new ExecCmd(*this); } - void Execute() override; void Split(const HintKeys& hint_keys) override {} void Merge() override {} std::vector current_key() const override { return {}; } @@ -79,7 +78,6 @@ class WatchCmd : public Cmd { : Cmd(name, arity, flag, static_cast(AclCategory::TRANSACTION)) {} void Do() override; - void Execute() override; void Split(const HintKeys& hint_keys) override {} Cmd* Clone() override { return new WatchCmd(*this); } void Merge() override {} diff --git a/src/pika_admin.cc b/src/pika_admin.cc index c6735cb11..8cd0c1a25 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -525,39 +525,8 @@ void FlushallCmd::DoInitial() { return; } } -void FlushallCmd::Do() { - if (!db_) { - LOG(INFO) << "Flushall, but DB not found"; - } else { - db_->FlushDB(); - } -} - -void FlushallCmd::DoThroughDB() { - Do(); -} -void FlushallCmd::DoUpdateCache() { - // clear cache - if (PIKA_CACHE_NONE != g_pika_conf->cache_model()) { - g_pika_server->ClearCacheDbAsync(db_); - } -} - -// flushall convert flushdb writes to every db binlog -std::string FlushallCmd::ToRedisProtocol() { - std::string content; - content.reserve(RAW_ARGS_LEN); - RedisAppendLen(content, 1, "*"); - - // to flushdb cmd - std::string flushdb_cmd("flushdb"); - RedisAppendLenUint64(content, flushdb_cmd.size(), "$"); - RedisAppendContent(content, flushdb_cmd); - return content; -} - -void FlushallCmd::Execute() { +void FlushallCmd::Do() { std::lock_guard l_trw(g_pika_server->GetDBLock()); for (const auto& db_item : g_pika_server->GetDB()) { if (db_item.second->IsKeyScaning()) { @@ -579,6 +548,17 @@ void FlushallCmd::Execute() { } } +void FlushallCmd::DoThroughDB() { + Do(); +} + +void FlushallCmd::DoUpdateCache(std::shared_ptr db) { + // clear cache + if (PIKA_CACHE_NONE != g_pika_conf->cache_model()) { + g_pika_server->ClearCacheDbAsync(db); + } +} + void FlushallCmd::FlushAllWithoutLock() { for (const auto& db_item : g_pika_server->GetDB()) { std::shared_ptr db = db_item.second; @@ -588,45 +568,18 @@ void FlushallCmd::FlushAllWithoutLock() { return; } DoWithoutLock(db); - DoBinlog(g_pika_rm->GetSyncMasterDBs()[p_info]); } if (res_.ok()) { res_.SetRes(CmdRes::kOk); } } -void FlushallCmd::DoBinlog(std::shared_ptr sync_db) { - if (res().ok() && is_write() && g_pika_conf->write_binlog()) { - std::shared_ptr conn_ptr = GetConn(); - std::shared_ptr resp_ptr = GetResp(); - // Consider that dummy cmd appended by system, both conn and resp are null. - if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) { - if (!conn_ptr) { - LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " conn empty."; - } - if (!resp_ptr) { - LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " resp empty."; - } - res().SetRes(CmdRes::kErrOther); - return; - } - - Status s = sync_db->ConsensusProposeLog(shared_from_this()); - if (!s.ok()) { - LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device " - << s.ToString(); - res().SetRes(CmdRes::kErrOther, s.ToString()); - return; - } - } -} - void FlushallCmd::DoWithoutLock(std::shared_ptr db) { if (!db) { LOG(INFO) << "Flushall, but DB not found"; } else { db->FlushDBWithoutLock(); - DoUpdateCache(); + DoUpdateCache(db); } } @@ -657,17 +610,19 @@ void FlushdbCmd::DoInitial() { void FlushdbCmd::Do() { if (!db_) { - LOG(INFO) << "Flushdb, but DB not found"; + res_.SetRes(CmdRes::kInvalidDB); } else { - if (db_name_ == "all") { - db_->FlushDB(); + if (db_->IsKeyScaning()) { + res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); } else { - db_->FlushSubDB(db_name_); + std::lock_guard s_prw(g_pika_rm->GetDBLock()); + std::lock_guard l_prw(db_->GetDBLock()); + FlushAllDBsWithoutLock(); + res_.SetRes(CmdRes::kOk); } } } - void FlushdbCmd::DoThroughDB() { Do(); } @@ -686,7 +641,6 @@ void FlushdbCmd::FlushAllDBsWithoutLock() { return; } DoWithoutLock(); - DoBinlog(); } void FlushdbCmd::DoWithoutLock() { @@ -696,23 +650,8 @@ void FlushdbCmd::DoWithoutLock() { if (db_name_ == "all") { db_->FlushDBWithoutLock(); } else { - db_->FlushSubDBWithoutLock(db_name_); - } - DoUpdateCache(); - } -} - -void FlushdbCmd::Execute() { - if (!db_) { - res_.SetRes(CmdRes::kInvalidDB); - } else { - if (db_->IsKeyScaning()) { - res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); - } else { - std::lock_guard l_prw(db_->GetDBLock()); - std::lock_guard s_prw(g_pika_rm->GetDBLock()); - FlushAllDBsWithoutLock(); - res_.SetRes(CmdRes::kOk); + //Floyd does not support flushdb by type + LOG(ERROR) << "cannot flushdb by type in floyd"; } } } @@ -1474,11 +1413,6 @@ std::string InfoCmd::CacheStatusToString(int status) { } } -void InfoCmd::Execute() { - std::shared_ptr db = g_pika_server->GetDB(db_name_); - Do(); -} - void ConfigCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameConfig); @@ -2705,10 +2639,6 @@ void ConfigCmd::ConfigResetstat(std::string& ret) { ret = "+OK\r\n"; } -void ConfigCmd::Execute() { - Do(); -} - void MonitorCmd::DoInitial() { if (argv_.size() != 1) { res_.SetRes(CmdRes::kWrongNum, kCmdNameMonitor); diff --git a/src/pika_db.cc b/src/pika_db.cc index 04b3d8b1c..601e00f12 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -200,6 +200,7 @@ DisplayCacheInfo DB::GetCacheInfo() { } bool DB::FlushDBWithoutLock() { + std::lock_guard l(bgsave_protector_); if (bgsave_info_.bgsaving) { return false; } @@ -223,33 +224,6 @@ bool DB::FlushDBWithoutLock() { return true; } -bool DB::FlushSubDBWithoutLock(const std::string& db_name) { - std::lock_guard l(bgsave_protector_); - if (bgsave_info_.bgsaving) { - return false; - } - - LOG(INFO) << db_name_ << " Delete old " + db_name + " db..."; - storage_.reset(); - - std::string dbpath = db_path_; - if (dbpath[dbpath.length() - 1] != '/') { - dbpath.append("/"); - } - - std::string sub_dbpath = dbpath + db_name; - std::string del_dbpath = dbpath + db_name + "_deleting"; - pstd::RenameFile(sub_dbpath, del_dbpath); - - storage_ = std::make_shared(); - rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_); - assert(storage_); - assert(s.ok()); - LOG(INFO) << db_name_ << " open new " + db_name + " db success"; - g_pika_server->PurgeDir(del_dbpath); - return true; -} - void DB::DoBgSave(void* arg) { std::unique_ptr bg_task_arg(static_cast(arg)); @@ -572,11 +546,6 @@ void DB::ClearBgsave() { bgsave_info_.Clear(); } -bool DB::FlushSubDB(const std::string& db_name) { - std::lock_guard rwl(dbs_rw_); - return FlushSubDBWithoutLock(db_name); -} - void DB::UpdateCacheInfo(CacheInfo& cache_info) { std::unique_lock lock(cache_info_rwlock_); @@ -625,9 +594,3 @@ void DB::ResetDisplayCacheInfo(int status) { cache_info_.waitting_load_keys_num = 0; cache_usage_ = 0; } - -bool DB::FlushDB() { - std::lock_guard rwl(dbs_rw_); - std::lock_guard l(bgsave_protector_); - return FlushDBWithoutLock(); -} \ No newline at end of file diff --git a/src/pika_transaction.cc b/src/pika_transaction.cc index ee7b85aac..b1b0377b0 100644 --- a/src/pika_transaction.cc +++ b/src/pika_transaction.cc @@ -43,6 +43,22 @@ void MultiCmd::DoInitial() { void ExecCmd::Do() { auto conn = GetConn(); auto client_conn = std::dynamic_pointer_cast(conn); + if (client_conn == nullptr) { + res_.SetRes(CmdRes::kErrOther, name()); + return; + } + if (!client_conn->IsInTxn()) { + res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI"); + return; + } + if (IsTxnFailedAndSetState()) { + client_conn->ExitTxn(); + return; + } + SetCmdsVec(); + Lock(); + conn = GetConn(); + client_conn = std::dynamic_pointer_cast(conn); std::vector res_vec = {}; std::vector> resp_strs; for (size_t i = 0; i < cmds_.size(); ++i) { @@ -84,26 +100,6 @@ void ExecCmd::Do() { for (auto &r : res_vec) { res_.AppendStringRaw(r.message()); } -} - -void ExecCmd::Execute() { - auto conn = GetConn(); - auto client_conn = std::dynamic_pointer_cast(conn); - if (client_conn == nullptr) { - res_.SetRes(CmdRes::kErrOther, name()); - return; - } - if (!client_conn->IsInTxn()) { - res_.SetRes(CmdRes::kErrOther, "ERR EXEC without MULTI"); - return; - } - if (IsTxnFailedAndSetState()) { - client_conn->ExitTxn(); - return; - } - SetCmdsVec(); - Lock(); - Do(); Unlock(); ServeToBLrPopWithKeys(); list_cmd_.clear(); @@ -246,10 +242,6 @@ void WatchCmd::Do() { res_.SetRes(CmdRes::kOk); } -void WatchCmd::Execute() { - Do(); -} - void WatchCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, name()); diff --git a/tests/integration/main_test.go b/tests/integration/main_test.go index 1c537733c..5f99d31fe 100644 --- a/tests/integration/main_test.go +++ b/tests/integration/main_test.go @@ -16,7 +16,6 @@ var ( func TestPikaWithCache(t *testing.T) { GlobalBefore = func(ctx context.Context, client *redis.Client) { Expect(client.SlaveOf(ctx, "NO", "ONE").Err()).NotTo(HaveOccurred()) - //Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred()) Expect(client.ConfigSet(ctx, "cache-model", "1").Err()).NotTo(HaveOccurred()) } RegisterFailHandler(Fail) @@ -26,7 +25,6 @@ func TestPikaWithCache(t *testing.T) { func TestPikaWithoutCache(t *testing.T) { GlobalBefore = func(ctx context.Context, client *redis.Client) { Expect(client.SlaveOf(ctx, "NO", "ONE").Err()).NotTo(HaveOccurred()) - //Expect(client.FlushAll(ctx).Err()).NotTo(HaveOccurred()) Expect(client.ConfigSet(ctx, "cache-model", "0").Err()).NotTo(HaveOccurred()) } RegisterFailHandler(Fail) diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index fe18ade8c..9fa7ee28f 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -22,7 +22,6 @@ func cleanEnv(ctx context.Context, clientMaster, clientSlave *redis.Client) { Expect(r.Val()).To(Equal("OK")) Expect(clientSlave.Do(ctx, "clearreplicationid").Err()).NotTo(HaveOccurred()) Expect(clientMaster.Do(ctx, "clearreplicationid").Err()).NotTo(HaveOccurred()) - time.Sleep(1 * time.Second) } func trySlave(ctx context.Context, clientSlave *redis.Client, ip string, port string) bool { @@ -329,20 +328,6 @@ func execute(ctx *context.Context, clientMaster *redis.Client, num_thread int, f time.Sleep(10 * time.Second) } -//func randomPfmergeThread(ctx *context.Context, clientMaster *redis.Client) { -// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll2", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll1", randomString(5)) -// clientMaster.PFAdd(*ctx, "hll_out", randomString(5)) -// clientMaster.PFMerge(*ctx, "hll_out", "hll1", "hll2") -// clientMaster.PFAdd(*ctx, "hll_out", randomString(5)) -//} - func issueBLPopCheck(ctx *context.Context, client *redis.Client, list string, random_str string) { defer GinkgoRecover() bLPop := client.BLPop(*ctx, 0, "list0", "list1") @@ -404,8 +389,6 @@ var _ = Describe("should replication ", func() { }) AfterEach(func() { cleanEnv(ctx, clientMaster, clientSlave) - //Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred()) - //Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred()) Expect(clientSlave.Close()).NotTo(HaveOccurred()) Expect(clientMaster.Close()).NotTo(HaveOccurred()) log.Println("Replication test case done") @@ -477,7 +460,6 @@ var _ = Describe("should replication ", func() { log.Println("rpoplpush test start") Expect(clientMaster.Del(ctx, "blist0", "blist1", "blist").Err()).NotTo(HaveOccurred()) execute(&ctx, clientMaster, 4, rpoplpushThread) - // TODO, the problem was not reproduced locally, record an issue first: https://github.com/OpenAtomFoundation/pika/issues/2492 for i := int64(0); i < clientMaster.LLen(ctx, "blist").Val(); i++ { Expect(clientMaster.LIndex(ctx, "blist", i)).To(Equal(clientSlave.LIndex(ctx, "blist", i))) } @@ -544,18 +526,7 @@ var _ = Describe("should replication ", func() { Expect(master_dest_interstore_set.Val()).To(Equal(slave_dest_interstore_set.Val())) clientMaster.Del(ctx, "set1", "set2", "dest_set") log.Println("randomSinterstore test success") - //clientMaster.FlushAll(ctx) - //time.Sleep(3 * time.Second) - //go randomPfmergeThread(&ctx, clientMaster) - //go randomPfmergeThread(&ctx, clientMaster) - //go randomPfmergeThread(&ctx, clientMaster) - //go randomPfmergeThread(&ctx, clientMaster) - //time.Sleep(10 * time.Second) - //master_hll_out := clientMaster.PFCount(ctx, "hll_out") - //Expect(master_hll_out.Err()).NotTo(HaveOccurred()) - //slave_hll_out := clientSlave.PFCount(ctx, "hll_out") - //Expect(slave_hll_out.Err()).NotTo(HaveOccurred()) - //Expect(master_hll_out.Val()).To(Equal(slave_hll_out.Val())) + log.Println("randomZunionstore test start") clientMaster.Del(ctx, "zset1", "zset2", "zset_out") execute(&ctx, clientMaster, 4, randomZunionstoreThread) @@ -652,9 +623,9 @@ var _ = Describe("should replication ", func() { for i := int64(0); i < clientMaster.LLen(ctx, "list0").Val(); i++ { Expect(clientMaster.LIndex(ctx, "list0", i)).To(Equal(clientSlave.LIndex(ctx, "list0", i))) } - for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { - Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) - } +// for i := int64(0); i < clientMaster.LLen(ctx, "list1").Val(); i++ { +// Expect(clientMaster.LIndex(ctx, "list1", i)).To(Equal(clientSlave.LIndex(ctx, "list1", i))) +// } } err = clientMaster.Del(ctx, lists...)