Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:result wrong when commands call redis interface #2596

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Status Redis::HDel(const Slice& key, const std::vector<std::string>& fields, int
std::string meta_value;
int32_t del_cnt = 0;
uint64_t version = 0;
ScopeRecordLock l(lock_mgr_, key);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;

Expand Down Expand Up @@ -280,6 +281,7 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fvs, int
Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -357,6 +359,7 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
Status Redis::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value) {
new_value->clear();
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -550,6 +553,7 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -613,6 +617,7 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {

Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -677,6 +682,7 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int

Status Redis::HSetnx(const Slice& key, const Slice& field, const Slice& value, int32_t* ret) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -1024,6 +1030,8 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st

Status Redis::HashesExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1047,6 +1055,8 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl) {

Status Redis::HashesDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1067,6 +1077,8 @@ Status Redis::HashesDel(const Slice& key) {

Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1089,6 +1101,8 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp) {

Status Redis::HashesPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kHashesMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
17 changes: 17 additions & 0 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Status Redis::LInsert(const Slice& key, const BeforeOrAfter& before_or_after, co
const std::string& value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -265,6 +266,7 @@ Status Redis::LPop(const Slice& key, int64_t count, std::vector<std::string>* el
elements->clear();

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;

Expand Down Expand Up @@ -310,6 +312,7 @@ Status Redis::LPop(const Slice& key, int64_t count, std::vector<std::string>* el
Status Redis::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t index = 0;
uint64_t version = 0;
Expand Down Expand Up @@ -357,6 +360,7 @@ Status Redis::LPush(const Slice& key, const std::vector<std::string>& values, ui
Status Redis::LPushx(const Slice& key, const std::vector<std::string>& values, uint64_t* len) {
*len = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;

Expand Down Expand Up @@ -501,6 +505,7 @@ Status Redis::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop, std::
Status Redis::LRem(const Slice& key, int64_t count, const Slice& value, uint64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -622,6 +627,7 @@ Status Redis::LRem(const Slice& key, int64_t count, const Slice& value, uint64_t

Status Redis::LSet(const Slice& key, int64_t index, const Slice& value) {
uint32_t statistic = 0;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -653,6 +659,7 @@ Status Redis::LSet(const Slice& key, int64_t index, const Slice& value) {

Status Redis::LTrim(const Slice& key, int64_t start, int64_t stop) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint32_t statistic = 0;
std::string meta_value;
Expand Down Expand Up @@ -716,6 +723,7 @@ Status Redis::RPop(const Slice& key, int64_t count, std::vector<std::string>* el
elements->clear();

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;

Expand Down Expand Up @@ -763,6 +771,7 @@ Status Redis::RPoplpush(const Slice& source, const Slice& destination, std::stri
uint32_t statistic = 0;
Status s;
rocksdb::WriteBatch batch;
MultiScopeRecordLock l(lock_mgr_, {source.ToString(), destination.ToString()});
if (source.compare(destination) == 0) {
std::string meta_value;
BaseMetaKey base_source(source);
Expand Down Expand Up @@ -928,6 +937,7 @@ Status Redis::RPushx(const Slice& key, const std::vector<std::string>& values, u
*len = 0;
rocksdb::WriteBatch batch;

ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;

BaseMetaKey base_meta_key(key);
Expand Down Expand Up @@ -958,6 +968,8 @@ Status Redis::RPushx(const Slice& key, const std::vector<std::string>& values, u

Status Redis::ListsExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -981,6 +993,8 @@ Status Redis::ListsExpire(const Slice& key, int64_t ttl) {

Status Redis::ListsDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1001,6 +1015,8 @@ Status Redis::ListsDel(const Slice& key) {

Status Redis::ListsExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1023,6 +1039,7 @@ Status Redis::ListsExpireat(const Slice& key, int64_t timestamp) {

Status Redis::ListsPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
BaseMetaKey base_meta_key(key);
Status s = db_->Get(default_read_options_, handles_[kListsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
16 changes: 16 additions & 0 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ rocksdb::Status Redis::SAdd(const Slice& key, const std::vector<std::string>& me
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
uint64_t version = 0;
std::string meta_value;

Expand Down Expand Up @@ -282,6 +283,7 @@ rocksdb::Status Redis::SDiffstore(const Slice& destination, const std::vector<st

std::string meta_value;
uint64_t version = 0;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -462,6 +464,7 @@ rocksdb::Status Redis::SInterstore(const Slice& destination, const std::vector<s
std::string meta_value;
uint64_t version = 0;
bool have_invalid_sets = false;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -684,6 +687,8 @@ rocksdb::Status Redis::SMove(const Slice& source, const Slice& destination, cons
uint32_t statistic = 0;
std::string meta_value;
std::vector<std::string> keys{source.ToString(), destination.ToString()};
MultiScopeRecordLock ml(lock_mgr_, keys);

if (source == destination) {
*ret = 1;
return rocksdb::Status::OK();
Expand Down Expand Up @@ -775,6 +780,7 @@ rocksdb::Status Redis::SPop(const Slice& key, std::vector<std::string>* members,

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t start_us = pstd::NowMicros();

Expand Down Expand Up @@ -880,6 +886,7 @@ rocksdb::Status Redis::SRandmember(const Slice& key, int32_t count, std::vector<

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::vector<int32_t> targets;
std::unordered_set<int32_t> unique;

Expand Down Expand Up @@ -942,6 +949,7 @@ rocksdb::Status Redis::SRandmember(const Slice& key, int32_t count, std::vector<
rocksdb::Status Redis::SRem(const Slice& key, const std::vector<std::string>& members, int32_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -1192,6 +1200,8 @@ rocksdb::Status Redis::SScan(const Slice& key, int64_t cursor, const std::string

rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1215,6 +1225,8 @@ rocksdb::Status Redis::SetsExpire(const Slice& key, int64_t ttl) {

rocksdb::Status Redis::SetsDel(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1235,6 +1247,8 @@ rocksdb::Status Redis::SetsDel(const Slice& key) {

rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand All @@ -1257,6 +1271,8 @@ rocksdb::Status Redis::SetsExpireat(const Slice& key, int64_t timestamp) {

rocksdb::Status Redis::SetsPersist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);

BaseMetaKey base_meta_key(key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[kSetsMetaCF], base_meta_key.Encode(), &meta_value);
if (s.ok()) {
Expand Down
Loading
Loading