Skip to content

Commit

Permalink
Optimize server.db_mu_ (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShooterIT authored May 26, 2021
1 parent a7c49bf commit bc6446e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 16 deletions.
21 changes: 6 additions & 15 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -898,13 +898,11 @@ Status Server::AsyncCompactDB(const std::string &begin_key, const std::string &e
if (is_loading_) {
return Status(Status::NotOK, "loading in-progress");
}
db_mu_.lock();
std::lock_guard<std::mutex> lg(db_job_mu_);
if (db_compacting_) {
db_mu_.unlock();
return Status(Status::NotOK, "compact in-progress");
}
db_compacting_ = true;
db_mu_.unlock();

Task task;
task.arg = this;
Expand All @@ -914,32 +912,28 @@ Status Server::AsyncCompactDB(const std::string &begin_key, const std::string &e
if (!begin_key.empty()) begin = new Slice(begin_key);
if (!end_key.empty()) end = new Slice(end_key);
svr->storage_->Compact(begin, end);
svr->db_mu_.lock();
std::lock_guard<std::mutex> lg(svr->db_job_mu_);
svr->db_compacting_ = false;
svr->db_mu_.unlock();
delete begin;
delete end;
};
return task_runner_.Publish(task);
}

Status Server::AsyncBgsaveDB() {
db_mu_.lock();
std::lock_guard<std::mutex> lg(db_job_mu_);
if (db_bgsave_) {
db_mu_.unlock();
return Status(Status::NotOK, "bgsave in-progress");
}
db_bgsave_ = true;
db_mu_.unlock();

Task task;
task.arg = this;
task.callback = [](void *arg) {
auto svr = static_cast<Server*>(arg);
svr->storage_->CreateBackup();
svr->db_mu_.lock();
std::lock_guard<std::mutex> lg(svr->db_job_mu_);
svr->db_bgsave_ = false;
svr->db_mu_.unlock();
};
return task_runner_.Publish(task);
}
Expand All @@ -955,17 +949,15 @@ Status Server::AsyncPurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backu
}

Status Server::AsyncScanDBSize(const std::string &ns) {
db_mu_.lock();
std::lock_guard<std::mutex> lg(db_job_mu_);
auto iter = db_scan_infos_.find(ns);
if (iter == db_scan_infos_.end()) {
db_scan_infos_[ns] = DBScanInfo{};
}
if (db_scan_infos_[ns].is_scanning) {
db_mu_.unlock();
return Status(Status::NotOK, "scanning the db now");
}
db_scan_infos_[ns].is_scanning = true;
db_mu_.unlock();

Task task;
task.arg = this;
Expand All @@ -975,11 +967,10 @@ Status Server::AsyncScanDBSize(const std::string &ns) {
KeyNumStats stats;
db.GetKeyNumStats("", &stats);

svr->db_mu_.lock();
std::lock_guard<std::mutex> lg(svr->db_job_mu_);
svr->db_scan_infos_[ns].key_num_stats = stats;
time(&svr->db_scan_infos_[ns].last_scan_time);
svr->db_scan_infos_[ns].is_scanning = false;
svr->db_mu_.unlock();
};
return task_runner_.Publish(task);
}
Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ class Server {
std::list<FeedSlaveThread *> slave_threads_;
std::atomic<int> fetch_file_threads_num_;

std::mutex db_mu_;
// Some jobs to operate DB should be unique
std::mutex db_job_mu_;
bool db_compacting_ = false;
bool db_bgsave_ = false;
std::map<std::string, DBScanInfo> db_scan_infos_;
Expand Down

0 comments on commit bc6446e

Please sign in to comment.