Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement BITPOS. #907

Merged
merged 5 commits into from
Mar 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 158 additions & 13 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ OpResult<std::string> ReadValue(const DbContext& context, std::string_view key,
OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, uint32_t offset);
OpResult<std::size_t> CountBitsForValue(const OpArgs& op_args, std::string_view key, int64_t start,
int64_t end, bool bit_value);
OpResult<int64_t> FindFirstBitWithValue(const OpArgs& op_args, std::string_view key, bool value,
int64_t start, int64_t end, bool as_bit);
std::string GetString(const PrimeValue& pv, EngineShard* shard);
bool SetBitValue(uint32_t offset, bool bit_value, std::string* entry);
std::size_t CountBitSetByByteIndices(std::string_view at, std::size_t start, std::size_t end);
Expand All @@ -58,6 +60,23 @@ std::string RunBitOperationOnValues(std::string_view op, const BitsStrVec& value

// ------------------------------------------------------------------------- //

// Converts `args[i] to uppercase, then sets `*as_bit` to true if `args[i]` equals "BIT", false if
// `args[i]` equals "BYTE", or returns false if `args[i]` has some other invalid value.
bool ToUpperAndGetAsBit(CmdArgList args, size_t i, bool* as_bit) {
CHECK_NOTNULL(as_bit);
ToUpper(&args[i]);
std::string_view arg = ArgS(args, i);
if (arg == "BIT") {
*as_bit = true;
return true;
} else if (arg == "BYTE") {
*as_bit = false;
return true;
} else {
return false;
}
}

// This function can be used for any case where we allowing out of bound
// access where the default in this case would be 0 -such as bitop
uint8_t GetByteAt(std::string_view s, std::size_t at) {
Expand Down Expand Up @@ -184,19 +203,22 @@ std::size_t CountBitSetByBitIndices(std::string_view at, std::size_t start, std:
return count;
}

// Returns normalized offset of `offset` in `size`. `size` is assumed to be a size of a container,
// and as such the returned value is always in the range [0, size]. If `offset` is negative, it is
// treated as an offset from the end and is normalized to be a positive offset from the start.
int64_t NormalizedOffset(int64_t size, int64_t offset) {
if (offset < 0) {
offset = size + offset;
}
return std::min(std::max(offset, int64_t{0}), size);
}

// General purpose function to count the number of bits that are on.
// The parameters for start, end and bits are defaulted to the start of the string,
// end of the string and bits are false.
// Note that when bits is false, it means that we are looking on byte boundaries.
std::size_t CountBitSet(std::string_view str, int64_t start, int64_t end, bool bits) {
const int32_t size = bits ? str.size() * OFFSET_FACTOR : str.size();

auto NormalizedOffset = [size](int32_t orig) {
if (orig < 0) {
orig = size + orig;
}
return orig;
};
const int64_t size = bits ? str.size() * OFFSET_FACTOR : str.size();

if (start > 0 && end > 0 && end < start) {
return 0; // for illegal range with positive we just return 0
Expand All @@ -206,11 +228,11 @@ std::size_t CountBitSet(std::string_view str, int64_t start, int64_t end, bool b
return 0; // for illegal range with negative we just return 0
}

start = NormalizedOffset(start);
start = NormalizedOffset(size, start);
if (end > 0 && end < start) {
return 0;
}
end = NormalizedOffset(end);
end = NormalizedOffset(size, end);
if (start > end) {
std::swap(start, end); // we're going backward
}
Expand Down Expand Up @@ -483,7 +505,47 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
// ------------------------------------------------------------------------- //
// Impl for the command functions
void BitPos(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(0);
// Support for the command BITPOS
// See details at https://redis.io/commands/bitpos/

if (args.size() < 2 || args.size() > 6) {
return (*cntx)->SendError(kSyntaxErr);
}

std::string_view key = ArgS(args, 1);

int32_t value{0};
int64_t start = 0;
int64_t end = std::numeric_limits<int64_t>::max();
bool as_bit = false;

if (!absl::SimpleAtoi(ArgS(args, 2), &value)) {
return (*cntx)->SendError(kInvalidIntErr);
}

if (args.size() >= 4) {
if (!absl::SimpleAtoi(ArgS(args, 3), &start)) {
return (*cntx)->SendError(kInvalidIntErr);
}
if (args.size() >= 5) {
if (!absl::SimpleAtoi(ArgS(args, 4), &end)) {
return (*cntx)->SendError(kInvalidIntErr);
}

if (args.size() >= 6) {
if (!ToUpperAndGetAsBit(args, 5, &as_bit)) {
return (*cntx)->SendError(kSyntaxErr);
}
}
}
}

auto cb = [&](Transaction* t, EngineShard* shard) {
return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit);
};
Transaction* trans = cntx->transaction;
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
}

void BitCount(CmdArgList args, ConnectionContext* cntx) {
Expand All @@ -505,8 +567,9 @@ void BitCount(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kInvalidIntErr);
}
if (args.size() == 5) {
ToUpper(&args[4]);
as_bit = ArgS(args, 4) == "BIT";
if (!ToUpperAndGetAsBit(args, 4, &as_bit)) {
return (*cntx)->SendError(kSyntaxErr);
}
}
}
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand Down Expand Up @@ -685,6 +748,88 @@ OpResult<std::size_t> CountBitsForValue(const OpArgs& op_args, std::string_view
}
}

// Returns the bit position (where MSB is 0, LSB is 7) of the leftmost bit that
// equals `value` in `byte`. Returns 8 if not found.
std::size_t GetFirstBitWithValueInByte(uint8_t byte, bool value) {
if (value) {
return absl::countl_zero(byte);
} else {
return absl::countl_one(byte);
}
}

int64_t FindFirstBitWithValueAsBit(std::string_view value_str, bool bit_value, int64_t start,
int64_t end) {
for (int64_t i = start; i <= end; ++i) {
if (static_cast<size_t>(GetByteIndex(i)) >= value_str.size()) {
break;
}
const uint8_t current_byte = GetByteValue(value_str, i);
bool current_bit = CheckBitStatus(current_byte, GetNormalizedBitIndex(i));
if (current_bit != bit_value) {
continue;
}

return i;
}

return -1;
}

int64_t FindFirstBitWithValueAsByte(std::string_view value_str, bool bit_value, int64_t start,
int64_t end) {
for (int64_t i = start; i <= end; ++i) {
if (static_cast<size_t>(i) >= value_str.size()) {
break;
}
const uint8_t current_byte = value_str[i];
const uint8_t kNotFoundByte = bit_value ? 0 : std::numeric_limits<uint8_t>::max();
if (current_byte == kNotFoundByte) {
continue;
}

return i * OFFSET_FACTOR + GetFirstBitWithValueInByte(current_byte, bit_value);
}

return -1;
}

OpResult<int64_t> FindFirstBitWithValue(const OpArgs& op_args, std::string_view key, bool bit_value,
int64_t start, int64_t end, bool as_bit) {
OpResult<std::string> value = ReadValue(op_args.db_cntx, key, op_args.shard);

std::string_view value_str;
if (value) { // non-existent keys are treated as empty strings, per Redis
value_str = value.value();
}

int64_t size = value_str.size();
if (as_bit) {
size *= OFFSET_FACTOR;
}

int64_t normalized_start = NormalizedOffset(size, start);
int64_t normalized_end = NormalizedOffset(size, end);
if (normalized_start > normalized_end) {
return -1; // Return -1 for negative ranges, per Redis
}

int64_t position;
if (as_bit) {
position = FindFirstBitWithValueAsBit(value_str, bit_value, normalized_start, normalized_end);
} else {
position = FindFirstBitWithValueAsByte(value_str, bit_value, normalized_start, normalized_end);
}

if (position == -1 && !bit_value && static_cast<size_t>(start) < value_str.size() &&
end == std::numeric_limits<int64_t>::max()) {
// Returning bit-size of the value, compatible with Redis (but is a weird API).
return value_str.size() * OFFSET_FACTOR;
} else {
return position;
}
}

} // namespace

