Skip to content

Commit

Permalink
Merge pull request #103 from SparqNet/state_dump
Browse files Browse the repository at this point in the history
State dump
  • Loading branch information
itamarcps authored Apr 25, 2024
2 parents a2b507b + 96a6ed5 commit d2521c8
Show file tree
Hide file tree
Showing 41 changed files with 1,943 additions and 1,524 deletions.
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") # Always look for static libraries - "ZLIB
set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # For clang-tidy

# Set project version inside the code (forcefully so changes in the .in file are always reflected correctly to the compiler)
if (EXISTS ${CMAKE_SOURCE_DIR}/src/utils/options.h)
file(REMOVE ${CMAKE_SOURCE_DIR}/src/utils/options.h)
endif()
# if (EXISTS ${CMAKE_SOURCE_DIR}/src/utils/options.h)
# file(REMOVE ${CMAKE_SOURCE_DIR}/src/utils/options.h)
# endif()
configure_file(
${CMAKE_SOURCE_DIR}/src/utils/options.h.in
${CMAKE_SOURCE_DIR}/src/utils/options.h
Expand Down
2 changes: 1 addition & 1 deletion scripts/AIO-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ if [ "$ONLY_DEPLOY" = false ]; then
## Build the project
cd build_local_testnet
cmake -DDEBUG=$DEBUG ..
cmake --build . --target orbitersdkd orbitersdkd-discovery -- -j${CORES}
make -j${CORES}
fi

if [ "$DEPLOY" = true ]; then
Expand Down
6 changes: 5 additions & 1 deletion src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ if(BUILD_AVALANCHEGO)
set(CORE_HEADERS
${CMAKE_SOURCE_DIR}/src/core/blockchain.h
# ${CMAKE_SOURCE_DIR}/src/core/snowmanVM.h
${CMAKE_SOURCE_DIR}/src/core/state.h
${CMAKE_SOURCE_DIR}/src/core/dump.h
${CMAKE_SOURCE_DIR}/src/core/storage.h
${CMAKE_SOURCE_DIR}/src/core/state.h
${CMAKE_SOURCE_DIR}/src/core/rdpos.h
PARENT_SCOPE
)

set(CORE_SOURCES
${CMAKE_SOURCE_DIR}/src/core/blockchain.cpp
# ${CMAKE_SOURCE_DIR}/src/core/snowmanVM.cpp
${CMAKE_SOURCE_DIR}/src/core/dump.cpp
${CMAKE_SOURCE_DIR}/src/core/state.cpp
${CMAKE_SOURCE_DIR}/src/core/storage.cpp
${CMAKE_SOURCE_DIR}/src/core/rdpos.cpp
Expand All @@ -19,6 +21,7 @@ if(BUILD_AVALANCHEGO)
else()
set(CORE_HEADERS
${CMAKE_SOURCE_DIR}/src/core/blockchain.h
${CMAKE_SOURCE_DIR}/src/core/dump.h
${CMAKE_SOURCE_DIR}/src/core/state.h
${CMAKE_SOURCE_DIR}/src/core/storage.h
${CMAKE_SOURCE_DIR}/src/core/rdpos.h
Expand All @@ -27,6 +30,7 @@ else()

set(CORE_SOURCES
${CMAKE_SOURCE_DIR}/src/core/blockchain.cpp
${CMAKE_SOURCE_DIR}/src/core/dump.cpp
${CMAKE_SOURCE_DIR}/src/core/state.cpp
${CMAKE_SOURCE_DIR}/src/core/storage.cpp
${CMAKE_SOURCE_DIR}/src/core/rdpos.cpp
Expand Down
16 changes: 9 additions & 7 deletions src/core/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ void Syncer::doValidatorBlock() {
if (this->stopSyncer_) return;

// Create the block and append to all chains, we can use any storage for latest block.
const std::shared_ptr<const Block> latestBlock = this->blockchain_.storage_.latest();
Block block(latestBlock->hash(), latestBlock->getTimestamp(), latestBlock->getNHeight() + 1);
const std::shared_ptr<const FinalizedBlock> latestBlock = this->blockchain_.storage_.latest();
MutableBlock block(latestBlock->getHash(), latestBlock->getTimestamp(), latestBlock->getNHeight() + 1);

// Append transactions towards block.
for (const auto& tx: randomHashTxs) block.appendTxValidator(tx);
Expand All @@ -172,15 +172,15 @@ void Syncer::doValidatorBlock() {

// Add transactions from state, sign, validate and process the block.
this->blockchain_.state_.fillBlockWithTransactions(block);
this->blockchain_.state_.rdposSignBlock(block);
if (!this->blockchain_.state_.validateNextBlock(block)) {
FinalizedBlock finalizedBlock = this->blockchain_.state_.rdposSignBlock(block);
if (!this->blockchain_.state_.validateNextBlock(finalizedBlock)) {
Logger::logToDebug(LogType::ERROR, Log::syncer, __func__, "Block is not valid!");
throw DynamicException("Block is not valid!");
}
if (this->stopSyncer_) return;
Hash latestBlockHash = block.hash();
this->blockchain_.state_.processNextBlock(std::move(block));
if (this->blockchain_.storage_.latest()->hash() != latestBlockHash) {
Hash latestBlockHash = finalizedBlock.getHash();
this->blockchain_.state_.processNextBlock(std::move(finalizedBlock));
if (this->blockchain_.storage_.latest()->getHash() != latestBlockHash) {
Logger::logToDebug(LogType::ERROR, Log::syncer, __func__, "Block is not valid!");
throw DynamicException("Block is not valid!");
}
Expand Down Expand Up @@ -242,6 +242,7 @@ bool Syncer::syncerLoop() {

// Sync the node with the network.
this->doSync();
this->blockchain_.state_.dumpStartWorker(); // Start the dump worker.
if (this->stopSyncer_) return false;
Utils::safePrint("Synced with the network, starting the node.");
if (this->blockchain_.options_.getIsValidator()) {
Expand All @@ -261,6 +262,7 @@ void Syncer::start() {
void Syncer::stop() {
this->stopSyncer_ = true;
this->blockchain_.state_.rdposStopWorker(); // Stop the rdPoS worker.
this->blockchain_.state_.dumpStopWorker(); // Stop the dump worker.
if (this->syncerLoopFuture_.valid()) this->syncerLoopFuture_.wait();
}

160 changes: 80 additions & 80 deletions src/core/blockchain.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ See the LICENSE.txt file in the project root for more information.
#include "state.h"
#include "../net/p2p/managerbase.h"
#include "../net/http/httpserver.h"
#include "../utils/options.h"
#include "../utils/db.h"
#include "../utils/options.h"

// Forward declaration for Syncer.
class Blockchain;
Expand All @@ -28,52 +28,52 @@ class Blockchain;
class Syncer {
// TODO: Maybe this class could also be responsible for slashing rdPoS if they are not behaving correctly
// TODO: Maybe it is better to move rdPoSWorker to Syncer
private:
Blockchain& blockchain_; ///< Reference to the parent blockchain.
std::unordered_map<P2P::NodeID, P2P::NodeInfo, SafeHash> currentlyConnectedNodes_; ///< List of currently connected nodes and their info.
std::shared_ptr<const Block> latestBlock_; ///< Pointer to the blockchain's latest block.
std::future<bool> syncerLoopFuture_; ///< Future object holding the thread for the syncer loop.
std::atomic<bool> stopSyncer_ = false; ///< Flag for stopping the syncer.
std::atomic<bool> synced_ = false; ///< Indicates whether or not the syncer is synced.

void updateCurrentlyConnectedNodes(); ///< Update the list of currently connected nodes.
bool checkLatestBlock(); ///< Check latest block (used by validatorLoop()).
void doSync(); ///< Do the syncing.
void validatorLoop(); ///< Routine loop for when the node is a Validator.
void nonValidatorLoop() const; ///< Routine loop for when the node is NOT a Validator.
bool syncerLoop(); ///< Routine loop for the syncer worker.

/**
* Create and broadcast a Validator block (called by validatorLoop()).
* If the node is a Validator and it has to create a new block,
* this function will be called, the new block will be created based on the
* current State and rdPoS objects, and then it will be broadcast.
* @throw DynamicException if block is invalid.
*/
void doValidatorBlock();

/**
* Wait for a new block (called by validatorLoop()).
* If the node is a Validator, this function will be called to make the
* node wait until it receives a new block.
*/
void doValidatorTx() const;

public:
/**
* Constructor.
* @param blockchain Reference to the parent blockchain.
*/
explicit Syncer(Blockchain& blockchain) : blockchain_(blockchain) {}

/// Destructor. Automatically stops the syncer.
~Syncer() { this->stop(); }

/// Getter for `synced_`.
const std::atomic<bool>& isSynced() const { return this->synced_; }

void start(); ///< Start the syncer routine loop.
void stop(); ///< Stop the syncer routine loop.
private:
Blockchain& blockchain_; ///< Reference to the parent blockchain.
std::unordered_map<P2P::NodeID, P2P::NodeInfo, SafeHash> currentlyConnectedNodes_; ///< List of currently connected nodes and their info.
std::shared_ptr<const FinalizedBlock> latestBlock_; ///< Pointer to the blockchain's latest block.
std::future<bool> syncerLoopFuture_; ///< Future object holding the thread for the syncer loop.
std::atomic<bool> stopSyncer_ = false; ///< Flag for stopping the syncer.
std::atomic<bool> synced_ = false; ///< Indicates whether or not the syncer is synced.

void updateCurrentlyConnectedNodes(); ///< Update the list of currently connected nodes.
bool checkLatestBlock(); ///< Check latest block (used by validatorLoop()).
void doSync(); ///< Do the syncing.
void validatorLoop(); ///< Routine loop for when the node is a Validator.
void nonValidatorLoop() const; ///< Routine loop for when the node is NOT a Validator.
bool syncerLoop(); ///< Routine loop for the syncer worker.

/**
* Create and broadcast a Validator block (called by validatorLoop()).
* If the node is a Validator and it has to create a new block,
* this function will be called, the new block will be created based on the
* current State and rdPoS objects, and then it will be broadcast.
* @throw DynamicException if block is invalid.
*/
void doValidatorBlock();

/**
* Wait for a new block (called by validatorLoop()).
* If the node is a Validator, this function will be called to make the
* node wait until it receives a new block.
*/
void doValidatorTx() const;

public:
/**
* Constructor.
* @param blockchain Reference to the parent blockchain.
*/
explicit Syncer(Blockchain& blockchain) : blockchain_(blockchain) {}

/// Destructor. Automatically stops the syncer.
~Syncer() { this->stop(); }

/// Getter for `synced_`.
const std::atomic<bool>& isSynced() const { return this->synced_; }

void start(); ///< Start the syncer routine loop.
void stop(); ///< Stop the syncer routine loop.
};

/**
Expand All @@ -82,39 +82,39 @@ class Syncer {
* Those parts interact with one another by communicating through this class.
*/
class Blockchain {
private:
Options options_; ///< Options singleton.
DB db_; ///< Database.
Storage storage_; ///< Blockchain storage.
State state_; ///< Blockchain state.
P2P::ManagerNormal p2p_; ///< P2P connection manager.
HTTPServer http_; ///< HTTP server.
Syncer syncer_; ///< Blockchain syncer.

public:
/**
* Constructor.
* @param blockchainPath Root path of the blockchain.
*/
explicit Blockchain(const std::string& blockchainPath);
~Blockchain() = default; ///< Default destructor.
void start(); ///< Start the blockchain. Initializes P2P, HTTP and Syncer, in this order.
void stop(); ///< Stop/shutdown the blockchain. Stops Syncer, HTTP and P2P, in this order (reverse order of start()).

///@{
/** Getter. */
Options& getOptions() { return this->options_; };
DB& getDB() { return this->db_; };
Storage& getStorage() { return this->storage_; };
State& getState() { return this->state_; };
P2P::ManagerNormal& getP2P() { return this->p2p_; };
HTTPServer& getHTTP() { return this->http_; };
Syncer& getSyncer() { return this->syncer_; };
///@}

const std::atomic<bool>& isSynced() const; ///< Check if the blockchain syncer is synced.

friend class Syncer; ///< Friend class.
private:
Options options_; ///< Options singleton.
DB db_; ///< Database.
Storage storage_; ///< Blockchain storage.
State state_; ///< Blockchain state.
P2P::ManagerNormal p2p_; ///< P2P connection manager.
HTTPServer http_; ///< HTTP server.
Syncer syncer_; ///< Blockchain syncer.

public:
/**
* Constructor.
* @param blockchainPath Root path of the blockchain.
*/
explicit Blockchain(const std::string& blockchainPath);
~Blockchain() = default; ///< Default destructor.
void start(); ///< Start the blockchain. Initializes P2P, HTTP and Syncer, in this order.
void stop(); ///< Stop/shutdown the blockchain. Stops Syncer, HTTP and P2P, in this order (reverse order of start()).

///@{
/** Getter. */
Options& getOptions() { return this->options_; };
DB& getDB() { return this->db_; };
Storage& getStorage() { return this->storage_; };
State& getState() { return this->state_; };
P2P::ManagerNormal& getP2P() { return this->p2p_; };
HTTPServer& getHTTP() { return this->http_; };
Syncer& getSyncer() { return this->syncer_; };
///@}

const std::atomic<bool>& isSynced() const; ///< Check if the blockchain syncer is synced.

friend class Syncer; ///< Friend class.
};

#endif // BLOCKCHAIN_H
103 changes: 103 additions & 0 deletions src/core/dump.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright (c) [2023-2024] [Sparq Network]
This software is distributed under the MIT License.
See the LICENSE.txt file in the project root for more information.
*/

#include "dump.h"

DumpManager::DumpManager(const Storage& storage, const Options& options, std::shared_mutex& stateMutex)
: storage_(storage),
options_(options),
stateMutex_(stateMutex)
{
}

void DumpManager::pushBack(Dumpable* dumpable)
{
// Check if latest Dumpable* is the same as the one we trying to append
if (this->dumpables_.size() > 0 &&
this->dumpables_.back() == dumpable) {
return;
}
dumpables_.push_back(dumpable);
}

void DumpManager::dumpAll()
{
std::vector<DBBatch> batches;
{
// state mutex lock
std::unique_lock lock(stateMutex_);
// Emplace DBBatch operations
Logger::logToDebug(LogType::INFO,
Log::dump,
__func__,
"Emplace DBBatch operations");
for (const auto dumpable: dumpables_) {
// call dump functions and put the operations ate the database
batches.emplace_back(dumpable->dump());
}
}
// Logs
Logger::logToDebug(LogType::INFO,
Log::dump,
__func__,
"Write to state database.");
// Write to the database
std::string dbName = options_.getRootPath() + "/stateDb/" + std::to_string(this->storage_.latest()->getNHeight());
DB stateDb(dbName);
for (const auto& batch: batches) {
stateDb.putBatch(batch);
}
}

DumpWorker::DumpWorker(const Options& options, const Storage& storage,
DumpManager& dumpManager)
: options_(options),
storage_(storage),
dumpManager_(dumpManager)
{
Logger::logToDebug(LogType::INFO, Log::dump, __func__, "DumpWorker Started.");
}

DumpWorker::~DumpWorker()
{
Logger::logToDebug(LogType::INFO, Log::dump, __func__, "DumpWorker Stopped.");
}

bool DumpWorker::workerLoop()
{
uint64_t latestBlock = 0;
while (!this->stopWorker_) {
if (latestBlock + 100 < this->storage_.currentChainSize()) {
Logger::logToDebug(LogType::INFO,
Log::dump,
__func__,
"Current size >= 100");
latestBlock = this->storage_.currentChainSize();
dumpManager_.dumpAll();
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return true;
}

void DumpWorker::startWorker()
{
if (!this->workerFuture_.valid()) {
this->workerFuture_ = std::async(std::launch::async,
&DumpWorker::workerLoop,
this);
}
}

void DumpWorker::stopWorker()
{
if (this->workerFuture_.valid()) {
this->stopWorker_ = true;
this->workerFuture_.wait();
this->workerFuture_.get();
}
}
Loading

0 comments on commit d2521c8

Please sign in to comment.