From 7f65a8ce670f1ee0915054118e88a06084030e27 Mon Sep 17 00:00:00 2001 From: madschemas <155993105+MadSchemas@users.noreply.github.com> Date: Mon, 12 Feb 2024 11:48:20 +0300 Subject: [PATCH] Update to version v3.22.1 --- .github/workflows/test.yml | 16 +- bindings/consts.go | 2 +- changelog.md | 23 +- cpp_src/CMakeLists.txt | 4 +- cpp_src/core/index/indexordered.cc | 2 +- cpp_src/core/namespace/namespaceimpl.cc | 6 +- cpp_src/core/namespace/namespaceimpl.h | 1 + cpp_src/core/nsselecter/nsselecter.cc | 34 +-- cpp_src/core/nsselecter/querypreprocessor.cc | 18 +- cpp_src/core/nsselecter/querypreprocessor.h | 1 + cpp_src/core/query/queryentry.cc | 2 +- cpp_src/gtests/tests/CMakeLists.txt | 2 +- cpp_src/gtests/tests/fixtures/ft_api.cc | 1 - cpp_src/gtests/tests/fixtures/reindexer_api.h | 9 - .../gtests/tests/fixtures/replication_api.cc | 10 +- .../tests/fixtures/replication_load_api.h | 3 +- .../gtests/tests/fixtures/servercontrol.cc | 10 +- cpp_src/gtests/tests/fixtures/servercontrol.h | 10 +- cpp_src/gtests/tests/unit/ft/ft_generic.cc | 126 ++++++++-- cpp_src/gtests/tests/unit/namespace_test.cc | 1 + .../unit/replication_master_master_test.cc | 232 +++++++++++------- cpp_src/readme.md | 4 +- cpp_src/replicator/replicator.cc | 120 +++++---- cpp_src/replicator/replicator.h | 11 +- cpp_src/replicator/waltracker.h | 11 +- cpp_src/server/CMakeLists.txt | 4 +- cpp_src/server/contrib/server.md | 5 +- cpp_src/server/contrib/server.yml | 5 +- cpp_src/tools/errors.h | 55 +++-- fulltext.md | 48 +++- 30 files changed, 512 insertions(+), 264 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7b4c2f4ea..7ef5e7c82 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ jobs: fail-fast: false steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Build Reindexer run: | mkdir build && cd build @@ -27,7 +27,7 @@ jobs: fail-fast: false steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Build Reindexer run: | mkdir build && cd build @@ -57,7 +57,7 @@ jobs: CXX: ${{matrix.cxx}} steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Prepare Environment run: | ./.github/workflows/install_gtest.sh @@ -94,7 +94,7 @@ jobs: rm -rf ./build/_CPack_Packages ./build/cpp_src/server/reindexer_server_resources tar -cvf artifacts.tar build/ bindings/builtin/builtin_posix.go bindings/builtinserver/builtinserver_posix.go dependencies.sh - name: Archive Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{matrix.os}}${{matrix.sanitizer}} path: artifacts.tar @@ -133,10 +133,10 @@ jobs: steps: - name: Checkout repository if: ${{ matrix.os != 'macos-latest' || matrix.test == 'GO' }} - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Download ${{matrix.os}}${{matrix.sanitizer}} Artifacts if: ${{ matrix.os != 'macos-latest' || matrix.test == 'GO' }} - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: ${{matrix.os}}${{matrix.sanitizer}} - name: 'Untar Artifacts' @@ -183,7 +183,7 @@ jobs: OS: ${{matrix.os}} steps: - name: Download ${{matrix.os}} Artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: ${{matrix.os}} - name: 'Untar Artifacts' @@ -217,7 +217,7 @@ jobs: cp ./usr/local/etc/reindexer.conf.pkg /usr/local/etc/reindexer.conf.pkg fi - name: Clone PyReindexer - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: restream/reindexer-py - name: Install PyReindexer diff --git a/bindings/consts.go b/bindings/consts.go index 8c5423746..90f704419 100644 --- a/bindings/consts.go +++ b/bindings/consts.go @@ -2,7 +2,7 @@ package bindings const CInt32Max = int(^uint32(0) >> 1) -const ReindexerVersion = "v3.22.0" +const ReindexerVersion = "v3.22.1" // public go consts from type_consts.h and reindexer_ctypes.h const ( diff --git a/changelog.md b/changelog.md index 1ad127caa..25487c92d 100644 --- a/changelog.md +++ b/changelog.md @@ -1,21 +1,34 @@ +# Version 3.22.1 (12.02.2024) +## Core +- [fix] Fixed `COUNT()` for fulltext queries, when `preselect_before_ft` is enabled in the index config +- [fix] Fixed `LIMIT`/`OFFSET` behavior for `MERGE`-queries, when `preselect_before_ft` is enabled in the index config +- [fix] Disabled in-memory WAL for the system namespaces + +## Replication +- [fix] Improved transactions replication via WAL queries: from now transactions will not be splitted into separate insert/updates on the follower + +## Face +- [fix] Fixed the Check request +- [fix] Added bottom padding to the Performance Updates chart + # Version 3.22.0 (02.02.2024) ## Core - [fea] Added `explain` results for the [subqueries](readme.md#subqueries-nested-queries) - [fea] Added support for limit/offset in `Delete` and `Update` queries - [fea] Optimized ordered indexes' cache logic to achive more cache hits and more compact cache size - [fea] Added support for `COUNT_CACHED(*)`/`CachedTotal()` aggregation in the queries with INNER JOINS. Now it's possible to cache total count results for such queries -- [fix] Fixed SQL parsing for combintaions of the [subqueries](readme.md#subqueries-nested-queries) and other conditions in the main query -- [fix] Fixed [select functions](fulltext.md#using-select-functions) with '.' delimiter. Previously those functios were actually expected '=' as a delimiter +- [fix] Fixed SQL parsing for combinations of the [subqueries](readme.md#subqueries-nested-queries) and other conditions in the main query +- [fix] Fixed [select functions](fulltext.md#using-select-functions) with '.' delimiter. Previously those functions actually expected '=' as a delimiter ## Fulltext - [fea] Reworked logic for the stop-words. [More details](fulltext.md#stopwords-details) - [fea] Added config for the base ranking algorithm. Check `bm25_config` field in the [fulltext settings](fulltext.md#base-config-parameters) ## Replication -- [fea] Fixed sync logic to allow runtime server ID changing +- [fea] Fixed sync logic to allow modification of the server's ID at runtime ## Reindexer server -- [fix] Fixed SQL suggests for subqueries and some kinds of the JOIN-queries +- [fix] Fixed SQL suggests for subqueries and certain types of JOIN-queries ## Docker - [fea] Base docker image was updated to alpine 3.19 @@ -26,7 +39,7 @@ ## Face - [fea] Added the subqueries field to the explain mode -- [fea] Upgraded the Webpack to 5.х +- [fea] Upgraded Webpack to 5.х - [fea] Added the default values to the NS config during the mode changing - [fix] Fixed the message about the outdated browser version after Chrome upgraded to v120. - [fix] Fixed the settings panel layout on the Performance page, which was overlapped by the message about the outdated browser version diff --git a/cpp_src/CMakeLists.txt b/cpp_src/CMakeLists.txt index d60ebefc5..df3a0e387 100644 --- a/cpp_src/CMakeLists.txt +++ b/cpp_src/CMakeLists.txt @@ -43,7 +43,7 @@ else() option (LINK_RESOURCES "Link web resources as binary data" ON) endif() -set (REINDEXER_VERSION_DEFAULT "3.22.0") +set (REINDEXER_VERSION_DEFAULT "3.22.1") if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "RelWithDebInfo") @@ -449,7 +449,7 @@ if (WITH_CPPTRACE) ExternalProject_Add( cpptrace_lib GIT_REPOSITORY "https://github.com/jeremy-rifkin/cpptrace.git" - GIT_TAG "main" + GIT_TAG "v0.3.1" CMAKE_ARGS -DCMAKE_INSTALL_LIBDIR=${CMAKE_CURRENT_BINARY_DIR} -DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR} -DCPPTRACE_STATIC=On diff --git a/cpp_src/core/index/indexordered.cc b/cpp_src/core/index/indexordered.cc index b0ac203ce..9b0af20e0 100644 --- a/cpp_src/core/index/indexordered.cc +++ b/cpp_src/core/index/indexordered.cc @@ -136,7 +136,7 @@ SelectKeyResults IndexOrdered::SelectKey(const VariantArray &keys, CondType c idsCount = 0; res.reserve(count); for (auto it = ctx.startIt; it != ctx.endIt; ++it) { - assertrx(it != ctx.i_map->end()); // FIXME: assertrx_dbg + assertrx_dbg(it != ctx.i_map->end()); idsCount += it->second.Unsorted().Size(); res.emplace_back(it->second, ctx.sortId); } diff --git a/cpp_src/core/namespace/namespaceimpl.cc b/cpp_src/core/namespace/namespaceimpl.cc index 8e782a694..3994bb7b6 100644 --- a/cpp_src/core/namespace/namespaceimpl.cc +++ b/cpp_src/core/namespace/namespaceimpl.cc @@ -115,7 +115,7 @@ NamespaceImpl::NamespaceImpl(const std::string& name, UpdatesObservers& observer queryCountCache_( std::make_unique(config_.cacheConfig.queryCountCacheSize, config_.cacheConfig.queryCountHitsToCache)), joinCache_(std::make_unique(config_.cacheConfig.joinCacheSize, config_.cacheConfig.joinHitsToCache)), - wal_(config_.walSize), + wal_(getWalSize(config_)), observers_(&observers), lastSelectTime_{0}, cancelCommitCnt_{0}, @@ -281,7 +281,7 @@ void NamespaceImpl::OnConfigUpdated(DBConfigProvider& configProvider, const RdxC updateSortedIdxCount(); } - if (wal_.Resize(config_.walSize)) { + if (wal_.Resize(getWalSize(config_))) { logPrintf(LogInfo, "[%s] WAL has been resized lsn #%s, max size %ld", name_, repl_.lastLsn, wal_.Capacity()); } @@ -2641,7 +2641,7 @@ void NamespaceImpl::LoadFromStorage(unsigned threadsCount, const RdxContext& ctx } void NamespaceImpl::initWAL(int64_t minLSN, int64_t maxLSN) { - wal_.Init(config_.walSize, minLSN, maxLSN, storage_); + wal_.Init(getWalSize(config_), minLSN, maxLSN, storage_); // Fill existing records for (IdType rowId = 0; rowId < IdType(items_.size()); rowId++) { if (!items_[rowId].IsFree()) { diff --git a/cpp_src/core/namespace/namespaceimpl.h b/cpp_src/core/namespace/namespaceimpl.h index 72f53cd46..55ee3aaed 100644 --- a/cpp_src/core/namespace/namespaceimpl.h +++ b/cpp_src/core/namespace/namespaceimpl.h @@ -472,6 +472,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. storage_.TryForceFlush(); } } + size_t getWalSize(const NamespaceConfigData &cfg) const noexcept { return isSystem() ? int64_t(1) : std::max(cfg.walSize, int64_t(1)); } PerfStatCounterMT updatePerfCounter_, selectPerfCounter_; std::atomic enablePerfCounters_; diff --git a/cpp_src/core/nsselecter/nsselecter.cc b/cpp_src/core/nsselecter/nsselecter.cc index 9f69f7b0d..f21bf8400 100644 --- a/cpp_src/core/nsselecter/nsselecter.cc +++ b/cpp_src/core/nsselecter/nsselecter.cc @@ -43,9 +43,8 @@ void NsSelecter::operator()(QueryResults &result, SelectCtx &ctx, const RdxConte bool needPutCachedTotal = false; const auto initTotalCount = result.totalCount; - bool containAggCount = containSomeAggCount(AggCount); - bool containAggCountCached = containAggCount ? false : containSomeAggCount(AggCountCached); - + const bool containAggCount = containSomeAggCount(AggCount); + const bool containAggCountCached = containAggCount ? false : containSomeAggCount(AggCountCached); bool needCalcTotal = aggregationQueryRef.CalcTotal() == ModeAccurateTotal || containAggCount; QueryCacheKey ckey; @@ -100,6 +99,7 @@ void NsSelecter::operator()(QueryResults &result, SelectCtx &ctx, const RdxConte if (isFt) { qPreproc.CheckUniqueFtQuery(); qPreproc.ExcludeFtQuery(rdxCtx); + result.haveRank = true; } qPreproc.ConvertWhereValues(); @@ -109,7 +109,6 @@ void NsSelecter::operator()(QueryResults &result, SelectCtx &ctx, const RdxConte result.addNSContext(ns_->payloadType_, ns_->tagsMatcher_, FieldsSet(ns_->tagsMatcher_, ctx.query.SelectFilters()), ns_->schema_); } - if (isFt) result.haveRank = true; if (ctx.query.IsWithRank()) { if (isFt) { result.needOutputRank = true; @@ -321,16 +320,17 @@ void NsSelecter::operator()(QueryResults &result, SelectCtx &ctx, const RdxConte } else if (!reverse && !hasComparators && !aggregationsOnly) { selectLoop(lctx, result, rdxCtx); } - } - // Get total count for simple query with 1 condition and 1 idset - if (needCalcTotal && !lctx.calcTotal) { - if (!ctx.query.Entries().Empty()) { - result.totalCount += qres.Get(0).GetMaxIterations(); - } else { - result.totalCount += ns_->items_.size() - ns_->free_.size(); + // Get total count for simple query with 1 condition and 1 idset + if (needCalcTotal && !lctx.calcTotal) { + if (!ctx.query.Entries().Empty()) { + result.totalCount += qres.Get(0).GetMaxIterations(); + } else { + result.totalCount += ns_->items_.size() - ns_->free_.size(); + } } } + explain.AddLoopTime(); explain.AddIterations(maxIterations); if (!ctx.inTransaction) { @@ -402,7 +402,9 @@ void NsSelecter::operator()(QueryResults &result, SelectCtx &ctx, const RdxConte } if (needPutCachedTotal) { - logPrintf(LogTrace, "[%s] put totalCount value into query cache: %d ", ns_->name_, result.totalCount); + if rx_unlikely (logLevel >= LogTrace) { + logPrintf(LogInfo, "[%s] put totalCount value into query cache: %d ", ns_->name_, result.totalCount); + } ns_->queryCountCache_->Put(ckey, {static_cast(result.totalCount - initTotalCount)}); } if (ctx.preResult && ctx.preResult->executionMode == JoinPreResult::ModeBuild) { @@ -1018,7 +1020,7 @@ void NsSelecter::selectLoop(LoopCtx &ctx, ResultsT &result, const RdxContext &rd SelectIterator &firstIterator = qres.begin()->Value(); IdType rowId = firstIterator.Val(); while (firstIterator.Next(rowId) && !finish) { - if (!sctx.inTransaction && (rowId % kCancelCheckFrequency == 0)) ThrowOnCancel(rdxCtx); + if ((rowId % kCancelCheckFrequency == 0) && !sctx.inTransaction) ThrowOnCancel(rdxCtx); rowId = firstIterator.Val(); IdType properRowId = rowId; @@ -1082,8 +1084,10 @@ void NsSelecter::selectLoop(LoopCtx &ctx, ResultsT &result, const RdxContext &rd getSortIndexValue(sctx.sortingContext, properRowId, prevValues, proc, sctx.nsid < result.joined_.size() ? &result.joined_[sctx.nsid] : nullptr, joinedSelectors); } - if (!ctx.count && !ctx.calcTotal && multiSortFinished) break; - if (ctx.calcTotal) result.totalCount++; + if (!ctx.count && !ctx.calcTotal && multiSortFinished) { + break; + } + result.totalCount += int(ctx.calcTotal); } else { assertf(static_cast(properRowId) < result.rowId2Vdoc->size(), "properRowId = %d; rowId = %d; result.rowId2Vdoc->size() = %d", properRowId, rowId, result.rowId2Vdoc->size()); diff --git a/cpp_src/core/nsselecter/querypreprocessor.cc b/cpp_src/core/nsselecter/querypreprocessor.cc index 5eda4812a..69876b365 100644 --- a/cpp_src/core/nsselecter/querypreprocessor.cc +++ b/cpp_src/core/nsselecter/querypreprocessor.cc @@ -23,7 +23,8 @@ QueryPreprocessor::QueryPreprocessor(QueryEntries &&queries, NamespaceImpl *ns, start_(query_.Offset()), count_(query_.Limit()), forcedSortOrder_(!query_.forcedSortOrder_.empty()), - reqMatchedOnce_(ctx.reqMatchedOnceFlag) { + reqMatchedOnce_(ctx.reqMatchedOnceFlag), + isMergeQuery_(ctx.isMergeQuery == IsMergeQuery::Yes) { if (forcedSortOrder_ && (start_ > QueryEntry::kDefaultOffset || count_ < QueryEntry::kDefaultLimit)) { assertrx_throw(!query_.sortingEntries_.empty()); static const std::vector emptyJoinedSelectors; @@ -40,7 +41,7 @@ QueryPreprocessor::QueryPreprocessor(QueryEntries &&queries, NamespaceImpl *ns, queryEntryAddedByForcedSortOptimization_ = true; } } - if (ctx.isMergeQuery == IsMergeQuery::Yes) { + if (isMergeQuery_) { if (QueryEntry::kDefaultLimit - start_ > count_) { count_ += start_; } else { @@ -83,8 +84,17 @@ bool QueryPreprocessor::NeedNextEvaluation(unsigned start, unsigned count, bool } else if (ftEntry_) { if (!matchedAtLeastOnce) return false; qresHolder.BackupContainer(); - start_ = query_.Offset(); - count_ = query_.Limit(); + if (isMergeQuery_) { + if (QueryEntry::kDefaultLimit - query_.Offset() > query_.Limit()) { + count_ = query_.Limit() + query_.Offset(); + } else { + count_ = QueryEntry::kDefaultLimit; + } + start_ = QueryEntry::kDefaultOffset; + } else { + start_ = query_.Offset(); + count_ = query_.Limit(); + } forcedSortOrder_ = !query_.forcedSortOrder_.empty(); clear(); Append(OpAnd, std::move(*ftEntry_)); diff --git a/cpp_src/core/nsselecter/querypreprocessor.h b/cpp_src/core/nsselecter/querypreprocessor.h index 7248bdd80..09efb12a7 100644 --- a/cpp_src/core/nsselecter/querypreprocessor.h +++ b/cpp_src/core/nsselecter/querypreprocessor.h @@ -119,6 +119,7 @@ class QueryPreprocessor : private QueryEntries { bool desc_ = false; bool forcedSortOrder_ = false; bool reqMatchedOnce_ = false; + const bool isMergeQuery_ = false; std::optional ftEntry_; std::optional ftPreselect_; }; diff --git a/cpp_src/core/query/queryentry.cc b/cpp_src/core/query/queryentry.cc index 74b62dfab..4611731f7 100644 --- a/cpp_src/core/query/queryentry.cc +++ b/cpp_src/core/query/queryentry.cc @@ -144,7 +144,7 @@ void VerifyQueryEntryValues(CondType cond, const VariantArray &values) { case CondLike: checkArgsCount(1); if (!values[0].Type().Is()) { - throw Error{errLogic, "Condition %s must have string argument, but %d argument was provided", CondTypeToStr(cond), + throw Error{errLogic, "Condition %s must have string argument, but %s argument was provided", CondTypeToStr(cond), values[0].Type().Name()}; } break; diff --git a/cpp_src/gtests/tests/CMakeLists.txt b/cpp_src/gtests/tests/CMakeLists.txt index fb82d5f18..5fa7e98fc 100644 --- a/cpp_src/gtests/tests/CMakeLists.txt +++ b/cpp_src/gtests/tests/CMakeLists.txt @@ -56,7 +56,7 @@ endif () set(GTEST_TIMEOUT 600) if (WITH_ASAN OR WITH_TSAN OR WITH_STDLIB_DEBUG) - set(GTEST_TIMEOUT 1200) + set(GTEST_TIMEOUT 2000) endif () find_program(GTEST_PARALLEL "gtest-parallel") diff --git a/cpp_src/gtests/tests/fixtures/ft_api.cc b/cpp_src/gtests/tests/fixtures/ft_api.cc index 764c90118..7fcf7d089 100644 --- a/cpp_src/gtests/tests/fixtures/ft_api.cc +++ b/cpp_src/gtests/tests/fixtures/ft_api.cc @@ -1,5 +1,4 @@ #include "ft_api.h" -#include "core/cjson/jsonbuilder.h" void FTApi::Init(const reindexer::FtFastConfig& ftCfg, unsigned nses, const std::string& storage) { rt.reindexer.reset(new reindexer::Reindexer); diff --git a/cpp_src/gtests/tests/fixtures/reindexer_api.h b/cpp_src/gtests/tests/fixtures/reindexer_api.h index f7cdc01fd..7b8455a37 100644 --- a/cpp_src/gtests/tests/fixtures/reindexer_api.h +++ b/cpp_src/gtests/tests/fixtures/reindexer_api.h @@ -1,17 +1,8 @@ #pragma once -#include -#include -#include -#include - -#include -#include "core/keyvalue/variant.h" -#include "core/query/query.h" #include "core/reindexer.h" #include "reindexertestapi.h" #include "servercontrol.h" -#include "tools/errors.h" using reindexer::Error; using reindexer::Item; diff --git a/cpp_src/gtests/tests/fixtures/replication_api.cc b/cpp_src/gtests/tests/fixtures/replication_api.cc index f37846a54..abfc31b5b 100644 --- a/cpp_src/gtests/tests/fixtures/replication_api.cc +++ b/cpp_src/gtests/tests/fixtures/replication_api.cc @@ -1,10 +1,4 @@ #include "replication_api.h" -#include -#include -#include "core/cjson/jsonbuilder.h" -#include "core/dbconfig.h" -#include "tools/fsops.h" -#include "vendor/gason/gason.h" const std::string ReplicationApi::kConfigNs = "#config"; @@ -112,7 +106,7 @@ void ReplicationApi::SwitchMaster(size_t id, const ReplicationConfigTest::NsSet& for (size_t i = 0; i < svc_.size(); i++) { std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(kDefaultRpcPort + masterId_) + "/node" + std::to_string(masterId_); ReplicationConfigTest config("slave", false, true, i, masterDsn, "server_" + std::to_string(i), namespaces); - if (i != masterId_) GetSrv(i)->MakeSlave(masterId_, config); + if (i != masterId_) GetSrv(i)->MakeSlave(config); } } @@ -148,7 +142,7 @@ void ReplicationApi::SetUp() { } else { std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(kDefaultRpcPort + 0) + "/node" + std::to_string(0); ReplicationConfigTest config("slave", false, true, i, masterDsn); - svc_.back().Get()->MakeSlave(0, config); + svc_.back().Get()->MakeSlave(config); } } } diff --git a/cpp_src/gtests/tests/fixtures/replication_load_api.h b/cpp_src/gtests/tests/fixtures/replication_load_api.h index ddbbf1fa2..394494cdc 100644 --- a/cpp_src/gtests/tests/fixtures/replication_load_api.h +++ b/cpp_src/gtests/tests/fixtures/replication_load_api.h @@ -3,7 +3,6 @@ #include "gtests/tools.h" #include "replication_api.h" #include "replicator/updatesobserver.h" -#include "vendor/hopscotch/hopscotch_map.h" class ReplicationLoadApi : public ReplicationApi { public: @@ -121,7 +120,7 @@ class ReplicationLoadApi : public ReplicationApi { void SetServerConfig(size_t num, const ReplicationConfigTest &config) { auto srv = GetSrv(num); if (num) { - srv->MakeSlave(0, config); + srv->MakeSlave(config); } else { srv->MakeMaster(config); } diff --git a/cpp_src/gtests/tests/fixtures/servercontrol.cc b/cpp_src/gtests/tests/fixtures/servercontrol.cc index 718de3ad9..c47ea008e 100644 --- a/cpp_src/gtests/tests/fixtures/servercontrol.cc +++ b/cpp_src/gtests/tests/fixtures/servercontrol.cc @@ -1,6 +1,7 @@ #include "servercontrol.h" #include #include "core/cjson/jsonbuilder.h" +#include "core/dbconfig.h" #include "systemhelpers.h" #include "tools/fsops.h" #include "vendor/gason/gason.h" @@ -263,12 +264,12 @@ ServerControl::Interface::Interface(size_t id, std::atomic_bool& stopped, const void ServerControl::Interface::MakeMaster(const ReplicationConfigTest& config) { assertrx(config.role_ == "master"); - setReplicationConfig(id_, config); + setReplicationConfig(config); } -void ServerControl::Interface::MakeSlave(size_t masterId, const ReplicationConfigTest& config) { +void ServerControl::Interface::MakeSlave(const ReplicationConfigTest& config) { assertrx(config.role_ == "slave"); assertrx(!config.dsn_.empty()); - setReplicationConfig(masterId, config); + setReplicationConfig(config); } template @@ -297,8 +298,7 @@ void ServerControl::Interface::setNamespaceConfigItem(std::string_view nsName, s ASSERT_TRUE(err.ok()) << err.what(); } -void ServerControl::Interface::setReplicationConfig(size_t masterId, const ReplicationConfigTest& config) { - (void)masterId; +void ServerControl::Interface::setReplicationConfig(const ReplicationConfigTest& config) { auto item = api.NewItem(kConfigNs); ASSERT_TRUE(item.Status().ok()) << item.Status().what(); WrSerializer ser; diff --git a/cpp_src/gtests/tests/fixtures/servercontrol.h b/cpp_src/gtests/tests/fixtures/servercontrol.h index c034588a3..7a35836c2 100644 --- a/cpp_src/gtests/tests/fixtures/servercontrol.h +++ b/cpp_src/gtests/tests/fixtures/servercontrol.h @@ -1,16 +1,10 @@ #pragma once -#include -#include -#include -#include #include "client/synccororeindexer.h" -#include "core/dbconfig.h" #include "estl/shared_mutex.h" #include "reindexertestapi.h" #include "server/server.h" #include "tools/fsops.h" -#include "tools/stringstools.h" #ifdef REINDEXER_WITH_SC_AS_PROCESS const bool kAsServerProcess = true; @@ -104,7 +98,7 @@ class ServerControl { // Make this server master void MakeMaster(const ReplicationConfigTest& config = ReplicationConfigTest("master")); // Make this server slave - void MakeSlave(size_t masterId, const ReplicationConfigTest& config); + void MakeSlave(const ReplicationConfigTest& config); // check with master or slave that sync complete ReplicationStateApi GetState(const std::string& ns); // Force sync (restart slave's replicator) @@ -130,7 +124,7 @@ class ServerControl { private: template void setNamespaceConfigItem(std::string_view nsName, std::string_view paramName, ValueT&& value); - void setReplicationConfig(size_t masterId, const ReplicationConfigTest& config); + void setReplicationConfig(const ReplicationConfigTest& config); std::string getLogName(const std::string& log, bool core = false); std::vector getCLIParamArray(bool enableStats, size_t maxUpdatesSize); diff --git a/cpp_src/gtests/tests/unit/ft/ft_generic.cc b/cpp_src/gtests/tests/unit/ft/ft_generic.cc index 63d9e9c86..e1c6ff62d 100644 --- a/cpp_src/gtests/tests/unit/ft/ft_generic.cc +++ b/cpp_src/gtests/tests/unit/ft/ft_generic.cc @@ -1036,10 +1036,12 @@ TEST_P(FTGenericApi, ConfigBm25Coefficients) { ""sv); CheckResults("targetword", - {{"otherword !targetword! otherword otherword otherword !targetword! otherword !targetword! otherword !targetword! otherword otherword otherword otherword otherword otherword otherword otherword !targetword!", ""}, - {"otherword !targetword! otherword !targetword targetword!", ""}, - {"otherword !targetword! otherword !targetword!", ""}, - {"otherword !targetword!", ""}}, + {{"otherword !targetword! otherword otherword otherword !targetword! otherword !targetword! otherword !targetword! " + "otherword otherword otherword otherword otherword otherword otherword otherword !targetword!", + ""}, + {"otherword !targetword! otherword !targetword targetword!", ""}, + {"otherword !targetword! otherword !targetword!", ""}, + {"otherword !targetword!", ""}}, true); cfg = cfgDef; @@ -1048,24 +1050,26 @@ TEST_P(FTGenericApi, ConfigBm25Coefficients) { ASSERT_TRUE(err.ok()) << err.what(); CheckResults("targetword", - { - {"otherword !targetword! otherword !targetword targetword!", ""}, - {"otherword !targetword! otherword !targetword!", ""}, - {"otherword !targetword! otherword otherword otherword !targetword! otherword !targetword! otherword !targetword! otherword otherword otherword otherword otherword otherword otherword otherword !targetword!", ""}, - {"otherword !targetword!", ""} - }, + {{"otherword !targetword! otherword !targetword targetword!", ""}, + {"otherword !targetword! otherword !targetword!", ""}, + {"otherword !targetword! otherword otherword otherword !targetword! otherword !targetword! otherword !targetword! " + "otherword otherword otherword otherword otherword otherword otherword otherword !targetword!", + ""}, + {"otherword !targetword!", ""}}, true); cfg = cfgDef; cfg.bm25Config.bm25Type = reindexer::FtFastConfig::Bm25Config::Bm25Type::wordCount; cfg.fieldsCfg[0].positionWeight = 0.0; - cfg.fullMatchBoost=1.0; + cfg.fullMatchBoost = 1.0; err = SetFTConfig(cfg, "nm1", "ft3", {"ft1", "ft2"}); ASSERT_TRUE(err.ok()) << err.what(); CheckResults("targetword", { - {"otherword !targetword! otherword otherword otherword !targetword! otherword !targetword! otherword !targetword! otherword otherword otherword otherword otherword otherword otherword otherword !targetword!", ""}, + {"otherword !targetword! otherword otherword otherword !targetword! otherword !targetword! otherword !targetword! " + "otherword otherword otherword otherword otherword otherword otherword otherword !targetword!", + ""}, {"otherword !targetword! otherword !targetword targetword!", ""}, {"otherword !targetword! otherword !targetword!", ""}, {"otherword !targetword!", ""}, @@ -1073,11 +1077,9 @@ TEST_P(FTGenericApi, ConfigBm25Coefficients) { }, true); - CheckResults("словах",{{"!слово! пусто !слово!",""},{"!слово! пусто !слова! пусто !словами!",""}},true); - + CheckResults("словах", {{"!слово! пусто !слово!", ""}, {"!слово! пусто !слова! пусто !словами!", ""}}, true); } - TEST_P(FTGenericApi, ConfigFtProc) { reindexer::FtFastConfig cfgDef = GetDefaultConfig(); cfgDef.synonyms = {{{"тестов"}, {"задача"}}}; @@ -1371,6 +1373,100 @@ TEST_P(FTGenericApi, ExplainWithFtPreselect) { } } +TEST_P(FTGenericApi, TotalCountWithFtPreselect) { + using reindexer::Query; + using reindexer::QueryResults; + using reindexer::Variant; + + auto cfg = GetDefaultConfig(); + auto preselectIsEnabled = true; + cfg.enablePreselectBeforeFt = preselectIsEnabled; + Init(cfg); + const int firstId = counter_; + Add("word5"sv); + Add("word1 word2 word3"sv); + Add("word3 word4"sv); + Add("word2 word5 word7"sv); + const int lastId = counter_; + + const std::string kJoinedNs = "ns_for_joins"; + const std::string kMainNs = "nm1"; + CreateAndFillSimpleNs(kJoinedNs, 0, 10, nullptr); + + for (auto preselect : {true, false}) { + if (preselectIsEnabled != preselect) { + auto cfg = GetDefaultConfig(); + preselectIsEnabled = preselect; + cfg.enablePreselectBeforeFt = preselectIsEnabled; + SetFTConfig(cfg); + } + std::string_view kPreselectStr = preselect ? " (with ft preselect) " : " (no ft preselect) "; + + struct Case { + Query query; + int limit; + int expectedTotalCount; + }; + std::vector cases = {{.query = Query(kMainNs).Where("ft3", CondEq, "word2 word4"), .limit = 2, .expectedTotalCount = 3}, + {.query = Query(kMainNs).Where("ft3", CondEq, "word2").Where("id", CondEq, {Variant{lastId - 3}}), + .limit = 0, + .expectedTotalCount = 1}, + {.query = Query(kMainNs) + .Where("ft3", CondEq, "word2") + .InnerJoin("id", "id", CondEq, Query(kJoinedNs).Where("id", CondLt, firstId + 2).Limit(0)), + .limit = 0, + .expectedTotalCount = 1}, + {.query = Query(kMainNs) + .Where("ft3", CondEq, "word2 word3") + .OpenBracket() + .InnerJoin("id", "id", CondEq, Query(kJoinedNs).Where("id", CondLt, firstId + 2).Limit(0)) + .Or() + .Where("id", CondSet, {Variant{lastId - 1}, Variant{lastId - 2}}) + .CloseBracket(), + .limit = 1, + .expectedTotalCount = 3}, + {.query = Query(kMainNs) + .Where("ft3", CondEq, "word2 word3") + .InnerJoin("id", "id", CondEq, Query(kJoinedNs).Where("id", CondLt, lastId).Limit(0)) + .Where("id", CondSet, {Variant{lastId - 1}, Variant{lastId - 2}}), + .limit = 1, + .expectedTotalCount = 2}, + {.query = Query(kMainNs) + .OpenBracket() + .Where("ft3", CondEq, "word2") + .CloseBracket() + .OpenBracket() + .InnerJoin("id", "id", CondEq, Query(kJoinedNs).Where("id", CondLt, firstId + 2)) + .Or() + .Where("id", CondEq, lastId - 1) + .CloseBracket(), + .limit = 0, + .expectedTotalCount = 2}}; + + for (auto& c : cases) { + c.query.ReqTotal(); + // Execute initial query + { + QueryResults qr; + auto err = rt.reindexer->Select(c.query, qr); + ASSERT_TRUE(err.ok()) << kPreselectStr << err.what() << "\n" << c.query.GetSQL(); + EXPECT_EQ(qr.Count(), c.expectedTotalCount) << kPreselectStr << c.query.GetSQL(); + EXPECT_EQ(qr.TotalCount(), c.expectedTotalCount) << kPreselectStr << c.query.GetSQL(); + } + + // Execute query with limit + const Query q = Query(c.query).Limit(c.limit); + { + QueryResults qr; + auto err = rt.reindexer->Select(q, qr); + ASSERT_TRUE(err.ok()) << kPreselectStr << err.what() << "\n" << c.query.GetSQL(); + EXPECT_EQ(qr.Count(), c.limit) << kPreselectStr << c.query.GetSQL(); + EXPECT_EQ(qr.TotalCount(), c.expectedTotalCount) << kPreselectStr << c.query.GetSQL(); + } + } + } +} + TEST_P(FTGenericApi, StopWordsWithMorphemes) { reindexer::FtFastConfig cfg = GetDefaultConfig(); diff --git a/cpp_src/gtests/tests/unit/namespace_test.cc b/cpp_src/gtests/tests/unit/namespace_test.cc index 2e3fbcf93..ce63238f2 100644 --- a/cpp_src/gtests/tests/unit/namespace_test.cc +++ b/cpp_src/gtests/tests/unit/namespace_test.cc @@ -5,6 +5,7 @@ #include "core/cjson/jsonbuilder.h" #include "core/cjson/msgpackbuilder.h" #include "core/cjson/msgpackdecoder.h" +#include "estl/fast_hash_set.h" #include "estl/span.h" #include "ns_api.h" #include "tools/jsontools.h" diff --git a/cpp_src/gtests/tests/unit/replication_master_master_test.cc b/cpp_src/gtests/tests/unit/replication_master_master_test.cc index b868c3a9c..317719190 100644 --- a/cpp_src/gtests/tests/unit/replication_master_master_test.cc +++ b/cpp_src/gtests/tests/unit/replication_master_master_test.cc @@ -1,26 +1,23 @@ #include -#include -#include -#include -#include "core/cjson/jsonbuilder.h" -#include "core/dbconfig.h" -#include "core/keyvalue/p_string.h" -#include "core/queryresults/queryresults.h" -#include "server/server.h" +#include "core/defnsconfigs.h" #include "servercontrol.h" -#include "tools/fsops.h" -#include "vendor/gason/gason.h" using namespace reindexer; class ReplicationSlaveSlaveApi : public ::testing::Test { protected: - void SetUp() { fs::RmDirAll(kBaseTestsetDbPath); } + void SetUp() override { fs::RmDirAll(kBaseTestsetDbPath); } - void TearDown() {} + void TearDown() override {} public: - ReplicationSlaveSlaveApi() {} + class ServerControlVec : public std::vector { + public: + ~ServerControlVec() { + for (auto& node : *this) node.Stop(); + } + }; + void WaitSync(ServerControl& s1, ServerControl& s2, const std::string& nsName) { auto now = std::chrono::milliseconds(0); const auto pause = std::chrono::milliseconds(100); @@ -41,7 +38,7 @@ class ReplicationSlaveSlaveApi : public ::testing::Test { } } - void CreateConfiguration(std::vector& nodes, const std::vector& slaveConfiguration, int basePort, int baseServerId, + void CreateConfiguration(ServerControlVec& nodes, const std::vector& slaveConfiguration, int basePort, int baseServerId, const std::string& dbPathMaster) { for (size_t i = 0; i < slaveConfiguration.size(); i++) { nodes.push_back(ServerControl()); @@ -52,12 +49,12 @@ class ReplicationSlaveSlaveApi : public ::testing::Test { } else { std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(slaveConfiguration[i]) + "/db"; ReplicationConfigTest config("slave", false, true, baseServerId + i, masterDsn); - nodes.back().Get()->MakeSlave(0, config); + nodes.back().Get()->MakeSlave(config); } } } - void RestartServer(size_t id, std::vector& nodes, int port, const std::string& dbPathMaster) { + void RestartServer(size_t id, ServerControlVec& nodes, int port, const std::string& dbPathMaster) { assertrx(id < nodes.size()); if (nodes[id].Get()) { nodes[id].Stop(); @@ -75,6 +72,29 @@ class ReplicationSlaveSlaveApi : public ::testing::Test { nodes[id].InitServer(id, port + id, port + 1000 + id, dbPathMaster + std::to_string(id), "db", true); } + void ApplyConfig(ServerControl& sc, std::string_view json) { + auto& rx = *sc.Get()->api.reindexer; + auto item = rx.NewItem(kConfigNamespace); + ASSERT_TRUE(item.Status().ok()) << item.Status().what(); + auto err = item.FromJSON(json); + ASSERT_TRUE(err.ok()) << err.what(); + err = rx.Upsert(kConfigNamespace, item); + ASSERT_TRUE(err.ok()) << err.what(); + } + void CheckTxCopyEventsCount(ServerControl& sc, int expectedCount) { + auto& rx = *sc.Get()->api.reindexer; + client::SyncCoroQueryResults qr(&rx); + auto err = rx.Select(Query(kPerfStatsNamespace), qr); + ASSERT_TRUE(err.ok()) << err.what(); + ASSERT_EQ(qr.Count(), 1); + WrSerializer ser; + err = qr.begin().GetJSON(ser, false); + ASSERT_TRUE(err.ok()) << err.what(); + gason::JsonParser parser; + auto resJS = parser.Parse(ser.Slice()); + ASSERT_EQ(resJS["transactions"]["total_copy_count"].As(-1), expectedCount) << ser.Slice(); + } + protected: const std::string kBaseTestsetDbPath = fs::JoinPath(fs::GetTempDir(), "rx_test/ReplicationSlaveSlaveApi"); @@ -103,6 +123,22 @@ class TestNamespace1 { } } + void AddRowsTx(ServerControl& masterControl, int from, unsigned int count, size_t dataLen = 8) { + auto& rx = *masterControl.Get()->api.reindexer; + reindexer::client::SyncCoroTransaction tr = rx.NewTransaction(nsName_); + ASSERT_TRUE(tr.Status().ok()) << tr.Status().what(); + for (unsigned int i = 0; i < count; i++) { + reindexer::client::Item item = tr.NewItem(); + auto err = item.FromJSON("{\"id\":" + std::to_string(from + i) + + (dataLen ? (",\"data\":\"" + reindexer::randStringAlph(dataLen) + "\"") : "") + "}"); + ASSERT_TRUE(err.ok()) << err.what(); + err = tr.Upsert(std::move(item)); + ASSERT_TRUE(err.ok()) << err.what(); + } + auto err = rx.CommitTransaction(tr); + ASSERT_TRUE(err.ok()) << err.what(); + } + std::function AddRow1msSleep = [](ServerControl& masterControl, int from, unsigned int count, std::string_view nsName) { auto master = masterControl.Get(); @@ -157,7 +193,7 @@ TEST_F(ReplicationSlaveSlaveApi, MasterSlaveSyncByWalAddRow) { const int port = 9999; std::vector slaveConfiguration = {-1, port}; - std::vector nodes; + ServerControlVec nodes; CreateConfiguration(nodes, slaveConfiguration, port, 10, kDbPathMaster); TestNamespace1 ns1(nodes[0]); @@ -214,7 +250,7 @@ TEST_F(ReplicationSlaveSlaveApi, MasterSlaveStart) { const int port = 9999; std::vector slaveConfiguration = {-1, port}; - std::vector nodes; + ServerControlVec nodes; CreateConfiguration(nodes, slaveConfiguration, port, 10, kDbPathMaster); // Insert 100 rows @@ -264,7 +300,7 @@ TEST_F(ReplicationSlaveSlaveApi, MasterSlaveSlave2) { const std::string kBaseDbPath(fs::JoinPath(kBaseTestsetDbPath, "MasterSlaveSlave2")); const std::string kDbPathMaster(kBaseDbPath + "/test_"); int serverId = 5; - std::vector nodes; + ServerControlVec nodes; CreateConfiguration(nodes, slaveConfiguration, port, serverId, kDbPathMaster); @@ -289,7 +325,6 @@ TEST_F(ReplicationSlaveSlaveApi, MasterSlaveSlave2) { for (size_t i = 1; i < results.size(); ++i) { EXPECT_TRUE((results[0] == results[i])); } - for (auto& node : nodes) node.Stop(); }; const int port = 9999; @@ -329,7 +364,7 @@ TEST_F(ReplicationSlaveSlaveApi, MasterSlaveSlaveReload) { const std::string kBaseDbPath(fs::JoinPath(kBaseTestsetDbPath, "MasterSlaveSlaveReload")); const std::string kDbPathMaster(kBaseDbPath + "/test_"); const int serverId = 5; - std::vector nodes; + ServerControlVec nodes; std::atomic_bool stopRestartServerThread(false); /* @@ -382,7 +417,6 @@ TEST_F(ReplicationSlaveSlaveApi, MasterSlaveSlaveReload) { for (size_t i = 1; i < results.size(); ++i) { EXPECT_TRUE((results[0] == results[i])) << i << "; size[0]: " << results[0].size() << "; size[i]: " << results[i].size(); } - for (auto& node : nodes) node.Stop(); } #endif @@ -407,7 +441,7 @@ TEST_F(ReplicationSlaveSlaveApi, TransactionTest) { std::vector slaveConfiguration = {-1, port, port + 1, port + 2, port + 3}; - std::vector nodes; + ServerControlVec nodes; CreateConfiguration(nodes, slaveConfiguration, port, serverId, kDbPathMaster); @@ -423,14 +457,7 @@ TEST_F(ReplicationSlaveSlaveApi, TransactionTest) { WaitSync(nodes[0], nodes[i], nsName); } - reindexer::client::SyncCoroTransaction tr = master.Get()->api.reindexer->NewTransaction(nsName); - - for (unsigned int i = 0; i < kRows; i++) { - reindexer::client::Item item = tr.NewItem(); - auto err = item.FromJSON("{\"id\":" + std::to_string(i + kRows * 10) + "}"); - tr.Upsert(std::move(item)); - } - master.Get()->api.reindexer->CommitTransaction(tr); + ns1.AddRowsTx(master, 0, kRows); for (size_t i = 1; i < nodes.size(); i++) { WaitSync(nodes[0], nodes[i], nsName); @@ -445,7 +472,6 @@ TEST_F(ReplicationSlaveSlaveApi, TransactionTest) { for (size_t i = 1; i < results.size(); ++i) { EXPECT_TRUE((results[0] == results[i])); } - for (auto& node : nodes) node.Stop(); } TEST_F(ReplicationSlaveSlaveApi, TransactionCopyPolicyForceSync) { @@ -475,29 +501,20 @@ TEST_F(ReplicationSlaveSlaveApi, TransactionCopyPolicyForceSync) { "type": "namespaces" })="; constexpr int port = 9999; - const std::string kBaseDbPath(fs::JoinPath(kBaseTestsetDbPath, "TransactionCopyPolicyForceSync")); - const std::string kDbPathMaster(kBaseDbPath + "/test_"); + const std::string kDbPathMaster(fs::JoinPath(fs::JoinPath(kBaseTestsetDbPath, "TransactionCopyPolicyForceSync"), "test_")); constexpr int serverId = 5; constexpr size_t kRows = 100; const std::string nsName("ns1"); std::vector slaveConfiguration = {-1, port, port + 1}; - std::vector nodes; + ServerControlVec nodes; for (size_t i = 0; i < slaveConfiguration.size(); i++) { - nodes.emplace_back(); - nodes.back().InitServer(i, port + i, port + 1000 + i, kDbPathMaster + std::to_string(i), "db", true); + nodes.emplace_back().InitServer(i, port + i, port + 1000 + i, kDbPathMaster + std::to_string(i), "db", true); nodes.back().Get()->EnableAllProfilings(); } // Set tx copy policy for the node '2' to 'always copy' - { - auto item = nodes[2].Get()->api.reindexer->NewItem("#config"); - ASSERT_TRUE(item.Status().ok()) << item.Status().what(); - auto err = item.FromJSON(kJsonCfgNss); - ASSERT_TRUE(err.ok()) << err.what(); - err = nodes[2].Get()->api.reindexer->Upsert("#config", item); - ASSERT_TRUE(err.ok()) << err.what(); - } + ApplyConfig(nodes[2], kJsonCfgNss); for (size_t i = 0; i < slaveConfiguration.size(); i++) { if (i == 0) { @@ -506,7 +523,7 @@ TEST_F(ReplicationSlaveSlaveApi, TransactionCopyPolicyForceSync) { } else { std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(slaveConfiguration[i]) + "/db"; ReplicationConfigTest config("slave", false, true, serverId + i, masterDsn); - nodes[i].Get()->MakeSlave(slaveConfiguration[i], config); + nodes[i].Get()->MakeSlave(config); } } nodes[2].Drop(); @@ -519,32 +536,79 @@ TEST_F(ReplicationSlaveSlaveApi, TransactionCopyPolicyForceSync) { nodes[2].InitServer(2, port + 2, port + 1000 + 2, kDbPathMaster + std::to_string(2), "db", true); std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(slaveConfiguration[2]) + "/db"; ReplicationConfigTest config("slave", false, true, serverId + 2, masterDsn); - nodes[2].Get()->MakeSlave(1, config); + nodes[2].Get()->MakeSlave(config); WaitSync(nodes[0], nodes[2], nsName); + // Check copy tx event in the perfstats before tx + CheckTxCopyEventsCount(nodes[2], 0); + // Apply tx - reindexer::client::SyncCoroTransaction tr = master.Get()->api.reindexer->NewTransaction(nsName); - for (unsigned int i = 0; i < kRows; i++) { - reindexer::client::Item item = tr.NewItem(); - auto err = item.FromJSON("{\"id\":" + std::to_string(i + kRows * 10) + "}"); - tr.Upsert(std::move(item)); - } - master.Get()->api.reindexer->CommitTransaction(tr); + ns1.AddRowsTx(master, 0, kRows); WaitSync(nodes[0], nodes[2], nsName); - // Check copy tx event in the perfstats - client::SyncCoroQueryResults qr(nodes[2].Get()->api.reindexer.get()); - auto err = nodes[2].Get()->api.reindexer->Select("select * from #perfstats", qr); - ASSERT_TRUE(err.ok()) << err.what(); - ASSERT_EQ(qr.Count(), 1); - WrSerializer ser; - err = qr.begin().GetJSON(ser, false); - ASSERT_TRUE(err.ok()) << err.what(); - gason::JsonParser parser; - auto resJS = parser.Parse(ser.Slice()); - ASSERT_EQ(resJS["transactions"]["total_copy_count"].As(-1), 1) << ser.Slice(); + // Check copy tx event in the perfstats after tx + CheckTxCopyEventsCount(nodes[2], 1); +} + +TEST_F(ReplicationSlaveSlaveApi, TransactionCopyPolicyWalSync) { + // Check transactions copy policy during the wal sync + /* + m + | + 1 + */ + constexpr std::string_view kJsonCfgNss = R"=({ + "namespaces": [ + { + "namespace": "*", + "start_copy_policy_tx_size": 10000, + "copy_policy_multiplier": 5, + "tx_size_to_always_copy": 100000 + }, + { + "namespace": "ns1", + "start_copy_policy_tx_size": 10000, + "copy_policy_multiplier": 5, + "tx_size_to_always_copy": 1 + } + ], + "type": "namespaces" + })="; + constexpr int port = 9999; + const std::string kDbPathMaster(fs::JoinPath(fs::JoinPath(kBaseTestsetDbPath, "TransactionCopyPolicyWalSync"), "test_")); + constexpr int serverId = 5; + constexpr size_t kRows = 100; + const std::string nsName("ns1"); + + std::vector slaveConfiguration = {-1, port}; + ServerControlVec nodes; + for (size_t i = 0; i < slaveConfiguration.size(); i++) { + nodes.emplace_back().InitServer(i, port + i, port + 1000 + i, kDbPathMaster + std::to_string(i), "db", true); + nodes.back().Get()->EnableAllProfilings(); + } + const std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(slaveConfiguration[1]) + "/db"; + + // Set tx copy policy for the node '1' to 'always copy' + ApplyConfig(nodes[1], kJsonCfgNss); - for (auto& node : nodes) node.Stop(); + nodes[0].Get()->MakeMaster(ReplicationConfigTest("master")); + nodes[1].Get()->MakeSlave(ReplicationConfigTest("slave", false, true, serverId + 1, masterDsn)); + + ServerControl& master = nodes[0]; + TestNamespace1 ns1(master, nsName); + WaitSync(master, nodes[1], nsName); + + nodes[1].Drop(); + // Apply tx + ns1.AddRowsTx(master, 0, kRows); + + // Restart node '1' + nodes[1].InitServer(1, port + 1, port + 1000 + 1, kDbPathMaster + std::to_string(1), "db", true); + nodes[1].Get()->MakeSlave(ReplicationConfigTest("slave", false, true, serverId + 1, masterDsn)); + WaitSync(master, nodes[1], nsName); + + // Check copy tx event in the perfstats + CheckTxCopyEventsCount(nodes[1], 1); } TEST_F(ReplicationSlaveSlaveApi, ForceSync3Node) { @@ -577,15 +641,15 @@ TEST_F(ReplicationSlaveSlaveApi, ForceSync3Node) { slave2.InitServer(2, 7772, 7882, kBaseDbPath + "/slave2", "db", true); std::string upDsn2 = "cproto://127.0.0.1:7771/db"; ReplicationConfigTest configSlave2("slave", false, true, 2, upDsn2); - slave2.Get()->MakeSlave(0, configSlave2); + slave2.Get()->MakeSlave(configSlave2); ServerControl slave3; slave3.InitServer(3, 7773, 7883, kBaseDbPath + "/slave3", "db", true); std::string upDsn3 = "cproto://127.0.0.1:7772/db"; ReplicationConfigTest configSlave3("slave", false, true, 3, upDsn3); - slave3.Get()->MakeSlave(0, configSlave3); + slave3.Get()->MakeSlave(configSlave3); - slave1.Get()->MakeSlave(0, configSlave1); + slave1.Get()->MakeSlave(configSlave1); WaitSync(master, slave1, "ns1"); WaitSync(master, slave2, "ns1"); @@ -628,7 +692,7 @@ TEST_F(ReplicationSlaveSlaveApi, NodeWithMasterAndSlaveNs1) { testns3.AddRows(slave, c1, n); std::string upDsn = "cproto://127.0.0.1:7770/db"; ReplicationConfigTest configSlave("slave", false, true, 0, upDsn); - slave.Get()->MakeSlave(0, configSlave); + slave.Get()->MakeSlave(configSlave); testns3.AddRows(slave, c2, n); WaitSync(master, slave, "ns1"); @@ -688,7 +752,7 @@ TEST_F(ReplicationSlaveSlaveApi, NodeWithMasterAndSlaveNs2) { std::string upDsn = "cproto://127.0.0.1:7770/db"; ReplicationConfigTest::NsSet nsSet = {"ns1"}; ReplicationConfigTest configSlave("slave", false, true, 0, upDsn, "slave", nsSet); - slave.Get()->MakeSlave(0, configSlave); + slave.Get()->MakeSlave(configSlave); testns3.AddRows(slave, c2, n); WaitSync(master, slave, "ns1"); @@ -740,7 +804,7 @@ TEST_F(ReplicationSlaveSlaveApi, NodeWithMasterAndSlaveNs3) { std::string upDsn = "cproto://127.0.0.1:7770/db"; ReplicationConfigTest::NsSet nsSet = {"ns1"}; ReplicationConfigTest configSlave("slave", false, true, 0, upDsn, "slave", nsSet); - slave.Get()->MakeSlave(0, configSlave); + slave.Get()->MakeSlave(configSlave); testns3.AddRows(slave, c2, n); WaitSync(master, slave, "ns1"); @@ -782,7 +846,7 @@ TEST_F(ReplicationSlaveSlaveApi, RenameSlaveNs) { std::string upDsn = "cproto://127.0.0.1:7770/db"; ReplicationConfigTest::NsSet nsSet = {"ns1"}; ReplicationConfigTest configSlave("slave", false, true, 0, upDsn, "slave", nsSet); - slave.Get()->MakeSlave(0, configSlave); + slave.Get()->MakeSlave(configSlave); WaitSync(master, slave, "ns1"); @@ -852,9 +916,9 @@ TEST_F(ReplicationSlaveSlaveApi, Node3ApplyWal) { slave1.InitServer(1, 7771, 7881, kBaseDbPath + "/slave1", "db", true); slave2.InitServer(2, 7772, 7882, kBaseDbPath + "/slave2", "db", true); ReplicationConfigTest configSlave1("slave", false, true, 1, upDsn1, "slave1"); - slave1.Get()->MakeSlave(0, configSlave1); + slave1.Get()->MakeSlave(configSlave1); ReplicationConfigTest configSlave2("slave", false, true, 2, upDsn2, "slave2"); - slave2.Get()->MakeSlave(0, configSlave2); + slave2.Get()->MakeSlave(configSlave2); WaitSync(master, slave1, "ns1"); WaitSync(master, slave2, "ns1"); } @@ -925,7 +989,7 @@ TEST_F(ReplicationSlaveSlaveApi, RestrictUpdates) { ServerControl slave; slave.InitServer(0, 7771, 7881, reindexer::fs::JoinPath(kBaseStoragePath, "slave"), "db", true); ReplicationConfigTest configSlave("slave", false, true, 0, upDsn, "slave"); - slave.Get()->MakeSlave(0, configSlave); + slave.Get()->MakeSlave(configSlave); insertThread.join(); WaitSync(master, slave, "ns1"); @@ -948,7 +1012,7 @@ TEST_F(ReplicationSlaveSlaveApi, ConcurrentForceSync) { const size_t kNsSyncCount = 3; const int kBaseServerId = 5; - std::vector nodes; + ServerControlVec nodes; auto createSlave = [&kBaseDbPath, &kDbName, &nodes, &kNsList](const std::string& masterDsn) { size_t id = nodes.size(); nodes.push_back(ServerControl()); @@ -958,7 +1022,7 @@ TEST_F(ReplicationSlaveSlaveApi, ConcurrentForceSync) { nsSet.emplace(kNsList[i]); } ReplicationConfigTest config("slave", false, true, kBaseServerId + id, masterDsn, "slave" + std::to_string(id), nsSet); - nodes.back().Get()->MakeSlave(0, config); + nodes.back().Get()->MakeSlave(config); }; // Create master @@ -1019,7 +1083,6 @@ TEST_F(ReplicationSlaveSlaveApi, ConcurrentForceSync) { EXPECT_EQ(nsDefs.size(), kNsSyncCount); } } - for (auto& node : nodes) node.Stop(); } #endif @@ -1032,7 +1095,7 @@ TEST_F(ReplicationSlaveSlaveApi, WriteIntoSlaveNsAfterReconfiguration) { const std::string kNs1 = "ns1"; const std::string kNs2 = "ns2"; int manualItemId = 5; - std::vector nodes; + ServerControlVec nodes; CreateConfiguration(nodes, {-1, kBasePort}, kBasePort, kServerId, kBaseDbPath); TestNamespace1 testns1(nodes[0], kNs1); testns1.AddRows(nodes[0], 0, n); @@ -1061,7 +1124,7 @@ TEST_F(ReplicationSlaveSlaveApi, WriteIntoSlaveNsAfterReconfiguration) { { ReplicationConfigTest::NsSet nsSet = {"ns2"}; ReplicationConfigTest configSlave("slave", false, true, kServerId + 1, kUpDsn, "slave", nsSet); - nodes[1].Get()->MakeSlave(0, configSlave); + nodes[1].Get()->MakeSlave(configSlave); } item = createItem(nodes[1], kNs1, manualItemId); @@ -1100,7 +1163,7 @@ TEST_F(ReplicationSlaveSlaveApi, WriteIntoSlaveNsAfterReconfiguration) { { ReplicationConfigTest::NsSet nsSet = {"ns1", "ns2"}; ReplicationConfigTest configSlave("slave", false, true, kServerId + 1, kUpDsn, "slave", nsSet); - nodes[1].Get()->MakeSlave(0, configSlave); + nodes[1].Get()->MakeSlave(configSlave); WaitSync(nodes[0], nodes[1], kNs1); WaitSync(nodes[0], nodes[1], kNs2); } @@ -1120,7 +1183,6 @@ TEST_F(ReplicationSlaveSlaveApi, WriteIntoSlaveNsAfterReconfiguration) { validateItemsCount(nodes[0], kNs2, 3 * n); validateItemsCount(nodes[1], kNs1, 3 * n); validateItemsCount(nodes[1], kNs2, 3 * n); - for (auto& node : nodes) node.Stop(); } struct DataStore { @@ -1169,7 +1231,7 @@ class ServerIdChange : public ReplicationSlaveSlaveApi, public ::testing::WithPa } else { std::string masterDsn = "cproto://127.0.0.1:" + std::to_string(port) + "/db"; ReplicationConfigTest config("slave", false, true, newServerId, masterDsn); - node.Get()->MakeSlave(0, config); + node.Get()->MakeSlave(config); } } @@ -1293,4 +1355,4 @@ TEST_P(ServerIdChange, UpdateServerId) { } } -INSTANTIATE_TEST_SUITE_P(WalSize, ServerIdChange, ::testing::Values(1, 4000000)); \ No newline at end of file +INSTANTIATE_TEST_SUITE_P(WalSize, ServerIdChange, ::testing::Values(1, 4000000)); diff --git a/cpp_src/readme.md b/cpp_src/readme.md index dba0b23d3..8ca43c2fe 100644 --- a/cpp_src/readme.md +++ b/cpp_src/readme.md @@ -3,8 +3,8 @@ **Reindexer** is an embeddable, in-memory, document-oriented database with a high-level Query builder interface. Reindexer's goal is to provide fast search with complex queries. -The Reindexer is compact and fast. It has not heavy dependencies. Complete reindexer docker image with all libraries and web interface size is just 15MB. -Reindexer is fast. Up to 5x times faster, than mongodb, and 10x times than elastic search. See [benchmaks section](../benchmarks) for details. +Reindexer is compact, fast and it does not have heavy dependencies. +reindexer is up to 5x times faster, than mongodb, and 10x times than elastic search. See [benchmaks section](https://github.com/Restream/reindexer-benchmarks) for details. # Installation diff --git a/cpp_src/replicator/replicator.cc b/cpp_src/replicator/replicator.cc index 3d94564a8..1b8196f15 100644 --- a/cpp_src/replicator/replicator.cc +++ b/cpp_src/replicator/replicator.cc @@ -1,14 +1,9 @@ - #include "replicator.h" #include "client/itemimpl.h" #include "client/reindexer.h" -#include "core/itemimpl.h" #include "core/namespace/namespaceimpl.h" -#include "core/namespacedef.h" #include "core/reindexer_impl/reindexerimpl.h" #include "tools/logger.h" -#include "tools/stringstools.h" -#include "walrecord.h" namespace reindexer { @@ -33,7 +28,7 @@ Replicator::Replicator(ReindexerImpl *slave) Replicator::~Replicator() { Stop(); } Error Replicator::Start() { - std::lock_guard lck(masterMtx_); + std::lock_guard lck(masterMtx_); if (master_) { return Error(errLogic, "Replicator is already started"); } @@ -67,7 +62,7 @@ bool Replicator::Configure(const ReplicationConfigData &config) { if (!enabled_.load(std::memory_order_acquire)) { return false; } - std::unique_lock lck(masterMtx_); + std::unique_lock lck(masterMtx_); bool changed = (config_ != config); if (changed) { @@ -79,7 +74,7 @@ bool Replicator::Configure(const ReplicationConfigData &config) { } void Replicator::Stop() { - std::unique_lock lck(masterMtx_); + std::unique_lock lck(masterMtx_); stop(); } @@ -118,7 +113,7 @@ void Replicator::run() { } } { - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); state_.store(StateInit, std::memory_order_release); } @@ -262,7 +257,7 @@ Error Replicator::syncNamespace(const NamespaceDef &ns, std::string_view forceSy LSNPair lastLsn; while (err.ok() && !terminate_) { { - std::unique_lock lck(syncMtx_); + std::unique_lock lck(syncMtx_); auto updatesIt = pendedUpdates_.find(ns.name); if (updatesIt != pendedUpdates_.end()) { if (updatesIt.value().UpdatesLost) { @@ -349,7 +344,7 @@ Error Replicator::syncDatabase() { state_.load()); { - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); state_.store(StateSyncing, std::memory_order_release); resyncUpdatesLostFlag_ = false; transactions_.clear(); @@ -406,7 +401,7 @@ Error Replicator::syncDatabase() { subscribeUpdatesIfRequired(ns.name); { - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); currentSyncNs_ = ns.name; } @@ -482,11 +477,10 @@ Error Replicator::syncNamespaceByWAL(const NamespaceDef &nsDef) { case errOutdatedWAL: // Check if WAL has been outdated, if yes, then force resync return syncNamespaceForced(nsDef, err.what()); - case errOK: { - auto err = applyWAL(slaveNs, qr); + case errOK: + err = applyWAL(slaveNs, qr); if (err.ok()) slave_->syncDownstream(nsDef.name, false); return err; - } case errNoWAL: terminate_ = true; return err; @@ -620,7 +614,15 @@ Error Replicator::applyWAL(Namespace::Ptr &slaveNs, client::QueryResults &qr, co // Simple item updated ser.Reset(); err = it.GetCJSON(ser, false); - if (err.ok()) err = modifyItem(LSNPair(), slaveNs, ser.Slice(), ModeUpsert, qr.getTagsMatcher(0), stat); + if (err.ok()) { + std::unique_lock lck(syncMtx_); + if (auto txIt = transactions_.find(slaveNs.get()); txIt == transactions_.end() || txIt->second.IsFree()) { + lck.unlock(); + err = modifyItem(LSNPair(), slaveNs, ser.Slice(), ModeUpsert, qr.getTagsMatcher(0), stat); + } else { + err = modifyItemTx(LSNPair(), txIt->second, ser.Slice(), ModeUpsert, qr.getTagsMatcher(0), stat); + } + } } } catch (const Error &e) { err = e; @@ -671,7 +673,7 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp switch (rec.type) { // Modify item case WalItemModify: { - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); Transaction &tx = transactions_[slaveNs.get()]; if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Transaction was not initiated.", nsName, config_.serverId); Item item = tx.NewItem(); @@ -683,13 +685,13 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp case WalUpdateQuery: { QueryResults result; Query q = Query::FromSQL(rec.data); - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); Transaction &tx = transactions_[slaveNs.get()]; if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Transaction was not initiated.", nsName, config_.serverId); tx.Modify(std::move(q)); } break; case WalInitTransaction: { - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); Transaction &tx = transactions_[slaveNs.get()]; if (!tx.IsFree()) logPrintf(LogError, "[repl:%s]:%d Init transaction befor commit of previous one.", nsName, config_.serverId); RdxContext rdxContext(true, LSNs); @@ -697,7 +699,7 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp } break; case WalCommitTransaction: { QueryResults res; - std::lock_guard lck(syncMtx_); + std::lock_guard lck(syncMtx_); Transaction &tx = transactions_[slaveNs.get()]; if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Commit of transaction befor initiate it.", nsName, config_.serverId); RdxContext rdxContext(true, LSNs); @@ -705,10 +707,6 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp tx = Transaction{}; } break; case WalTagsMatcher: { - std::lock_guard lck(syncMtx_); - Transaction &tx = transactions_[slaveNs.get()]; - if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Transaction was not initiated.", nsName, config_.serverId); - TagsMatcher tm; Serializer ser(rec.data.data(), rec.data.size()); const auto version = ser.GetVarint(); @@ -716,15 +714,26 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp tm.deserialize(ser, version, stateToken); logPrintf(LogInfo, "[repl:%s]:%d Got new tagsmatcher replicated via tx: { state_token: %08X, version: %d }", nsName, config_.serverId, stateToken, version); + + std::lock_guard lck(syncMtx_); + Transaction &tx = transactions_[slaveNs.get()]; + if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Transaction was not initiated.", nsName, config_.serverId); + tx.SetTagsMatcher(std::move(tm)); } break; + case WalPutMeta: { + std::lock_guard lck(syncMtx_); + Transaction &tx = transactions_[slaveNs.get()]; + if (tx.IsFree()) return Error(errLogic, "[repl:%s]:%d Transaction was not initiated.", nsName, config_.serverId); + + tx.PutMeta(std::string(rec.putMeta.key), rec.putMeta.value); + } break; case WalEmpty: case WalReplState: case WalItemUpdate: case WalIndexAdd: case WalIndexDrop: case WalIndexUpdate: - case WalPutMeta: case WalNamespaceAdd: case WalNamespaceDrop: case WalNamespaceRename: @@ -740,11 +749,11 @@ Error Replicator::applyTxWALRecord(LSNPair LSNs, std::string_view nsName, Namesp } void Replicator::checkNoOpenedTransaction(std::string_view nsName, Namespace::Ptr &slaveNs) { - std::lock_guard lck(syncMtx_); - Transaction &tx = transactions_[slaveNs.get()]; - if (!tx.IsFree()) { + std::lock_guard lck(syncMtx_); + auto txIt = transactions_.find(slaveNs.get()); + if (txIt != transactions_.end() && !txIt->second.IsFree()) { logPrintf(LogError, "[repl:%s]:%d Transaction started but not commited", nsName, config_.serverId); - tx = Transaction{}; + throw Error(errLogic, "Transaction for '%s' was started but was not commited", nsName); } } @@ -760,25 +769,24 @@ Error Replicator::applyWALRecord(LSNPair LSNs, std::string_view nsName, Namespac if (rec.inTransaction) { return applyTxWALRecord(LSNs, nsName, slaveNs, rec); } - RdxContext rdxContext(true, LSNs); - - auto sendSyncAsync = [this](const WALRecord &rec, bool forced) { - NamespaceDef nsDef; - nsDef.FromJSON(giftStr(rec.data)); - syncQueue_.Push(nsDef.name, std::move(nsDef), forced); - walSyncAsync_.send(); - }; if (firstRec && rec.type != WalIndexAdd) { // compatibility with old version err = syncIndexesForced(slaveNs, *firstRec); logPrintf(LogInfo, "[repl:%s]:%d Sync indexes error '%s'", nsName, config_.serverId, err.what()); return err; } + checkNoOpenedTransaction(nsName, slaveNs); + auto sendSyncAsync = [this](const WALRecord &rec, bool forced) { + NamespaceDef nsDef; + nsDef.FromJSON(giftStr(rec.data)); + syncQueue_.Push(nsDef.name, std::move(nsDef), forced); + walSyncAsync_.send(); + }; + RdxContext rdxContext(true, LSNs); switch (rec.type) { // Modify item case WalItemModify: - checkNoOpenedTransaction(nsName, slaveNs); err = modifyItem(LSNs, slaveNs, rec.itemModify.itemCJson, rec.itemModify.modifyMode, master_->NewItem(nsName).impl_->tagsMatcher(), stat); break; @@ -808,7 +816,6 @@ Error Replicator::applyWALRecord(LSNPair LSNs, std::string_view nsName, Namespac // Update query case WalUpdateQuery: { logPrintf(LogTrace, "[repl:%s]:%d WalUpdateQuery", nsName, config_.serverId); - checkNoOpenedTransaction(nsName, slaveNs); QueryResults result; Query q = Query::FromSQL(rec.data); switch (q.type_) { @@ -866,7 +873,6 @@ Error Replicator::applyWALRecord(LSNPair LSNs, std::string_view nsName, Namespac stat.schemasSet++; break; case WalTagsMatcher: { - checkNoOpenedTransaction(nsName, slaveNs); TagsMatcher tm; Serializer ser(rec.data.data(), rec.data.size()); const auto version = ser.GetVarint(); @@ -944,6 +950,36 @@ Error Replicator::modifyItem(LSNPair LSNs, Namespace::Ptr &slaveNs, std::string_ return err; } +Error Replicator::modifyItemTx(LSNPair LSNs, Transaction &tx, std::string_view cjson, int modifyMode, const TagsMatcher &tm, + SyncStat &stat) { + Item item = tx.NewItem(); + Error err = unpackItem(item, LSNs.upstreamLSN_, cjson, tm); + + if (err.ok()) { + switch (modifyMode) { + case ModeDelete: + tx.Delete(std::move(item)); + stat.deleted++; + break; + case ModeInsert: + tx.Insert(std::move(item)); + stat.updated++; + break; + case ModeUpsert: + tx.Upsert(std::move(item)); + stat.updated++; + break; + case ModeUpdate: + tx.Update(std::move(item)); + stat.updated++; + break; + default: + return Error(errNotValid, "Unknown modify mode %d of tx item with lsn %ul", modifyMode, int64_t(LSNs.upstreamLSN_)); + } + } + return err; +} + WrSerializer &Replicator::SyncStat::Dump(WrSerializer &ser) { if (updated) ser << updated << " items updated; "; if (deleted) ser << deleted << " items deleted; "; @@ -1106,7 +1142,7 @@ void Replicator::onWALUpdateImpl(LSNPair LSNs, std::string_view nsName, const WA } void Replicator::OnUpdatesLost(std::string_view nsName) { - std::unique_lock lck(syncMtx_); + std::unique_lock lck(syncMtx_); auto updatesIt = pendedUpdates_.find(nsName); if (updatesIt == pendedUpdates_.end()) { UpdatesData updates; @@ -1128,7 +1164,7 @@ void Replicator::OnConnectionState(const Error &err) { } else { logPrintf(LogInfo, "[repl:] OnConnectionState closed, reason: %s", err.what()); } - std::unique_lock lck(syncMtx_); + std::unique_lock lck(syncMtx_); state_.store(StateInit, std::memory_order_release); resync_.send(); } diff --git a/cpp_src/replicator/replicator.h b/cpp_src/replicator/replicator.h index 5a3949a9d..21665bd0e 100644 --- a/cpp_src/replicator/replicator.h +++ b/cpp_src/replicator/replicator.h @@ -1,16 +1,7 @@ #pragma once -#include -#include -#include "core/dbconfig.h" #include "core/namespace/namespace.h" -#include "core/namespace/namespacestat.h" -#include "estl/atomic_unique_ptr.h" -#include "estl/fast_hash_map.h" #include "net/ev/ev.h" -#include "tools/errors.h" -#include "updatesobserver.h" -#include "vendor/hopscotch/hopscotch_set.h" namespace reindexer { namespace client { @@ -90,6 +81,8 @@ class Replicator : public IUpdatesObserver { void checkNoOpenedTransaction(std::string_view nsName, Namespace::Ptr &slaveNs); // Apply single cjson item Error modifyItem(LSNPair LSNs, Namespace::Ptr &ns, std::string_view cjson, int modifyMode, const TagsMatcher &tm, SyncStat &stat); + // Add single cjson item into tx + Error modifyItemTx(LSNPair LSNs, Transaction &tx, std::string_view cjson, int modifyMode, const TagsMatcher &tm, SyncStat &stat); static Error unpackItem(Item &, lsn_t, std::string_view cjson, const TagsMatcher &tm); // Push update to the queue to apply it later void pushPendingUpdate(LSNPair LSNs, std::string_view nsName, const WALRecord &wrec); diff --git a/cpp_src/replicator/waltracker.h b/cpp_src/replicator/waltracker.h index ebb40118b..381fd0636 100644 --- a/cpp_src/replicator/waltracker.h +++ b/cpp_src/replicator/waltracker.h @@ -46,16 +46,19 @@ class WALTracker { /// Iterator for WAL records class iterator { public: - iterator &operator++() { return idx_++, *this; } - bool operator!=(const iterator &other) const { return idx_ != other.idx_; } + iterator &operator++() noexcept { + ++idx_; + return *this; + } + bool operator!=(const iterator &other) const noexcept { return idx_ != other.idx_; } WALRecord operator*() const { assertf(idx_ % wt_->walSize_ < int(wt_->records_.size()), "idx=%d,wt_->records_.size()=%d,lsnCounter=%d", idx_, wt_->records_.size(), wt_->lsnCounter_); return WALRecord(span(wt_->records_[idx_ % wt_->walSize_])); } - span GetRaw() const { return wt_->records_[idx_ % wt_->walSize_]; } - int64_t GetLSN() const { return idx_; } + span GetRaw() const noexcept { return wt_->records_[idx_ % wt_->walSize_]; } + int64_t GetLSN() const noexcept { return idx_; } int64_t idx_; const WALTracker *wt_; }; diff --git a/cpp_src/server/CMakeLists.txt b/cpp_src/server/CMakeLists.txt index f605f12ef..86b1c6839 100644 --- a/cpp_src/server/CMakeLists.txt +++ b/cpp_src/server/CMakeLists.txt @@ -4,14 +4,14 @@ cmake_minimum_required(VERSION 3.0) project(reindexer_server_library) set (SWAGGER_VERSION "2.x") -set (GH_FACE_VERSION "3.22.0") +set (GH_FACE_VERSION "3.22.1") set (GH_FACE_TAG "v${GH_FACE_VERSION}") set (TARGET reindexer_server_library) set (SERVER_LIB_DIR ${PROJECT_BINARY_DIR} PARENT_SCOPE) file(WRITE ${PROJECT_BINARY_DIR}/swagger_replace.cmake "file(READ ${PROJECT_BINARY_DIR}/swagger/index.html indexhtml) - string(REPLACE \"http://petstore.swagger.io/v2/swagger.json\" \"swagger.yml\" indexhtml \"\${indexhtml}\") + STRING(REPLACE \"http://petstore.swagger.io/v2/swagger.json\" \"swagger.yml\" indexhtml \"\${indexhtml}\") file(WRITE ${PROJECT_BINARY_DIR}/swagger/index.html \"\${indexhtml}\")" ) diff --git a/cpp_src/server/contrib/server.md b/cpp_src/server/contrib/server.md index 5639c8960..fc74a514a 100644 --- a/cpp_src/server/contrib/server.md +++ b/cpp_src/server/contrib/server.md @@ -128,12 +128,11 @@ ## Overview **Reindexer** is an embeddable, in-memory, document-oriented database with a high-level Query builder interface. Reindexer's goal is to provide fast search with complex queries. -The Reindexer is compact and fast. It has not heavy dependencies. Complete reindexer docker image with all libraries and web interface size is just 15MB. -Reindexer is fast. +Reindexer is compact, fast and it does not have heavy dependencies. ### Version information -*Version* : 3.22.0 +*Version* : 3.22.1 ### License information diff --git a/cpp_src/server/contrib/server.yml b/cpp_src/server/contrib/server.yml index af3bd4abb..d3c6dc474 100644 --- a/cpp_src/server/contrib/server.yml +++ b/cpp_src/server/contrib/server.yml @@ -3,9 +3,8 @@ info: description: | **Reindexer** is an embeddable, in-memory, document-oriented database with a high-level Query builder interface. Reindexer's goal is to provide fast search with complex queries. - The Reindexer is compact and fast. It has not heavy dependencies. Complete reindexer docker image with all libraries and web interface size is just 15MB. - Reindexer is fast. - version: "3.22.0" + Reindexer is compact, fast and it does not have heavy dependencies. + version: "3.22.1" title: "Reindexer REST API" license: name: "Apache 2.0" diff --git a/cpp_src/tools/errors.h b/cpp_src/tools/errors.h index a33e3f351..d42a58585 100644 --- a/cpp_src/tools/errors.h +++ b/cpp_src/tools/errors.h @@ -14,6 +14,38 @@ namespace reindexer { +#if defined(REINDEX_CORE_BUILD) +template +void assertf_fmt(const char *fmt, const Args &...args) { + fmt::fprintf(std::cerr, fmt, args...); +} +#if defined(NDEBUG) +#define assertf(...) ((void)0) +#else +// Using (void)fmt here to force ';' usage after the macro +#define assertf(e, fmt, ...) \ + if rx_unlikely (!(e)) { \ + reindexer::assertf_fmt("%s:%d: failed assertion '%s':\n" fmt, __FILE__, __LINE__, #e, __VA_ARGS__); \ + reindexer::debug::print_crash_query(std::cerr); \ + abort(); \ + } \ + (void)fmt +#endif // NDEBUG + +#ifdef RX_WITH_STDLIB_DEBUG +#define assertf_dbg(e, fmt, ...) \ + if rx_unlikely (!(e)) { \ + reindexer::assertf_fmt("%s:%d: failed assertion '%s':\n" fmt, __FILE__, __LINE__, #e, __VA_ARGS__); \ + reindexer::debug::print_crash_query(std::cerr); \ + abort(); \ + } \ + (void)fmt +#else // RX_WITH_STDLIB_DEBUG +#define assertf_dbg(...) ((void)0) +#endif // RX_WITH_STDLIB_DEBUG + +#endif // REINDEX_CORE_BUILD + class Error { using WhatT = intrusive_atomic_rc_wrapper; using WhatPtr = intrusive_ptr; @@ -69,6 +101,7 @@ class Error { try { what_ = make_intrusive(fmt::sprintf(fmt, args...)); } catch (const fmt::FormatError &) { + assertf_dbg(false, "Incorrect error format: '%s'", fmt); what_ = make_intrusive(fmt); } } catch (...) { @@ -78,11 +111,11 @@ class Error { } #endif // REINDEX_CORE_BUILD - [[nodiscard]] const std::string &what() const &noexcept { + [[nodiscard]] const std::string &what() const & noexcept { static const std::string noerr; return what_ ? *what_ : noerr; } - [[nodiscard]] std::string what() &&noexcept { + [[nodiscard]] std::string what() && noexcept { if (what_) { return std::move(*what_); } else { @@ -101,22 +134,4 @@ class Error { ErrorCode code_{errOK}; }; -#if defined(REINDEX_CORE_BUILD) -#if defined(NDEBUG) -#define assertf(...) ((void)0) -#else -template -void assertf_fmt(const char *fmt, const Args &...args) { - fmt::fprintf(std::cerr, fmt, args...); -} - -#define assertf(e, fmt, ...) \ - if rx_unlikely (!(e)) { \ - reindexer::assertf_fmt("%s:%d: failed assertion '%s':\n" fmt, __FILE__, __LINE__, #e, __VA_ARGS__); \ - reindexer::debug::print_crash_query(std::cerr); \ - abort(); \ - } -#endif // NDEBUG -#endif // REINDEX_CORE_BUILD - } // namespace reindexer diff --git a/fulltext.md b/fulltext.md index 1d13e8b53..80b0c8305 100644 --- a/fulltext.md +++ b/fulltext.md @@ -27,6 +27,7 @@ Reindexer has builtin full text search engine. This document describes usage of - [Stopwords details](#stopwords-details) - [Detailed typos config](#detailed-typos-config) - [Base ranking config](#base-ranking-config) + - [Basic document ranking algorithms](#basic-document-ranking-algorithms) - [Limitations and know issues](#limitations-and-know-issues) @@ -378,11 +379,7 @@ Several parameters of full text search engine can be configured from application | | MaxTotalAreasToCache | int | Max total number of highlighted areas in ft result, when result still remains cacheable. '-1' means unlimited | -1 | | | Optimization | string | Optimize the index by 'memory' or by 'cpu' | "memory" | | | FtBaseRanking | struct | Relevance of the word in different forms | | -| | Bm25Config | struct | Document ranking function parameters | | -| | Bm25Config.Bm25k1 | float | Coefficient k1 in the formula for calculating bm25 (used only for rx_bm25, bm25). Сoefficient that sets the saturation threshold for the frequency of the term. The higher the coefficient, the higher the threshold and the lower the saturation rate. | 2.0 | -| | Bm25Config.Bm25b | float | Coefficient b in the formula for calculating bm25 (used only for rx_bm25, bm25). If b is bigger, the effects of the length of the document compared to the average length are more amplified. | 0.75 | -| | Bm25Config.Bm25Type | string | Formula for calculating document relevance (rx_bm25, bm25, word_count) | "rx_bm25" | - +| | Bm25Config | struct | Document ranking function parameters [More...](#basic-document-ranking-algorithms) | | ### Stopwords details The list item can be either a string or a structure containing a string (the stopword) and a bool attribute (`is_morpheme`) indicating whether the stopword can be part of a word that can be shown in query-results. @@ -446,6 +443,47 @@ FtBaseRanking: config for the base relevancy of the word in different forms. | | Translit | int | Relevancy of the match in translit | 90 | | | Synonyms | int | Relevancy of synonyms match | 95 | +### Basic document ranking algorithms + +For basic document ranking, one of the following algorithms can be used: + +- `bm25` +- `bm25_rx` +- `word_count` + +Calculation formula for `bm25`: + +``` +R = (log(totalDocCount / (matchedDocCount + 1)) + 1) * termCountInDoc / wordsInDoc * (k1 + 1.0) / (termCountInDoc / wordsInDoc + k1 * (1.0 - b_ + b_ * wordsInDoc / avgDocLen)) +``` + +Calculation formula for `bm25_rx`: + +``` +R = (log(totalDocCount / (matchedDocCount + 1)) + 1) * termCountInDoc * (k1 + 1.0) / (termCountInDoc + k1 * (1.0 - b_ + b_ * wordsInDoc / avgDocLen)) +``` + +Calculation formula for `word_count`: + +``` +R = termCountInDoc +``` + +- `totalDocCount` - total number of documents +- `matchedDocCount` - number of documents in which a subform of the word was found +- `termCountInDoc` - number of words found in the document for the query word subform +- `wordsInDoc` - number of words in document +- `k1` - free coefficient (Сoefficient that sets the saturation threshold for the frequency of the term. The higher the coefficient, the higher the threshold and the lower the saturation rate.) +- `b` - free coefficient (If b is bigger, the effects of the length of the document compared to the average length are more amplified.) + +| | Parameter name | Type | Description | Default value | +|---|:---------------------:|:--------:|:---------------------------------------------------------------------------------------:|:-------------:| +| | Bm25k1 | float | Coefficient k1 in the formula for calculating bm25 (used only for rx_bm25, bm25). | 2.0 | +| | Bm25b | float | Coefficient b in the formula for calculating bm25 (used only for rx_bm25, bm25). | 0.75 | +| | Bm25Type | string | Formula for calculating document relevance (rx_bm25, bm25, word_count) | "rx_bm25" | + + + ### Limitations and know issues - Results of full text search is always sorted by relevancy.