From ca497b7ff9fd5888cec65f9f28e6617dc0f74a06 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Tue, 28 May 2024 12:25:29 +0800 Subject: [PATCH 1/7] Create index before insertion --- python/benchmark/clients/infinity_client.py | 10 +++++----- python/benchmark/configs/infinity_enwiki.json | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/benchmark/clients/infinity_client.py b/python/benchmark/clients/infinity_client.py index e33e88f06f..b8b922cd8e 100644 --- a/python/benchmark/clients/infinity_client.py +++ b/python/benchmark/clients/infinity_client.py @@ -45,6 +45,11 @@ def upload(self): db_obj.drop_table(self.table_name) db_obj.create_table(self.table_name, self.data["schema"]) table_obj = db_obj.get_table(self.table_name) + # create index + # indexs = self._parse_index_schema(self.data["index"]) + # for i, idx in enumerate(indexs): + # table_obj.create_index(f"index{i}", [idx]) + dataset_path = os.path.join(self.path_prefix, self.data["data_path"]) if not os.path.exists(dataset_path): self.download_data(self.data["data_link"], dataset_path) @@ -110,11 +115,6 @@ def upload(self): if current_batch: table_obj.insert(current_batch) - # create index - indexs = self._parse_index_schema(self.data["index"]) - for i, idx in enumerate(indexs): - table_obj.create_index(f"index{i}", [idx]) - def setup_clients(self, num_threads=1): host, port = self.data["host"].split(":") self.clients = list() diff --git a/python/benchmark/configs/infinity_enwiki.json b/python/benchmark/configs/infinity_enwiki.json index e13660af62..fdb9f4d40c 100644 --- a/python/benchmark/configs/infinity_enwiki.json +++ b/python/benchmark/configs/infinity_enwiki.json @@ -10,7 +10,7 @@ "query_link": "to_be_set", "mode": "fulltext", "topK": 10, - "use_import": true, + "use_import": false, "schema": { "doctitle": {"type": "varchar", "default":""}, "docdate": {"type": "varchar", "default":""}, From a1edba8540d0613b13690a5a1c4f4035241915b5 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Tue, 28 May 2024 16:22:42 +0800 Subject: [PATCH 2/7] Added NOATIME flag --- src/storage/io/local_file_system.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/io/local_file_system.cpp b/src/storage/io/local_file_system.cpp index efad247644..ba825ed434 100644 --- a/src/storage/io/local_file_system.cpp +++ b/src/storage/io/local_file_system.cpp @@ -61,7 +61,7 @@ Pair, Status> LocalFileSystem::OpenFile(const String &pat if (read_flag && write_flag) { file_flags = O_RDWR; } else if (read_flag) { - file_flags = O_RDONLY; + file_flags = O_RDONLY | O_NOATIME; } else if (write_flag) { file_flags = O_WRONLY; } else { @@ -369,7 +369,7 @@ int LocalFileSystem::MmapFile(const String &file_path, u8 *&data_ptr, SizeT &dat long len_f = fs::file_size(file_path); if (len_f == 0) return -1; - int f = open(file_path.c_str(), O_RDONLY); + int f = open(file_path.c_str(), O_RDONLY | O_NOATIME); void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0); if (tmpd == MAP_FAILED) return -1; From d5652d07e165788f0066bd54665e41d371145d9c Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Tue, 28 May 2024 18:44:33 +0800 Subject: [PATCH 3/7] Use infinity http client to insert data Moved cptl thread pool to InfinityContext --- python/benchmark/clients/infinity_client.py | 42 ++++++++++++++++--- src/main/infinity_context.cpp | 2 + src/main/infinity_context.cppm | 7 ++++ src/storage/invertedindex/memory_indexer.cpp | 12 ++---- src/storage/invertedindex/memory_indexer.cppm | 8 +--- .../meta/entry/segment_index_entry.cpp | 22 +++------- src/storage/meta/entry/table_index_entry.cpp | 3 +- src/storage/meta/entry/table_index_entry.cppm | 4 -- .../invertedindex/column_index_merger.cpp | 4 +- .../storage/invertedindex/memory_indexer.cpp | 13 +++--- .../storage/invertedindex/posting_merger.cpp | 7 +--- 11 files changed, 65 insertions(+), 59 deletions(-) diff --git a/python/benchmark/clients/infinity_client.py b/python/benchmark/clients/infinity_client.py index b8b922cd8e..331c2962f9 100644 --- a/python/benchmark/clients/infinity_client.py +++ b/python/benchmark/clients/infinity_client.py @@ -3,6 +3,7 @@ import h5py from typing import Any import logging +import requests import infinity import infinity.index as index @@ -10,6 +11,33 @@ from .base_client import BaseClient +class InfinityHttpClient: + def __init__(self, db_name, table_name): + self.url = ( + "http://localhost:23820/" + f"databases/{db_name}/tables/{table_name}/docs" + ) + self.headers = { + "accept": "application/json", + "content-type": "application/json", + } + + def request(self, method, data={}): + match method: + case "get": + response = requests.get(self.url, headers=self.headers, json=data) + case "post": + response = requests.post(self.url, headers=self.headers, json=data) + case "put": + response = requests.put(self.url, headers=self.headers, json=data) + case "delete": + response = requests.delete(self.url, headers=self.headers, json=data) + return response + + def insert(self, values=[]): + r = self.request("post", values) + return r + + class InfinityClient(BaseClient): def __init__(self, conf_path: str) -> None: """ @@ -46,9 +74,11 @@ def upload(self): db_obj.create_table(self.table_name, self.data["schema"]) table_obj = db_obj.get_table(self.table_name) # create index - # indexs = self._parse_index_schema(self.data["index"]) - # for i, idx in enumerate(indexs): - # table_obj.create_index(f"index{i}", [idx]) + indexs = self._parse_index_schema(self.data["index"]) + for i, idx in enumerate(indexs): + table_obj.create_index(f"index{i}", [idx]) + + inf_http_client = InfinityHttpClient("default_db", self.table_name) dataset_path = os.path.join(self.path_prefix, self.data["data_path"]) if not os.path.exists(dataset_path): @@ -109,11 +139,13 @@ def upload(self): } current_batch.append(row_dict) if len(current_batch) >= batch_size: - table_obj.insert(current_batch) + # table_obj.insert(current_batch) + inf_http_client.insert(current_batch) current_batch = [] if current_batch: - table_obj.insert(current_batch) + # table_obj.insert(current_batch) + inf_http_client.insert(current_batch) def setup_clients(self, num_threads=1): host, port = self.data["host"].split(":") diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index 8ff84bab88..b1e28f5f78 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -56,6 +56,8 @@ void InfinityContext::Init(const SharedPtr &config_path) { storage_ = MakeUnique(config_.get()); storage_->Init(); + inverting_thread_pool_.resize(config_->CPULimit()); + commiting_thread_pool_.resize(config_->CPULimit()); initialized_ = true; } } diff --git a/src/main/infinity_context.cppm b/src/main/infinity_context.cppm index bbe7fab7c2..21a62c6d72 100644 --- a/src/main/infinity_context.cppm +++ b/src/main/infinity_context.cppm @@ -23,6 +23,7 @@ import task_scheduler; import storage; import singleton; import session_manager; +import third_party; namespace infinity { @@ -38,6 +39,9 @@ public: [[nodiscard]] inline SessionManager *session_manager() noexcept { return session_mgr_.get(); } + [[nodiscard]] inline ThreadPool &GetFulltextInvertingThreadPool() { return inverting_thread_pool_; } + [[nodiscard]] inline ThreadPool &GetFulltextCommitingThreadPool() { return commiting_thread_pool_; } + void Init(const SharedPtr &config_path); void UnInit(); @@ -52,6 +56,9 @@ private: UniquePtr task_scheduler_{}; UniquePtr storage_{}; UniquePtr session_mgr_{}; + // For fulltext index + ThreadPool inverting_thread_pool_{4}; + ThreadPool commiting_thread_pool_{2}; bool initialized_{false}; }; diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index 1fa570665b..96790895e9 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -59,6 +59,7 @@ import mmap; import buf_writer; import profiler; import third_party; +import infinity_context; namespace infinity { constexpr int MAX_TUPLE_LENGTH = 1024; // we assume that analyzed term, together with docid/offset info, will never exceed such length @@ -69,15 +70,10 @@ bool MemoryIndexer::KeyComp::operator()(const String &lhs, const String &rhs) co MemoryIndexer::PostingTable::PostingTable() {} -MemoryIndexer::MemoryIndexer(const String &index_dir, - const String &base_name, - RowID base_row_id, - optionflag_t flag, - const String &analyzer, - ThreadPool &inverting_thread_pool, - ThreadPool &commiting_thread_pool) +MemoryIndexer::MemoryIndexer(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag, const String &analyzer) : index_dir_(index_dir), base_name_(base_name), base_row_id_(base_row_id), flag_(flag), analyzer_(analyzer), - inverting_thread_pool_(inverting_thread_pool), commiting_thread_pool_(commiting_thread_pool), ring_inverted_(15UL), ring_sorted_(13UL) { + inverting_thread_pool_(infinity::InfinityContext::instance().GetFulltextInvertingThreadPool()), + commiting_thread_pool_(infinity::InfinityContext::instance().GetFulltextCommitingThreadPool()), ring_inverted_(15UL), ring_sorted_(13UL) { posting_table_ = MakeShared(); prepared_posting_ = MakeShared(PostingFormatOption(flag_), column_lengths_); Path path = Path(index_dir) / (base_name + ".tmp.merge"); diff --git a/src/storage/invertedindex/memory_indexer.cppm b/src/storage/invertedindex/memory_indexer.cppm index c5b07ec8a3..84b3dd1a2d 100644 --- a/src/storage/invertedindex/memory_indexer.cppm +++ b/src/storage/invertedindex/memory_indexer.cppm @@ -49,13 +49,7 @@ public: PostingTableStore store_; }; - MemoryIndexer(const String &index_dir, - const String &base_name, - RowID base_row_id, - optionflag_t flag, - const String &analyzer, - ThreadPool &inverting_thread_pool, - ThreadPool &commiting_thread_pool); + MemoryIndexer(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag, const String &analyzer); ~MemoryIndexer(); diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index b8bbf7f6c2..5811a3ae8b 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -255,9 +255,7 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr block_entry, base_name, begin_row_id, index_fulltext->flag_, - index_fulltext->analyzer_, - table_index_entry_->GetFulltextInvertingThreadPool(), - table_index_entry_->GetFulltextCommitingThreadPool()); + index_fulltext->analyzer_); } table_index_entry_->UpdateFulltextSegmentTs(commit_ts); } else { @@ -396,13 +394,8 @@ void SegmentIndexEntry::MemIndexLoad(const String &base_name, RowID base_row_id) // Init the mem index from previously spilled one. assert(memory_indexer_.get() == nullptr); const IndexFullText *index_fulltext = static_cast(index_base); - memory_indexer_ = MakeUnique(*table_index_entry_->index_dir(), - base_name, - base_row_id, - index_fulltext->flag_, - index_fulltext->analyzer_, - table_index_entry_->GetFulltextInvertingThreadPool(), - table_index_entry_->GetFulltextCommitingThreadPool()); + memory_indexer_ = + MakeUnique(*table_index_entry_->index_dir(), base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_); memory_indexer_->Load(); } @@ -435,13 +428,8 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn u32 seg_id = segment_entry->segment_id(); RowID base_row_id(seg_id, 0); String base_name = fmt::format("ft_{:016x}", base_row_id.ToUint64()); - memory_indexer_ = MakeUnique(*table_index_entry_->index_dir(), - base_name, - base_row_id, - index_fulltext->flag_, - index_fulltext->analyzer_, - table_index_entry_->GetFulltextInvertingThreadPool(), - table_index_entry_->GetFulltextCommitingThreadPool()); + memory_indexer_ = + MakeUnique(*table_index_entry_->index_dir(), base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_); u64 column_id = column_def->id(); auto block_entry_iter = BlockEntryIter(segment_entry); for (const auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) { diff --git a/src/storage/meta/entry/table_index_entry.cpp b/src/storage/meta/entry/table_index_entry.cpp index 9f8e869d34..4925efcdca 100644 --- a/src/storage/meta/entry/table_index_entry.cpp +++ b/src/storage/meta/entry/table_index_entry.cpp @@ -70,8 +70,7 @@ TableIndexEntry::TableIndexEntry(const SharedPtr &index_base, TransactionID txn_id, TxnTimeStamp begin_ts) : BaseEntry(EntryType::kTableIndex, is_delete, TableIndexEntry::EncodeIndex(*index_base->index_name_, table_index_meta)), - inverting_thread_pool_(4), commiting_thread_pool_(2), table_index_meta_(table_index_meta), index_base_(std::move(index_base)), - index_dir_(index_entry_dir) { + table_index_meta_(table_index_meta), index_base_(std::move(index_base)), index_dir_(index_entry_dir) { if (!is_delete) { assert(index_base.get() != nullptr); const String &column_name = index_base->column_name(); diff --git a/src/storage/meta/entry/table_index_entry.cppm b/src/storage/meta/entry/table_index_entry.cppm index 41e7ce5f0a..365d5f7fbe 100644 --- a/src/storage/meta/entry/table_index_entry.cppm +++ b/src/storage/meta/entry/table_index_entry.cppm @@ -125,8 +125,6 @@ public: Status CreateIndexDo(BaseTableRef *table_ref, HashMap &create_index_idxes, Txn *txn); - ThreadPool &GetFulltextInvertingThreadPool() { return inverting_thread_pool_; } - ThreadPool &GetFulltextCommitingThreadPool() { return commiting_thread_pool_; } TxnTimeStamp GetFulltexSegmentUpdateTs() { std::shared_lock lock(segment_update_ts_mutex_); return segment_update_ts_; @@ -148,8 +146,6 @@ private: private: // For fulltext index - ThreadPool inverting_thread_pool_{}; - ThreadPool commiting_thread_pool_{}; std::shared_mutex segment_update_ts_mutex_{}; TxnTimeStamp segment_update_ts_{0}; diff --git a/src/unit_test/storage/invertedindex/column_index_merger.cpp b/src/unit_test/storage/invertedindex/column_index_merger.cpp index 3863902c58..7312c0dd69 100644 --- a/src/unit_test/storage/invertedindex/column_index_merger.cpp +++ b/src/unit_test/storage/invertedindex/column_index_merger.cpp @@ -72,8 +72,6 @@ class ColumnIndexMergerTest : public BaseTest { String GetTerm(u32 n); protected: - ThreadPool inverting_thread_pool_{4}; - ThreadPool commiting_thread_pool_{2}; optionflag_t flag_{OPTION_FLAG_ALL}; static constexpr SizeT BUFFER_SIZE_ = 1024; }; @@ -97,7 +95,7 @@ void ColumnIndexMergerTest::CreateIndex(const Vector& paragraphs, auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(index_dir); for (SizeT i = 0; i < chunk_names.size(); ++i) { - MemoryIndexer indexer(index_dir, chunk_names[i], base_row_ids[i], flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + MemoryIndexer indexer(index_dir, chunk_names[i], base_row_ids[i], flag_, "standard"); indexer.Insert(column, row_offsets[i], row_counts[i]); indexer.Dump(); } diff --git a/src/unit_test/storage/invertedindex/memory_indexer.cpp b/src/unit_test/storage/invertedindex/memory_indexer.cpp index e6bd97aa8a..08acd8ad87 100644 --- a/src/unit_test/storage/invertedindex/memory_indexer.cpp +++ b/src/unit_test/storage/invertedindex/memory_indexer.cpp @@ -110,13 +110,12 @@ class MemoryIndexerTest : public BaseTest { TEST_F(MemoryIndexerTest, Insert) { // prepare fake segment index entry auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetTmpDir()); - MemoryIndexer indexer1(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + MemoryIndexer indexer1(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); indexer1.Insert(column_, 0, 1); indexer1.Insert(column_, 1, 3); indexer1.Dump(); - auto indexer2 = - MakeUnique(GetTmpDir(), "chunk2", RowID(0U, 4U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + auto indexer2 = MakeUnique(GetTmpDir(), "chunk2", RowID(0U, 4U), flag_, "standard"); indexer2->Insert(column_, 4, 1); while (indexer2->GetInflightTasks() > 0) { sleep(1); @@ -133,7 +132,7 @@ TEST_F(MemoryIndexerTest, Insert) { TEST_F(MemoryIndexerTest, test2) { auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetTmpDir()); - MemoryIndexer indexer1(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + MemoryIndexer indexer1(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); indexer1.Insert(column_, 0, 2, true); indexer1.Insert(column_, 2, 2, true); indexer1.Insert(column_, 4, 1, true); @@ -149,8 +148,7 @@ TEST_F(MemoryIndexerTest, test2) { TEST_F(MemoryIndexerTest, SpillLoadTest) { auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetTmpDir()); - auto indexer1 = - MakeUnique(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + auto indexer1 = MakeUnique(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); bool offline = false; bool spill = true; indexer1->Insert(column_, 0, 2, offline); @@ -162,8 +160,7 @@ TEST_F(MemoryIndexerTest, SpillLoadTest) { } indexer1->Dump(offline, spill); - UniquePtr loaded_indexer = - MakeUnique(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + UniquePtr loaded_indexer = MakeUnique(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); loaded_indexer->Load(); SharedPtr segment_reader = MakeShared(loaded_indexer.get()); diff --git a/src/unit_test/storage/invertedindex/posting_merger.cpp b/src/unit_test/storage/invertedindex/posting_merger.cpp index e9bf47019e..489f10d56c 100644 --- a/src/unit_test/storage/invertedindex/posting_merger.cpp +++ b/src/unit_test/storage/invertedindex/posting_merger.cpp @@ -47,8 +47,6 @@ class PostingMergerTest : public BaseTest { void CreateIndex(); protected: - ThreadPool inverting_thread_pool_{4}; - ThreadPool commiting_thread_pool_{4}; optionflag_t flag_{OPTION_FLAG_ALL}; static constexpr SizeT BUFFER_SIZE_ = 1024; }; @@ -69,13 +67,12 @@ void PostingMergerTest::CreateIndex() { } auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetTmpDir()); - MemoryIndexer indexer1(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + MemoryIndexer indexer1(GetTmpDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); indexer1.Insert(column, 0, 1); indexer1.Dump(); fake_segment_index_entry_1->AddFtChunkIndexEntry("chunk1", RowID(0U, 0U).ToUint64(), 1U); - auto indexer2 = - MakeUnique(GetTmpDir(), "chunk2", RowID(0U, 1U), flag_, "standard", inverting_thread_pool_, commiting_thread_pool_); + auto indexer2 = MakeUnique(GetTmpDir(), "chunk2", RowID(0U, 1U), flag_, "standard"); indexer2->Insert(column, 1, 1); indexer2->Dump(); } From d91b7a1c9659ef3565873b5b87dc07a7972e99bb Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Wed, 29 May 2024 23:38:34 +0800 Subject: [PATCH 4/7] Always dump MemIndex if segment is sealed --- src/storage/meta/entry/table_entry.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 65ec396252..37bd02c780 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -698,11 +698,13 @@ void TableEntry::MemIndexInsertInner(TableIndexEntry *table_index_entry, Txn *tx if (block_entry->GetAvailableCapacity() <= 0) dump_idx = i; } + for (SizeT i = 0; i < num_ranges; i++) { AppendRange &range = append_ranges[i]; SharedPtr block_entry = block_entries[i]; segment_index_entry->MemIndexInsert(block_entry, range.start_offset_, range.row_count_, txn->CommitTS(), txn->buffer_mgr()); - if (i == dump_idx && segment_index_entry->MemIndexRowCount() >= infinity::InfinityContext::instance().config()->MemIndexCapacity()) { + if ((i == dump_idx && segment_index_entry->MemIndexRowCount() >= infinity::InfinityContext::instance().config()->MemIndexCapacity()) || + (i == num_ranges - 1 && segment_entry->Room() <= 0)) { SharedPtr chunk_index_entry = segment_index_entry->MemIndexDump(); if (chunk_index_entry.get() != nullptr) { chunk_index_entry->Commit(txn->CommitTS()); From 6323f8cd3667e06f4708624f4e0cd8941bd339dd Mon Sep 17 00:00:00 2001 From: Yingfeng Zhang Date: Thu, 30 May 2024 14:59:23 +0800 Subject: [PATCH 5/7] Fix ByteSliceReader::Seek --- .../format/inmem_doc_list_decoder.cpp | 17 ++++++++++------- .../format/posting_byte_slice_reader.cppm | 7 +++++-- src/storage/io/byte_slice_reader.cpp | 11 ++++++----- src/storage/io/byte_slice_reader.cppm | 2 +- .../format/inmem_doc_list_decoder.cpp | 4 ++-- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/storage/invertedindex/format/inmem_doc_list_decoder.cpp b/src/storage/invertedindex/format/inmem_doc_list_decoder.cpp index 32ab1a3e80..f508533824 100644 --- a/src/storage/invertedindex/format/inmem_doc_list_decoder.cpp +++ b/src/storage/invertedindex/format/inmem_doc_list_decoder.cpp @@ -34,7 +34,9 @@ bool InMemDocListDecoder::DecodeSkipList(docid_t start_doc_id, docid_t &prev_las if (skiplist_reader_ == nullptr) { prev_last_doc_id = 0; current_ttf = 0; - return DecodeSkipListWithoutSkipList(0, 0, start_doc_id, last_doc_id); + // If skiplist is absent, we allow doc_id buffer be decoded only once. + // So here we pass zero as the encoded doc_id buffer offset. + return DecodeSkipListWithoutSkipList(last_doc_id_in_prev_record_, 0, start_doc_id, last_doc_id); } auto ret = skiplist_reader_->SkipTo((u32)start_doc_id, last_doc_id_, last_doc_id_in_prev_record_, offset_, record_len_); if (!ret) { @@ -53,16 +55,18 @@ bool InMemDocListDecoder::DecodeSkipList(docid_t start_doc_id, docid_t &prev_las } bool InMemDocListDecoder::DecodeSkipListWithoutSkipList(docid_t last_doc_id_in_prev_record, u32 offset, docid_t start_doc_id, docid_t &last_doc_id) { - if (finish_decoded_) { - return false; - } // allocate space - doc_buffer_to_copy_ = new docid_t[MAX_DOC_PER_RECORD]; + if (doc_buffer_to_copy_ == nullptr) + doc_buffer_to_copy_ = new docid_t[MAX_DOC_PER_RECORD]; - doc_list_reader_.Seek(offset); + finish_decoded_ = false; + if (!doc_list_reader_.Seek(offset)) { + return false; + } if (!doc_list_reader_.Decode(doc_buffer_to_copy_, MAX_DOC_PER_RECORD, decode_count_)) { return false; } + finish_decoded_ = true; last_doc_id = last_doc_id_in_prev_record; for (SizeT i = 0; i < decode_count_; ++i) { last_doc_id += doc_buffer_to_copy_[i]; @@ -70,7 +74,6 @@ bool InMemDocListDecoder::DecodeSkipListWithoutSkipList(docid_t last_doc_id_in_p if (start_doc_id > last_doc_id) { return false; } - finish_decoded_ = true; return true; } diff --git a/src/storage/invertedindex/format/posting_byte_slice_reader.cppm b/src/storage/invertedindex/format/posting_byte_slice_reader.cppm index 5fc2db8e92..0a17b61758 100644 --- a/src/storage/invertedindex/format/posting_byte_slice_reader.cppm +++ b/src/storage/invertedindex/format/posting_byte_slice_reader.cppm @@ -27,10 +27,13 @@ public: posting_fields_ = posting_byte_slice_->GetPostingFields(); } - void Seek(u32 pos) { - byte_slice_reader_.Seek(pos); + bool Seek(u32 pos) { + SizeT ret = byte_slice_reader_.Seek(pos); location_cursor_ = 0; posting_buffer_cursor_ = 0; + if (ret == ByteSliceReader::BYTE_SLICE_EOF) + return false; + return true; } u32 Tell() const { return byte_slice_reader_.Tell(); } diff --git a/src/storage/io/byte_slice_reader.cpp b/src/storage/io/byte_slice_reader.cpp index 985cfa4c88..76edb0706a 100644 --- a/src/storage/io/byte_slice_reader.cpp +++ b/src/storage/io/byte_slice_reader.cpp @@ -1,5 +1,7 @@ module; +#include + module byte_slice_reader; import stl; @@ -111,10 +113,8 @@ SizeT ByteSliceReader::ReadMayCopy(void *&value, SizeT len) { SizeT ByteSliceReader::Seek(SizeT offset) { if (offset < global_offset_) { - // fmt::format("invalid offset value: seek offset = {}, State: list length = {}, offset = {}", offset, GetSize(), global_offset_)); - String error_message = "Invalid offset value"; - LOG_CRITICAL(error_message); - UnrecoverableError(error_message); + // seeking backward is disallowed + return BYTE_SLICE_EOF; } SizeT len = offset - global_offset_; @@ -125,6 +125,7 @@ SizeT ByteSliceReader::Seek(SizeT offset) { if (current_slice_offset_ + len < GetSliceDataSize(current_slice_)) { current_slice_offset_ += len; global_offset_ += len; + assert(global_offset_ == offset); return global_offset_; } else { // current byteslice is not long enough, seek to next byteslices @@ -154,7 +155,7 @@ SizeT ByteSliceReader::Seek(SizeT offset) { current_slice_ = ByteSlice::GetEmptySlice(); current_slice_offset_ = 0; } - + assert(global_offset_ == offset); return global_offset_; } } diff --git a/src/storage/io/byte_slice_reader.cppm b/src/storage/io/byte_slice_reader.cppm index a3ae56734c..0e2727a473 100644 --- a/src/storage/io/byte_slice_reader.cppm +++ b/src/storage/io/byte_slice_reader.cppm @@ -11,7 +11,7 @@ namespace infinity { export class ByteSliceReader { public: - static const int BYTE_SLICE_EOF = -1; + static const SizeT BYTE_SLICE_EOF = -1; public: ByteSliceReader(); diff --git a/src/unit_test/storage/invertedindex/format/inmem_doc_list_decoder.cpp b/src/unit_test/storage/invertedindex/format/inmem_doc_list_decoder.cpp index 7c0d011d0e..9c81478fd8 100644 --- a/src/unit_test/storage/invertedindex/format/inmem_doc_list_decoder.cpp +++ b/src/unit_test/storage/invertedindex/format/inmem_doc_list_decoder.cpp @@ -218,7 +218,7 @@ TEST_F(InMemDocListDecoderTest, test1) { ASSERT_EQ(doc1, doc_buffer[0]); ASSERT_EQ(doc2 - doc1, doc_buffer[1]); - ASSERT_TRUE(!doc_list_decoder.DecodeSkipList(doc2, prev_last_doc_id, last_doc_id, current_ttf)); + ASSERT_TRUE(doc_list_decoder.DecodeSkipList(doc2, prev_last_doc_id, last_doc_id, current_ttf)); } TEST_F(InMemDocListDecoderTest, test2) { @@ -300,7 +300,7 @@ TEST_F(InMemDocListDecoderTest, test3) { ASSERT_EQ((ttf_t)0, current_ttf); ASSERT_EQ((u32)490, doc_buffer[0]); - ASSERT_TRUE(!doc_list_decoder_->DecodeSkipList(last_doc_id, prev_last_doc_id, last_doc_id, current_ttf)); + ASSERT_TRUE(doc_list_decoder_->DecodeSkipList(last_doc_id, prev_last_doc_id, last_doc_id, current_ttf)); delete doc_list_decoder_; } From 5ffd1bd815bc61ac4d79b7f075722a8a59e2f3e2 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Fri, 31 May 2024 13:50:24 +0800 Subject: [PATCH 6/7] Optimized BlockEntry::row_count --- src/storage/meta/entry/block_entry.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/storage/meta/entry/block_entry.cpp b/src/storage/meta/entry/block_entry.cpp index 087241473d..d85b58f0a8 100644 --- a/src/storage/meta/entry/block_entry.cpp +++ b/src/storage/meta/entry/block_entry.cpp @@ -124,6 +124,8 @@ void BlockEntry::UpdateBlockReplay(SharedPtr block_entry, String blo SizeT BlockEntry::row_count(TxnTimeStamp check_ts) const { std::shared_lock lock(rw_locker_); + if (check_ts >= max_row_ts_) + return row_count_; auto block_version_handle = this->block_version_->Load(); const auto *block_version = reinterpret_cast(block_version_handle.GetData()); From 0e58954a8f45773fd6ee67873213a0996168b2d0 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Thu, 30 May 2024 15:16:18 +0800 Subject: [PATCH 7/7] Updated benchmark.md --- docs/references/benchmark.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/references/benchmark.md b/docs/references/benchmark.md index 15295e2e50..80d5fdd10d 100644 --- a/docs/references/benchmark.md +++ b/docs/references/benchmark.md @@ -149,7 +149,7 @@ psql -h 0.0.0.0 -p 5432 -c "SELECT doctitle, ROW_ID(), SCORE() FROM infinity_enw | | Time to insert & build index | Time to import & build index | P95 Latency(ms)| QPS (16 python clients) | Memory | vCPU | | ----------------- | ---------------------------- | ---------------------------- | ---------------| ------------------------| --------| ----- | | **Elasticsearch** | 2289 s | N/A | 14.75 | 1340 | 21.0GB | 10.6 | -| **Infinity** | 2321 s | 2890 s | 1.86 | 12328 | 10.0GB | 11.0 | +| **Infinity** | 1562 s | 2244 s | 1.86 | 12328 | 10.0GB | 11.0 | ---