Skip to content

Commit

Permalink
Merge branch 'feature/lsn-refactoring-v3' into 'develop'
Browse files Browse the repository at this point in the history
LSN refactoring v3

See merge request itv-backend/reindexer!1443
  • Loading branch information
reindexer-bot committed Oct 31, 2023
1 parent 8fe18b8 commit 180ed49
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 43 deletions.
195 changes: 195 additions & 0 deletions cpp_src/cmd/reindexer_server/test/test_storage_compatibility.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/bin/bash
# Task: https://github.com/restream/reindexer/-/issues/1188
set -e

function KillAndRemoveServer {
local pid=$1
kill $pid
wait $pid
yum remove -y 'reindexer*' > /dev/null
}

function WaitForDB {
# wait until DB is loaded
set +e # disable "exit on error" so the script won't stop when DB's not loaded yet
is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list');
while [[ $is_connected != "test" ]]
do
sleep 2
is_connected=$(reindexer_tool --dsn $ADDRESS --command '\databases list');
done
set -e
}

function CompareNamespacesLists {
local ns_list_actual=$1
local ns_list_expected=$2
local pid=$3

diff=$(echo ${ns_list_actual[@]} ${ns_list_expected[@]} | tr ' ' '\n' | sort | uniq -u) # compare in any order
if [ "$diff" == "" ]; then
echo "## PASS: namespaces list not changed"
else
echo "##### FAIL: namespaces list was changed"
echo "expected: $ns_list_expected"
echo "actual: $ns_list_actual"
KillAndRemoveServer $pid;
exit 1
fi
}

function CompareMemstats {
local actual=$1
local expected=$2
local pid=$3
diff=$(echo ${actual[@]} ${expected[@]} | tr ' ' '\n' | sed 's/\(.*\),$/\1/' | sort | uniq -u) # compare in any order
if [ "$diff" == "" ]; then
echo "## PASS: memstats not changed"
else
echo "##### FAIL: memstats was changed"
echo "expected: $expected"
echo "actual: $actual"
KillAndRemoveServer $pid;
exit 1
fi
}


RX_SERVER_CURRENT_VERSION_RPM="$(basename build/reindexer-*server*.rpm)"
VERSION_FROM_RPM=$(echo "$RX_SERVER_CURRENT_VERSION_RPM" | grep -o '.*server-..')
VERSION=$(echo ${VERSION_FROM_RPM: -2:1}) # one-digit version

echo "## choose latest release rpm file"
if [ $VERSION == 3 ]; then
LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 3)
namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg'
elif [ $VERSION == 4 ]; then
LATEST_RELEASE=$(python3 cpp_src/cmd/reindexer_server/test/get_last_rx_version.py -v 4)
# replicationstats ns added for v4
namespaces_list_expected=$'purchase_options_ext_dict\nchild_account_recommendations\n#config\n#activitystats\n#replicationstats\nradio_channels\ncollections\n#namespaces\nwp_imports_tasks\nepg_genres\nrecom_media_items_personal\nrecom_epg_archive_default\n#perfstats\nrecom_epg_live_default\nmedia_view_templates\nasset_video_servers\nwp_tasks_schedule\nadmin_roles\n#clientsstats\nrecom_epg_archive_personal\nrecom_media_items_similars\nmenu_items\naccount_recommendations\nkaraoke_items\nmedia_items\nbanners\n#queriesperfstats\nrecom_media_items_default\nrecom_epg_live_personal\nservices\n#memstats\nchannels\nmedia_item_recommendations\nwp_tasks_tasks\nepg'
else
echo "Unknown version"
exit 1
fi

echo "## downloading latest release rpm file: $LATEST_RELEASE"
curl "http://repo.itv.restr.im/itv-api-ng/7/x86_64/$LATEST_RELEASE" --output $LATEST_RELEASE;
echo "## downloading example DB"
curl "https://git.restream.ru/MaksimKravchuk/reindexer_testdata/-/raw/master/big.zip" --output big.zip;
unzip -o big.zip # unzips into mydb_big.rxdump;

ADDRESS="cproto://127.0.0.1:6534/"
DB_NAME="test"

