Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dense_set.SetExpiryTime in preparation for fieldexpire #3780

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end)
}
}

void DenseSet::IteratorBase::SetExpiryTime(uint32_t ttl_sec) {
if (!HasExpiry()) {
auto src = curr_entry_->GetObject();
void* new_obj = owner_->ObjectClone(src, false, true);
curr_entry_->SetObject(new_obj);
curr_entry_->SetTtl(true);
owner_->ObjDelete(src, false);
}
owner_->ObjUpdateExpireTime(curr_entry_->GetObject(), ttl_sec);
}

void DenseSet::IteratorBase::Advance() {
bool step_link = false;
DCHECK(curr_entry_);
Expand Down Expand Up @@ -211,7 +222,7 @@ void DenseSet::CloneBatch(unsigned len, CloneItem* items, DenseSet* other) const
auto& src = items[i];
if (src.obj) {
// The majority of the CPU is spent in this block.
void* new_obj = other->ObjectClone(src.obj, src.has_ttl);
void* new_obj = other->ObjectClone(src.obj, src.has_ttl, false);
uint64_t hash = Hash(src.obj, 0);
other->AddUnique(new_obj, src.has_ttl, hash);
src.obj = nullptr;
Expand Down
5 changes: 4 additions & 1 deletion src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class DenseSet {
return curr_entry_->HasTtl() ? owner_->ObjExpireTime(curr_entry_->GetObject()) : UINT32_MAX;
}

void SetExpiryTime(uint32_t ttl_sec);

bool HasExpiry() const {
return curr_entry_->HasTtl();
}
Expand Down Expand Up @@ -265,8 +267,9 @@ class DenseSet {
virtual bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const = 0;
virtual size_t ObjectAllocSize(const void* obj) const = 0;
virtual uint32_t ObjExpireTime(const void* obj) const = 0;
virtual void ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) = 0;
virtual void ObjDelete(void* obj, bool has_ttl) const = 0;
virtual void* ObjectClone(const void* obj, bool has_ttl) const = 0;
virtual void* ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const = 0;

void CollectExpired();

Expand Down
6 changes: 5 additions & 1 deletion src/core/score_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,16 @@ uint32_t ScoreMap::ObjExpireTime(const void* obj) const {
return UINT32_MAX;
}

void ScoreMap::ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) {
// Should not reach.
}

void ScoreMap::ObjDelete(void* obj, bool has_ttl) const {
sds s1 = (sds)obj;
sdsfree(s1);
}

void* ScoreMap::ObjectClone(const void* obj, bool has_ttl) const {
void* ScoreMap::ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const {
return nullptr;
}

Expand Down
5 changes: 3 additions & 2 deletions src/core/score_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ class ScoreMap : public DenseSet {
bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const final;
size_t ObjectAllocSize(const void* obj) const final;
uint32_t ObjExpireTime(const void* obj) const final;
void ObjDelete(void* obj, bool has_ttl) const final;
void* ObjectClone(const void* obj, bool has_ttl) const final;
void ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) override;
void ObjDelete(void* obj, bool has_ttl) const override;
void* ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const final;
};

} // namespace dfly
8 changes: 8 additions & 0 deletions src/core/sds_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "core/sds_utils.h"

#include "base/endian.h"

