Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move some functions from 'replica' to 'replica_stub' #1384

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/replica/disk_cleaner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "runtime/api_layer1.h"

#include <fmt/format.h>
#include "disk_cleaner.h"

namespace dsn {
Expand Down Expand Up @@ -125,5 +125,16 @@ error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
}
return error_s::ok();
}

void move_to_err_path(const std::string &path, const std::string &log_prefix)
{
const std::string new_path = fmt::format("{}.{}{}", path, dsn_now_us(), kFolderSuffixErr);
CHECK(dsn::utils::filesystem::rename_path(path, new_path),
"{}: failed to move directory from '{}' to '{}'",
log_prefix,
path,
new_path);
LOG_WARNING("{}: succeed to move directory from '{}' to '{}'", log_prefix, path, new_path);
}
} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/replica/disk_cleaner.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,7 @@ inline bool is_data_dir_invalid(const std::string &dir)
const std::string folder_suffix = dir.substr(dir.length() - 4);
return is_data_dir_removable(dir) || folder_suffix == kFolderSuffixBak;
}

void move_to_err_path(const std::string &path, const std::string &log_prefix);
} // namespace replication
} // namespace dsn
16 changes: 0 additions & 16 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
public:
~replica(void);

//
// routines for replica stub
//
static replica *load(replica_stub *stub, const char *dir);
// {parent_dir} is used in partition split for get_child_dir in replica_stub
static replica *newr(replica_stub *stub,
gpid gpid,
const app_info &app,
bool restore_if_necessary,
bool is_duplication_follower,
const std::string &parent_dir = "");

// return true when the mutation is valid for the current replica
bool replay_mutation(mutation_ptr &mu, bool is_private);
void reset_prepare_list_after_replay();
Expand Down Expand Up @@ -491,10 +479,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// path = "" means using the default directory (`_dir`/.app_info)
error_code store_app_info(app_info &info, const std::string &path = "");

// clear replica if open failed
static replica *
clear_on_failure(replica_stub *stub, replica *rep, const std::string &path, const gpid &pid);

void update_app_max_replica_count(int32_t max_replica_count);
void update_app_name(const std::string &app_name);

