From fe5b50ae5fb46238c5820ff24a93948e7fae7b50 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Mon, 22 Jan 2018 02:57:14 +0000 Subject: [PATCH] multi: add specialized rebroadcast handling for stake txs. --- blockmanager.go | 16 +++++++++++++++- rpcserver.go | 2 +- server.go | 45 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 9938e72b15..f1e892ee62 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1153,7 +1153,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.chain.CalcNextRequiredStakeDifficulty() if errSDiff != nil { bmgrLog.Warnf("Failed to get next stake difficulty "+ - "calculation: %v", err) + "calculation: %v", errSDiff) } if r != nil && errSDiff == nil { // Update registered websocket clients on the @@ -2077,6 +2077,13 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { } for _, stx := range block.STransactions()[0:] { + // Remove rebroadcast inventory for revocations + if txType := stake.DetermineTxType(stx.MsgTx()); txType == stake.TxTypeSSRtx { + iv := wire.NewInvVect(wire.InvTypeTx, stx.Hash()) + b.server.RemoveRebroadcastInventory(iv) + continue + } + b.server.txMemPool.RemoveTransaction(stx, false) b.server.txMemPool.RemoveDoubleSpends(stx) b.server.txMemPool.RemoveOrphan(stx.Hash()) @@ -2173,6 +2180,13 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { } for _, tx := range block.STransactions()[0:] { + // Remove rebroadcast inventory for revocations + if txType := stake.DetermineTxType(tx.MsgTx()); txType == stake.TxTypeSSRtx { + iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + b.server.RemoveRebroadcastInventory(iv) + continue + } + _, err := b.server.txMemPool.MaybeAcceptTransaction(tx, false, true) if err != nil { // Remove the transaction and all transactions diff --git a/rpcserver.go b/rpcserver.go index a77296c40a..981955745f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -5023,7 +5023,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st // there is no clean infrastructure in place currently to handle those // removals and perpetually broadcasting transactions which are no longer // valid is not desirable. - if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular { + if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen { iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) s.server.AddRebroadcastInventory(iv, tx) } diff --git a/server.go b/server.go index 62d03b9e45..6fe7d337ef 100644 --- a/server.go +++ b/server.go @@ -23,6 +23,7 @@ import ( "github.com/decred/dcrd/addrmgr" "github.com/decred/dcrd/blockchain" "github.com/decred/dcrd/blockchain/indexers" + "github.com/decred/dcrd/blockchain/stake" "github.com/decred/dcrd/bloom" "github.com/decred/dcrd/chaincfg" "github.com/decred/dcrd/chaincfg/chainhash" @@ -179,6 +180,8 @@ type server struct { txIndex *indexers.TxIndex addrIndex *indexers.AddrIndex existsAddrIndex *indexers.ExistsAddrIndex + + pendingInvs map[wire.InvVect]interface{} } // serverPeer extends the peer to maintain state shared by the server and @@ -1945,13 +1948,44 @@ func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight i } } +// PruneInvalidatedSStxRebroadcastInventory removes expired – either by stake difficulty mismatch or exceeded SStx ticket pruning threshold. +func (s *server) PruneInvalidatedSStxRebroadcastInventory() { + // heightDiffToPruneTicket is the number of blocks to pass by in terms + // of height before old tickets are pruned. + heightDiffToPruneTicket := 288 + best := s.blockManager.chain.BestSnapshot() + nextStakeDiff, errSDiff := + s.blockManager.chain.CalcNextRequiredStakeDifficulty() + if errSDiff != nil { + srvrLog.Warnf("Failed to get next stake difficulty "+ + "calculation: %v", errSDiff) + } + + for iv, data := range s.pendingInvs { + tx, ok := data.(dcrutil.Tx) + if !ok { + continue + } + + txType := stake.DetermineTxType(tx.MsgTx()) + // remove sstx rebroadcast if the amount is less than the stake difficulty + if txType == stake.TxTypeSStx && + tx.MsgTx().TxOut[0].Value < nextStakeDiff { + delete(s.pendingInvs, iv) + } + if txType == stake.TxTypeSStx && + int64(tx.MsgTx().Expiry)+int64(heightDiffToPruneTicket) < best.Height { + delete(s.pendingInvs, iv) + } + } +} + // rebroadcastHandler keeps track of user submitted inventories that we have // sent out but have not yet made it into a block. We periodically rebroadcast // them in case our peers restarted or otherwise lost track of them. func (s *server) rebroadcastHandler() { // Wait 5 min before first tx rebroadcast. timer := time.NewTimer(5 * time.Minute) - pendingInvs := make(map[wire.InvVect]interface{}) out: for { @@ -1960,20 +1994,20 @@ out: switch msg := riv.(type) { // Incoming InvVects are added to our map of RPC txs. case broadcastInventoryAdd: - pendingInvs[*msg.invVect] = msg.data + s.pendingInvs[*msg.invVect] = msg.data // When an InvVect has been added to a block, we can // now remove it, if it was present. case broadcastInventoryDel: - if _, ok := pendingInvs[*msg]; ok { - delete(pendingInvs, *msg) + if _, ok := s.pendingInvs[*msg]; ok { + delete(s.pendingInvs, *msg) } } case <-timer.C: // Any inventory we have has not made it into a block // yet. We periodically resubmit them until they have. - for iv, data := range pendingInvs { + for iv, data := range s.pendingInvs { ivCopy := iv s.RelayInventory(&ivCopy, data) } @@ -2387,6 +2421,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param timeSource: blockchain.NewMedianTime(), services: services, sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), + pendingInvs: make(map[wire.InvVect]interface{}), } // Create the transaction and address indexes if needed.