This repository has been archived by the owner on Aug 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
bnet_plugin.cpp
1560 lines (1300 loc) · 60.7 KB
/
bnet_plugin.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* The purpose of this protocol is to synchronize (and keep synchronized) two
* blockchains using a very simple algorithm:
*
* 1. find the last block id on our local chain that the remote peer knows about
* 2. if we have the next block send it to them
* 3. if we don't have the next block send them a the oldest unexpired transaction
*
* There are several input events:
*
* 1. new block accepted by local chain
* 2. block deemed irreversible by local chain
* 3. new block header accepted by local chain
* 4. transaction accepted by local chain
* 5. block received from remote peer
* 6. transaction received from remote peer
* 7. socket ready for next write
*
* Each session is responsible for maintaining the following
*
* 1. the most recent block on our current best chain which we know
* with certainty that the remote peer has.
* - this could be the peers last irreversible block
* - a block ID after the LIB that the peer has notified us of
* - a block which we have sent to the remote peer
* - a block which the peer has sent us
* 2. the block IDs we have received from the remote peer so that
* we can disconnect peer if one of those blocks is deemed invalid
* - we can clear these IDs once the block becomes reversible
* 3. the transactions we have received from the remote peer so that
* we do not send them something that they already know.
* - this includes transactions sent as part of blocks
* - we clear this cache after we have applied a block that
* includes the transactions because we know the controller
* should not notify us again (they would be dupe)
*
* Assumptions:
* 1. all blocks we send the peer are valid and will be held in the
* peers fork database until they become irreversible or are replaced
* by an irreversible alternative.
* 2. we don't care what fork the peer is on, so long as we know they have
* the block prior to the one we want to send. The peer will sort it out
* with its fork database and hopfully come to our conclusion.
* 3. the peer will send us blocks on the same basis
*
*/
#include <eosio/bnet_plugin/bnet_plugin.hpp>
#include <eosio/chain/controller.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain_plugin/chain_plugin.hpp>
#include <fc/io/json.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <eosio/chain/plugin_interface.hpp>
using tcp = boost::asio::ip::tcp;
namespace ws = boost::beast::websocket;
namespace eosio {
using namespace chain;
static appbase::abstract_plugin& _bnet_plugin = app().register_plugin<bnet_plugin>();
} /// namespace eosio
namespace fc {
extern std::unordered_map<std::string,logger>& get_logger_map();
}
const fc::string logger_name("bnet_plugin");
fc::logger plugin_logger;
std::string peer_log_format;
#define peer_dlog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( plugin_logger.is_enabled( fc::log_level::debug ) ) \
plugin_logger.log( FC_LOG_MESSAGE( debug, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
FC_MULTILINE_MACRO_END
#define peer_ilog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( plugin_logger.is_enabled( fc::log_level::info ) ) \
plugin_logger.log( FC_LOG_MESSAGE( info, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
FC_MULTILINE_MACRO_END
#define peer_wlog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( plugin_logger.is_enabled( fc::log_level::warn ) ) \
plugin_logger.log( FC_LOG_MESSAGE( warn, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
FC_MULTILINE_MACRO_END
#define peer_elog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( plugin_logger.is_enabled( fc::log_level::error ) ) \
plugin_logger.log( FC_LOG_MESSAGE( error, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant())) ); \
FC_MULTILINE_MACRO_END
using eosio::public_key_type;
using eosio::chain_id_type;
using eosio::block_id_type;
using eosio::block_timestamp_type;
using std::string;
using eosio::sha256;
using eosio::signed_block_ptr;
using eosio::packed_transaction_ptr;
using std::vector;
struct hello {
public_key_type peer_id;
string network_version;
string agent;
string protocol_version = "1.0.1";
string user;
string password;
chain_id_type chain_id;
bool request_transactions = false;
uint32_t last_irr_block_num = 0;
vector<block_id_type> pending_block_ids;
};
FC_REFLECT( hello, (peer_id)(network_version)(user)(password)(agent)(protocol_version)(chain_id)(request_transactions)(last_irr_block_num)(pending_block_ids) )
struct hello_extension_irreversible_only {};
FC_REFLECT( hello_extension_irreversible_only, BOOST_PP_SEQ_NIL )
using hello_extension = fc::static_variant<hello_extension_irreversible_only>;
/**
* This message is sent upon successful speculative application of a transaction
* and informs a peer not to send this message.
*/
struct trx_notice {
vector<sha256> signed_trx_id; ///< hash of trx + sigs
};
FC_REFLECT( trx_notice, (signed_trx_id) )
/**
* This message is sent upon successfully adding a transaction to the fork database
* and informs the remote peer that there is no need to send this block.
*/
struct block_notice {
vector<block_id_type> block_ids;
};
FC_REFLECT( block_notice, (block_ids) );
struct ping {
fc::time_point sent;
fc::sha256 code;
uint32_t lib; ///< the last irreversible block
};
FC_REFLECT( ping, (sent)(code)(lib) )
struct pong {
fc::time_point sent;
fc::sha256 code;
};
FC_REFLECT( pong, (sent)(code) )
using bnet_message = fc::static_variant<hello,
trx_notice,
block_notice,
signed_block_ptr,
packed_transaction_ptr,
ping, pong
>;
struct by_id;
struct by_num;
struct by_received;
struct by_expired;
namespace eosio {
using namespace chain::plugin_interface;
class bnet_plugin_impl;
template <typename Strand>
void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) {
if( !strand.running_in_this_thread() ) {
elog( "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) );
app().quit();
}
}
/**
* Each session is presumed to operate in its own strand so that
* operations can execute in parallel.
*/
class session : public std::enable_shared_from_this<session>
{
public:
enum session_state {
hello_state,
sending_state,
idle_state
};
struct block_status {
block_status( block_id_type i, bool kby_peer, bool rfrom_peer)
{
known_by_peer = kby_peer;
received_from_peer = rfrom_peer;
id = i;
}
bool known_by_peer = false; ///< we sent block to peer or peer sent us notice
bool received_from_peer = false; ///< peer sent us this block and considers full block valid
block_id_type id; ///< the block id;
// block_id_type prev; ///< the prev block id
// shared_ptr< vector<char> > block_msg; ///< packed bnet_message for this block
uint32_t block_num()const { return block_header::num_from_id(id); }
};
typedef boost::multi_index_container<block_status,
indexed_by<
ordered_unique< tag<by_id>, member<block_status,block_id_type,&block_status::id> >,
ordered_non_unique< tag<by_num>, const_mem_fun<block_status,uint32_t,&block_status::block_num> >
>
> block_status_index;
struct transaction_status {
time_point received;
time_point expired; /// 5 seconds from last accepted
transaction_id_type id;
transaction_metadata_ptr trx;
void mark_known_by_peer() { received = fc::time_point::maximum(); trx.reset(); }
bool known_by_peer()const { return received == fc::time_point::maximum(); }
};
typedef boost::multi_index_container<transaction_status,
indexed_by<
ordered_unique< tag<by_id>, member<transaction_status,transaction_id_type,&transaction_status::id> >,
ordered_non_unique< tag<by_received>, member<transaction_status,time_point,&transaction_status::received> >,
ordered_non_unique< tag<by_expired>, member<transaction_status,time_point,&transaction_status::expired> >
>
> transaction_status_index;
block_status_index _block_status;
transaction_status_index _transaction_status;
const uint32_t _max_block_status_range = 2048; // limit tracked block_status known_by_peer
public_key_type _local_peer_id;
uint32_t _local_lib = 0;
block_id_type _local_lib_id;
uint32_t _local_head_block_num = 0;
block_id_type _local_head_block_id; /// the last block id received on local channel
public_key_type _remote_peer_id;
uint32_t _remote_lib = 0;
block_id_type _remote_lib_id;
bool _remote_request_trx = false;
bool _remote_request_irreversible_only = false;
uint32_t _last_sent_block_num = 0;
block_id_type _last_sent_block_id; /// the id of the last block sent
bool _recv_remote_hello = false;
bool _sent_remote_hello = false;
fc::sha256 _current_code;
fc::time_point _last_recv_ping_time = fc::time_point::now();
ping _last_recv_ping;
ping _last_sent_ping;
int _session_num = 0;
session_state _state = hello_state;
tcp::resolver _resolver;
bnet_ptr _net_plugin;
boost::asio::io_service& _ios;
unique_ptr<ws::stream<tcp::socket>> _ws;
boost::asio::strand< boost::asio::io_context::executor_type> _strand;
boost::asio::io_service& _app_ios;
methods::get_block_by_number::method_type& _get_block_by_number;
string _peer;
string _remote_host;
string _remote_port;
vector<char> _out_buffer;
//boost::beast::multi_buffer _in_buffer;
boost::beast::flat_buffer _in_buffer;
flat_set<block_id_type> _block_header_notices;
fc::optional<fc::variant_object> _logger_variant;
int next_session_id()const {
static int session_count = 0;
return ++session_count;
}
/**
* Creating session from server socket acceptance
*/
explicit session( tcp::socket socket, bnet_ptr net_plug )
:_resolver(socket.get_io_service()),
_net_plugin( std::move(net_plug) ),
_ios(socket.get_io_service()),
_ws( new ws::stream<tcp::socket>(move(socket)) ),
_strand(_ws->get_executor() ),
_app_ios( app().get_io_service() ),
_get_block_by_number( app().get_method<methods::get_block_by_number>() )
{
_session_num = next_session_id();
set_socket_options();
_ws->binary(true);
wlog( "open session ${n}",("n",_session_num) );
}
/**
* Creating outgoing session
*/
explicit session( boost::asio::io_context& ioc, bnet_ptr net_plug )
:_resolver(ioc),
_net_plugin( std::move(net_plug) ),
_ios(ioc),
_ws( new ws::stream<tcp::socket>(ioc) ),
_strand( _ws->get_executor() ),
_app_ios( app().get_io_service() ),
_get_block_by_number( app().get_method<methods::get_block_by_number>() )
{
_session_num = next_session_id();
_ws->binary(true);
wlog( "open session ${n}",("n",_session_num) );
}
~session();
void set_socket_options() {
try {
/** to minimize latency when sending short messages */
_ws->next_layer().set_option( boost::asio::ip::tcp::no_delay(true) );
/** to minimize latency when sending large 1MB blocks, the send buffer should not have to
* wait for an "ack", making this larger could result in higher latency for smaller urgent
* messages.
*/
_ws->next_layer().set_option( boost::asio::socket_base::send_buffer_size( 1024*1024 ) );
_ws->next_layer().set_option( boost::asio::socket_base::receive_buffer_size( 1024*1024 ) );
} catch ( ... ) {
elog( "uncaught exception on set socket options" );
}
}
void run() {
_ws->async_accept( boost::asio::bind_executor(
_strand,
std::bind( &session::on_accept,
shared_from_this(),
std::placeholders::_1) ) );
}
void run( const string& peer ) {
auto c = peer.find(':');
auto host = peer.substr( 0, c );
auto port = peer.substr( c+1, peer.size() );
_peer = peer;
_remote_host = host;
_remote_port = port;
_resolver.async_resolve( _remote_host, _remote_port,
boost::asio::bind_executor( _strand,
std::bind( &session::on_resolve,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2 ) ) );
}
void on_resolve( boost::system::error_code ec,
tcp::resolver::results_type results ) {
if( ec ) return on_fail( ec, "resolve" );
boost::asio::async_connect( _ws->next_layer(),
results.begin(), results.end(),
boost::asio::bind_executor( _strand,
std::bind( &session::on_connect,
shared_from_this(),
std::placeholders::_1 ) ) );
}
void on_connect( boost::system::error_code ec ) {
if( ec ) return on_fail( ec, "connect" );
set_socket_options();
_ws->async_handshake( _remote_host, "/",
boost::asio::bind_executor( _strand,
std::bind( &session::on_handshake,
shared_from_this(),
std::placeholders::_1 ) ) );
}
void on_handshake( boost::system::error_code ec ) {
if( ec ) return on_fail( ec, "handshake" );
do_hello();
do_read();
}
/**
* This will be called "every time" a the transaction is accepted which happens
* on the speculative block (potentially several such blocks) and when a block
* that contains the transaction is applied and/or when switching forks.
*
* We will add it to the transaction status table as "received now" to be the
* basis of sending it to the peer. When we send it to the peer "received now"
* will be set to the infinite future to mark it as sent so we don't resend it
* when it is accepted again.
*
* Each time the transaction is "accepted" we extend the time we cache it by
* 5 seconds from now. Every time a block is applied we purge all accepted
* transactions that have reached 5 seconds without a new "acceptance".
*/
void on_accepted_transaction( transaction_metadata_ptr t ) {
//ilog( "accepted ${t}", ("t",t->id) );
auto itr = _transaction_status.find( t->id );
if( itr != _transaction_status.end() ) {
if( !itr->known_by_peer() ) {
_transaction_status.modify( itr, [&]( auto& stat ) {
stat.expired = std::min<fc::time_point>( fc::time_point::now() + fc::seconds(5), t->trx.expiration );
});
}
return;
}
transaction_status stat;
stat.received = fc::time_point::now();
stat.expired = stat.received + fc::seconds(5);
stat.id = t->id;
stat.trx = t;
_transaction_status.insert( stat );
maybe_send_next_message();
}
/**
* Remove all transactions that expired from cache prior to now
*/
void purge_transaction_cache() {
auto& idx = _transaction_status.get<by_expired>();
auto itr = idx.begin();
auto now = fc::time_point::now();
while( itr != idx.end() && itr->expired < now ) {
idx.erase(itr);
itr = idx.begin();
}
}
/**
* When our local LIB advances we can purge our known history up to
* the LIB or up to the last block known by the remote peer.
*/
void on_new_lib( block_state_ptr s ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
_local_lib = s->block_num;
_local_lib_id = s->id;
auto purge_to = std::min( _local_lib, _last_sent_block_num );
auto& idx = _block_status.get<by_num>();
auto itr = idx.begin();
while( itr != idx.end() && itr->block_num() < purge_to ) {
idx.erase(itr);
itr = idx.begin();
}
if( _remote_request_irreversible_only ) {
auto bitr = _block_status.find(s->id);
if ( bitr == _block_status.end() || !bitr->received_from_peer ) {
_block_header_notices.insert(s->id);
}
}
maybe_send_next_message();
}
void on_bad_block( signed_block_ptr b ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
try {
auto id = b->id();
auto itr = _block_status.find( id );
if( itr == _block_status.end() ) return;
if( itr->received_from_peer ) {
peer_elog(this, "bad signed_block_ptr : unknown" );
elog( "peer sent bad block #${b} ${i}, disconnect", ("b", b->block_num())("i",b->id()) );
_ws->next_layer().close();
}
} catch ( ... ) {
elog( "uncaught exception" );
}
}
void on_accepted_block_header( const block_state_ptr& s ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
// ilog( "accepted block header ${n}", ("n",s->block_num) );
const auto& id = s->id;
if( fc::time_point::now() - s->block->timestamp < fc::seconds(6) ) {
// ilog( "queue notice to peer that we have this block so hopefully they don't send it to us" );
auto itr = _block_status.find( id );
if( !_remote_request_irreversible_only && ( itr == _block_status.end() || !itr->received_from_peer ) ) {
_block_header_notices.insert( id );
}
if( itr == _block_status.end() ) {
_block_status.insert( block_status(id, false, false) );
}
}
}
void on_accepted_block( const block_state_ptr& s ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
//idump((_block_status.size())(_transaction_status.size()));
//ilog( "accepted block ${n}", ("n",s->block_num) );
const auto& id = s->id;
_local_head_block_id = id;
_local_head_block_num = block_header::num_from_id(id);
if( _local_head_block_num < _last_sent_block_num ) {
_last_sent_block_num = _local_lib;
_last_sent_block_id = _local_lib_id;
}
purge_transaction_cache();
/** purge all transactions from cache, I will send them as part of a block
* in the future unless peer tells me they already have block.
*/
for( const auto& receipt : s->block->transactions ) {
if( receipt.trx.which() == 1 ) {
const auto& pt = receipt.trx.get<packed_transaction>();
// get id via get_uncached_id() as packed_transaction.id() mutates internal transaction state
const auto& tid = pt.get_uncached_id();
auto itr = _transaction_status.find( tid );
if( itr != _transaction_status.end() )
_transaction_status.erase(itr);
}
}
maybe_send_next_message(); /// attempt to send if we are idle
}
template<typename L>
void async_get_pending_block_ids( L&& callback ) {
/// send peer my head block status which is read from chain plugin
_app_ios.post( [self = shared_from_this(),callback]{
auto& control = app().get_plugin<chain_plugin>().chain();
auto lib = control.last_irreversible_block_num();
auto head = control.fork_db_head_block_id();
auto head_num = block_header::num_from_id(head);
std::vector<block_id_type> ids;
if( lib > 0 ) {
ids.reserve((head_num-lib)+1);
for( auto i = lib; i <= head_num; ++i ) {
ids.emplace_back(control.get_block_id_for_num(i));
}
}
self->_ios.post( boost::asio::bind_executor(
self->_strand,
[callback,ids,lib](){
callback(ids,lib);
}
));
});
}
template<typename L>
void async_get_block_num( uint32_t blocknum, L&& callback ) {
_app_ios.post( [self = shared_from_this(), blocknum, callback]{
auto& control = app().get_plugin<chain_plugin>().chain();
signed_block_ptr sblockptr;
try {
//ilog( "fetch block ${n}", ("n",blocknum) );
sblockptr = control.fetch_block_by_number( blocknum );
} catch ( const fc::exception& e ) {
edump((e.to_detail_string()));
}
self->_ios.post( boost::asio::bind_executor(
self->_strand,
[callback,sblockptr](){
callback(sblockptr);
}
));
});
}
void do_hello();
void send( const bnet_message& msg ) { try {
auto ps = fc::raw::pack_size(msg);
_out_buffer.resize(ps);
fc::datastream<char*> ds(_out_buffer.data(), ps);
fc::raw::pack(ds, msg);
send();
} FC_LOG_AND_RETHROW() }
template<class T>
void send( const bnet_message& msg, const T& ex ) { try {
auto ex_size = fc::raw::pack_size(ex);
auto ps = fc::raw::pack_size(msg) + fc::raw::pack_size(unsigned_int(ex_size)) + ex_size;
_out_buffer.resize(ps);
fc::datastream<char*> ds(_out_buffer.data(), ps);
fc::raw::pack( ds, msg );
fc::raw::pack( ds, unsigned_int(ex_size) );
fc::raw::pack( ds, ex );
send();
} FC_LOG_AND_RETHROW() }
void send() { try {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
_state = sending_state;
_ws->async_write( boost::asio::buffer(_out_buffer),
boost::asio::bind_executor(
_strand,
std::bind( &session::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2 ) ) );
} FC_LOG_AND_RETHROW() }
void mark_block_status( const block_id_type& id, bool known_by_peer, bool recv_from_peer ) {
auto itr = _block_status.find(id);
if( itr == _block_status.end() ) {
// optimization to avoid sending blocks to nodes that already know about them
// to avoid unbounded memory growth limit number tracked
const auto min_block_num = std::min( _local_lib, _last_sent_block_num );
const auto max_block_num = min_block_num + _max_block_status_range;
const auto block_num = block_header::num_from_id( id );
if( block_num > min_block_num && block_num < max_block_num && _block_status.size() < _max_block_status_range )
_block_status.insert( block_status( id, known_by_peer, recv_from_peer ) );
} else {
_block_status.modify( itr, [&]( auto& item ) {
item.known_by_peer = known_by_peer;
if (recv_from_peer) item.received_from_peer = true;
});
}
}
/**
* This method will determine whether there is a message in the
* out queue, if so it returns. Otherwise it determines the best
* message to send.
*/
void maybe_send_next_message() {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
if( _state == sending_state ) return; /// in process of sending
if( _out_buffer.size() ) return; /// in process of sending
if( !_recv_remote_hello || !_sent_remote_hello ) return;
clear_expired_trx();
if( send_block_notice() ) return;
if( send_pong() ) return;
if( send_ping() ) return;
/// we don't know where we are (waiting on accept block localhost)
if( _local_head_block_id == block_id_type() ) return ;
if( send_next_block() ) return;
if( send_next_trx() ) return;
}
bool send_block_notice() {
if( _block_header_notices.size() == 0 )
return false;
block_notice notice;
notice.block_ids.reserve( _block_header_notices.size() );
for( auto& id : _block_header_notices )
notice.block_ids.emplace_back(id);
send(notice);
_block_header_notices.clear();
return true;
}
bool send_pong() {
if( _last_recv_ping.code == fc::sha256() )
return false;
send( pong{ fc::time_point::now(), _last_recv_ping.code } );
_last_recv_ping.code = fc::sha256();
return true;
}
bool send_ping() {
auto delta_t = fc::time_point::now() - _last_sent_ping.sent;
if( delta_t < fc::seconds(3) ) return false;
if( _last_sent_ping.code == fc::sha256() ) {
_last_sent_ping.sent = fc::time_point::now();
_last_sent_ping.code = fc::sha256::hash(_last_sent_ping.sent); /// TODO: make this more random
_last_sent_ping.lib = _local_lib;
send( _last_sent_ping );
}
/// we expect the peer to send us a ping every 3 seconds, so if we haven't gotten one
/// in the past 6 seconds then the connection is likely hung. Unfortunately, we cannot
/// use the round-trip time of ping/pong to measure latency because during syncing the
/// remote peer can be stuck doing CPU intensive tasks that block its reading of the
/// buffer. This buffer gets filled with perhaps 100 blocks taking .1 seconds each for
/// a total processing time of 10+ seconds. That said, the peer should come up for air
/// every .1 seconds so should still be able to send out a ping every 3 seconds.
//
// We don't want to wait a RTT for each block because that could also slow syncing for
// empty blocks...
//
//if( fc::time_point::now() - _last_recv_ping_time > fc::seconds(6) ) {
// do_goodbye( "no ping from peer in last 6 seconds...." );
//}
return true;
}
bool is_known_by_peer( block_id_type id ) {
auto itr = _block_status.find(id);
if( itr == _block_status.end() ) return false;
return itr->known_by_peer;
}
void clear_expired_trx() {
auto& idx = _transaction_status.get<by_expired>();
auto itr = idx.begin();
while( itr != idx.end() && itr->expired < fc::time_point::now() ) {
idx.erase(itr);
itr = idx.begin();
}
}
bool send_next_trx() { try {
if( !_remote_request_trx ) return false;
auto& idx = _transaction_status.get<by_received>();
auto start = idx.begin();
if( start == idx.end() || start->known_by_peer() )
return false;
auto ptrx_ptr = std::make_shared<packed_transaction>( start->trx->packed_trx );
idx.modify( start, [&]( auto& stat ) {
stat.mark_known_by_peer();
});
// wlog("sending trx ${id}", ("id",start->id) );
send(ptrx_ptr);
return true;
} FC_LOG_AND_RETHROW() }
void on_async_get_block( const signed_block_ptr& nextblock ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
if( !nextblock) {
_state = idle_state;
maybe_send_next_message();
return;
}
/// if something changed, the next block doesn't link to the last
/// block we sent, local chain must have switched forks
if( nextblock->previous != _last_sent_block_id ) {
if( !is_known_by_peer( nextblock->previous ) ) {
_last_sent_block_id = _local_lib_id;
_last_sent_block_num = _local_lib;
_state = idle_state;
maybe_send_next_message();
return;
}
}
/// at this point we know the peer can link this block
auto next_id = nextblock->id();
/// if the peer already knows about this block, great no need to
/// send it, mark it as 'sent' and move on.
if( is_known_by_peer( next_id ) ) {
_last_sent_block_id = next_id;
_last_sent_block_num = nextblock->block_num();
_state = idle_state;
maybe_send_next_message();
return;
}
mark_block_status( next_id, true, false );
_last_sent_block_id = next_id;
_last_sent_block_num = nextblock->block_num();
send( nextblock );
status( "sending block " + std::to_string( block_header::num_from_id(next_id) ) );
if( nextblock->timestamp > (fc::time_point::now() - fc::seconds(5)) ) {
mark_block_transactions_known_by_peer( nextblock );
}
}
/**
* Send the next block after the last block in our current fork that
* we know the remote peer knows.
*/
bool send_next_block() {
if ( _remote_request_irreversible_only && _last_sent_block_id == _local_lib_id ) {
return false;
}
if( _last_sent_block_id == _local_head_block_id ) /// we are caught up
return false;
///< set sending state because this callback may result in sending a message
_state = sending_state;
async_get_block_num( _last_sent_block_num + 1,
[self=shared_from_this()]( auto sblockptr ) {
self->on_async_get_block( sblockptr );
});
return true;
}
void on_fail( boost::system::error_code ec, const char* what ) {
try {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
elog( "${w}: ${m}", ("w", what)("m", ec.message() ) );
_ws->next_layer().close();
} catch ( ... ) {
elog( "uncaught exception on close" );
}
}
void on_accept( boost::system::error_code ec ) {
if( ec ) {
return on_fail( ec, "accept" );
}
do_hello();
do_read();
}
void do_read() {
_ws->async_read( _in_buffer,
boost::asio::bind_executor(
_strand,
std::bind( &session::on_read,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
void on_read( boost::system::error_code ec, std::size_t bytes_transferred ) {
boost::ignore_unused(bytes_transferred);
if( ec == ws::error::closed )
return on_fail( ec, "close on read" );
if( ec ) {
return on_fail( ec, "read" );;
}
try {
auto d = boost::asio::buffer_cast<char const*>(boost::beast::buffers_front(_in_buffer.data()));
auto s = boost::asio::buffer_size(_in_buffer.data());
fc::datastream<const char*> ds(d,s);
bnet_message msg;
fc::raw::unpack( ds, msg );
on_message( msg, ds );
_in_buffer.consume( ds.tellp() );
wait_on_app();
return;
} catch ( ... ) {
wlog( "close bad payload" );
}
try {
_ws->close( boost::beast::websocket::close_code::bad_payload );
} catch ( ... ) {
elog( "uncaught exception on close" );
}
}
/** if we just call do_read here then this thread might run ahead of
* the main thread, instead we post an event to main which will then
* post a new read event when ready.
*
* This also keeps the "shared pointer" alive in the callback preventing
* the connection from being closed.
*/
void wait_on_app() {
app().get_io_service().post(
boost::asio::bind_executor( _strand, [self=shared_from_this()]{ self->do_read(); } )
);
}
void on_message( const bnet_message& msg, fc::datastream<const char*>& ds ) {
try {
switch( msg.which() ) {
case bnet_message::tag<hello>::value:
on( msg.get<hello>(), ds );
break;
case bnet_message::tag<block_notice>::value:
on( msg.get<block_notice>() );
break;
case bnet_message::tag<signed_block_ptr>::value:
on( msg.get<signed_block_ptr>() );
break;
case bnet_message::tag<packed_transaction_ptr>::value:
on( msg.get<packed_transaction_ptr>() );
break;
case bnet_message::tag<ping>::value:
on( msg.get<ping>() );
break;
case bnet_message::tag<pong>::value:
on( msg.get<pong>() );
break;
default:
wlog( "bad message received" );
_ws->close( boost::beast::websocket::close_code::bad_payload );
return;
}
maybe_send_next_message();
} catch( const fc::exception& e ) {
elog( "${e}", ("e",e.to_detail_string()));
_ws->close( boost::beast::websocket::close_code::bad_payload );
}
}
void on( const block_notice& notice ) {
peer_ilog(this, "received block_notice");
for( const auto& id : notice.block_ids ) {
status( "received notice " + std::to_string( block_header::num_from_id(id) ) );
mark_block_status( id, true, false );
}
}
void on( const hello& hi, fc::datastream<const char*>& ds );
void on( const ping& p ) {
peer_ilog(this, "received ping");
_last_recv_ping = p;
_remote_lib = p.lib;
_last_recv_ping_time = fc::time_point::now();
}
void on( const pong& p ) {
peer_ilog(this, "received pong");
if( p.code != _last_sent_ping.code ) {
peer_elog(this, "bad ping : invalid pong code");
return do_goodbye( "invalid pong code" );
}
_last_sent_ping.code = fc::sha256();
}
void do_goodbye( const string& reason ) {
try {
status( "goodbye - " + reason );
_ws->next_layer().close();
} catch ( ... ) {
elog( "uncaught exception on close" );
}
}
void check_for_redundant_connection();
void on( const signed_block_ptr& b ) {
peer_ilog(this, "received signed_block_ptr");
if (!b) {
peer_elog(this, "bad signed_block_ptr : null pointer");