Skip to content

Commit

Permalink
fix(namespace): didn't reload namespaces from DB after the full sync (#…
Browse files Browse the repository at this point in the history
…2527)

Namespaces would store inside DB if the repl-namespace-enabled was set to yes.
Then replicas will intercept and parse the increment replication log to
see if it needs to reload namespaces. But it won't have the namespace update log
when doing the full sync, so we need to reload from DB once the full replication is done.
  • Loading branch information
git-hulk authored Sep 8, 2024
1 parent 1d450ee commit a29df09
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
7 changes: 7 additions & 0 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,13 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish";
post_fullsync_cb_();

// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " << s.Msg();
}

// Switch to psync state machine again
psync_steps_.Start();
return CBState::QUIT;
Expand Down
3 changes: 2 additions & 1 deletion src/server/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ bool Namespace::IsAllowModify() const {
Status Namespace::loadFromDB(std::map<std::string, std::string>* db_tokens) const {
std::string value;
engine::Context ctx(storage_);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf_, kNamespaceDBKey, &value);
auto cf = storage_->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf, kNamespaceDBKey, &value);
if (!s.ok()) {
if (s.IsNotFound()) return Status::OK();
return {Status::NotOK, s.ToString()};
Expand Down
4 changes: 1 addition & 3 deletions src/server/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__";

class Namespace {
public:
explicit Namespace(engine::Storage *storage)
: storage_(storage), cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {}
explicit Namespace(engine::Storage *storage) : storage_(storage) {}

~Namespace() = default;
Namespace(const Namespace &) = delete;
Expand All @@ -45,7 +44,6 @@ class Namespace {

private:
engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *cf_ = nullptr;

std::shared_mutex tokens_mu_;
// mapping from token to namespace name
Expand Down
39 changes: 39 additions & 0 deletions tests/gocase/unit/namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package namespace

import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -252,6 +254,43 @@ func TestNamespaceReplicate(t *testing.T) {
})
}

func TestNamespaceReplicateWithFullSync(t *testing.T) {
config := map[string]string{
"rocksdb.write_buffer_size": "4",
"rocksdb.target_file_size_base": "16",
"rocksdb.max_write_buffer_number": "1",
"rocksdb.wal_ttl_seconds": "0",
"rocksdb.wal_size_limit_mb": "0",
"repl-namespace-enabled": "yes",
"requirepass": "123",
"masterauth": "123",
}
master := util.StartServer(t, config)
defer master.Close()
masterClient := master.NewClientWithOption(&redis.Options{Password: "123"})
defer func() { require.NoError(t, masterClient.Close()) }()

slave := util.StartServer(t, config)
defer slave.Close()
slaveClient := slave.NewClientWithOption(&redis.Options{Password: "123"})
defer func() { require.NoError(t, slaveClient.Close()) }()

ctx := context.Background()
value := strings.Repeat("a", 128*1024)
for i := 0; i < 1024; i++ {
require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err())
}
require.NoError(t, masterClient.Do(ctx, "NAMESPACE", "ADD", "foo", "bar").Err())

util.SlaveOf(t, slaveClient, master)
util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second)

// Namespaces should be replicated after the full sync
token, err := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, "bar", token)
}

func TestNamespaceRewrite(t *testing.T) {
password := "pwd"
srv := util.StartServerWithCLIOptions(t, false, map[string]string{
Expand Down

0 comments on commit a29df09

Please sign in to comment.