void BitOpsFamily::Register(CommandRegistry* registry) {
Expand Down
110 changes: 110 additions & 0 deletions src/server/bitops_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,114 @@ TEST_F(BitOpsFamilyTest, BitOpsNot) {
EXPECT_EQ(res, NOT_RESULTS);
}

TEST_F(BitOpsFamilyTest, BitPos) {
ASSERT_EQ(Run({"set", "a", "\x00\x00\x06\xff\xf0"_b}), "OK");

// Find clear bits
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0"}));
EXPECT_EQ(8, CheckedInt({"bitpos", "a", "0", "1"}));
EXPECT_EQ(16, CheckedInt({"bitpos", "a", "0", "2"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100", "103"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100", "0"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "0", "100"}));
EXPECT_EQ(8, CheckedInt({"bitpos", "a", "0", "1", "100"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "0", "-3"}));
EXPECT_EQ(8, CheckedInt({"bitpos", "a", "0", "1", "-2"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "3"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "4"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "-2"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "-2", "-1"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "-1"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "-100"}));

// Find clear bits, explicitly mention "BYTE"
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100", "103", "BYTE"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100", "0", "BYTE"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "0", "100", "BYTE"}));
EXPECT_EQ(8, CheckedInt({"bitpos", "a", "0", "1", "100", "BYTE"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "0", "-3", "BYTE"}));
EXPECT_EQ(8, CheckedInt({"bitpos", "a", "0", "1", "-2", "BYTE"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "-2", "-1", "BYTE"}));

