Skip to content

Commit

Permalink
[ref] Use pdqsort in few more places
Browse files Browse the repository at this point in the history
  • Loading branch information
reindexer-bot committed Nov 14, 2023
1 parent 8fe18b8 commit 78bd37f
Show file tree
Hide file tree
Showing 39 changed files with 476 additions and 227 deletions.
14 changes: 5 additions & 9 deletions cpp_src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,12 @@ include (TargetArch)
target_architecture(COMPILER_TARGET_ARCH)

# Configure compile options
string(REPLACE "-DNDEBUG" "" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
string(REPLACE "-O2" "-O3" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
string(REPLACE "-O2" "-O3" CMAKE_C_FLAGS "${CMAKE_C_FLAGS}")
if (NOT ${COMPILER_TARGET_ARCH} STREQUAL "e2k")
string(REPLACE "-g" "-g1" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
else()
string(REPLACE "-g" "-g0" CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO}")
endif()

set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g1")
set (CMAKE_C_FLAGS_RELWITHDEBINFO "-O3 -g1")
set (CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")
set (CMAKE_C_FLAGS_RELEASE "-O3 -DNDEBUG")
if (${COMPILER_TARGET_ARCH} STREQUAL "e2k")
set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g0")
add_definitions(-D__E2K__)
add_definitions(-D__LCC__)
endif()
Expand Down
195 changes: 195 additions & 0 deletions cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/bin/bash
# Task: https://github.com/restream/reindexer/-/issues/1188
set -e

function KillAndRemoveServer {
local pid=$1
kill $pid
wait $pid
yum remove -y 'reindexer*' > /dev/null
}

function WaitForDB {
# wait until DB is loaded
set +e # disable "exit on error" so the script won't stop when DB's not loaded yet
is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list');
while [[ $is_connected != "test" ]]
do
sleep 2
is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list');
done
set -e
}

function CompareNamespacesLists {
local ns_list_actual=$1
local ns_list_expected=$2
local pid=$3

diff=$(echo ${ns_list_actual[@]} ${ns_list_expected[@]} | tr ' ' '\n' | sort | uniq -u) # compare in any order
if [ "$diff" == "" ]; then
echo "## PASS: namespaces list not changed"
else
echo "##### FAIL: namespaces list was changed"
echo "expected: $ns_list_expected"
echo "actual: $ns_list_actual"
KillAndRemoveServer $pid;
exit 1
fi
}

function CompareMemstats {
local actual=$1
local expected=$2
local pid=$3
diff=$(echo ${actual[@]} ${expected[@]} | tr ' ' '\n' | sed 's/\(.*\),$/\1/' | sort | uniq -u) # compare in any order
if [ "$diff" == "" ]; then
echo "## PASS: memstats not changed"
else
echo "##### FAIL: memstats was changed"
echo "expected: $expected"
echo "actual: $actual"
KillAndRemoveServer $pid;
exit 1
fi
}


RX_SERVER_CURRENT_VERSION_RPM="$(basename build/reindexer-*server*.rpm)"
VERSION_FROM_RPM=$(echo "$RX_SERVER_CURRENT_VERSION_RPM" | grep -o '.*server-..')
VERSION=$(echo ${VERSION_FROM_RPM: -2:1}) # one-digit version

echo "## choose latest release rpm file"
if [ $VERSION == 3 ]; then
LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 3)
namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg'
elif [ $VERSION == 4 ]; then
LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 4)
# replicationstats ns added for v4
namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\n#replicationstats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg'
else
echo "Unknown version"
exit 1
fi

echo "## downloading latest release rpm file: $LATEST_RELEASE"
curl "http://repo.itv.restr.im/itv-api-ng/7/x86_64/$LATEST_RELEASE" --output $LATEST_RELEASE;
echo "## downloading example DB"
curl "https://git.restream.ru/MaksimKravchuk/reindexer_testdata/-/raw/master/big.zip" --output big.zip;
unzip -o big.zip # unzips into mydb_big.rxdump;

ADDRESS="cproto://127.0.0.1:6534/"
DB_NAME="test"

memstats_expected=$'[
{"replication":{"data_hash":24651210926,"data_count":3}},
{"replication":{"data_hash":6252344969,"data_count":1}},
{"replication":{"data_hash":37734732881,"data_count":28}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":1024095024522,"data_count":1145}},
{"replication":{"data_hash":8373644068,"data_count":1315}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":7404222244,"data_count":97}},
{"replication":{"data_hash":94132837196,"data_count":4}},
{"replication":{"data_hash":1896088071,"data_count":2}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":-672103903,"data_count":33538}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":6833710705,"data_count":1}},
{"replication":{"data_hash":5858155773472,"data_count":4500}},
{"replication":{"data_hash":-473221280268823592,"data_count":65448}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":8288213744,"data_count":3}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":354171024786967,"data_count":3941}},
{"replication":{"data_hash":-6520334670,"data_count":35886}},
{"replication":{"data_hash":112772074632,"data_count":281}},
{"replication":{"data_hash":-12679568198538,"data_count":1623116}}
]
Returned 27 rows'

echo "##### Forward compatibility test #####"

DB_PATH=$(pwd)"/rx_db"

echo "Database: "$DB_PATH

echo "## installing latest release: $LATEST_RELEASE"
yum install -y $LATEST_RELEASE > /dev/null;
# run RX server with disabled logging
reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

reindexer_tool --dsn $ADDRESS$DB_NAME -f mydb_big.rxdump --createdb;
sleep 1;

namespaces_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_1;
CompareNamespacesLists "${namespaces_1[@]}" "${namespaces_list_expected[@]}" $server_pid;

memstats_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_1[@]}" "${memstats_expected[@]}" $server_pid;

