Skip to content

Commit

Permalink
Implement the new encoding for search framework (#2338)
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored May 30, 2024
1 parent 5e04455 commit bd3c053
Show file tree
Hide file tree
Showing 16 changed files with 358 additions and 301 deletions.
2 changes: 1 addition & 1 deletion src/search/executors/filter_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ struct QueryExprEvaluator {

StatusOr<bool> Visit(TagContainExpr *v) const {
auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info));
auto meta = v->field->info->MetadataAs<redis::SearchTagFieldMetadata>();
auto meta = v->field->info->MetadataAs<redis::TagFieldMetadata>();

auto split = util::Split(val, std::string(1, meta->separator));
return std::find(split.begin(), split.end(), v->tag->val) != split.end();
Expand Down
45 changes: 24 additions & 21 deletions src/search/executors/numeric_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,38 @@ struct NumericFieldScanExecutor : ExecutorNode {
util::UniqueIterator iter{nullptr};

IndexInfo *index;
std::string ns_key;
redis::SearchKey search_key;

NumericFieldScanExecutor(ExecutorContext *ctx, NumericFieldScan *scan)
: ExecutorNode(ctx), scan(scan), ss(ctx->storage), index(scan->field->info->index) {
ns_key = ComposeNamespaceKey(index->ns, index->name, ctx->storage->IsSlotIdEncoded());
}
: ExecutorNode(ctx),
scan(scan),
ss(ctx->storage),
index(scan->field->info->index),
search_key(index->ns, index->name, scan->field->name) {}

std::string IndexKey(double num) {
return InternalKey(ns_key, redis::ConstructNumericFieldSubkey(scan->field->name, num, {}), index->metadata.version,
ctx->storage->IsSlotIdEncoded())
.Encode();
}
std::string IndexKey(double num) const { return search_key.ConstructNumericFieldData(num, {}); }

bool InRangeDecode(Slice key, Slice field, double num, double *curr, Slice *user_key) {
auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
if (ikey.GetVersion() != index->metadata.version) return false;
auto subkey = ikey.GetSubKey();
bool InRangeDecode(Slice key, double *curr, Slice *user_key) const {
uint8_t ns_size = 0;
if (!GetFixed8(&key, &ns_size)) return false;
if (ns_size != index->ns.size()) return false;
if (!key.starts_with(index->ns)) return false;
key.remove_prefix(ns_size);

uint8_t flag = 0;
if (!GetFixed8(&subkey, &flag)) return false;
if (flag != (uint8_t)redis::SearchSubkeyType::NUMERIC_FIELD) return false;
uint8_t subkey_type = 0;
if (!GetFixed8(&key, &subkey_type)) return false;
if (subkey_type != (uint8_t)redis::SearchSubkeyType::FIELD) return false;

Slice value;
if (!GetSizedString(&subkey, &value)) return false;
if (value != field) return false;
if (!GetSizedString(&key, &value)) return false;
if (value != index->name) return false;

if (!GetSizedString(&key, &value)) return false;
if (value != scan->field->name) return false;

if (!GetDouble(&subkey, curr)) return false;
if (!GetDouble(&key, curr)) return false;

if (!GetSizedString(&subkey, user_key)) return false;
if (!GetSizedString(&key, user_key)) return false;

return true;
}
Expand All @@ -90,7 +93,7 @@ struct NumericFieldScanExecutor : ExecutorNode {

double curr = 0;
Slice user_key;
if (!InRangeDecode(iter->key(), scan->field->name, scan->range.r, &curr, &user_key)) {
if (!InRangeDecode(iter->key(), &curr, &user_key)) {
return end;
}

Expand Down
48 changes: 25 additions & 23 deletions src/search/executors/tag_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,37 @@ struct TagFieldScanExecutor : ExecutorNode {
util::UniqueIterator iter{nullptr};

IndexInfo *index;
std::string ns_key;
std::string index_key;

TagFieldScanExecutor(ExecutorContext *ctx, TagFieldScan *scan)
: ExecutorNode(ctx), scan(scan), ss(ctx->storage), index(scan->field->info->index) {
ns_key = ComposeNamespaceKey(index->ns, index->name, ctx->storage->IsSlotIdEncoded());
index_key = InternalKey(ns_key, redis::ConstructTagFieldSubkey(scan->field->name, scan->tag, {}),
index->metadata.version, ctx->storage->IsSlotIdEncoded())
.Encode();
}

bool InRangeDecode(Slice key, Slice field, Slice *user_key) {
auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
if (ikey.GetVersion() != index->metadata.version) return false;
auto subkey = ikey.GetSubKey();

uint8_t flag = 0;
if (!GetFixed8(&subkey, &flag)) return false;
if (flag != (uint8_t)redis::SearchSubkeyType::TAG_FIELD) return false;
: ExecutorNode(ctx),
scan(scan),
ss(ctx->storage),
index(scan->field->info->index),
index_key(redis::SearchKey(index->ns, index->name, scan->field->name).ConstructTagFieldData(scan->tag, {})) {}

bool InRangeDecode(Slice key, Slice *user_key) const {
uint8_t ns_size = 0;
if (!GetFixed8(&key, &ns_size)) return false;
if (ns_size != index->ns.size()) return false;
if (!key.starts_with(index->ns)) return false;
key.remove_prefix(ns_size);

uint8_t subkey_type = 0;
if (!GetFixed8(&key, &subkey_type)) return false;
if (subkey_type != (uint8_t)redis::SearchSubkeyType::FIELD) return false;

Slice value;
if (!GetSizedString(&subkey, &value)) return false;
if (value != field) return false;
if (!GetSizedString(&key, &value)) return false;
if (value != index->name) return false;

if (!GetSizedString(&key, &value)) return false;
if (value != scan->field->name) return false;

Slice tag;
if (!GetSizedString(&subkey, &tag)) return false;
if (tag != scan->tag) return false;
if (!GetSizedString(&key, &value)) return false;
if (value != scan->tag) return false;

if (!GetSizedString(&subkey, user_key)) return false;
if (!GetSizedString(&key, user_key)) return false;

return true;
}
Expand All @@ -85,7 +87,7 @@ struct TagFieldScanExecutor : ExecutorNode {
}

Slice user_key;
if (!InRangeDecode(iter->key(), scan->field->name, &user_key)) {
if (!InRangeDecode(iter->key(), &user_key)) {
return end;
}

Expand Down
12 changes: 6 additions & 6 deletions src/search/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ struct IndexInfo;
struct FieldInfo {
std::string name;
IndexInfo *index = nullptr;
std::unique_ptr<redis::SearchFieldMetadata> metadata;
std::unique_ptr<redis::IndexFieldMetadata> metadata;

FieldInfo(std::string name, std::unique_ptr<redis::SearchFieldMetadata> &&metadata)
FieldInfo(std::string name, std::unique_ptr<redis::IndexFieldMetadata> &&metadata)
: name(std::move(name)), metadata(std::move(metadata)) {}

bool IsSortable() const { return dynamic_cast<redis::SearchSortableFieldMetadata *>(metadata.get()) != nullptr; }
bool IsSortable() const { return metadata->IsSortable(); }
bool HasIndex() const { return !metadata->noindex; }

template <typename T>
Expand All @@ -51,12 +51,12 @@ struct IndexInfo {
using FieldMap = std::map<std::string, FieldInfo>;

std::string name;
SearchMetadata metadata;
redis::IndexMetadata metadata;
FieldMap fields;
redis::SearchPrefixesMetadata prefixes;
redis::IndexPrefixes prefixes;
std::string ns;

IndexInfo(std::string name, SearchMetadata metadata) : name(std::move(name)), metadata(std::move(metadata)) {}
IndexInfo(std::string name, redis::IndexMetadata metadata) : name(std::move(name)), metadata(std::move(metadata)) {}

void Add(FieldInfo &&field) {
const auto &name = field.name;
Expand Down
159 changes: 84 additions & 75 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@

namespace redis {

StatusOr<FieldValueRetriever> FieldValueRetriever::Create(SearchOnDataType type, std::string_view key,
StatusOr<FieldValueRetriever> FieldValueRetriever::Create(IndexOnDataType type, std::string_view key,
engine::Storage *storage, const std::string &ns) {
if (type == SearchOnDataType::HASH) {
if (type == IndexOnDataType::HASH) {
Hash db(storage, ns);
std::string ns_key = db.AppendNamespacePrefix(key);
HashMetadata metadata(false);
auto s = db.GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return {Status::NotOK, s.ToString()};
return FieldValueRetriever(db, metadata, key);
} else if (type == SearchOnDataType::JSON) {
} else if (type == IndexOnDataType::JSON) {
Json db(storage, ns);
std::string ns_key = db.AppendNamespacePrefix(key);
JsonMetadata metadata(false);
Expand All @@ -50,7 +50,7 @@ StatusOr<FieldValueRetriever> FieldValueRetriever::Create(SearchOnDataType type,
if (!s.ok()) return {Status::NotOK, s.ToString()};
return FieldValueRetriever(value);
} else {
assert(false && "unreachable code: unexpected SearchOnDataType");
assert(false && "unreachable code: unexpected IndexOnDataType");
__builtin_unreachable();
}
}
Expand Down Expand Up @@ -111,94 +111,103 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key, c
return values;
}

Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns) const {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
Status IndexUpdater::UpdateTagIndex(std::string_view key, std::string_view original, std::string_view current,
const SearchKey &search_key, const TagFieldMetadata *tag) const {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);

auto to_tag_set = [](const std::vector<std::string> &tags, bool case_sensitive) -> std::set<std::string> {
if (case_sensitive) {
return {tags.begin(), tags.end()};
} else {
std::set<std::string> res;
std::transform(tags.begin(), tags.end(), std::inserter(res, res.begin()), util::ToLower);
return res;
}
};

std::set<std::string> tags_to_delete = to_tag_set(original_tags, tag->case_sensitive);
std::set<std::string> tags_to_add = to_tag_set(current_tags, tag->case_sensitive);

for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
it = tags_to_delete.erase(it);
tags_to_add.erase(jt);
} else {
++it;
}
}

auto iter = info->fields.find(field);
if (iter == info->fields.end()) {
return {Status::NotOK, "No such field to do index updating"};
if (tags_to_add.empty() && tags_to_delete.empty()) {
// no change, skip index updating
return Status::OK();
}

auto *metadata = iter->second.metadata.get();
auto *storage = indexer->storage;
auto ns_key = ComposeNamespaceKey(ns, info->name, storage->IsSlotIdEncoded());
if (auto tag = dynamic_cast<SearchTagFieldMetadata *>(metadata)) {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);

auto to_tag_set = [](const std::vector<std::string> &tags, bool case_sensitive) -> std::set<std::string> {
if (case_sensitive) {
return {tags.begin(), tags.end()};
} else {
std::set<std::string> res;
std::transform(tags.begin(), tags.end(), std::inserter(res, res.begin()), util::ToLower);
return res;
}
};

std::set<std::string> tags_to_delete = to_tag_set(original_tags, tag->case_sensitive);
std::set<std::string> tags_to_add = to_tag_set(current_tags, tag->case_sensitive);

for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
it = tags_to_delete.erase(it);
tags_to_add.erase(jt);
} else {
++it;
}
}
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

if (tags_to_add.empty() && tags_to_delete.empty()) {
// no change, skip index updating
return Status::OK();
}
for (const auto &tag : tags_to_delete) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
batch->Delete(cf_handle, index_key);
}

for (const auto &tag : tags_to_delete) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
for (const auto &tag : tags_to_add) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Delete(cf_handle, index_key.Encode());
}
batch->Put(cf_handle, index_key, Slice());
}

for (const auto &tag : tags_to_add) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
return Status::OK();
}

batch->Put(cf_handle, index_key.Encode(), Slice());
}
Status IndexUpdater::UpdateNumericIndex(std::string_view key, std::string_view original, std::string_view current,
const SearchKey &search_key, const NumericFieldMetadata *num) const {
auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else if (auto numeric [[maybe_unused]] = dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto index_key = search_key.ConstructNumericFieldData(original_num, key);

if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto sub_key = ConstructNumericFieldSubkey(field, original_num, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
batch->Delete(cf_handle, index_key);
}

batch->Delete(cf_handle, index_key.Encode());
}
if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto index_key = search_key.ConstructNumericFieldData(current_num, key);

if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto sub_key = ConstructNumericFieldSubkey(field, current_num, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
batch->Put(cf_handle, index_key, Slice());
}

batch->Put(cf_handle, index_key.Encode(), Slice());
}
auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
return Status::OK();
}

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns) const {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
}

auto iter = info->fields.find(field);
if (iter == info->fields.end()) {
return {Status::NotOK, "No such field to do index updating"};
}

auto *metadata = iter->second.metadata.get();
SearchKey search_key(ns, info->name, field);
if (auto tag = dynamic_cast<TagFieldMetadata *>(metadata)) {
GET_OR_RET(UpdateTagIndex(key, original, current, search_key, tag));
} else if (auto numeric [[maybe_unused]] = dynamic_cast<NumericFieldMetadata *>(metadata)) {
GET_OR_RET(UpdateNumericIndex(key, original, current, search_key, numeric));
} else {
return {Status::NotOK, "Unexpected field type"};
}
Expand Down
Loading

0 comments on commit bd3c053

Please sign in to comment.