diff --git a/src/config.cc b/src/config.cc index b3befd317c8..050585ac59f 100644 --- a/src/config.cc +++ b/src/config.cc @@ -194,6 +194,8 @@ void Config::initFieldCallback() { db_dir = dir + "/db"; if (backup_dir.empty()) backup_dir = dir + "/backup"; if (log_dir.empty()) log_dir = dir; + checkpoint_dir = dir + "/checkpoint"; + sync_checkpoint_dir = dir + "/sync_checkpoint"; return Status::OK(); }}, {"bind", [this](Server* srv, const std::string &k, const std::string& v)->Status { diff --git a/src/config.h b/src/config.h index 79d509765b6..6357cccbe0e 100644 --- a/src/config.h +++ b/src/config.h @@ -68,6 +68,8 @@ struct Config{ std::string dir; std::string db_dir; std::string backup_dir; + std::string checkpoint_dir; + std::string sync_checkpoint_dir; std::string log_dir; std::string pidfile; std::string db_name; diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index c7c118aaaa9..d244c16ffb3 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -4068,31 +4068,26 @@ class CommandFetchMeta : public Commander { // Feed-replica-meta thread std::thread t = std::thread([svr, repl_fd, ip]() { - Util::ThreadSetName("feed-replica-meta"); - int fd; - uint64_t file_size; - rocksdb::BackupID meta_id; - auto s = Engine::Storage::BackupManager::OpenLatestMeta( - svr->storage_, &fd, &meta_id, &file_size); + Util::ThreadSetName("feed-replica-data-info"); + std::string files; + auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo( + svr->storage_, &files); if (!s.IsOK()) { - const char *message = "-ERR can't create db backup"; + const char *message = "-ERR can't create db checkpoint"; write(repl_fd, message, strlen(message)); - LOG(ERROR) << "[replication] Failed to open latest meta, err: " - << s.Msg(); + LOG(WARNING) << "[replication] Failed to get full data file info," + << " error: " << s.Msg(); close(repl_fd); return; } - // Send the meta ID, meta file size and content - if (Util::SockSend(repl_fd, std::to_string(meta_id)+CRLF).IsOK() && - Util::SockSend(repl_fd, std::to_string(file_size)+CRLF).IsOK() && - Util::SockSendFile(repl_fd, fd, file_size).IsOK()) { - LOG(INFO) << "[replication] Succeed sending backup meta " << meta_id - << " to " << ip; + // Send full data file info + if (Util::SockSend(repl_fd, files+CRLF).IsOK()) { + LOG(INFO) << "[replication] Succeed sending full data file info to " << ip; } else { - LOG(WARNING) << "[replication] Fail to send backup meta" << meta_id + LOG(WARNING) << "[replication] Fail to send full data file info " << ip << ", error: " << strerror(errno); } - close(fd); + svr->storage_->SetCheckpointAccessTime(std::time(nullptr)); close(repl_fd); }); t.detach(); @@ -4132,7 +4127,7 @@ class CommandFetchFile : public Commander { svr->GetFetchFileThreadNum(); } auto start = std::chrono::high_resolution_clock::now(); - auto fd = Engine::Storage::BackupManager::OpenDataFile(svr->storage_, + auto fd = Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_, file, &file_size); if (fd < 0) break; @@ -4160,6 +4155,7 @@ class CommandFetchFile : public Commander { usleep(shortest - duration); } } + svr->storage_->SetCheckpointAccessTime(std::time(nullptr)); svr->DecrFetchFileThread(); close(repl_fd); }); diff --git a/src/replication.cc b/src/replication.cc index 7f6ab2285f2..97213eae76f 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -277,6 +277,12 @@ Status ReplicationThread::Start(std::function &&pre_fullsync_cb, pre_fullsync_cb_ = std::move(pre_fullsync_cb); post_fullsync_cb_ = std::move(post_fullsync_cb); + // Clean synced checkpoint from old master because replica starts to follow new master + auto s = rocksdb::DestroyDB(srv_->GetConfig()->sync_checkpoint_dir, rocksdb::Options()); + if (!s.ok()) { + LOG(WARNING) << "Can't clean synced checkpoint from master, error: " << s.ToString(); + } + // cleanup the old backups, so we can start replication in a clean state storage_->PurgeOldBackups(0, 0); @@ -532,6 +538,11 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, auto input = bufferevent_get_input(bev); switch (self->fullsync_state_) { case kFetchMetaID: + // New version master only sends meta file content + if (!self->srv_->GetConfig()->master_use_repl_port) { + self->fullsync_state_ = kFetchMetaContent; + return CBState::AGAIN; + } line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT); if (!line) return CBState::AGAIN; if (line[0] == '-') { @@ -566,17 +577,51 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, self->fullsync_state_ = kFetchMetaContent; LOG(INFO) << "[replication] Succeed fetching meta size: " << self->fullsync_filesize_; case kFetchMetaContent: - if (evbuffer_get_length(input) < self->fullsync_filesize_) { - return CBState::AGAIN; + std::string target_dir; + Engine::Storage::ReplDataManager::MetaInfo meta; + // Master using old version + if (self->srv_->GetConfig()->master_use_repl_port) { + if (evbuffer_get_length(input) < self->fullsync_filesize_) { + return CBState::AGAIN; + } + meta = Engine::Storage::ReplDataManager::ParseMetaAndSave( + self->storage_, self->fullsync_meta_id_, input); + target_dir = self->srv_->GetConfig()->backup_dir; + } else { + // Master using new version + line = evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF_STRICT); + if (!line) return CBState::AGAIN; + if (line[0] == '-') { + LOG(ERROR) << "[replication] Failed to fetch meta info: " << line; + free(line); + return CBState::RESTART; + } + std::vector need_files; + Util::Split(std::string(line), ",", &need_files); + for (auto f : need_files) { + meta.files.emplace_back(f, 0); + } + target_dir = self->srv_->GetConfig()->sync_checkpoint_dir; + // Clean invaild files of checkpoint + auto s = Engine::Storage::ReplDataManager::CleanInvalidFiles( + self->storage_, target_dir, need_files); + if (!s.IsOK()) { + LOG(WARNING) << "[replication] Failed to clean up invalid files of the old checkpoint," + << " error: " << s.Msg(); + LOG(WARNING) << "[replication] Try to clean all checkpoint files"; + auto s = rocksdb::DestroyDB(target_dir, rocksdb::Options()); + if (!s.ok()) { + LOG(WARNING) << "[replication] Failed to clean all checkpoint files, error: " + << s.ToString(); + } + } } - auto meta = Engine::Storage::BackupManager::ParseMetaAndSave( - self->storage_, self->fullsync_meta_id_, input); assert(evbuffer_get_length(input) == 0); self->fullsync_state_ = kFetchMetaID; - LOG(INFO) << "[replication] Succeeded fetching meta file, fetching files in parallel"; + LOG(INFO) << "[replication] Succeeded fetching full data files info, fetching files in parallel"; self->repl_state_ = kReplFetchSST; - auto s = self->parallelFetchFile(meta.files); + auto s = self->parallelFetchFile(target_dir, meta.files); if (!s.IsOK()) { LOG(ERROR) << "[replication] Failed to parallel fetch files while " + s.Msg(); return CBState::RESTART; @@ -585,7 +630,12 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, // Restore DB from backup self->pre_fullsync_cb_(); - s = self->storage_->RestoreFromBackup(); + // For old version, master uses rocksdb backup to implement data snapshot + if (self->srv_->GetConfig()->master_use_repl_port) { + s = self->storage_->RestoreFromBackup(); + } else { + s = self->storage_->RestoreFromCheckpoint(); + } if (!s.IsOK()) { LOG(ERROR) << "[replication] Failed to restore backup while " + s.Msg() + ", restart fullsync"; return CBState::RESTART; @@ -603,7 +653,8 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev, return CBState::QUIT; } -Status ReplicationThread::parallelFetchFile(const std::vector> &files) { +Status ReplicationThread::parallelFetchFile(const std::string &dir, + const std::vector> &files) { size_t concurrency = 1; if (files.size() > 20) { // Use 4 threads to download files in parallel @@ -614,7 +665,7 @@ Status ReplicationThread::parallelFetchFile(const std::vector> results; for (size_t tid = 0; tid < concurrency; ++tid) { results.push_back(std::async( - std::launch::async, [this, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status { + std::launch::async, [this, dir, &files, tid, concurrency, &fetch_cnt, &skip_cnt]() -> Status { if (this->stop_flag_) { return Status(Status::NotOK, "replication thread was stopped"); } @@ -637,7 +688,7 @@ Status ReplicationThread::parallelFetchFile(const std::vectorstorage_, f_name, f_crc)) { + if (Engine::Storage::ReplDataManager::FileExists(this->storage_, dir, f_name, f_crc)) { skip_cnt.fetch_add(1); uint32_t cur_skip_cnt = skip_cnt.load(); uint32_t cur_fetch_cnt = fetch_cnt.load(); @@ -663,11 +714,13 @@ Status ReplicationThread::parallelFetchFile(const std::vectorGetConfig()->master_use_repl_port) { for (unsigned i = 0; i < fetch_files.size(); i++) { - s = this->fetchFiles(sock_fd, {fetch_files[i]}, {crcs[i]}, fn); + s = this->fetchFiles(sock_fd, dir, {fetch_files[i]}, {crcs[i]}, fn); if (!s.IsOK()) break; } } else { - if (!fetch_files.empty()) s = this->fetchFiles(sock_fd, fetch_files, crcs, fn); + if (!fetch_files.empty()) { + s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn); + } } close(sock_fd); return s; @@ -711,8 +764,9 @@ Status ReplicationThread::sendAuth(int sock_fd) { return Status::OK(); } -Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, - std::string file, uint32_t crc, fetch_file_callback fn) { +Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, + const std::string &dir, std::string file, + uint32_t crc, fetch_file_callback fn) { size_t line_len, file_size; // Read file size line @@ -735,7 +789,7 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, } // Write to tmp file - auto tmp_file = Engine::Storage::BackupManager::NewTmpFile(storage_, file); + auto tmp_file = Engine::Storage::ReplDataManager::NewTmpFile(storage_, dir, file); if (!tmp_file) { return Status(Status::NotOK, "unable to create tmp file"); } @@ -759,13 +813,14 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, } } } - if (crc != tmp_crc) { + // Verify file crc checksum if crc is not 0 + if (crc && crc != tmp_crc) { char err_buf[64]; snprintf(err_buf, sizeof(err_buf), "CRC mismatched, %u was expected but got %u", crc, tmp_crc); return Status(Status::NotOK, err_buf); } // File is OK, rename to formal name - auto s = Engine::Storage::BackupManager::SwapTmpFile(storage_, file); + auto s = Engine::Storage::ReplDataManager::SwapTmpFile(storage_, dir, file); if (!s.IsOK()) return s; // Call fetch file callback function @@ -773,8 +828,9 @@ Status ReplicationThread::fetchFile(int sock_fd, evbuffer *evbuf, return Status::OK(); } -Status ReplicationThread::fetchFiles(int sock_fd, const std::vector &files, - const std::vector &crcs, fetch_file_callback fn) { +Status ReplicationThread::fetchFiles(int sock_fd, const std::string &dir, + const std::vector &files, const std::vector &crcs, + fetch_file_callback fn) { std::string files_str; for (auto file : files) { files_str += file; @@ -789,7 +845,7 @@ Status ReplicationThread::fetchFiles(int sock_fd, const std::vector evbuffer *evbuf = evbuffer_new(); for (unsigned i = 0; i < files.size(); i++) { DLOG(INFO) << "[fetch] Start to fetch file " << files[i]; - s = fetchFile(sock_fd, evbuf, files[i], crcs[i], fn); + s = fetchFile(sock_fd, evbuf, dir, files[i], crcs[i], fn); if (!s.IsOK()) { s = Status(Status::NotOK, "fetch file err: " + s.Msg()); LOG(WARNING) << "[fetch] Fail to fetch file " << files[i] << ", err: " << s.Msg(); diff --git a/src/replication.h b/src/replication.h index 1beaf0b4867..40704547dee 100644 --- a/src/replication.h +++ b/src/replication.h @@ -163,11 +163,14 @@ class ReplicationThread { // Synchronized-Blocking ops Status sendAuth(int sock_fd); - Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string file, - uint32_t crc, fetch_file_callback fn); - Status fetchFiles(int sock_fd, const std::vector &files, - const std::vector &crcs, fetch_file_callback fn); - Status parallelFetchFile(const std::vector> &files); + Status fetchFile(int sock_fd, evbuffer *evbuf, const std::string &dir, + const std::string file, uint32_t crc, fetch_file_callback fn); + Status fetchFiles(int sock_fd, const std::string &dir, + const std::vector &files, + const std::vector &crcs, + fetch_file_callback fn); + Status parallelFetchFile(const std::string &dir, + const std::vector> &files); static bool isRestoringError(const char *err); static void EventTimerCB(int, int16_t, void *ctx); diff --git a/src/server.cc b/src/server.cc index 28b7d17228e..58317b8293e 100644 --- a/src/server.cc +++ b/src/server.cc @@ -501,6 +501,26 @@ void Server::cron() { Status s = dynamicResizeBlockAndSST(); LOG(INFO) << "[server] Schedule to dynamic resize block and sst, result: " << s.Msg(); } + + // No replica uses this checkpoint, we can remove it. + if (counter != 0 && counter % 10 == 0) { + time_t create_time = storage_->GetCheckpointCreateTime(); + time_t access_time = storage_->GetCheckpointAccessTime(); + + // Maybe creating checkpoint costs much time if target dir is on another + // disk partition, so when we want to clean up checkpoint, we should guarantee + // that kvrocks is not creating checkpoint even if there is a checkpoint. + if (storage_->ExistCheckpoint() && storage_->IsCreatingCheckpoint() == false) { + // TODO(shooterit): support to config the alive time of checkpoint + if ((GetFetchFileThreadNum() == 0 && std::time(nullptr) - access_time > 30) || + (std::time(nullptr) - create_time > 24 * 60 * 60)) { + auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options()); + if (!s.ok()) { + LOG(WARNING) << "Fail to clean checkpoint, error: " << s.ToString(); + } + } + } + } cleanupExitedSlaves(); counter++; std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/src/storage.cc b/src/storage.cc index 91ebebddaa1..f11deb4c29d 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include "config.h" #include "redis_db.h" @@ -43,6 +44,9 @@ Storage::Storage(Config *config) lock_mgr_(16) { InitCRC32Table(); Metadata::InitVersionCounter(); + SetCreatingCheckpoint(false); + SetCheckpointCreateTime(0); + SetCheckpointAccessTime(0); } Storage::~Storage() { @@ -314,6 +318,51 @@ Status Storage::RestoreFromBackup() { return s.ok() ? Status::OK() : Status(Status::DBBackupErr, s.ToString()); } +Status Storage::RestoreFromCheckpoint() { + std::string dir = config_->sync_checkpoint_dir; + std::string tmp_dir = config_->db_dir + ".tmp"; + + // Clean old backups and checkpoints because server will work on the new db + PurgeOldBackups(0, 0); + rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options()); + + // Close db + CloseDB(); + + // Rename db dir to tmp, so we can restore if replica fails to load + // the checkpoint from master. + // But only try best effort to make data safe + auto s = backup_env_->RenameFile(config_->db_dir, tmp_dir); + if (!s.ok()) { + if (!Open().IsOK()) LOG(ERROR) << "[storage] Fail to reopen db"; + return Status(Status::NotOK, "Fail to rename db dir, error: " + s.ToString()); + } + + // Rename checkpoint dir to db dir + if (!(s = backup_env_->RenameFile(dir, config_->db_dir)).ok()) { + backup_env_->RenameFile(tmp_dir, config_->db_dir); + if (!Open().IsOK()) LOG(ERROR) << "[storage] Fail to reopen db"; + return Status(Status::NotOK, "Fail to rename checkpoint dir, error: " + s.ToString()); + } + + // Open the new db, restore if replica fails to open db + auto s2 = Open(); + if (!s2.IsOK()) { + LOG(WARNING) << "[storage] Fail to open master checkpoint, error: " << s2.Msg(); + rocksdb::DestroyDB(config_->db_dir, rocksdb::Options()); + backup_env_->RenameFile(tmp_dir, config_->db_dir); + if (!Open().IsOK()) LOG(ERROR) << "[storage] Fail to reopen db"; + return Status(Status::DBOpenErr, + "Fail to open master checkpoint, error: " + s2.Msg()); + } + + // Destory origin db + if (!(s = rocksdb::DestroyDB(tmp_dir, rocksdb::Options())).ok()) { + LOG(WARNING) << "[storage] Fail to destroy " << tmp_dir << ", error:" << s.ToString(); + } + return Status::OK(); +} + void Storage::PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours) { time_t now = time(nullptr); std::vector backup_infos; @@ -538,36 +587,97 @@ Status Storage::DecrDBRefs() { return Status::OK(); } -Status Storage::BackupManager::OpenLatestMeta(Storage *storage, - int *fd, - rocksdb::BackupID *meta_id, - uint64_t *file_size) { - Status status = storage->CreateBackup(); - if (!status.IsOK()) return status; - std::vector backup_infos; - storage->backup_->GetBackupInfo(&backup_infos); - auto latest_backup = backup_infos.back(); - rocksdb::Status r_status = storage->backup_->VerifyBackup(latest_backup.backup_id); - if (!r_status.ok()) { - return Status(Status::NotOK, r_status.ToString()); - } - *meta_id = latest_backup.backup_id; - std::string meta_file = - storage->config_->backup_dir + "/meta/" + std::to_string(*meta_id); - auto s = storage->backup_env_->FileExists(meta_file); - storage->backup_env_->GetFileSize(meta_file, file_size); - // NOTE: here we use the system's open instead of using rocksdb::Env to open - // a sequential file, because we want to use sendfile syscall. - *fd = open(meta_file.c_str(), O_RDONLY); - if (*fd < 0) { - return Status(Status::NotOK, strerror(errno)); +Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::string *files) { + std::string data_files_dir = storage->config_->checkpoint_dir; + std::unique_lock ulm(storage->checkpoint_mu_); + + // Create checkpoint if not exist + if (!storage->backup_env_->FileExists(data_files_dir).ok()) { + rocksdb::Checkpoint* checkpoint = NULL; + rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_, &checkpoint); + if (!s.ok()) { + LOG(WARNING) << "Fail to create checkpoint, error:" << s.ToString(); + return Status(Status::NotOK, s.ToString()); + } + std::unique_ptr checkpoint_guard(checkpoint); + + // Create checkpoint of rocksdb + storage->SetCreatingCheckpoint(true); + s = checkpoint->CreateCheckpoint(data_files_dir); + storage->SetCheckpointCreateTime(std::time(nullptr)); + storage->SetCheckpointAccessTime(std::time(nullptr)); + storage->SetCreatingCheckpoint(false); + if (!s.ok()) { + LOG(WARNING) << "[storage] Fail to create checkpoint, error:" << s.ToString(); + return Status(Status::NotOK, s.ToString()); + } + LOG(INFO) << "[storage] Create checkpoint successfully"; + } else { + // Replicas can share checkpiont to replication if the checkpoint existing + // time is less half of WAL ttl. + int64_t can_shared_time = storage->config_->RocksDB.WAL_ttl_seconds / 2; + if (can_shared_time > 60 * 60) can_shared_time = 60 * 60; + if (can_shared_time < 10 * 60) can_shared_time = 10 * 60; + if (std::time(nullptr) - storage->GetCheckpointCreateTime() > can_shared_time) { + LOG(WARNING) << "[storage] Can't use current checkpoint, waiting next checkpoint"; + return Status(Status::NotOK, "Can't use current checkpoint, waiting for next checkpoint"); + } + LOG(INFO) << "[storage] Use current existing checkpoint"; } + ulm.unlock(); + + // Get checkpoint file list + std::vector result; + storage->backup_env_->GetChildren(data_files_dir, &result); + for (auto f : result) { + if (f == "." || f == "..") continue; + files->append(f); + files->push_back(','); + } + files->pop_back(); return Status::OK(); } -int Storage::BackupManager::OpenDataFile(Storage *storage, const std::string &rel_path, - uint64_t *file_size) { - std::string abs_path = storage->config_->backup_dir + "/" + rel_path; +Status Storage::ReplDataManager::CleanInvalidFiles(Storage *storage, + const std::string &dir, std::vector valid_files) { + if (!storage->backup_env_->FileExists(dir).ok()) { + return Status::OK(); + } + + std::vector tmp_files, files; + storage->backup_env_->GetChildren(dir, &tmp_files); + for (auto file : tmp_files) { + if (file == "." || file == "..") continue; + files.push_back(file); + } + + // Find invalid files + std::sort(files.begin(), files.end()); + std::sort(valid_files.begin(), valid_files.end()); + std::vector invalid_files(files.size() + valid_files.size()); + auto it = std::set_difference(files.begin(), files.end(), + valid_files.begin(), valid_files.end(), invalid_files.begin()); + + // Delete invalid files + Status ret; + invalid_files.resize(it - invalid_files.begin()); + for (it = invalid_files.begin(); it != invalid_files.end(); ++it) { + auto s = storage->backup_env_->DeleteFile(dir + "/" + *it); + if (!s.ok()) { + ret = Status(Status::NotOK, s.ToString()); + LOG(INFO) << "[storage] Fail to delete invalid file " + << *it << " of master checkpoint"; + } else { + LOG(INFO) << "[storage] Succeed deleting invalid file " + << *it << " of master checkpoint"; + } + } + return ret; +} + +int Storage::ReplDataManager::OpenDataFile(Storage *storage, + const std::string &repl_file, uint64_t *file_size) { + std::string abs_path = storage->config_->checkpoint_dir + "/" + repl_file; auto s = storage->backup_env_->FileExists(abs_path); if (!s.ok()) { LOG(ERROR) << "[storage] Data file [" << abs_path << "] not found"; @@ -581,16 +691,16 @@ int Storage::BackupManager::OpenDataFile(Storage *storage, const std::string &re return rv; } -Storage::BackupManager::MetaInfo Storage::BackupManager::ParseMetaAndSave( +Storage::ReplDataManager::MetaInfo Storage::ReplDataManager::ParseMetaAndSave( Storage *storage, rocksdb::BackupID meta_id, evbuffer *evbuf) { char *line; size_t len; - Storage::BackupManager::MetaInfo meta; + Storage::ReplDataManager::MetaInfo meta; auto meta_file = "meta/" + std::to_string(meta_id); DLOG(INFO) << "[meta] id: " << meta_id; // Save the meta to tmp file - auto wf = NewTmpFile(storage, meta_file); + auto wf = NewTmpFile(storage, storage->config_->backup_dir, meta_file); auto data = evbuffer_pullup(evbuf, -1); wf->Append(rocksdb::Slice(reinterpret_cast(data), evbuffer_get_length(evbuf))); @@ -631,7 +741,7 @@ Storage::BackupManager::MetaInfo Storage::BackupManager::ParseMetaAndSave( meta.files.emplace_back(filename, crc32); free(line); } - SwapTmpFile(storage, meta_file); + SwapTmpFile(storage, storage->config_->backup_dir, meta_file); return meta; } @@ -651,21 +761,21 @@ Status MkdirRecursively(rocksdb::Env *env, const std::string &dir) { return Status(Status::NotOK); } -std::unique_ptr Storage::BackupManager::NewTmpFile( - Storage *storage, const std::string &rel_path) { - std::string tmp_path = storage->config_->backup_dir + "/" + rel_path + ".tmp"; - auto s = storage->backup_env_->FileExists(tmp_path); +std::unique_ptr Storage::ReplDataManager::NewTmpFile( + Storage *storage, const std::string &dir, const std::string &repl_file) { + std::string tmp_file = dir + "/" + repl_file + ".tmp"; + auto s = storage->backup_env_->FileExists(tmp_file); if (s.ok()) { LOG(ERROR) << "[storage] Data file exists, override"; - storage->backup_env_->DeleteFile(tmp_path); + storage->backup_env_->DeleteFile(tmp_file); } // Create directory if missing - auto abs_dir = tmp_path.substr(0, tmp_path.rfind('/')); + auto abs_dir = tmp_file.substr(0, tmp_file.rfind('/')); if (!MkdirRecursively(storage->backup_env_, abs_dir).IsOK()) { return nullptr; } std::unique_ptr wf; - s = storage->backup_env_->NewWritableFile(tmp_path, &wf, rocksdb::EnvOptions()); + s = storage->backup_env_->NewWritableFile(tmp_file, &wf, rocksdb::EnvOptions()); if (!s.ok()) { LOG(ERROR) << "[storage] Failed to create data file: " << s.ToString(); return nullptr; @@ -673,23 +783,27 @@ std::unique_ptr Storage::BackupManager::NewTmpFile( return wf; } -Status Storage::BackupManager::SwapTmpFile(Storage *storage, - const std::string &rel_path) { - std::string tmp_path = storage->config_->backup_dir + "/" + rel_path + ".tmp"; - std::string orig_path = storage->config_->backup_dir + "/" + rel_path; - if (!storage->backup_env_->RenameFile(tmp_path, orig_path).ok()) { - return Status(Status::NotOK, "unable to rename: "+tmp_path); +Status Storage::ReplDataManager::SwapTmpFile(Storage *storage, + const std::string &dir, const std::string &repl_file) { + std::string tmp_file = dir + "/" + repl_file + ".tmp"; + std::string orig_file = dir + "/" + repl_file; + if (!storage->backup_env_->RenameFile(tmp_file, orig_file).ok()) { + return Status(Status::NotOK, "unable to rename: "+tmp_file); } return Status::OK(); } -bool Storage::BackupManager::FileExists(Storage *storage, const std::string &rel_path, uint32_t crc) { +bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &dir, + const std::string &repl_file, uint32_t crc) { if (storage->IsClosing()) return false; - auto file_path = storage->config_->backup_dir + "/" + rel_path; + auto file_path = dir + "/" + repl_file; auto s = storage->backup_env_->FileExists(file_path); if (!s.ok()) return false; + // If crc is 0, we needn't verify, return true directly. + if (crc == 0) return true; + std::unique_ptr src_file; const rocksdb::EnvOptions soptions; s = storage->GetDB()->GetEnv()->NewSequentialFile(file_path, &src_file, soptions); diff --git a/src/storage.h b/src/storage.h index d9f0cd95091..afcae392cfb 100644 --- a/src/storage.h +++ b/src/storage.h @@ -47,6 +47,7 @@ class Storage { Status CreateBackup(); Status DestroyBackup(); Status RestoreFromBackup(); + Status RestoreFromCheckpoint(); Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr *iter); Status WriteBatch(std::string &&raw_batch); @@ -82,15 +83,20 @@ class Storage { Storage(const Storage &) = delete; Storage &operator=(const Storage &) = delete; - class BackupManager { + // Full replication data files manager + class ReplDataManager { public: // Master side - static Status OpenLatestMeta(Storage *storage, - int *fd, - rocksdb::BackupID *meta_id, - uint64_t *file_size); - static int OpenDataFile(Storage *storage, const std::string &rel_path, + static Status GetFullReplDataInfo(Storage *storage, std::string *files); + static int OpenDataFile(Storage *storage, const std::string &rel_file, uint64_t *file_size); + static Status CleanInvalidFiles(Storage *storage, + const std::string &dir, std::vector valid_files); + struct CheckpointInfo { + std::atomic is_creating; + std::atomic create_time; + std::atomic access_time; + }; // Slave side struct MetaInfo { @@ -104,11 +110,21 @@ class Storage { rocksdb::BackupID meta_id, evbuffer *evbuf); static std::unique_ptr NewTmpFile( - Storage *storage, const std::string &rel_path); - static Status SwapTmpFile(Storage *storage, const std::string &rel_path); - static bool FileExists(Storage *storage, const std::string &rel_path, uint32_t crc); + Storage *storage, const std::string &dir, const std::string &repl_file); + static Status SwapTmpFile(Storage *storage, const std::string &dir, + const std::string &repl_file); + static bool FileExists(Storage *storage, const std::string &dir, + const std::string &repl_file, uint32_t crc); }; + bool ExistCheckpoint() { return backup_env_->FileExists(config_->checkpoint_dir).ok(); } + void SetCreatingCheckpoint(bool yes_or_no) { checkpoint_info_.is_creating = yes_or_no; } + bool IsCreatingCheckpoint() { return checkpoint_info_.is_creating; } + void SetCheckpointCreateTime(time_t t) { checkpoint_info_.create_time = t; } + time_t GetCheckpointCreateTime() { return checkpoint_info_.create_time; } + void SetCheckpointAccessTime(time_t t) { checkpoint_info_.access_time = t; } + time_t GetCheckpointAccessTime() { return checkpoint_info_.access_time; } + private: rocksdb::DB *db_ = nullptr; std::mutex backup_mu_; @@ -116,6 +132,8 @@ class Storage { rocksdb::Env *backup_env_; std::shared_ptr sst_file_manager_; std::shared_ptr rate_limiter_; + ReplDataManager::CheckpointInfo checkpoint_info_; + std::mutex checkpoint_mu_; Config *config_ = nullptr; std::vector cf_handles_; LockManager lock_mgr_; diff --git a/tests/tcl/tests/integration/replication.tcl b/tests/tcl/tests/integration/replication.tcl index cf8c0235284..c19d23a29b6 100644 --- a/tests/tcl/tests/integration/replication.tcl +++ b/tests/tcl/tests/integration/replication.tcl @@ -30,9 +30,7 @@ start_server {tags {"repl"}} { start_server {} { test {Second server should have role master at first} { # Can't statify partial replication - for {set i 0} {$i < 1000} {incr i} { - r set $i $i - } + populate 100 "" 10 s role } {master} @@ -94,3 +92,35 @@ start_server {tags {"repl"}} { } } } + +start_server {tags {"repl"}} { + set A [srv 0 client] + populate 100 "" 10 + + start_server {} { + set B [srv 0 client] + populate 100 "" 10 + + start_server {} { + set C [srv 0 client] + set C_host [srv 0 host] + set C_port [srv 0 port] + populate 50 "" 10 + + test {Multi slaves full sync with master at the same time} { + $A slaveof $C_host $C_port + $B slaveof $C_host $C_port + + # Wait for finishing full replication + wait_for_condition 500 100 { + [string match {*connected*} [$A role]] && + [string match {*connected*} [$B role]] + } else { + fail "Slaves can't sync with master" + } + # Only 2 full sync + assert_equal 2 [s sync_full] + } + } + } +}