Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the new encoding for search framework #2338

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/search/executors/filter_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ struct QueryExprEvaluator {

StatusOr<bool> Visit(TagContainExpr *v) const {
auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info));
auto meta = v->field->info->MetadataAs<redis::SearchTagFieldMetadata>();
auto meta = v->field->info->MetadataAs<redis::TagFieldMetadata>();

auto split = util::Split(val, std::string(1, meta->separator));
return std::find(split.begin(), split.end(), v->tag->val) != split.end();
Expand Down
45 changes: 24 additions & 21 deletions src/search/executors/numeric_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,38 @@ struct NumericFieldScanExecutor : ExecutorNode {
util::UniqueIterator iter{nullptr};

IndexInfo *index;
std::string ns_key;
redis::SearchKey search_key;

NumericFieldScanExecutor(ExecutorContext *ctx, NumericFieldScan *scan)
: ExecutorNode(ctx), scan(scan), ss(ctx->storage), index(scan->field->info->index) {
ns_key = ComposeNamespaceKey(index->ns, index->name, ctx->storage->IsSlotIdEncoded());
}
: ExecutorNode(ctx),
scan(scan),
ss(ctx->storage),
index(scan->field->info->index),
search_key(index->ns, index->name, scan->field->name) {}

std::string IndexKey(double num) {
return InternalKey(ns_key, redis::ConstructNumericFieldSubkey(scan->field->name, num, {}), index->metadata.version,
ctx->storage->IsSlotIdEncoded())
.Encode();
}
std::string IndexKey(double num) const { return search_key.ConstructNumericFieldData(num, {}); }

bool InRangeDecode(Slice key, Slice field, double num, double *curr, Slice *user_key) {
auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
if (ikey.GetVersion() != index->metadata.version) return false;
auto subkey = ikey.GetSubKey();
bool InRangeDecode(Slice key, double *curr, Slice *user_key) const {
uint8_t ns_size = 0;
if (!GetFixed8(&key, &ns_size)) return false;
if (ns_size != index->ns.size()) return false;
if (!key.starts_with(index->ns)) return false;
key.remove_prefix(ns_size);

uint8_t flag = 0;
if (!GetFixed8(&subkey, &flag)) return false;
if (flag != (uint8_t)redis::SearchSubkeyType::NUMERIC_FIELD) return false;
uint8_t subkey_type = 0;
if (!GetFixed8(&key, &subkey_type)) return false;
if (subkey_type != (uint8_t)redis::SearchSubkeyType::FIELD) return false;

Slice value;
if (!GetSizedString(&subkey, &value)) return false;
if (value != field) return false;
if (!GetSizedString(&key, &value)) return false;
if (value != index->name) return false;

if (!GetSizedString(&key, &value)) return false;
if (value != scan->field->name) return false;

if (!GetDouble(&subkey, curr)) return false;
if (!GetDouble(&key, curr)) return false;

if (!GetSizedString(&subkey, user_key)) return false;
if (!GetSizedString(&key, user_key)) return false;

return true;
}
Expand All @@ -90,7 +93,7 @@ struct NumericFieldScanExecutor : ExecutorNode {

double curr = 0;
Slice user_key;
if (!InRangeDecode(iter->key(), scan->field->name, scan->range.r, &curr, &user_key)) {
if (!InRangeDecode(iter->key(), &curr, &user_key)) {
return end;
}

Expand Down
48 changes: 25 additions & 23 deletions src/search/executors/tag_field_scan_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,35 +38,37 @@ struct TagFieldScanExecutor : ExecutorNode {
util::UniqueIterator iter{nullptr};

IndexInfo *index;
std::string ns_key;
std::string index_key;

TagFieldScanExecutor(ExecutorContext *ctx, TagFieldScan *scan)
: ExecutorNode(ctx), scan(scan), ss(ctx->storage), index(scan->field->info->index) {
ns_key = ComposeNamespaceKey(index->ns, index->name, ctx->storage->IsSlotIdEncoded());
index_key = InternalKey(ns_key, redis::ConstructTagFieldSubkey(scan->field->name, scan->tag, {}),
index->metadata.version, ctx->storage->IsSlotIdEncoded())
.Encode();
}

bool InRangeDecode(Slice key, Slice field, Slice *user_key) {
auto ikey = InternalKey(key, ctx->storage->IsSlotIdEncoded());
if (ikey.GetVersion() != index->metadata.version) return false;
auto subkey = ikey.GetSubKey();

uint8_t flag = 0;
if (!GetFixed8(&subkey, &flag)) return false;
if (flag != (uint8_t)redis::SearchSubkeyType::TAG_FIELD) return false;
: ExecutorNode(ctx),
scan(scan),
ss(ctx->storage),
index(scan->field->info->index),
index_key(redis::SearchKey(index->ns, index->name, scan->field->name).ConstructTagFieldData(scan->tag, {})) {}

bool InRangeDecode(Slice key, Slice *user_key) const {
uint8_t ns_size = 0;
if (!GetFixed8(&key, &ns_size)) return false;
if (ns_size != index->ns.size()) return false;
if (!key.starts_with(index->ns)) return false;
key.remove_prefix(ns_size);

uint8_t subkey_type = 0;
if (!GetFixed8(&key, &subkey_type)) return false;
if (subkey_type != (uint8_t)redis::SearchSubkeyType::FIELD) return false;

Slice value;
if (!GetSizedString(&subkey, &value)) return false;
if (value != field) return false;
if (!GetSizedString(&key, &value)) return false;
if (value != index->name) return false;

if (!GetSizedString(&key, &value)) return false;
if (value != scan->field->name) return false;

Slice tag;
if (!GetSizedString(&subkey, &tag)) return false;
if (tag != scan->tag) return false;
if (!GetSizedString(&key, &value)) return false;
if (value != scan->tag) return false;

if (!GetSizedString(&subkey, user_key)) return false;
if (!GetSizedString(&key, user_key)) return false;

return true;
}
Expand All @@ -85,7 +87,7 @@ struct TagFieldScanExecutor : ExecutorNode {
}

Slice user_key;
if (!InRangeDecode(iter->key(), scan->field->name, &user_key)) {
if (!InRangeDecode(iter->key(), &user_key)) {
return end;
}

Expand Down
12 changes: 6 additions & 6 deletions src/search/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ struct IndexInfo;
struct FieldInfo {
std::string name;
IndexInfo *index = nullptr;
std::unique_ptr<redis::SearchFieldMetadata> metadata;
std::unique_ptr<redis::IndexFieldMetadata> metadata;

FieldInfo(std::string name, std::unique_ptr<redis::SearchFieldMetadata> &&metadata)
FieldInfo(std::string name, std::unique_ptr<redis::IndexFieldMetadata> &&metadata)
: name(std::move(name)), metadata(std::move(metadata)) {}

bool IsSortable() const { return dynamic_cast<redis::SearchSortableFieldMetadata *>(metadata.get()) != nullptr; }
bool IsSortable() const { return metadata->IsSortable(); }
bool HasIndex() const { return !metadata->noindex; }

template <typename T>
Expand All @@ -51,12 +51,12 @@ struct IndexInfo {
using FieldMap = std::map<std::string, FieldInfo>;

std::string name;
SearchMetadata metadata;
redis::IndexMetadata metadata;
FieldMap fields;
redis::SearchPrefixesMetadata prefixes;
redis::IndexPrefixes prefixes;
std::string ns;

IndexInfo(std::string name, SearchMetadata metadata) : name(std::move(name)), metadata(std::move(metadata)) {}
IndexInfo(std::string name, redis::IndexMetadata metadata) : name(std::move(name)), metadata(std::move(metadata)) {}

void Add(FieldInfo &&field) {
const auto &name = field.name;
Expand Down
159 changes: 84 additions & 75 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@

namespace redis {

StatusOr<FieldValueRetriever> FieldValueRetriever::Create(SearchOnDataType type, std::string_view key,
StatusOr<FieldValueRetriever> FieldValueRetriever::Create(IndexOnDataType type, std::string_view key,
engine::Storage *storage, const std::string &ns) {
if (type == SearchOnDataType::HASH) {
if (type == IndexOnDataType::HASH) {
Hash db(storage, ns);
std::string ns_key = db.AppendNamespacePrefix(key);
HashMetadata metadata(false);
auto s = db.GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return {Status::NotOK, s.ToString()};
return FieldValueRetriever(db, metadata, key);
} else if (type == SearchOnDataType::JSON) {
} else if (type == IndexOnDataType::JSON) {
Json db(storage, ns);
std::string ns_key = db.AppendNamespacePrefix(key);
JsonMetadata metadata(false);
Expand All @@ -50,7 +50,7 @@ StatusOr<FieldValueRetriever> FieldValueRetriever::Create(SearchOnDataType type,
if (!s.ok()) return {Status::NotOK, s.ToString()};
return FieldValueRetriever(value);
} else {
assert(false && "unreachable code: unexpected SearchOnDataType");
assert(false && "unreachable code: unexpected IndexOnDataType");
__builtin_unreachable();
}
}
Expand Down Expand Up @@ -111,94 +111,103 @@ StatusOr<IndexUpdater::FieldValues> IndexUpdater::Record(std::string_view key, c
return values;
}

Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns) const {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
Status IndexUpdater::UpdateTagIndex(std::string_view key, std::string_view original, std::string_view current,
const SearchKey &search_key, const TagFieldMetadata *tag) const {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);

auto to_tag_set = [](const std::vector<std::string> &tags, bool case_sensitive) -> std::set<std::string> {
if (case_sensitive) {
return {tags.begin(), tags.end()};
} else {
std::set<std::string> res;
std::transform(tags.begin(), tags.end(), std::inserter(res, res.begin()), util::ToLower);
return res;
}
};

std::set<std::string> tags_to_delete = to_tag_set(original_tags, tag->case_sensitive);
std::set<std::string> tags_to_add = to_tag_set(current_tags, tag->case_sensitive);

for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
it = tags_to_delete.erase(it);
tags_to_add.erase(jt);
} else {
++it;
}
}

auto iter = info->fields.find(field);
if (iter == info->fields.end()) {
return {Status::NotOK, "No such field to do index updating"};
if (tags_to_add.empty() && tags_to_delete.empty()) {
// no change, skip index updating
return Status::OK();
}

auto *metadata = iter->second.metadata.get();
auto *storage = indexer->storage;
auto ns_key = ComposeNamespaceKey(ns, info->name, storage->IsSlotIdEncoded());
if (auto tag = dynamic_cast<SearchTagFieldMetadata *>(metadata)) {
const char delim[] = {tag->separator, '\0'};
auto original_tags = util::Split(original, delim);
auto current_tags = util::Split(current, delim);

auto to_tag_set = [](const std::vector<std::string> &tags, bool case_sensitive) -> std::set<std::string> {
if (case_sensitive) {
return {tags.begin(), tags.end()};
} else {
std::set<std::string> res;
std::transform(tags.begin(), tags.end(), std::inserter(res, res.begin()), util::ToLower);
return res;
}
};

std::set<std::string> tags_to_delete = to_tag_set(original_tags, tag->case_sensitive);
std::set<std::string> tags_to_add = to_tag_set(current_tags, tag->case_sensitive);

for (auto it = tags_to_delete.begin(); it != tags_to_delete.end();) {
if (auto jt = tags_to_add.find(*it); jt != tags_to_add.end()) {
it = tags_to_delete.erase(it);
tags_to_add.erase(jt);
} else {
++it;
}
}
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

if (tags_to_add.empty() && tags_to_delete.empty()) {
// no change, skip index updating
return Status::OK();
}
for (const auto &tag : tags_to_delete) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
batch->Delete(cf_handle, index_key);
}

for (const auto &tag : tags_to_delete) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
for (const auto &tag : tags_to_add) {
auto index_key = search_key.ConstructTagFieldData(tag, key);

batch->Delete(cf_handle, index_key.Encode());
}
batch->Put(cf_handle, index_key, Slice());
}

for (const auto &tag : tags_to_add) {
auto sub_key = ConstructTagFieldSubkey(field, tag, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
return Status::OK();
}

batch->Put(cf_handle, index_key.Encode(), Slice());
}
Status IndexUpdater::UpdateNumericIndex(std::string_view key, std::string_view original, std::string_view current,
const SearchKey &search_key, const NumericFieldMetadata *num) const {
auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
} else if (auto numeric [[maybe_unused]] = dynamic_cast<SearchNumericFieldMetadata *>(metadata)) {
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);
if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto index_key = search_key.ConstructNumericFieldData(original_num, key);

if (!original.empty()) {
auto original_num = GET_OR_RET(ParseFloat(std::string(original.begin(), original.end())));
auto sub_key = ConstructNumericFieldSubkey(field, original_num, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
batch->Delete(cf_handle, index_key);
}

batch->Delete(cf_handle, index_key.Encode());
}
if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto index_key = search_key.ConstructNumericFieldData(current_num, key);

if (!current.empty()) {
auto current_num = GET_OR_RET(ParseFloat(std::string(current.begin(), current.end())));
auto sub_key = ConstructNumericFieldSubkey(field, current_num, key);
auto index_key = InternalKey(ns_key, sub_key, info->metadata.version, storage->IsSlotIdEncoded());
batch->Put(cf_handle, index_key, Slice());
}

batch->Put(cf_handle, index_key.Encode(), Slice());
}
auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
return Status::OK();
}

auto s = storage->Write(storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
Status IndexUpdater::UpdateIndex(const std::string &field, std::string_view key, std::string_view original,
std::string_view current, const std::string &ns) const {
if (original == current) {
// the value of this field is unchanged, no need to update
return Status::OK();
}

auto iter = info->fields.find(field);
if (iter == info->fields.end()) {
return {Status::NotOK, "No such field to do index updating"};
}

auto *metadata = iter->second.metadata.get();
SearchKey search_key(ns, info->name, field);
if (auto tag = dynamic_cast<TagFieldMetadata *>(metadata)) {
GET_OR_RET(UpdateTagIndex(key, original, current, search_key, tag));
} else if (auto numeric [[maybe_unused]] = dynamic_cast<NumericFieldMetadata *>(metadata)) {
GET_OR_RET(UpdateNumericIndex(key, original, current, search_key, numeric));
} else {
return {Status::NotOK, "Unexpected field type"};
}
Expand Down
Loading
Loading