// Find clear bits using "BIT"
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100", "103", "BIT"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "0", "100", "0", "BIT"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "0", "100", "BIT"}));
EXPECT_EQ(1, CheckedInt({"bitpos", "a", "0", "1", "100", "BIT"}));
EXPECT_EQ(2, CheckedInt({"bitpos", "a", "0", "2", "100", "BIT"}));
EXPECT_EQ(16, CheckedInt({"bitpos", "a", "0", "16", "100", "BIT"}));
EXPECT_EQ(23, CheckedInt({"bitpos", "a", "0", "21", "100", "BIT"}));
EXPECT_EQ(36, CheckedInt({"bitpos", "a", "0", "24", "100", "BIT"}));
EXPECT_EQ(0, CheckedInt({"bitpos", "a", "0", "0", "-3", "BIT"}));
EXPECT_EQ(1, CheckedInt({"bitpos", "a", "0", "1", "-2", "BIT"}));
EXPECT_EQ(38, CheckedInt({"bitpos", "a", "0", "-2", "-1", "BIT"}));

// Find set bits
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "1"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "2"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "a", "1", "3"}));
EXPECT_EQ(32, CheckedInt({"bitpos", "a", "1", "4"}));
EXPECT_EQ(32, CheckedInt({"bitpos", "a", "1", "-1"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "a", "1", "-2"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "-3"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "-4"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "-5"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "-6"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "-100"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "0", "0"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "0", "1"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0", "3"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0", "100"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "2", "2"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "2", "3"}));
EXPECT_EQ(32, CheckedInt({"bitpos", "a", "1", "-1", "-1"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "a", "1", "-2", "-1"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "-1", "-2"}));

// Find set bits, explicitly mention "BYTE"
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "0", "0", "BYTE"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "0", "1", "BYTE"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0", "3", "BYTE"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0", "100", "BYTE"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "2", "2", "BYTE"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "2", "3", "BYTE"}));
EXPECT_EQ(32, CheckedInt({"bitpos", "a", "1", "-1", "-1", "BYTE"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "a", "1", "-2", "-1", "BYTE"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "-1", "-2", "BYTE"}));

// Find set bits using "BIT"
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "0", "0", "BIT"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "0", "1", "BIT"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0", "21", "BIT"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "21", "21", "BIT"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "21", "100", "BIT"}));
EXPECT_EQ(21, CheckedInt({"bitpos", "a", "1", "0", "100", "BIT"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "-1", "-1", "BIT"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "a", "1", "-4", "-1", "BIT"}));
EXPECT_EQ(35, CheckedInt({"bitpos", "a", "1", "-5", "-1", "BIT"}));
EXPECT_EQ(34, CheckedInt({"bitpos", "a", "1", "-6", "-1", "BIT"}));

// Make sure we behave like Redis does when looking for clear bits in an all-set string.
ASSERT_EQ(Run({"set", "b", "\xff\xff\xff"_b}), "OK");
EXPECT_EQ(24, CheckedInt({"bitpos", "b", "0"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "b", "0", "0"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "b", "0", "1"}));
EXPECT_EQ(24, CheckedInt({"bitpos", "b", "0", "2"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "b", "0", "3"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "b", "0", "0", "1"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "b", "0", "0", "1", "BYTE"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "b", "0", "0", "3"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "b", "0", "0", "3", "BYTE"}));

ASSERT_EQ(Run({"set", "empty", ""_b}), "OK");
EXPECT_EQ(-1, CheckedInt({"bitpos", "empty", "0"}));
EXPECT_EQ(-1, CheckedInt({"bitpos", "empty", "0", "1"}));

// Non-existent key should be treated like an empty string.
EXPECT_EQ(-1, CheckedInt({"bitpos", "d", "0"}));
}

} // end of namespace dfly