extern "C" {
#include "redis/sds.h"
#include "redis/zmalloc.h"
Expand Down Expand Up @@ -43,6 +45,12 @@ inline int SdsHdrSize(char type) {

} // namespace

void SdsUpdateExpireTime(const void* obj, uint32_t time_at, uint32_t offset) {
sds str = (sds)obj;
char* valptr = str + sdslen(str) + 1;
absl::little_endian::Store32(valptr + offset, time_at);
}

char* AllocSdsWithSpace(uint32_t strlen, uint32_t space) {
size_t usable;
char type = SdsReqType(strlen);
Expand Down
3 changes: 3 additions & 0 deletions src/core/sds_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ namespace dfly {
// sds string (keys) with metadata attached to them.
char* AllocSdsWithSpace(uint32_t strlen, uint32_t space);

// Updates the expire time of the sds object. The offset is the number of bytes
void SdsUpdateExpireTime(const void* obj, uint32_t time_at, uint32_t offset);

} // namespace dfly
13 changes: 11 additions & 2 deletions src/core/string_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,24 @@ uint32_t StringMap::ObjExpireTime(const void* obj) const {
return UINT32_MAX;
}

void StringMap::ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) {
return SdsUpdateExpireTime(obj, time_now() + ttl_sec, 8);
}

void StringMap::ObjDelete(void* obj, bool has_ttl) const {
sds s1 = (sds)obj;
sds value = GetValue(s1);
sdsfree(value);
sdsfree(s1);
}

void* StringMap::ObjectClone(const void* obj, bool has_ttl) const {
return nullptr;
void* StringMap::ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const {
uint32_t ttl_sec = add_ttl ? 0 : (has_ttl ? ObjExpireTime(obj) : UINT32_MAX);
sds str = (sds)obj;
auto pair = detail::SdsPair(str, GetValue(str));
auto [newkey, sdsval_tag] = CreateEntry(pair->first, pair->second, time_now(), ttl_sec);

return (void*)newkey;
}

detail::SdsPair StringMap::iterator::BreakToPair(void* obj) {
Expand Down
9 changes: 5 additions & 4 deletions src/core/string_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ class StringMap : public DenseSet {

using IteratorBase::ExpiryTime;
using IteratorBase::HasExpiry;
using IteratorBase::SetExpiryTime;
};

// Returns true if field was added
// otherwise updates its value and returns false.
bool AddOrUpdate(std::string_view field, std::string_view value, uint32_t ttl_sec = UINT32_MAX);

Expand All @@ -114,7 +114,7 @@ class StringMap : public DenseSet {

bool Contains(std::string_view s1) const;

/// @brief Returns value of the key or nullptr if key not found.
/// @brief Returns value of the key or an empty iterator if key not found.
/// @param key
/// @return sds
iterator Find(std::string_view member) {
Expand Down Expand Up @@ -157,8 +157,9 @@ class StringMap : public DenseSet {
bool ObjEqual(const void* left, const void* right, uint32_t right_cookie) const final;
size_t ObjectAllocSize(const void* obj) const final;
uint32_t ObjExpireTime(const void* obj) const final;
void ObjDelete(void* obj, bool has_ttl) const final;
void* ObjectClone(const void* obj, bool has_ttl) const final;
void ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) override;
void ObjDelete(void* obj, bool has_ttl) const override;
void* ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const final;
};

} // namespace dfly
19 changes: 19 additions & 0 deletions src/core/string_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,25 @@ TEST_F(StringMapTest, IterateExpired) {
EXPECT_EQ(it, sm_->end());
}

TEST_F(StringMapTest, SetFieldExpireHasExpiry) {
EXPECT_TRUE(sm_->AddOrUpdate("k1", "v1", 5));
auto k = sm_->Find("k1");
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 5);
k.SetExpiryTime(1);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 1);
}

TEST_F(StringMapTest, SetFieldExpireNoHasExpiry) {
EXPECT_TRUE(sm_->AddOrUpdate("k1", "v1"));
auto k = sm_->Find("k1");
EXPECT_FALSE(k.HasExpiry());
k.SetExpiryTime(1);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 1);
}

unsigned total_wasted_memory = 0;

