Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of the BF.INSERT command #1768

Merged
merged 5 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CommandBFReserve : public Commander {
Status Parse(const std::vector<std::string> &args) override {
auto parse_error_rate = ParseFloat<double>(args[2]);
if (!parse_error_rate) {
return {Status::RedisParseErr, errValueIsNotFloat};
return {Status::RedisParseErr, "Bad error rate"};
zncleon marked this conversation as resolved.
Show resolved Hide resolved
}
error_rate_ = *parse_error_rate;
if (error_rate_ >= 1 || error_rate_ <= 0) {
Expand All @@ -40,7 +40,7 @@ class CommandBFReserve : public Commander {

auto parse_capacity = ParseInt<uint32_t>(args[3], 10);
if (!parse_capacity) {
return {Status::RedisParseErr, errValueNotInteger};
return {Status::RedisParseErr, "Bad capacity"};
}
capacity_ = *parse_capacity;
if (capacity_ <= 0) {
Expand All @@ -56,7 +56,11 @@ class CommandBFReserve : public Commander {
expansion_ = 0;
} else if (parser.EatEqICase("expansion")) {
has_expansion = true;
expansion_ = GET_OR_RET(parser.TakeInt<uint16_t>());
auto parse_expansion = parser.TakeInt<uint16_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, "Bad expansion"};
}
expansion_ = parse_expansion.GetValue();
if (expansion_ < 1) {
return {Status::RedisParseErr, "expansion should be greater or equal to 1"};
}
Expand Down Expand Up @@ -147,6 +151,103 @@ class CommandBFMAdd : public Commander {
std::vector<Slice> items_;
};

class CommandBFInsert : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
bool is_nonscaling = false;
bool has_expansion = false;
while (parser.Good()) {
if (parser.EatEqICase("capacity")) {
auto parse_capacity = parser.TakeInt<uint32_t>();
if (!parse_capacity.IsOK()) {
return {Status::RedisParseErr, "Bad capacity"};
zncleon marked this conversation as resolved.
Show resolved Hide resolved
}
insert_options_.capacity = parse_capacity.GetValue();
if (insert_options_.capacity <= 0) {
return {Status::RedisParseErr, "capacity should be larger than 0"};
}
} else if (parser.EatEqICase("error")) {
auto parse_error_rate = parser.TakeFloat<double>();
if (!parse_error_rate.IsOK()) {
return {Status::RedisParseErr, "Bad error rate"};
}
insert_options_.error_rate = parse_error_rate.GetValue();
if (insert_options_.error_rate >= 1 || insert_options_.error_rate <= 0) {
return {Status::RedisParseErr, "error rate should be between 0 and 1"};
}
} else if (parser.EatEqICase("nocreate")) {
insert_options_.auto_create = false;
} else if (parser.EatEqICase("nonscaling")) {
is_nonscaling = true;
insert_options_.expansion = 0;
} else if (parser.EatEqICase("expansion")) {
has_expansion = true;
auto parse_expansion = parser.TakeInt<uint16_t>();
if (!parse_expansion.IsOK()) {
return {Status::RedisParseErr, "Bad expansion"};
}
insert_options_.expansion = parse_expansion.GetValue();
if (insert_options_.expansion < 1) {
return {Status::RedisParseErr, "expansion should be greater or equal to 1"};
}
} else if (parser.EatEqICase("items")) {
break;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}

if (is_nonscaling && has_expansion) {
return {Status::RedisParseErr, "nonscaling filters cannot expand"};
}

auto items_begin =
find_if(args.begin(), args.end(), [](std::string_view lhs) { return lhs == "items" || lhs == "ITEMS"; });
zncleon marked this conversation as resolved.
Show resolved Hide resolved

if (items_begin == args.end()) {
zncleon marked this conversation as resolved.
Show resolved Hide resolved
return {Status::RedisParseErr, errInvalidSyntax};
}
items_.reserve(args_.end() - items_begin);
for (auto item_iter = items_begin + 1; item_iter < args.end(); ++item_iter) {
items_.emplace_back(*item_iter);
}
zncleon marked this conversation as resolved.
Show resolved Hide resolved
if (items_.size() == 0) {
return {Status::RedisParseErr, "num of items should be greater than 0"};
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.InsertCommon(args_[1], items_, insert_options_, &rets);
if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
for (size_t i = 0; i < items_.size(); ++i) {
switch (rets[i]) {
case BloomFilterAddResult::kOk:
*output += redis::Integer(1);
break;
case BloomFilterAddResult::kExist:
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error("ERR nonscaling filter is full");
break;
}
}
return Status::OK();
}

private:
std::vector<Slice> items_;
BloomFilterInsertOptions insert_options_;
};

class CommandBFExists : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -277,6 +378,7 @@ class CommandBFCard : public Commander {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandBFInsert>("bf.insert", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 1, 1, 1),
Expand Down
12 changes: 10 additions & 2 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,22 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, BloomF

rocksdb::Status BloomChain::MAdd(const Slice &user_key, const std::vector<Slice> &items,
std::vector<BloomFilterAddResult> *rets) {
BloomFilterInsertOptions insert_options;
return InsertCommon(user_key, items, insert_options, rets);
}

rocksdb::Status BloomChain::InsertCommon(const Slice &user_key, const std::vector<Slice> &items,
const BloomFilterInsertOptions &insert_options,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);

if (s.IsNotFound()) {
s = createBloomChain(ns_key, kBFDefaultErrorRate, kBFDefaultInitCapacity, kBFDefaultExpansion, &metadata);
if (s.IsNotFound() && insert_options.auto_create) {
s = createBloomChain(ns_key, insert_options.error_rate, insert_options.capacity, insert_options.expansion,
&metadata);
}
if (!s.ok()) return s;

Expand Down
9 changes: 9 additions & 0 deletions src/types/redis_bloom_chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ enum class BloomFilterAddResult {
kFull,
};

struct BloomFilterInsertOptions {
double error_rate = kBFDefaultErrorRate;
uint32_t capacity = kBFDefaultInitCapacity;
uint16_t expansion = kBFDefaultExpansion;
bool auto_create = true;
};

struct BloomFilterInfo {
uint32_t capacity;
uint32_t bloom_bytes;
Expand All @@ -59,6 +66,8 @@ class BloomChain : public Database {
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, BloomFilterAddResult *ret);
rocksdb::Status MAdd(const Slice &user_key, const std::vector<Slice> &items, std::vector<BloomFilterAddResult> *rets);
rocksdb::Status InsertCommon(const Slice &user_key, const std::vector<Slice> &items,
const BloomFilterInsertOptions &insert_options, std::vector<BloomFilterAddResult> *rets);
rocksdb::Status Exists(const Slice &user_key, const Slice &item, bool *exist);
rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> &items, std::vector<bool> *exists);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
Expand Down
94 changes: 86 additions & 8 deletions tests/gocase/unit/type/bloom/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func TestBloom(t *testing.T) {

t.Run("Reserve a bloom filter with wrong error_rate", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "abc", "1000").Err(), "ERR value is not a valid float")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "abc", "1000").Err(), "ERR Bad error rate")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "-0.03", "1000").Err(), "ERR error rate should be between 0 and 1")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "1", "1000").Err(), "ERR error rate should be between 0 and 1")
})