KillAndRemoveServer $server_pid;

echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM"
yum install -y build/*.rpm > /dev/null;
reindexer_server -l0 --corelog=none --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

WaitForDB

namespaces_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_2;
CompareNamespacesLists "${namespaces_2[@]}" "${namespaces_1[@]}" $server_pid;

memstats_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_2[@]}" "${memstats_1[@]}" $server_pid;

KillAndRemoveServer $server_pid;
rm -rf $DB_PATH;
sleep 1;

echo "##### Backward compatibility test #####"

echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM"
yum install -y build/*.rpm > /dev/null;
reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

reindexer_tool --dsn $ADDRESS$DB_NAME -f mydb_big.rxdump --createdb;
sleep 1;

namespaces_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_3;
CompareNamespacesLists "${namespaces_3[@]}" "${namespaces_list_expected[@]}" $server_pid;

memstats_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_3[@]}" "${memstats_expected[@]}" $server_pid;

KillAndRemoveServer $server_pid;

echo "## installing latest release: $LATEST_RELEASE"
yum install -y $LATEST_RELEASE > /dev/null;
reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

WaitForDB

namespaces_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_4;
CompareNamespacesLists "${namespaces_4[@]}" "${namespaces_3[@]}" $server_pid;

memstats_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_4[@]}" "${memstats_3[@]}" $server_pid;

KillAndRemoveServer $server_pid;
rm -rf $DB_PATH;
1 change: 1 addition & 0 deletions cpp_src/core/defnsconfigs.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const std::vector<std::string> kDefDBConfig = {
"replication":{
"role":"none",
"master_dsn":"cproto://127.0.0.1:6534/db",
"server_id":0,
"cluster_id":2,
"force_sync_on_logic_error": false,
"force_sync_on_wrong_data_hash": false,
Expand Down
3 changes: 2 additions & 1 deletion cpp_src/core/ft/areaholder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class AreaBuffer {
[[nodiscard]] bool Empty() const noexcept { return data_.empty(); }
void Commit() {
if (!data_.empty()) {
boost::sort::pdqsort(data_.begin(), data_.end(), [](const Area &rhs, const Area &lhs) { return rhs.start < lhs.start; });
boost::sort::pdqsort_branchless(data_.begin(), data_.end(),
[](const Area &rhs, const Area &lhs) noexcept { return rhs.start < lhs.start; });
for (auto vit = data_.begin() + 1; vit != data_.end(); ++vit) {
auto prev = vit - 1;
if (vit->Concat(*prev)) {
Expand Down
31 changes: 16 additions & 15 deletions cpp_src/core/ft/ft_fast/selecter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ void Selecter<IdCont>::processLowRelVariants(FtSelectContext& ctx, const FtMerge
return false;
});
} else {
boost::sort::pdqsort(ctx.lowRelVariants.begin(), ctx.lowRelVariants.end(),
[](FtBoundVariantEntry& l, FtBoundVariantEntry& r) noexcept { return l.proc > r.proc; });
boost::sort::pdqsort_branchless(ctx.lowRelVariants.begin(), ctx.lowRelVariants.end(),
[](FtBoundVariantEntry& l, FtBoundVariantEntry& r) noexcept { return l.proc > r.proc; });
}

auto lastVariantLen = ctx.lowRelVariants.size() ? ctx.lowRelVariants[0].GetLenCached() : -1;
Expand Down Expand Up @@ -790,7 +790,7 @@ std::pair<double, int> Selecter<IdCont>::calcTermRank(const TextSearchResults& r
if (!termRank) return std::make_pair(termRank, field);

if (holder_.cfg_->summationRanksByFieldsRatio > 0) {
std::sort(ranksInFields.begin(), ranksInFields.end());
boost::sort::pdqsort_branchless(ranksInFields.begin(), ranksInFields.end());
double k = holder_.cfg_->summationRanksByFieldsRatio;
for (auto rank : ranksInFields) {
termRank += (k * rank);
Expand Down Expand Up @@ -921,9 +921,10 @@ void Selecter<IdCont>::mergeIterationGroup(TextSearchResults& rawRes, index_t ra
mergedPosInfo.rank = 0;
} else {
auto& posTmp = mergedPosInfo.posTmp;
boost::sort::pdqsort(
posTmp.begin(), posTmp.end(),
[](const std::pair<IdRelType::PosType, int>& l, const std::pair<IdRelType::PosType, int>& r) { return l.first < r.first; });
boost::sort::pdqsort_branchless(posTmp.begin(), posTmp.end(),
[](const std::pair<IdRelType::PosType, int>& l,
const std::pair<IdRelType::PosType, int>& r) noexcept { return l.first < r.first; });

auto last = std::unique(posTmp.begin(), posTmp.end());
posTmp.resize(last - posTmp.begin());

Expand Down Expand Up @@ -984,9 +985,9 @@ void Selecter<IdCont>::mergeResultsPart(std::vector<TextSearchResults>& rawResul
merged.maxRank = m.proc;
}
}

boost::sort::pdqsort(merged.begin(), merged.end(),
[](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) { return lhs.proc > rhs.proc; });
boost::sort::pdqsort_branchless(
merged.begin(), merged.end(),
[](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) noexcept { return lhs.proc > rhs.proc; });
}

template <typename IdCont>
Expand Down Expand Up @@ -1244,12 +1245,11 @@ typename IDataHolder::MergeData Selecter<IdCont>::mergeResults(std::vector<TextS
std::vector<IDataHolder::MergedIdRel> merged_rd;

std::vector<IDataHolder::MergedOffsetT> idoffsets;

for (auto& rawRes : rawResults) {
boost::sort::pdqsort(rawRes.begin(), rawRes.end(),
[](const TextSearchResult& lhs, const TextSearchResult& rhs) { return lhs.proc_ > rhs.proc_; });
boost::sort::pdqsort_branchless(
rawRes.begin(), rawRes.end(),
[](const TextSearchResult& lhs, const TextSearchResult& rhs) noexcept { return lhs.proc_ > rhs.proc_; });
}

const auto maxMergedSize = std::min(size_t(holder_.cfg_->mergeLimit), totalORVids);
merged.reserve(maxMergedSize);

Expand Down Expand Up @@ -1332,8 +1332,9 @@ typename IDataHolder::MergeData Selecter<IdCont>::mergeResults(std::vector<TextS
}
}

boost::sort::pdqsort(merged.begin(), merged.end(),
[](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) { return lhs.proc > rhs.proc; });
boost::sort::pdqsort_branchless(
merged.begin(), merged.end(),
[](const IDataHolder::MergeInfo& lhs, const IDataHolder::MergeInfo& rhs) noexcept { return lhs.proc > rhs.proc; });
return merged;
}

Expand Down
2 changes: 1 addition & 1 deletion cpp_src/core/ft/ft_fuzzy/merger/basemerger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ SearchResult BaseMerger::Merge(MergeCtx& ctx, bool inTransaction, const reindexe
data_set.AddData(it->Id(), id_ctx);
}
}
boost::sort::pdqsort(data_set.data_->begin(), data_set.data_->end(), [](const MergedData& lhs, const MergedData& rhs) {
boost::sort::pdqsort(data_set.data_->begin(), data_set.data_->end(), [](const MergedData& lhs, const MergedData& rhs) noexcept {
if (lhs.proc_ == rhs.proc_) {
return lhs.id_ < rhs.id_;
}
Expand Down
6 changes: 0 additions & 6 deletions cpp_src/core/ft/idrelset.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include "idrelset.h"
#include <algorithm>
#include "estl/h_vector.h"
Expand Down Expand Up @@ -84,9 +83,4 @@ int IdRelSet::Add(VDocIdType id, int pos, int field) {
return back().Size();
}

void IdRelType::SimpleCommit() {
boost::sort::pdqsort(pos_.begin(), pos_.end(),
[](const IdRelType::PosType& lhs, const IdRelType::PosType& rhs) { return lhs.pos() < rhs.pos(); });
}

} // namespace reindexer
12 changes: 8 additions & 4 deletions cpp_src/core/ft/idrelset.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ class IdRelType {
addField(field);
}
void SortAndUnique() {
boost::sort::pdqsort(pos_.begin(), pos_.end());
boost::sort::pdqsort_branchless(pos_.begin(), pos_.end());
auto last = std::unique(pos_.begin(), pos_.end());
pos_.resize(last - pos_.begin());
}
void Clear() {
void Clear() noexcept {
usedFieldsMask_ = 0;
#ifdef REINDEXER_FT_EXTRA_DEBUG
pos_.clear<false>();
Expand All @@ -116,7 +116,11 @@ class IdRelType {
}
size_t Size() const noexcept { return pos_.size(); }
size_t size() const noexcept { return pos_.size(); }
void SimpleCommit();
void SimpleCommit() noexcept {
boost::sort::pdqsort_branchless(
pos_.begin(), pos_.end(),
[](const IdRelType::PosType& lhs, const IdRelType::PosType& rhs) noexcept { return lhs.pos() < rhs.pos(); });
}
const RVector<PosType, 3>& Pos() const noexcept { return pos_; }
uint64_t UsedFieldsMask() const noexcept { return usedFieldsMask_; }
size_t HeapSize() const noexcept { return heapSize(pos_); }
Expand All @@ -141,7 +145,7 @@ class IdRelType {
class IdRelSet : public std::vector<IdRelType> {
public:
int Add(VDocIdType id, int pos, int field);
void SimpleCommit() {
void SimpleCommit() noexcept {
for (auto& val : *this) val.SimpleCommit();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp_src/core/idset.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class IdSet : public IdSetPlain {
return *this;
}
static Ptr BuildFromUnsorted(base_idset &&ids) {
boost::sort::pdqsort(ids.begin(), ids.end());
boost::sort::pdqsort_branchless(ids.begin(), ids.end());
ids.erase(std::unique(ids.begin(), ids.end()), ids.end()); // TODO: It would be better to integrate unique into sort
return make_intrusive<intrusive_atomic_rc_wrapper<IdSet>>(std::move(ids));
}
Expand Down
2 changes: 1 addition & 1 deletion cpp_src/core/index/indextext/fastindextext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ template <typename T>
IndexMemStat FastIndexText<T>::GetMemStat(const RdxContext &ctx) {
auto ret = IndexUnordered<T>::GetMemStat(ctx);

contexted_shared_lock lck(this->mtx_, &ctx);
contexted_shared_lock lck(this->mtx_, ctx);
ret.fulltextSize = this->holder_->GetMemStat();
ret.idsetCache = this->cache_ft_ ? this->cache_ft_->GetMemStat() : LRUCacheMemStat();
return ret;
Expand Down
Loading

0 comments on commit 78bd37f

Please sign in to comment.