Expand Down
23 changes: 12 additions & 11 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,13 +634,14 @@ bool replica::update_local_configuration(const replica_configuration &config,
partition_status::type old_status = status();
ballot old_ballot = get_ballot();

// skip unncessary configuration change
if (old_status == config.status && old_ballot == config.ballot)
// skip unnecessary configuration change
if (old_status == config.status && old_ballot == config.ballot) {
return true;
}

// skip invalid change
// but do not disable transitions to partition_status::PS_ERROR as errors
// must be handled immmediately
// must be handled immediately
switch (old_status) {
case partition_status::PS_ERROR: {
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
Expand Down Expand Up @@ -716,8 +717,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
break;
}

bool r = false;
uint64_t oldTs = _last_config_change_time_ms;
uint64_t old_ts = _last_config_change_time_ms;
_config = config;
// we should durable the new ballot to prevent the inconsistent state
if (_config.ballot > old_ballot) {
Expand Down Expand Up @@ -827,8 +827,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
_prepare_list->truncate(_app->last_committed_decree());

// using force cleanup now as all tasks must be done already
r = _potential_secondary_states.cleanup(true);
CHECK(r, "{}: potential secondary context cleanup failed", name());
CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
"potential secondary context cleanup failed");

check_state_completeness();
break;
Expand All @@ -840,8 +840,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
_prepare_list->reset(_app->last_committed_decree());
_potential_secondary_states.cleanup(false);
// => do this in close as it may block
// r = _potential_secondary_states.cleanup(true);
// CHECK(r, "{}: potential secondary context cleanup failed", name());
// CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
// "potential secondary context cleanup failed");
break;
default:
CHECK(false, "invalid execution path");
Expand Down Expand Up @@ -942,7 +942,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
_prepare_list->last_committed_decree(),
_app->last_committed_decree(),
_app->last_durable_decree(),
_last_config_change_time_ms - oldTs,
_last_config_change_time_ms - old_ts,
boost::lexical_cast<std::string>(_config));

if (status() != old_status) {
Expand Down Expand Up @@ -985,8 +985,9 @@ bool replica::update_local_configuration(const replica_configuration &config,

bool replica::update_local_configuration_with_no_ballot_change(partition_status::type s)
{
if (status() == s)
if (status() == s) {
return false;
}

auto config = _config;
config.status = s;
Expand Down
132 changes: 0 additions & 132 deletions src/replica/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,61 +76,6 @@ error_code replica::initialize_on_new()
return init_app_and_prepare_list(true);
}

/*static*/ replica *replica::newr(replica_stub *stub,
gpid gpid,
const app_info &app,
bool restore_if_necessary,
bool is_duplication_follower,
const std::string &parent_dir)
{
std::string dir;
if (parent_dir.empty()) {
dir = stub->get_replica_dir(app.app_type.c_str(), gpid);
} else {
dir = stub->get_child_dir(app.app_type.c_str(), gpid, parent_dir);
}
replica *rep =
new replica(stub, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
error_code err;
if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err.to_string());
return clear_on_failure(stub, rep, dir, gpid);
}

if (is_duplication_follower &&
(err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) {
LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check "
"previous detail error log",
rep->name(),
err.to_string());
return clear_on_failure(stub, rep, dir, gpid);
}

err = rep->initialize_on_new();
if (err == ERR_OK) {
LOG_DEBUG("{}: new replica succeed", rep->name());
return rep;
} else {
LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err.to_string());
return clear_on_failure(stub, rep, dir, gpid);
}
}

/* static */ replica *replica::clear_on_failure(replica_stub *stub,
replica *rep,
const std::string &path,
const gpid &pid)
{
rep->close();
delete rep;
rep = nullptr;

// clear work on failure
utils::filesystem::remove_path(path);
stub->_fs_manager.remove_replica(pid);
return nullptr;
}

error_code replica::initialize_on_load()
{
LOG_INFO_PREFIX("initialize replica on load, dir = {}", _dir);
Expand All @@ -143,83 +88,6 @@ error_code replica::initialize_on_load()
return init_app_and_prepare_list(false);
}

/*static*/ replica *replica::load(replica_stub *stub, const char *dir)
{
FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });

char splitters[] = {'\\', '/', 0};
std::string name = utils::get_last_component(std::string(dir), splitters);
if (name == "") {
LOG_ERROR("invalid replica dir {}", dir);
return nullptr;
}

char app_type[128];
int32_t app_id, pidx;
if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
LOG_ERROR("invalid replica dir {}", dir);
return nullptr;
}

gpid pid(app_id, pidx);
if (!utils::filesystem::directory_exists(dir)) {
LOG_ERROR("replica dir {} not exist", dir);
return nullptr;
}

dsn::app_info info;
replica_app_info info2(&info);
std::string path = utils::filesystem::path_combine(dir, kAppInfo);
auto err = info2.load(path);
if (ERR_OK != err) {
LOG_ERROR("load app-info from {} failed, err = {}", path, err);
return nullptr;
}

if (info.app_type != app_type) {
LOG_ERROR("unmatched app type {} for {}", info.app_type, path);
return nullptr;
}

if (info.partition_count < pidx) {
LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, "
"ignore it",
pid,
info.partition_count);
return nullptr;
}

replica *rep = new replica(stub, pid, info, dir, false);

err = rep->initialize_on_load();
if (err == ERR_OK) {
LOG_INFO("{}: load replica succeed", rep->name());
return rep;
} else {
LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
rep->close();
delete rep;
rep = nullptr;

// clear work on failure
if (dsn::utils::filesystem::directory_exists(dir)) {
char rename_dir[1024];
sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us());
CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir),
"load_replica: failed to move directory '{}' to '{}'",
dir,
rename_dir);
LOG_WARNING("load_replica: replica_dir_op succeed to move directory '{}' to '{}'",
dir,
rename_dir);
stub->_counter_replicas_recent_replica_move_error_count->increment();
stub->_fs_manager.remove_replica(pid);
}

return nullptr;
}
}

decree replica::get_replay_start_decree()
{
decree replay_start_decree = _app->last_committed_decree();
Expand Down
Loading