memstats_expected=$'[
{"replication":{"data_hash":24651210926,"data_count":3}},
{"replication":{"data_hash":6252344969,"data_count":1}},
{"replication":{"data_hash":37734732881,"data_count":28}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":1024095024522,"data_count":1145}},
{"replication":{"data_hash":8373644068,"data_count":1315}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":7404222244,"data_count":97}},
{"replication":{"data_hash":94132837196,"data_count":4}},
{"replication":{"data_hash":1896088071,"data_count":2}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":-672103903,"data_count":33538}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":6833710705,"data_count":1}},
{"replication":{"data_hash":5858155773472,"data_count":4500}},
{"replication":{"data_hash":-473221280268823592,"data_count":65448}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":8288213744,"data_count":3}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":0,"data_count":0}},
{"replication":{"data_hash":354171024786967,"data_count":3941}},
{"replication":{"data_hash":-6520334670,"data_count":35886}},
{"replication":{"data_hash":112772074632,"data_count":281}},
{"replication":{"data_hash":-12679568198538,"data_count":1623116}}
]
Returned 27 rows'

echo "##### Forward compatibility test #####"

DB_PATH=$(pwd)"/rx_db"

echo "Database: "$DB_PATH

echo "## installing latest release: $LATEST_RELEASE"
yum install -y $LATEST_RELEASE > /dev/null;
# run RX server with disabled logging
reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

reindexer_tool --dsn $ADDRESS$DB_NAME -f mydb_big.rxdump --createdb;
sleep 1;

namespaces_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_1;
CompareNamespacesLists "${namespaces_1[@]}" "${namespaces_list_expected[@]}" $server_pid;

memstats_1=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_1[@]}" "${memstats_expected[@]}" $server_pid;

KillAndRemoveServer $server_pid;

echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM"
yum install -y build/*.rpm > /dev/null;
reindexer_server -l0 --corelog=none --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

WaitForDB

namespaces_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_2;
CompareNamespacesLists "${namespaces_2[@]}" "${namespaces_1[@]}" $server_pid;

memstats_2=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_2[@]}" "${memstats_1[@]}" $server_pid;

KillAndRemoveServer $server_pid;
rm -rf $DB_PATH;
sleep 1;

echo "##### Backward compatibility test #####"

echo "## installing current version: $RX_SERVER_CURRENT_VERSION_RPM"
yum install -y build/*.rpm > /dev/null;
reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

reindexer_tool --dsn $ADDRESS$DB_NAME -f mydb_big.rxdump --createdb;
sleep 1;

namespaces_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_3;
CompareNamespacesLists "${namespaces_3[@]}" "${namespaces_list_expected[@]}" $server_pid;

memstats_3=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_3[@]}" "${memstats_expected[@]}" $server_pid;

KillAndRemoveServer $server_pid;

echo "## installing latest release: $LATEST_RELEASE"
yum install -y $LATEST_RELEASE > /dev/null;
reindexer_server -l warning --httplog=none --rpclog=none --db $DB_PATH &
server_pid=$!
sleep 2;

WaitForDB

namespaces_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command '\namespaces list');
echo $namespaces_4;
CompareNamespacesLists "${namespaces_4[@]}" "${namespaces_3[@]}" $server_pid;

memstats_4=$(reindexer_tool --dsn $ADDRESS$DB_NAME --command 'select replication.data_hash, replication.data_count from #memstats');
CompareMemstats "${memstats_4[@]}" "${memstats_3[@]}" $server_pid;

KillAndRemoveServer $server_pid;
rm -rf $DB_PATH;
6 changes: 4 additions & 2 deletions cpp_src/core/lsn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

namespace reindexer {

void lsn_t::GetJSON(JsonBuilder &builder) const {
void lsn_t::GetJSON(JsonBuilder& builder) const {
builder.Put("server_id", Server());
builder.Put("counter", Counter());
}

} // namespace reindexer
[[noreturn]] void lsn_t::throwValidation(ErrorCode code, const char* fmt, int64_t value) { throw Error(code, fmt, value); }

} // namespace reindexer
92 changes: 51 additions & 41 deletions cpp_src/core/lsn.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,53 @@ class JsonBuilder;
// SSS NNN NNN NNN NNN NNN (18 decimal digits)

struct lsn_t {
static const int64_t digitCountLSNMult = 1000000000000000ll;
private:
static constexpr int16_t kMinServerIDValue = 0;
static constexpr int16_t kMaxServerIDValue = 999;

static const int64_t kCounterbitCount = 48;
static const int64_t kCounterMask = (1ull << kCounterbitCount) - 1ull;
static constexpr int64_t kMaxCounter = 1000000000000000ll;
static constexpr int64_t kDefaultCounter = kMaxCounter - 1;

public:
void GetJSON(JsonBuilder &builder) const;

void FromJSON(const gason::JsonNode &root) {
int server = root["server_id"].As<int>(0);
int64_t counter = root["counter"].As<int64_t>(digitCountLSNMult - 1ll);
const int server = root["server_id"].As<int>(0);
const int64_t counter = root["counter"].As<int64_t>(kDefaultCounter);
payload_ = int64_t(lsn_t(counter, server));
}

lsn_t() {}
explicit lsn_t(int64_t v) {
if ((v & kCounterMask) == kCounterMask) // init -1
payload_ = digitCountLSNMult - 1ll;
else {
payload_ = v;
}
}
lsn_t(int64_t counter, uint8_t server) {
if ((counter & kCounterMask) == kCounterMask) counter = digitCountLSNMult - 1ll;
int64_t s = server * digitCountLSNMult;
payload_ = s + counter;
lsn_t() noexcept = default;
lsn_t(const lsn_t &) noexcept = default;
lsn_t(lsn_t &&) noexcept = default;
lsn_t &operator=(const lsn_t &) noexcept = default;
lsn_t &operator=(lsn_t &&) noexcept = default;
explicit lsn_t(int64_t v) : lsn_t(v % kMaxCounter, v / kMaxCounter) {}
lsn_t(int64_t counter, int16_t server) {
validateCounter(counter);
validateServerId(server);
payload_ = server * kMaxCounter + counter;
}
explicit operator int64_t() const { return payload_; }

bool operator==(lsn_t o) { return payload_ == o.payload_; }
bool operator!=(lsn_t o) { return payload_ != o.payload_; }
bool operator==(lsn_t o) const noexcept { return payload_ == o.payload_; }
bool operator!=(lsn_t o) const noexcept { return payload_ != o.payload_; }

int64_t SetServer(short s) {
if (s > 999) throw Error(errLogic, "Server id > 999");
int64_t server = s * digitCountLSNMult;
int64_t serverOld = payload_ / digitCountLSNMult;
payload_ = payload_ - serverOld * digitCountLSNMult + server;
int64_t SetServer(short server) {
validateServerId(server);
payload_ = server * kMaxCounter + Counter();
return payload_;
}
int64_t SetCounter(int64_t c) {
if (c >= digitCountLSNMult) throw Error(errLogic, "LSN Counter > digitCountLSNMult");
int64_t server = payload_ / digitCountLSNMult;
payload_ = server * digitCountLSNMult + c;
int64_t SetCounter(int64_t counter) {
validateCounter(counter);
payload_ = Server() * kMaxCounter + counter;
return payload_;
}
int64_t Counter() const {
int64_t server = payload_ / digitCountLSNMult;
return payload_ - server * digitCountLSNMult;
}
short Server() const { return payload_ / digitCountLSNMult; }
bool isEmpty() const { return Counter() == digitCountLSNMult - 1ll; }
int64_t Counter() const noexcept { return payload_ % kMaxCounter; }
int16_t Server() const noexcept { return payload_ / kMaxCounter; }
bool isEmpty() const noexcept { return Counter() == kDefaultCounter; }

int compare(lsn_t o) {
int compare(lsn_t o) const {
if (Server() != o.Server()) throw Error(errLogic, "Compare lsn from different server");
if (Counter() < o.Counter())
return -1;
Expand All @@ -73,13 +68,28 @@ struct lsn_t {
return 0;
}

bool operator<(lsn_t o) { return compare(o) == -1; }
bool operator<=(lsn_t o) { return compare(o) <= 0; }
bool operator>(lsn_t o) { return compare(o) == 1; }
bool operator>=(lsn_t o) { return compare(o) >= 0; }
bool operator<(lsn_t o) const { return compare(o) == -1; }
bool operator<=(lsn_t o) const { return compare(o) <= 0; }
bool operator>(lsn_t o) const { return compare(o) == 1; }
bool operator>=(lsn_t o) const { return compare(o) >= 0; }

private:
int64_t payload_ = kDefaultCounter;
static void validateServerId(int16_t server) {
if (server < kMinServerIDValue) {
throwValidation(errLogic, "Server id < %d", kMinServerIDValue);
}
if (server > kMaxServerIDValue) {
throwValidation(errLogic, "Server id > %d", kMaxServerIDValue);
}
}
static void validateCounter(int64_t counter) {
if (counter > kDefaultCounter) {
throwValidation(errLogic, "LSN Counter > Default LSN (%d)", kMaxCounter);
}
}

protected:
int64_t payload_ = digitCountLSNMult - 1ll;
[[noreturn]] static void throwValidation(ErrorCode, const char *, int64_t);
};

struct LSNPair {
Expand Down

0 comments on commit 180ed49

Please sign in to comment.