t.Run("Reserve a bloom filter with wrong capacity", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "qwe").Err(), "ERR value is not an integer")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "qwe").Err(), "ERR Bad capacity")
// capacity stored in uint32_t, if input is negative, the parser will make an error.
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "-1000").Err(), "ERR value is not an integer or out of range")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "-1000").Err(), "ERR Bad capacity")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "0").Err(), "ERR capacity should be larger than 0")
})

Expand All @@ -69,12 +69,12 @@ func TestBloom(t *testing.T) {

t.Run("Reserve a bloom filter with wrong expansion", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion").Err(), "ERR no more item to parse")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion").Err(), "Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "0").Err(), "ERR expansion should be greater or equal to 1")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "asd").Err(), "ERR not started as an integer")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "-1").Err(), "ERR out of range of integer type")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "1.5").Err(), "ERR encounter non-integer characters")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "123asd").Err(), "ERR encounter non-integer characters")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "asd").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "-1").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "1.5").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", "1000", "expansion", "123asd").Err(), "ERR Bad expansion")
})

t.Run("Reserve a bloom filter with nonscaling and expansion", func(t *testing.T) {
Expand Down Expand Up @@ -227,6 +227,84 @@ func TestBloom(t *testing.T) {
require.Equal(t, []interface{}{"Capacity", int64(210), "Size", int64(896), "Number of filters", int64(3), "Number of items inserted", int64(91), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("Insert but not create", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "nocreate", "items", "items1").Err(), "key is not found")

})

t.Run("Insert with error_rate", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "error", "abc", "items", "items1").Err(), "ERR Bad error rate")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "error", "-0.03", "items", "items1").Err(), "ERR error rate should be between 0 and 1")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "error", "1", "items", "items1").Err(), "ERR error rate should be between 0 and 1")

