Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: simplify code by macros #1012

Merged
merged 4 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/dsn/dist/fmt_logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,19 @@
#define dcheck_le_replica(var1, var2) dassert_replica(var1 <= var2, "{} vs {}", var1, var2)
#define dcheck_gt_replica(var1, var2) dassert_replica(var1 > var2, "{} vs {}", var1, var2)
#define dcheck_lt_replica(var1, var2) dassert_replica(var1 < var2, "{} vs {}", var1, var2)

// Return the given status if condition is not true.
#define ERR_LOG_AND_RETURN_NOT_TRUE(s, err, ...) \
do { \
if (dsn_unlikely(!(s))) { \
derror_f("{}: {}", err, fmt::format(__VA_ARGS__)); \
return err; \
} \
} while (0)

// Return the given status if it is not ERR_OK.
#define ERR_LOG_AND_RETURN_NOT_OK(s, ...) \
do { \
error_code _err = (s); \
ERR_LOG_AND_RETURN_NOT_TRUE(_err == ERR_OK, _err, __VA_ARGS__); \
} while (0)
74 changes: 35 additions & 39 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ class replica_init_info
class replica_app_info
{
private:
dsn::app_info *_app;
app_info *_app;

public:
replica_app_info(dsn::app_info *app) { _app = app; }
replica_app_info(app_info *app) { _app = app; }
error_code load(const char *file);
error_code store(const char *file);
};

/// The store engine interface of Pegasus.
/// Inherited by pegasus::pegasus_server_impl
/// Inherited by dsn::apps::rrdb_service
/// Inherited by apps::rrdb_service
class replication_app_base : public replica_base
{
public:
Expand Down Expand Up @@ -109,7 +109,7 @@ class replication_app_base : public replica_base
//
// Open the app.
//
::dsn::error_code open();
error_code open();

//
// Cancel all background flush and compaction work.
Expand All @@ -122,22 +122,22 @@ class replication_app_base : public replica_base
//
// Must be thread safe.
//
::dsn::error_code close(bool clear_state);
error_code close(bool clear_state);

::dsn::error_code apply_checkpoint(chkpt_apply_mode mode, const learn_state &state);
::dsn::error_code apply_mutation(const mutation *mu);
error_code apply_checkpoint(chkpt_apply_mode mode, const learn_state &state);
error_code apply_mutation(const mutation *mu);

// methods need to implement on storage engine side
virtual ::dsn::error_code start(int argc, char **argv) = 0;
virtual ::dsn::error_code stop(bool clear_state) = 0;
virtual error_code start(int argc, char **argv) = 0;
virtual error_code stop(bool clear_state) = 0;
//
// synchonously checkpoint, and update last_durable_decree internally.
// which stops replication writes to the app concurrently.
//
// Postconditions:
// * last_committed_decree() == last_durable_decree()
//
virtual ::dsn::error_code sync_checkpoint() = 0;
virtual error_code sync_checkpoint() = 0;
//
// asynchonously checkpoint, which will not stall the normal write ops.
// replication layer will check last_durable_decree() later.
Expand All @@ -147,12 +147,12 @@ class replication_app_base : public replica_base
// It is not always necessary for the apps to implement this method,
// but if it is implemented, the checkpoint logic in replication will be much simpler.
//
virtual ::dsn::error_code async_checkpoint(bool flush_memtable) = 0;
virtual error_code async_checkpoint(bool flush_memtable) = 0;
//
// prepare an app-specific learning request (on learner, to be sent to learnee
// and used by method get_checkpoint), so that the learning process is more efficient
//
virtual ::dsn::error_code prepare_get_checkpoint(/*out*/ ::dsn::blob &learn_req) = 0;
virtual error_code prepare_get_checkpoint(/*out*/ blob &learn_req) = 0;
//
// Learn [start, infinite) from remote replicas (learner)
//
Expand All @@ -167,9 +167,9 @@ class replication_app_base : public replica_base
// get the
// full path of the files.
//
virtual ::dsn::error_code get_checkpoint(int64_t learn_start,
const ::dsn::blob &learn_request,
/*out*/ learn_state &state) = 0;
virtual error_code get_checkpoint(int64_t learn_start,
const blob &learn_request,
/*out*/ learn_state &state) = 0;
//
// [DSN_CHKPT_LEARN]
// after learn the state from learner, apply the learned state to the local app
Expand All @@ -188,25 +188,25 @@ class replication_app_base : public replica_base
// * if mode == DSN_CHKPT_LEARN, after apply_checkpoint() succeed:
// last_committed_decree() == last_durable_decree() == state.to_decree_included
//
virtual ::dsn::error_code storage_apply_checkpoint(chkpt_apply_mode mode,
const learn_state &state) = 0;
virtual error_code storage_apply_checkpoint(chkpt_apply_mode mode,
const learn_state &state) = 0;
//
// copy the latest checkpoint to checkpoint_dir, and the decree of the checkpoint
// copied will be assigned to checkpoint_decree if checkpoint_decree not null
//
// must be thread safe
//
virtual ::dsn::error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree,
bool flush_memtable = false) = 0;
virtual error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree,
bool flush_memtable = false) = 0;

