Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

587 read node forward #629

Merged
merged 7 commits into from
Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 59 additions & 22 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,17 @@ namespace steemit { namespace app {

bool network_broadcast_api::check_max_block_age( int32_t max_block_age )
{
if( max_block_age < 0 )
return false;
return _app.chain_database()->with_read_lock( [&]()
{
if( max_block_age < 0 )
return false;

fc::time_point_sec now = graphene::time::now();
std::shared_ptr< database > db = _app.chain_database();
const dynamic_global_property_object& dgpo = db->get_dynamic_global_properties();
fc::time_point_sec now = graphene::time::now();
std::shared_ptr< database > db = _app.chain_database();
const dynamic_global_property_object& dgpo = db->get_dynamic_global_properties();

return ( dgpo.time < now - fc::seconds( max_block_age ) );
return ( dgpo.time < now - fc::seconds( max_block_age ) );
});
}

void network_broadcast_api::set_max_block_age( int32_t max_block_age )
Expand Down Expand Up @@ -204,34 +207,68 @@ namespace steemit { namespace app {
void network_broadcast_api::broadcast_transaction(const signed_transaction& trx)
{
trx.validate();
FC_ASSERT( !check_max_block_age( _max_block_age ) );
_app.chain_database()->push_transaction(trx);
_app.p2p_node()->broadcast_transaction(trx);

if( _app._read_only )
{
FC_ASSERT( _app._remote_net_api, "Write node RPC not configured properly or non connected." );
(*_app._remote_net_api)->broadcast_transaction( trx );
}
else
{
FC_ASSERT( !check_max_block_age( _max_block_age ) );
_app.chain_database()->push_transaction(trx);
_app.p2p_node()->broadcast_transaction(trx);
}
}

fc::variant network_broadcast_api::broadcast_transaction_synchronous(const signed_transaction& trx)
{
promise<fc::variant>::ptr prom( new fc::promise<fc::variant>() );
broadcast_transaction_with_callback( [=]( const fc::variant& v ){
prom->set_value(v);
}, trx );
return future<fc::variant>(prom).wait();
if( _app._read_only )
{
FC_ASSERT( _app._remote_net_api, "Write node RPC not configured properly or non connected." );
return (*_app._remote_net_api)->broadcast_transaction_synchronous( trx );
}
else
{
promise<fc::variant>::ptr prom( new fc::promise<fc::variant>() );
broadcast_transaction_with_callback( [=]( const fc::variant& v ){
prom->set_value(v);
}, trx );
return future<fc::variant>(prom).wait();
}
}

void network_broadcast_api::broadcast_block( const signed_block& b )
{
_app.chain_database()->push_block(b);
_app.p2p_node()->broadcast( graphene::net::block_message( b ));
if( _app._read_only )
{
FC_ASSERT( _app._remote_net_api, "Write node RPC not configured properly or non connected." );
(*_app._remote_net_api)->broadcast_block( b );
}
else
{
_app.chain_database()->push_block(b);
_app.p2p_node()->broadcast( graphene::net::block_message( b ));
}
}

void network_broadcast_api::broadcast_transaction_with_callback(confirmation_callback cb, const signed_transaction& trx)
{
FC_ASSERT( !check_max_block_age( _max_block_age ) );
trx.validate();
_callbacks[trx.id()] = cb;
_callbacks_expirations[trx.expiration].push_back(trx.id());
if( _app._read_only )
{
FC_ASSERT( _app._remote_net_api, "Write node RPC not configured properly or non connected." );
(*_app._remote_net_api)->broadcast_transaction_with_callback( cb, trx );
}
else
{
FC_ASSERT( !check_max_block_age( _max_block_age ) );
trx.validate();
_callbacks[trx.id()] = cb;
_callbacks_expirations[trx.expiration].push_back(trx.id());

_app.chain_database()->push_transaction(trx);
_app.p2p_node()->broadcast_transaction(trx);
_app.chain_database()->push_transaction(trx);
_app.p2p_node()->broadcast_transaction(trx);
}
}

network_node_api::network_node_api( const api_context& a ) : _app( a.app )
Expand Down
107 changes: 70 additions & 37 deletions libraries/app/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,69 +236,96 @@ namespace detail {

void startup()
{ try {
_max_block_age =_options->at("max-block-age").as<int32_t>();
_shared_file_size = fc::parse_size( _options->at( "shared-file-size" ).as< string >() );
ilog( "shared_file_size is ${n} bytes", ("n", _shared_file_size) );
bool read_only = _options->count( "read-only" );
register_builtin_apis();

if( _options->count("shared-file-dir") )
_shared_dir = fc::path( _options->at("shared-file-dir").as<string>() );
else
_shared_dir = _data_dir / "blockchain";

if( !read_only )
{
_self->_read_only = false;
ilog( "Starting Steem node in write mode." );
_max_block_age =_options->at("max-block-age").as<int32_t>();

if( _options->count("resync-blockchain") )
_chain_db->wipe(_data_dir / "blockchain", _shared_dir, true);
if( _options->count("resync-blockchain") )
_chain_db->wipe(_data_dir / "blockchain", _shared_dir, true);

_chain_db->set_flush_interval( _options->at("flush").as<uint32_t>() );
_chain_db->set_flush_interval( _options->at("flush").as<uint32_t>() );

flat_map<uint32_t,block_id_type> loaded_checkpoints;
if( _options->count("checkpoint") )
{
auto cps = _options->at("checkpoint").as<vector<string>>();
loaded_checkpoints.reserve( cps.size() );
for( auto cp : cps )
flat_map<uint32_t,block_id_type> loaded_checkpoints;
if( _options->count("checkpoint") )
{
auto item = fc::json::from_string(cp).as<std::pair<uint32_t,block_id_type> >();
loaded_checkpoints[item.first] = item.second;
auto cps = _options->at("checkpoint").as<vector<string>>();
loaded_checkpoints.reserve( cps.size() );
for( auto cp : cps )
{
auto item = fc::json::from_string(cp).as<std::pair<uint32_t,block_id_type> >();
loaded_checkpoints[item.first] = item.second;
}
}
}
_chain_db->add_checkpoints( loaded_checkpoints );
_chain_db->add_checkpoints( loaded_checkpoints );

if( _options->count("replay-blockchain") )
{
ilog("Replaying blockchain on user request.");
_chain_db->reindex( _data_dir / "blockchain", _shared_dir, _shared_file_size );
}
else
{
try
if( _options->count("replay-blockchain") )
{
_chain_db->open(_data_dir / "blockchain", _shared_dir, 0, _shared_file_size );
ilog("Replaying blockchain on user request.");
_chain_db->reindex( _data_dir / "blockchain", _shared_dir, _shared_file_size );
}
catch( fc::assert_exception& )
else
{
wlog( "Error when opening database. Attempting reindex..." );

try
{
_chain_db->reindex( _data_dir / "blockchain", _shared_dir, _shared_file_size );
_chain_db->open(_data_dir / "blockchain", _shared_dir, 0, _shared_file_size, chainbase::database::read_write );\
}
catch( chain::block_log_exception& )
catch( fc::assert_exception& )
{
wlog( "Error opening block log. Having to resync from network..." );
_chain_db->open( _data_dir / "blockchain", _shared_dir, 0, _shared_file_size );
wlog( "Error when opening database. Attempting reindex..." );

try
{
_chain_db->reindex( _data_dir / "blockchain", _shared_dir, _shared_file_size );
}
catch( chain::block_log_exception& )
{
wlog( "Error opening block log. Having to resync from network..." );
_chain_db->open( _data_dir / "blockchain", _shared_dir, 0, _shared_file_size, chainbase::database::read_write );
}
}
}
}

if( _options->count("force-validate") )
{
ilog( "All transaction signatures will be validated" );
_force_validate = true;
if( _options->count("force-validate") )
{
ilog( "All transaction signatures will be validated" );
_force_validate = true;
}

graphene::time::now();
}
else
{
ilog( "Starting Steem node in read mode." );
_chain_db->open( _data_dir / "blockchain", 0, _shared_file_size, chainbase::database::read_only );

graphene::time::now();
if( _options->count( "read-forward-rpc" ) )
{
try
{
auto ws_ptr = _self->_client.connect( _options->at( "read-forward-rpc" ).as< string >() );
auto apic = std::make_shared< fc::rpc::websocket_api_connection >( *ws_ptr );
auto login = apic->get_remote_api< login_api >( 1 );
FC_ASSERT( login->login( "", "" ) );
_self->_remote_net_api = login->get_api_by_name( "network_broadcast_api" )->as< network_broadcast_api >();
}
catch( fc::exception& e )
{
wlog( "Error conencting to remote RPC, network api forwarding disabled.", ("e", e.to_detail_string()) );
}
}
}

if( _options->count("api-user") )
{
Expand Down Expand Up @@ -336,7 +363,11 @@ namespace detail {
}
_running = true;

reset_p2p_node(_data_dir);
if( !read_only )
{
reset_p2p_node(_data_dir);
}

reset_websocket_server();
reset_websocket_tls_server();
} FC_LOG_AND_RETHROW() }
Expand Down Expand Up @@ -886,6 +917,7 @@ void application::set_program_options(boost::program_options::options_descriptio
("shared-file-size", bpo::value<string>()->default_value("32G"), "Size of the shared memory file. Default: 32G")
("rpc-endpoint", bpo::value<string>()->implicit_value("127.0.0.1:8090"), "Endpoint for websocket RPC to listen on")
("rpc-tls-endpoint", bpo::value<string>()->implicit_value("127.0.0.1:8089"), "Endpoint for TLS websocket RPC to listen on")
("read-forward-rpc", bpo::value<string>(), "Endpoint to forward write API calls to for a read node" )
("server-pem,p", bpo::value<string>()->implicit_value("server.pem"), "The TLS certificate file for this server")
("server-pem-password,P", bpo::value<string>()->implicit_value(""), "Password for this certificate")
("api-user", bpo::value< vector<string> >()->composing(), "API user specification, may be specified multiple times")
Expand All @@ -899,6 +931,7 @@ void application::set_program_options(boost::program_options::options_descriptio
("replay-blockchain", "Rebuild object graph by replaying all blocks")
("resync-blockchain", "Delete all blocks and re-sync with network from scratch")
("force-validate", "Force validation of all transactions")
("read-only", "Node will not connect to p2p network and can only read from the chain state" )
;
command_line_options.add(_cli_options);
configuration_file_options.add(_cfg_options);
Expand Down
Loading