diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc index bf4e527f27f..8d3309abfb5 100644 --- a/src/commands/cmd_search.cc +++ b/src/commands/cmd_search.cc @@ -333,7 +333,7 @@ static StatusOr> ParseRediSearchQuery(const std::vec kqir::ParamMap param_map; while (parser.Good()) { - if (parser.EatEqICase("RETURNS")) { + if (parser.EatEqICase("RETURN")) { auto count = GET_OR_RET(parser.TakeInt()); for (size_t i = 0; i < count; ++i) { diff --git a/src/search/index_manager.h b/src/search/index_manager.h index eb5d5dcc30d..a89445ea435 100644 --- a/src/search/index_manager.h +++ b/src/search/index_manager.h @@ -124,8 +124,8 @@ struct IndexManager { info->Add(kqir::FieldInfo(field_name.ToString(), std::move(field_meta))); } - IndexUpdater updater(info.get()); - indexer->Add(updater); + auto updater = std::make_unique(info.get()); + indexer->Add(std::move(updater)); index_map.Insert(std::move(info)); } @@ -180,12 +180,12 @@ struct IndexManager { return {Status::NotOK, fmt::format("failed to write index metadata: {}", s.ToString())}; } - IndexUpdater updater(info.get()); - indexer->Add(updater); + auto updater = std::make_unique(info.get()); + indexer->Add(std::move(updater)); index_map.Insert(std::move(info)); - for (auto updater : indexer->updater_list) { - GET_OR_RET(updater.Build(ctx)); + for (const auto &updater : indexer->updater_list) { + GET_OR_RET(updater->Build(ctx)); } return Status::OK(); diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 0f771026216..630e4c1ea81 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -280,10 +280,14 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original, const kqir::Value ¤t, const SearchKey &search_key, - HnswVectorFieldMetadata *vector) const { + HnswVectorFieldMetadata *vector) { CHECK(original.IsNull() || original.Is()); CHECK(current.IsNull() || current.Is()); + // TODO: we can remove the lock if we solve the race problem + // inside the HNSW indexer, refer to #2481 and #2489 + std::unique_lock lock(update_mutex); + auto storage = indexer->storage; auto hnsw = HnswIndex(search_key, vector, storage); @@ -305,7 +309,7 @@ Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, std::string_vie } Status IndexUpdater::UpdateIndex(engine::Context &ctx, const std::string &field, std::string_view key, - const kqir::Value &original, const kqir::Value ¤t) const { + const kqir::Value &original, const kqir::Value ¤t) { if (original == current) { // the value of this field is unchanged, no need to update return Status::OK(); @@ -331,7 +335,7 @@ Status IndexUpdater::UpdateIndex(engine::Context &ctx, const std::string &field, return Status::OK(); } -Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const { +Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, std::string_view key) { auto current = GET_OR_RET(Record(ctx, key)); for (const auto &[field, i] : info->fields) { @@ -354,7 +358,7 @@ Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, s return Status::OK(); } -Status IndexUpdater::Build(engine::Context &ctx) const { +Status IndexUpdater::Build(engine::Context &ctx) { auto storage = indexer->storage; util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata); @@ -380,26 +384,27 @@ Status IndexUpdater::Build(engine::Context &ctx) const { return Status::OK(); } -void GlobalIndexer::Add(IndexUpdater updater) { - updater.indexer = this; - for (const auto &prefix : updater.info->prefixes) { - prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false), updater); +void GlobalIndexer::Add(std::unique_ptr updater) { + updater->indexer = this; + for (const auto &prefix : updater->info->prefixes) { + prefix_map.insert(ComposeNamespaceKey(updater->info->ns, prefix, false), updater.get()); } - updater_list.push_back(updater); + updater_list.push_back(std::move(updater)); } void GlobalIndexer::Remove(const kqir::IndexInfo *index) { for (auto iter = prefix_map.begin(); iter != prefix_map.end();) { - if (iter->info == index) { + if ((*iter)->info == index) { iter = prefix_map.erase(iter); } else { ++iter; } } - updater_list.erase(std::remove_if(updater_list.begin(), updater_list.end(), - [index](IndexUpdater updater) { return updater.info == index; }), - updater_list.end()); + updater_list.erase( + std::remove_if(updater_list.begin(), updater_list.end(), + [index](const std::unique_ptr &updater) { return updater->info == index; }), + updater_list.end()); } StatusOr GlobalIndexer::Record(engine::Context &ctx, std::string_view key, @@ -411,14 +416,14 @@ StatusOr GlobalIndexer::Record(engine::Context &ctx auto iter = prefix_map.longest_prefix(ComposeNamespaceKey(ns, key, false)); if (iter != prefix_map.end()) { auto updater = iter.value(); - return RecordResult{updater, std::string(key.begin(), key.end()), GET_OR_RET(updater.Record(ctx, key))}; + return RecordResult{updater, std::string(key.begin(), key.end()), GET_OR_RET(updater->Record(ctx, key))}; } return {Status::NoPrefixMatched}; } Status GlobalIndexer::Update(engine::Context &ctx, const RecordResult &original) { - return original.updater.Update(ctx, original.fields, original.key); + return original.updater->Update(ctx, original.fields, original.key); } } // namespace redis diff --git a/src/search/indexer.h b/src/search/indexer.h index 20819a2f279..b6d0043103c 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -75,15 +75,16 @@ struct IndexUpdater { const kqir::IndexInfo *info = nullptr; GlobalIndexer *indexer = nullptr; + std::mutex update_mutex; explicit IndexUpdater(const kqir::IndexInfo *info) : info(info) {} StatusOr Record(engine::Context &ctx, std::string_view key) const; Status UpdateIndex(engine::Context &ctx, const std::string &field, std::string_view key, const kqir::Value &original, - const kqir::Value ¤t) const; - Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const; + const kqir::Value ¤t); + Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key); - Status Build(engine::Context &ctx) const; + Status Build(engine::Context &ctx); Status UpdateTagIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original, const kqir::Value ¤t, const SearchKey &search_key, const TagFieldMetadata *tag) const; @@ -92,25 +93,25 @@ struct IndexUpdater { const NumericFieldMetadata *num) const; Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original, const kqir::Value ¤t, const SearchKey &search_key, - HnswVectorFieldMetadata *vector) const; + HnswVectorFieldMetadata *vector); }; struct GlobalIndexer { using FieldValues = IndexUpdater::FieldValues; struct RecordResult { - IndexUpdater updater; + IndexUpdater *updater; std::string key; FieldValues fields; }; - tsl::htrie_map prefix_map; - std::vector updater_list; + tsl::htrie_map prefix_map; + std::vector> updater_list; engine::Storage *storage = nullptr; explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {} - void Add(IndexUpdater updater); + void Add(std::unique_ptr updater); void Remove(const kqir::IndexInfo *index); StatusOr Record(engine::Context &ctx, std::string_view key, const std::string &ns); diff --git a/src/search/redis_query_transformer.h b/src/search/redis_query_transformer.h index ed7c8fc651b..7cc3a90302a 100644 --- a/src/search/redis_query_transformer.h +++ b/src/search/redis_query_transformer.h @@ -171,7 +171,7 @@ struct Transformer : ir::TreeTransformer { if (Is(knn_search->children[1])) { k = *ParseInt(knn_search->children[1]->string()); } else { - k = *ParseInt(GET_OR_RET(GetParam(node))); + k = *ParseInt(GET_OR_RET(GetParam(knn_search->children[1]))); } return std::make_unique(std::make_unique(knn_search->children[2]->string()), diff --git a/tests/cppunit/indexer_test.cc b/tests/cppunit/indexer_test.cc index 4e5ea3fce8d..5e7ecf1a5a3 100644 --- a/tests/cppunit/indexer_test.cc +++ b/tests/cppunit/indexer_test.cc @@ -48,7 +48,7 @@ struct IndexerTest : TestBase { map.emplace("hashtest", std::move(hash_info)); - redis::IndexUpdater hash_updater{map.at("hashtest").get()}; + auto hash_updater = std::make_unique(map.at("hashtest").get()); redis::IndexMetadata json_field_meta; json_field_meta.on_data_type = redis::IndexOnDataType::JSON; @@ -65,7 +65,7 @@ struct IndexerTest : TestBase { map.emplace("jsontest", std::move(json_info)); - redis::IndexUpdater json_updater{map.at("jsontest").get()}; + auto json_updater = std::make_unique(map.at("jsontest").get()); indexer.Add(std::move(hash_updater)); indexer.Add(std::move(json_updater)); @@ -87,7 +87,7 @@ TEST_F(IndexerTest, HashTag) { { auto s = indexer.Record(*ctx_, key1, ns); ASSERT_EQ(s.Msg(), Status::ok_msg); - ASSERT_EQ(s->updater.info->name, idxname); + ASSERT_EQ(s->updater->info->name, idxname); ASSERT_TRUE(s->fields.empty()); uint64_t cnt = 0; @@ -120,7 +120,7 @@ TEST_F(IndexerTest, HashTag) { { auto s = indexer.Record(*ctx_, key1, ns); ASSERT_TRUE(s); - ASSERT_EQ(s->updater.info->name, idxname); + ASSERT_EQ(s->updater->info->name, idxname); ASSERT_EQ(s->fields.size(), 1); ASSERT_EQ(s->fields["x"], T("food,kitChen,Beauty")); @@ -178,7 +178,7 @@ TEST_F(IndexerTest, JsonTag) { { auto s = indexer.Record(*ctx_, key1, ns); ASSERT_TRUE(s); - ASSERT_EQ(s->updater.info->name, idxname); + ASSERT_EQ(s->updater->info->name, idxname); ASSERT_TRUE(s->fields.empty()); auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})"); @@ -210,7 +210,7 @@ TEST_F(IndexerTest, JsonTag) { { auto s = indexer.Record(*ctx_, key1, ns); ASSERT_TRUE(s); - ASSERT_EQ(s->updater.info->name, idxname); + ASSERT_EQ(s->updater->info->name, idxname); ASSERT_EQ(s->fields.size(), 1); ASSERT_EQ(s->fields["$.x"], T("food,kitChen,Beauty")); @@ -262,7 +262,7 @@ TEST_F(IndexerTest, JsonTagBuildIndex) { auto s_set = db.Set(*ctx_, key1, "$", R"({"x": "food,kitChen,Beauty"})"); ASSERT_TRUE(s_set.ok()); - auto s2 = indexer.updater_list[1].Build(*ctx_); + auto s2 = indexer.updater_list[1]->Build(*ctx_); ASSERT_EQ(s2.Msg(), Status::ok_msg); auto key = redis::SearchKey(ns, idxname, "$.x").ConstructTagFieldData("food", key1); @@ -301,7 +301,7 @@ TEST_F(IndexerTest, JsonHnswVector) { { auto s = indexer.Record(*ctx_, key3, ns); ASSERT_TRUE(s); - ASSERT_EQ(s->updater.info->name, idxname); + ASSERT_EQ(s->updater->info->name, idxname); ASSERT_TRUE(s->fields.empty()); auto s_set = db.Set(*ctx_, key3, "$", R"({"z": [1,2,3]})"); diff --git a/tests/cppunit/plan_executor_test.cc b/tests/cppunit/plan_executor_test.cc index 3524c14393e..ecff4bb179e 100644 --- a/tests/cppunit/plan_executor_test.cc +++ b/tests/cppunit/plan_executor_test.cc @@ -386,7 +386,7 @@ std::vector> ScopedUpdates(engine::Context& ctx, r TEST_F(PlanExecutorTestC, NumericFieldScan) { redis::GlobalIndexer indexer(storage_.get()); - indexer.Add(redis::IndexUpdater(IndexI())); + indexer.Add(std::make_unique(IndexI())); { engine::Context ctx(storage_.get()); @@ -428,7 +428,7 @@ TEST_F(PlanExecutorTestC, NumericFieldScan) { TEST_F(PlanExecutorTestC, TagFieldScan) { redis::GlobalIndexer indexer(storage_.get()); - indexer.Add(redis::IndexUpdater(IndexI())); + indexer.Add(std::make_unique(IndexI())); { engine::Context ctx(storage_.get()); @@ -467,7 +467,7 @@ TEST_F(PlanExecutorTestC, TagFieldScan) { TEST_F(PlanExecutorTestC, HnswVectorFieldScans) { redis::GlobalIndexer indexer(storage_.get()); - indexer.Add(redis::IndexUpdater(IndexI())); + indexer.Add(std::make_unique(IndexI())); { auto updates = ScopedUpdates(*ctx_, indexer,