//
// Query methods.
//
virtual ::dsn::replication::decree last_durable_decree() const = 0;
virtual ::dsn::replication::decree last_flushed_decree() const { return last_durable_decree(); }
virtual replication::decree last_durable_decree() const = 0;
virtual replication::decree last_flushed_decree() const { return last_durable_decree(); }
// return the error generated by storage engine
virtual int on_request(dsn::message_ex *request) = 0;
virtual int on_request(message_ex *request) = 0;

//
// Parameters:
Expand All @@ -217,7 +217,7 @@ class replication_app_base : public replica_base
//
virtual int on_batched_write_requests(int64_t decree,
uint64_t timstamp,
dsn::message_ex **requests,
message_ex **requests,
int request_length);

// query compact state.
Expand All @@ -237,7 +237,7 @@ class replication_app_base : public replica_base
virtual void set_partition_version(int32_t partition_version){};

// dump the write request some info to string, it may need overload
virtual std::string dump_write_request(dsn::message_ex *request) { return "write request"; };
virtual std::string dump_write_request(message_ex *request) { return "write request"; };

virtual void set_ingestion_status(ingestion_status::type status) {}

Expand All @@ -246,7 +246,7 @@ class replication_app_base : public replica_base
virtual void on_detect_hotkey(const detect_hotkey_request &req,
/*out*/ detect_hotkey_response &resp)
{
resp.err = dsn::ERR_OBJECT_NOT_FOUND;
resp.err = ERR_OBJECT_NOT_FOUND;
resp.__set_err_hint("on_detect_hotkey implementation not found");
}

Expand All @@ -264,10 +264,7 @@ class replication_app_base : public replica_base
const std::string &backup_dir() const { return _dir_backup; }
const std::string &bulk_load_dir() const { return _dir_bulk_load; }
const app_info *get_app_info() const;
::dsn::replication::decree last_committed_decree() const
{
return _last_committed_decree.load();
}
replication::decree last_committed_decree() const { return _last_committed_decree.load(); }

private:
// routines for replica internal usage
Expand All @@ -276,16 +273,15 @@ class replication_app_base : public replica_base
friend class mock_replica;
friend class replica_disk_migrator;

::dsn::error_code open_internal(replica *r);
::dsn::error_code
open_new_internal(replica *r, int64_t shared_log_start, int64_t private_log_start);
error_code open_internal(replica *r);
error_code open_new_internal(replica *r, int64_t shared_log_start, int64_t private_log_start);

const replica_init_info &init_info() const { return _info; }
::dsn::error_code update_init_info(replica *r,
int64_t shared_log_offset,
int64_t private_log_offset,
int64_t durable_decree);
::dsn::error_code update_init_info_ballot_and_decree(replica *r);
error_code update_init_info(replica *r,
int64_t shared_log_offset,
int64_t private_log_offset,
int64_t durable_decree);
error_code update_init_info_ballot_and_decree(replica *r);

protected:
std::string _dir_data; // ${replica_dir}/data
Expand All @@ -296,7 +292,7 @@ class replication_app_base : public replica_base
std::atomic<int64_t> _last_committed_decree;
replica_init_info _info;

explicit replication_app_base(::dsn::replication::replica *replica);
explicit replication_app_base(replication::replica *replica);
};

} // namespace replication
Expand Down
Loading