require.NoError(t, rdb.Do(ctx, "bf.insert", key, "error", "0.0001", "items", "items1").Err())
require.Equal(t, []interface{}{"Capacity", int64(100), "Size", int64(512), "Number of filters", int64(1), "Number of items inserted", int64(1), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("Insert with capacity", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "capacity", "qwe", "items", "items1").Err(), "ERR Bad capacity")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "capacity", "-1000", "items", "items1").Err(), "ERR Bad capacity")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "capacity", "0", "items", "items1").Err(), "ERR capacity should be larger than 0")

require.NoError(t, rdb.Do(ctx, "bf.insert", key, "capacity", "200", "items", "items1").Err())
require.Equal(t, []interface{}{"Capacity", int64(200), "Size", int64(256), "Number of filters", int64(1), "Number of items inserted", int64(1), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("Insert with nonscaling", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.insert", key, "nonscaling", "items", "items1").Err())

require.Equal(t, redis.Nil, rdb.Do(ctx, "bf.info", key, "expansion").Err())
})

t.Run("Insert with expansion", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "items", "items1").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "0", "items", "items1").Err(), "ERR expansion should be greater or equal to 1")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "asd", "items", "items1").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "-1", "items", "items1").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "1.5", "items", "items1").Err(), "ERR Bad expansion")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "123asd", "items", "items1").Err(), "ERR Bad expansion")

require.NoError(t, rdb.Do(ctx, "bf.insert", key, "expansion", "3", "items", "items1").Err())
require.Equal(t, []interface{}{"Capacity", int64(100), "Size", int64(128), "Number of filters", int64(1), "Number of items inserted", int64(1), "Expansion rate", int64(3)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("Insert with nonscaling and expansion", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "expansion", "1", "nonscaling", "items", "items1").Err(), "ERR nonscaling filters cannot expand")
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "nonscaling", "expansion", "1", "items", "items1").Err(), "ERR nonscaling filters cannot expand")
})

t.Run("Insert items", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.insert", key, "capacity", "100", "items").Err(), "ERR num of items should be greater than 0")

require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

require.Equal(t, []interface{}{int64(1), int64(1)}, rdb.Do(ctx, "bf.insert", key, "items", "xxx", "zzz").Val())
require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

// add the existed value
require.Equal(t, []interface{}{int64(0)}, rdb.Do(ctx, "bf.insert", key, "items", "zzz").Val())
require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())

// add the same value
require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx, "bf.insert", key, "items", "yyy", "yyy").Val())
require.Equal(t, []interface{}{int64(1), int64(1), int64(1)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
})

t.Run("Insert would not change existed bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", "1000", "expansion", "3").Err())
require.NoError(t, rdb.Do(ctx, "bf.insert", key, "error", "0.01", "capacity", "2000", "expansion", "4", "items", "xxx", "zzz").Err())
require.Equal(t, []interface{}{"Capacity", int64(1000), "Size", int64(2048), "Number of filters", int64(1), "Number of items inserted", int64(2), "Expansion rate", int64(3)}, rdb.Do(ctx, "bf.info", key).Val())
})

t.Run("MExists Basic Test", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
Expand Down
Loading