Skip to content

Commit

Permalink
Merge branch 'unstable' into test/2662
Browse files Browse the repository at this point in the history
  • Loading branch information
LindaSummer authored Dec 11, 2024
2 parents fe5ce62 + 6bbdc55 commit bbfb73f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 32 deletions.
56 changes: 32 additions & 24 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
#include <atomic>
#include <csignal>
#include <future>
#include <memory>
#include <string>
#include <string_view>
#include <thread>

#include "commands/error_constants.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "scope_exit.h"
#include "server/redis_reply.h"
Expand Down Expand Up @@ -557,7 +560,6 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
}

ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *bev) {
char *bulk_data = nullptr;
repl_state_.store(kReplConnected, std::memory_order_relaxed);
auto input = bufferevent_get_input(bev);
while (true) {
Expand All @@ -576,31 +578,38 @@ ReplicationThread::CBState ReplicationThread::incrementBatchLoopCB(bufferevent *
}
case Incr_batch_data:
// Read bulk data (batch data)
if (incr_bulk_len_ + 2 <= evbuffer_get_length(input)) { // We got enough data
bulk_data = reinterpret_cast<char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
if (incr_bulk_len_ + 2 > evbuffer_get_length(input)) { // If data not enough
return CBState::AGAIN;
}

const char *bulk_data =
reinterpret_cast<const char *>(evbuffer_pullup(input, static_cast<ssize_t>(incr_bulk_len_ + 2)));
std::string bulk_string = std::string(bulk_data, incr_bulk_len_);
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;

if (bulk_string == "ping") {
// master would send the ping heartbeat packet to check whether the slave was alive or not,
// don't write ping to db here.
if (bulk_string != "ping") {
auto s = storage_->ReplicaApplyWriteBatch(std::string(bulk_data, incr_bulk_len_));
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(bulk_string);
return CBState::RESTART;
}

s = parseWriteBatch(bulk_string);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(bulk_string)
<< ": " << s.Msg();
return CBState::RESTART;
}
}
evbuffer_drain(input, incr_bulk_len_ + 2);
incr_state_ = Incr_batch_size;
} else {
return CBState::AGAIN;
}

rocksdb::WriteBatch batch(std::move(bulk_string));

auto s = storage_->ReplicaApplyWriteBatch(&batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - Failed to write batch to local, " << s.Msg() << ". batch: 0x"
<< util::StringToHex(batch.Data());
return CBState::RESTART;
}

s = parseWriteBatch(batch);
if (!s.IsOK()) {
LOG(ERROR) << "[replication] CRITICAL - failed to parse write batch 0x" << util::StringToHex(batch.Data())
<< ": " << s.Msg();
return CBState::RESTART;
}

break;
}
}
Expand Down Expand Up @@ -981,8 +990,7 @@ void ReplicationThread::TimerCB(int, int16_t) {
}
}

Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
rocksdb::WriteBatch write_batch(batch_string);
Status ReplicationThread::parseWriteBatch(const rocksdb::WriteBatch &write_batch) {
WriteBatchHandler write_batch_handler;

auto db_status = write_batch.Iterate(&write_batch_handler);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "event_util.h"
#include "io_util.h"
#include "rocksdb/write_batch.h"
#include "server/redis_connection.h"
#include "status.h"
#include "storage/storage.h"
Expand Down Expand Up @@ -209,7 +210,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
static bool isWrongPsyncNum(std::string_view err);
static bool isUnknownOption(std::string_view err);

Status parseWriteBatch(const std::string &batch_string);
Status parseWriteBatch(const rocksdb::WriteBatch &write_batch);
};

/*
Expand Down
16 changes: 11 additions & 5 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include "redis_db.h"
#include "redis_metadata.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "rocksdb_crc32c.h"
#include "server/server.h"
#include "storage/batch_indexer.h"
Expand Down Expand Up @@ -766,22 +768,26 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write
return Write(ctx, options, batch->GetWriteBatch());
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
Status Storage::ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch) {
return applyWriteBatch(default_write_opts_, batch);
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
Status Storage::applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
auto s = db_->Write(options, &batch);
auto s = db_->Write(options, batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
return Status::OK();
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
return applyWriteBatch(options, &batch);
}

void Storage::RecordStat(StatType type, uint64_t v) {
switch (type) {
case StatType::FlushCount:
Expand Down
6 changes: 4 additions & 2 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "config/config.h"
#include "lock_manager.h"
#include "observer_or_unique.h"
#include "rocksdb/write_batch.h"
#include "status.h"

#if defined(__sparc__) || defined(__arm__)
Expand Down Expand Up @@ -230,7 +231,7 @@ class Storage {
Status RestoreFromBackup();
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq, std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
Status ReplicaApplyWriteBatch(rocksdb::WriteBatch *batch);
Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();

Expand Down Expand Up @@ -380,13 +381,14 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;

rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();
rocksdb::WriteOptions default_write_opts_;

// rocksdb used global block cache
std::shared_ptr<rocksdb::Cache> shared_block_cache_;

rocksdb::Status writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
Status applyWriteBatch(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *batch);
};

/// Context passes fixed snapshot and batch between APIs
Expand Down

0 comments on commit bbfb73f

Please sign in to comment.