Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

WIP - Download snapshot through Parity's warp protocol #4227

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions eth/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ void help()
<< " --from <n> Export only from block n; n may be a decimal, a '0x' prefixed hash, or 'latest'.\n"
<< " --to <n> Export only to block n (inclusive); n may be a decimal, a '0x' prefixed hash, or 'latest'.\n"
<< " --only <n> Equivalent to --export-from n --export-to n.\n"
<< " --dont-check Prevent checking some block aspects. Faster importing, but to apply only when the data is known to be valid.\n\n"
<< " --dont-check Prevent checking some block aspects. Faster importing, but to apply only when the data is known to be valid.\n"
<< " --download-snapshot <path> Download snapshot data to specified path.\n\n"
<< "General Options:\n"
<< " -d,--db-path,--datadir <path> Load database from path (default: " << getDataDir() << ").\n"
#if ETH_EVMJIT
Expand Down Expand Up @@ -402,6 +403,7 @@ int main(int argc, char** argv)
bool listenSet = false;
string configJSON;
string genesisJSON;
string snapshotPath;
for (int i = 1; i < argc; ++i)
{
string arg = argv[i];
Expand Down Expand Up @@ -781,6 +783,8 @@ int main(int argc, char** argv)
noPinning = true;
bootstrap = false;
}
else if (arg == "--download-snapshot" && i + 1 < argc)
snapshotPath = argv[++i];
else
{
cerr << "Invalid argument: " << arg << "\n";
Expand Down Expand Up @@ -908,6 +912,7 @@ int main(int argc, char** argv)
dev::WebThreeDirect web3(
WebThreeDirect::composeClientVersion("eth"),
getDataDir(),
snapshotPath,
chainParams,
withExisting,
nodeMode == NodeMode::Full ? caps : set<string>(),
Expand Down Expand Up @@ -1071,7 +1076,7 @@ int main(int argc, char** argv)
if (author)
cout << "Mining Beneficiary: " << renderFullAddress(author) << "\n";

if (bootstrap || !remoteHost.empty() || enableDiscovery || listenSet)
if (bootstrap || !remoteHost.empty() || enableDiscovery || listenSet || !preferredNodes.empty())
{
web3.startNetwork();
cout << "Node ID: " << web3.enode() << "\n";
Expand Down
3 changes: 2 additions & 1 deletion libethashseal/EthashClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ EthashClient::EthashClient(
p2p::Host* _host,
std::shared_ptr<GasPricer> _gpForAdoption,
fs::path const& _dbPath,
fs::path const& _snapshotPath,
WithExisting _forceAction,
TransactionQueue::Limits const& _limits
):
Client(_params, _networkID, _host, _gpForAdoption, _dbPath, _forceAction, _limits)
Client(_params, _networkID, _host, _gpForAdoption, _dbPath, _snapshotPath, _forceAction, _limits)
{
// will throw if we're not an Ethash seal engine.
asEthashClient(*this);
Expand Down
1 change: 1 addition & 0 deletions libethashseal/EthashClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class EthashClient: public Client
p2p::Host* _host,
std::shared_ptr<GasPricer> _gpForAdoption,
boost::filesystem::path const& _dbPath = boost::filesystem::path(),
boost::filesystem::path const& _snapshotPath = boost::filesystem::path(),
WithExisting _forceAction = WithExisting::Trust,
TransactionQueue::Limits const& _l = TransactionQueue::Limits{1024, 1024}
);
Expand Down
23 changes: 15 additions & 8 deletions libethereum/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Client::Client(
p2p::Host* _host,
std::shared_ptr<GasPricer> _gpForAdoption,
fs::path const& _dbPath,
fs::path const& _snapshotPath,
WithExisting _forceAction,
TransactionQueue::Limits const& _l
):
Expand All @@ -75,7 +76,7 @@ Client::Client(
m_postSeal(chainParams().accountStartNonce),
m_working(chainParams().accountStartNonce)
{
init(_host, _dbPath, _forceAction, _networkID);
init(_host, _dbPath, _snapshotPath, _forceAction, _networkID);
}

Client::~Client()
Expand All @@ -84,7 +85,7 @@ Client::~Client()
terminate();
}

void Client::init(p2p::Host* _extNet, fs::path const& _dbPath, WithExisting _forceAction, u256 _networkId)
void Client::init(p2p::Host* _extNet, fs::path const& _dbPath, fs::path const& _snapshotPath, WithExisting _forceAction, u256 _networkId)
{
DEV_TIMED_FUNCTION_ABOVE(500);

Expand All @@ -99,12 +100,12 @@ void Client::init(p2p::Host* _extNet, fs::path const& _dbPath, WithExisting _for
m_bq.setChain(bc());

m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30);
m_tqReady = m_tq.onReady([=](){ this->onTransactionQueueReady(); }); // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue);
m_tqReplaced = m_tq.onReplaced([=](h256 const&){ m_needStateReset = true; });
m_bqReady = m_bq.onReady([=](){ this->onBlockQueueReady(); }); // TODO: should read m_bq->onReady(thisThread, syncBlockQueue);
m_bq.setOnBad([=](Exception& ex){ this->onBadBlock(ex); });
bc().setOnBad([=](Exception& ex){ this->onBadBlock(ex); });
bc().setOnBlockImport([=](BlockHeader const& _info){
m_tqReady = m_tq.onReady([=]() { this->onTransactionQueueReady(); }); // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue);
m_tqReplaced = m_tq.onReplaced([=](h256 const&) { m_needStateReset = true; });
m_bqReady = m_bq.onReady([=]() { this->onBlockQueueReady(); }); // TODO: should read m_bq->onReady(thisThread, syncBlockQueue);
m_bq.setOnBad([=](Exception& ex) { this->onBadBlock(ex); });
bc().setOnBad([=](Exception& ex) { this->onBadBlock(ex); });
bc().setOnBlockImport([=](BlockHeader const& _info) {
if (auto h = m_host.lock())
h->onBlockImported(_info);
});
Expand All @@ -114,6 +115,12 @@ void Client::init(p2p::Host* _extNet, fs::path const& _dbPath, WithExisting _for

m_gp->update(bc());

if (!_snapshotPath.empty())
{
m_warpHost = _extNet->registerCapability(make_shared<WarpHostCapability>(bc(), _networkId, _snapshotPath));
return;
}

auto host = _extNet->registerCapability(make_shared<EthereumHost>(bc(), m_stateDB, m_tq, m_bq, _networkId));
m_host = host;

Expand Down
5 changes: 4 additions & 1 deletion libethereum/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "Block.h"
#include "CommonNet.h"
#include "ClientBase.h"
#include "WarpHostCapability.h"

#include <boost/filesystem/path.hpp>

Expand Down Expand Up @@ -82,6 +83,7 @@ class Client: public ClientBase, protected Worker
p2p::Host* _host,
std::shared_ptr<GasPricer> _gpForAdoption,
boost::filesystem::path const& _dbPath = boost::filesystem::path(),
boost::filesystem::path const& _snapshotPath = boost::filesystem::path(),
WithExisting _forceAction = WithExisting::Trust,
TransactionQueue::Limits const& _l = TransactionQueue::Limits{1024, 1024}
);
Expand Down Expand Up @@ -196,7 +198,7 @@ class Client: public ClientBase, protected Worker
protected:
/// Perform critical setup functions.
/// Must be called in the constructor of the finally derived class.
void init(p2p::Host* _extNet, boost::filesystem::path const& _dbPath, WithExisting _forceAction, u256 _networkId);
void init(p2p::Host* _extNet, boost::filesystem::path const& _dbPath, boost::filesystem::path const& _snapshotPath, WithExisting _forceAction, u256 _networkId);

/// InterfaceStub methods
BlockChain& bc() override { return m_bc; }
Expand Down Expand Up @@ -300,6 +302,7 @@ class Client: public ClientBase, protected Worker
std::chrono::system_clock::time_point m_lastGetWork; ///< Is there an active and valid remote worker?

std::weak_ptr<EthereumHost> m_host; ///< Our Ethereum Host. Don't do anything if we can't lock.
std::weak_ptr<WarpHostCapability> m_warpHost;

Handler<> m_tqReady;
Handler<h256 const&> m_tqReplaced;
Expand Down
2 changes: 1 addition & 1 deletion libethereum/ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ClientTest::ClientTest(
WithExisting _forceAction,
TransactionQueue::Limits const& _limits
):
Client(_params, _networkID, _host, _gpForAdoption, _dbPath, _forceAction, _limits)
Client(_params, _networkID, _host, _gpForAdoption, _dbPath, std::string(), _forceAction, _limits)
{}

void ClientTest::setChainParams(string const& _genesis)
Expand Down
2 changes: 2 additions & 0 deletions libethereum/CommonNet.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ enum class Asking
BlockBodies,
NodeData,
Receipts,
WarpManifest,
WarpData,
Nothing
};

Expand Down
2 changes: 2 additions & 0 deletions libethereum/EthereumPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ static string toString(Asking _a)
case Asking::Receipts: return "Receipts";
case Asking::Nothing: return "Nothing";
case Asking::State: return "State";
case Asking::WarpManifest: return "WarpManifest";
case Asking::WarpData: return "WarpData";
}
return "?";
}
Expand Down
142 changes: 142 additions & 0 deletions libethereum/SnapshotDownloader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
This file is part of cpp-ethereum.

cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file SnapshotDownloader.cpp
*/

#include "BlockChain.h"
#include "SnapshotDownloader.h"
#include "WarpHostCapability.h"
#include "WarpPeerCapability.h"

#include <libdevcore/CommonIO.h>
#include <libp2p/Session.h>

#include <boost/filesystem.hpp>

using namespace std;
using namespace dev;
using namespace eth;
using namespace p2p;

struct SnapshotLog: public LogChannel
{
static char const* name() { return "SNAP"; }
static int const verbosity = 9;
static const bool debug = false;
};

SnapshotDownloader::SnapshotDownloader(WarpHostCapability& _host, BlockChain const& _blockChain, boost::filesystem::path const& _snapshotPath):
m_host(_host), m_blockChain(_blockChain), m_snapshotPath(_snapshotPath)
{
}

void SnapshotDownloader::onPeerStatus(shared_ptr<WarpPeerCapability> _peer)
{
if (_peer->validateStatus(m_blockChain.genesisHash(), { m_host.protocolVersion() }, m_host.networkId()))
startDownload(_peer);
}

void SnapshotDownloader::onPeerManifest(shared_ptr<WarpPeerCapability> _peer, RLP const& _r)
{
if (m_downloading)
// TODO if this peer has the same snapshot, download from it, too
return;

if (!_r.isList() || _r.itemCount() != 1)
return;

RLP const manifest = _r[0];

u256 version = manifest[0].toInt<u256>();
if (version != 2)
return;

u256 const blockNumber = manifest[4].toInt<u256>();
if (blockNumber != m_syncingSnapshotNumber)
return;

h256s const stateHashes = manifest[1].toVector<h256>();
h256s const blockHashes = manifest[2].toVector<h256>();

h256 const stateRoot = manifest[3].toHash<h256>();
h256 const blockHash = manifest[5].toHash<h256>();

clog(SnapshotLog) << "MANIFEST: "
<< "version " << version
<< "state root " << stateRoot
<< "block number " << blockNumber
<< "block hash " << blockHash;

writeFile((m_snapshotPath / "MANIFEST").string(), manifest.data());

m_stateChunksQueue.assign(stateHashes.begin(), stateHashes.end());
m_blockChunksQueue.assign(blockHashes.begin(), blockHashes.end());
m_downloading = true;
downloadStateChunks(_peer);
}

void SnapshotDownloader::onPeerData(shared_ptr<WarpPeerCapability> _peer, RLP const& _r)
{
if (!_r.isList() || _r.itemCount() != 1)
return;

// TODO handle timeouts

RLP const data = _r[0];

h256 const hash = sha3(data.toBytesConstRef());

h256 const askedHash = m_stateChunksQueue.empty() ? m_blockChunksQueue.front() : m_stateChunksQueue.front();
if (hash == askedHash)
{

// TODO handle writeFile failure
writeFile((m_snapshotPath / toHex(hash)).string(), data.toBytesConstRef());

clog(SnapshotLog) << "Saved chunk" << hash;

if (m_stateChunksQueue.empty())
m_blockChunksQueue.pop_front();
else
m_stateChunksQueue.pop_front();
clog(SnapshotLog) << "State chunks left: " << m_stateChunksQueue.size() << " Block chunks left: " << m_blockChunksQueue.size();
}

downloadStateChunks(_peer);
}

void SnapshotDownloader::startDownload(shared_ptr<WarpPeerCapability> _peer)
{
m_syncingSnapshotNumber = _peer->snapshotNumber();
_peer->requestManifest();
}

void SnapshotDownloader::downloadStateChunks(shared_ptr<WarpPeerCapability> _peer)
{
if (!m_stateChunksQueue.empty())
_peer->requestData(m_stateChunksQueue.front());
else
downloadBlockChunks(_peer);
}

void SnapshotDownloader::downloadBlockChunks(shared_ptr<WarpPeerCapability> _peer)
{
if (!m_blockChunksQueue.empty())
_peer->requestData(m_blockChunksQueue.front());
else
clog(SnapshotLog) << "Snapshot download complete!";
}
68 changes: 68 additions & 0 deletions libethereum/SnapshotDownloader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
This file is part of cpp-ethereum.

cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file SnapshotDownloader.h
*/

#pragma once

#include "CommonNet.h"

#include <memory>
#include <deque>

namespace dev
{

namespace eth
{

class BlockChain;
class WarpHostCapability;
class WarpPeerCapability;

class SnapshotDownloader
{
public:
SnapshotDownloader(WarpHostCapability& _host, BlockChain const& _blockChain, boost::filesystem::path const& _snapshotPath);

void onPeerStatus(std::shared_ptr<WarpPeerCapability> _peer);

void onPeerManifest(std::shared_ptr<WarpPeerCapability> _peer, RLP const& _r);

void onPeerData(std::shared_ptr<WarpPeerCapability> _peer, RLP const& _r);

void onPeerRequestTimeout(std::shared_ptr<WarpPeerCapability> /*_peer*/, Asking /*_asking*/) {}

private:
void startDownload(std::shared_ptr<WarpPeerCapability> _peer);
void downloadStateChunks(std::shared_ptr<WarpPeerCapability> _peer);
void downloadBlockChunks(std::shared_ptr<WarpPeerCapability> _peer);

WarpHostCapability& m_host;
BlockChain const& m_blockChain;

u256 m_syncingSnapshotNumber;
bool m_downloading = false;

std::deque<h256> m_stateChunksQueue;
std::deque<h256> m_blockChunksQueue;

boost::filesystem::path const m_snapshotPath;
};

}
}
Loading