TEST_F(StringMapTest, ReallocIfNeeded) {
Expand Down
48 changes: 22 additions & 26 deletions src/core/string_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ inline bool MayHaveTtl(sds s) {

sds AllocImmutableWithTtl(uint32_t len, uint32_t at) {
sds res = AllocSdsWithSpace(len, sizeof(at));
absl::little_endian::Store32(res + len + 1, at);
absl::little_endian::Store32(res + len + 1, at); // Save TTL

return res;
}
Expand All @@ -43,22 +43,8 @@ bool StringSet::AddSds(sds s1) {
}

bool StringSet::Add(string_view src, uint32_t ttl_sec) {
DCHECK_GT(ttl_sec, 0u); // ttl_sec == 0 would mean find and delete immediately

sds newsds = nullptr;
bool has_ttl = false;

if (ttl_sec == UINT32_MAX) {
newsds = sdsnewlen(src.data(), src.size());
} else {
uint32_t at = time_now() + ttl_sec;
DCHECK_LT(time_now(), at);

newsds = AllocImmutableWithTtl(src.size(), at);
if (!src.empty())
memcpy(newsds, src.data(), src.size());
has_ttl = true;
}
sds newsds = MakeSetSds(src, ttl_sec);
bool has_ttl = ttl_sec != UINT32_MAX;

if (AddOrFindObj(newsds, has_ttl) != nullptr) {
sdsfree(newsds);
Expand Down Expand Up @@ -129,22 +115,32 @@ uint32_t StringSet::ObjExpireTime(const void* str) const {
return absl::little_endian::Load32(ttlptr);
}

void StringSet::ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) {
return SdsUpdateExpireTime(obj, time_now() + ttl_sec, 0);
}

void StringSet::ObjDelete(void* obj, bool has_ttl) const {
sdsfree((sds)obj);
}

void* StringSet::ObjectClone(const void* obj, bool has_ttl) const {
void* StringSet::ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const {
sds src = (sds)obj;
if (has_ttl) {
size_t slen = sdslen(src);
char* ttlptr = src + slen + 1;
uint32_t at = absl::little_endian::Load32(ttlptr);
sds newsds = AllocImmutableWithTtl(slen, at);
if (slen)
memcpy(newsds, src, slen);
string_view sv{src, sdslen(src)};
uint32_t ttl_sec = add_ttl ? 0 : (has_ttl ? ObjExpireTime(obj) : UINT32_MAX);
return (void*)MakeSetSds(sv, ttl_sec);
}

sds StringSet::MakeSetSds(string_view src, uint32_t ttl_sec) const {
if (ttl_sec != UINT32_MAX) {
uint32_t at = time_now() + ttl_sec;

sds newsds = AllocImmutableWithTtl(src.size(), at);
if (!src.empty())
memcpy(newsds, src.data(), src.size());
return newsds;
}
return sdsnewlen(src, sdslen(src));

return sdsnewlen(src.data(), src.size());
}

} // namespace dfly
5 changes: 4 additions & 1 deletion src/core/string_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class StringSet : public DenseSet {

using IteratorBase::ExpiryTime;
using IteratorBase::HasExpiry;
using IteratorBase::SetExpiryTime;
};

iterator begin() {
Expand All @@ -111,8 +112,10 @@ class StringSet : public DenseSet {

size_t ObjectAllocSize(const void* s1) const override;
uint32_t ObjExpireTime(const void* obj) const override;
void ObjUpdateExpireTime(const void* obj, uint32_t ttl_sec) override;
void ObjDelete(void* obj, bool has_ttl) const override;
void* ObjectClone(const void* obj, bool has_ttl) const override;
void* ObjectClone(const void* obj, bool has_ttl, bool add_ttl) const override;
sds MakeSetSds(std::string_view src, uint32_t ttl_sec) const;
};

} // end namespace dfly
19 changes: 19 additions & 0 deletions src/core/string_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,25 @@ TEST_F(StringSetTest, Iteration) {
EXPECT_EQ(to_insert.size(), 0);
}

TEST_F(StringSetTest, SetFieldExpireHasExpiry) {
EXPECT_TRUE(ss_->Add("k1", 100));
auto k = ss_->Find("k1");
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 100);
k.SetExpiryTime(1);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 1);
}

TEST_F(StringSetTest, SetFieldExpireNoHasExpiry) {
EXPECT_TRUE(ss_->Add("k1"));
auto k = ss_->Find("k1");
EXPECT_FALSE(k.HasExpiry());
k.SetExpiryTime(10);
EXPECT_TRUE(k.HasExpiry());
EXPECT_EQ(k.ExpiryTime(), 10);
}

TEST_F(StringSetTest, Ttl) {
EXPECT_TRUE(ss_->Add("bla"sv, 1));
EXPECT_FALSE(ss_->Add("bla"sv, 1));
Expand Down
Loading