Skip to content

Commit

Permalink
multi: add specialized rebroadcast handling for stake txs.
Browse files Browse the repository at this point in the history
  • Loading branch information
dnldd committed Jan 22, 2018
1 parent fc102fa commit fe5b50a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
16 changes: 15 additions & 1 deletion blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
bmgrLog.Warnf("Failed to get next stake difficulty "+
"calculation: %v", err)
"calculation: %v", errSDiff)
}
if r != nil && errSDiff == nil {
// Update registered websocket clients on the
Expand Down Expand Up @@ -2077,6 +2077,13 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}

for _, stx := range block.STransactions()[0:] {
// Remove rebroadcast inventory for revocations
if txType := stake.DetermineTxType(stx.MsgTx()); txType == stake.TxTypeSSRtx {
iv := wire.NewInvVect(wire.InvTypeTx, stx.Hash())
b.server.RemoveRebroadcastInventory(iv)
continue
}

b.server.txMemPool.RemoveTransaction(stx, false)
b.server.txMemPool.RemoveDoubleSpends(stx)
b.server.txMemPool.RemoveOrphan(stx.Hash())
Expand Down Expand Up @@ -2173,6 +2180,13 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}

for _, tx := range block.STransactions()[0:] {
// Remove rebroadcast inventory for revocations
if txType := stake.DetermineTxType(tx.MsgTx()); txType == stake.TxTypeSSRtx {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
b.server.RemoveRebroadcastInventory(iv)
continue
}

_, err := b.server.txMemPool.MaybeAcceptTransaction(tx, false, true)
if err != nil {
// Remove the transaction and all transactions
Expand Down
2 changes: 1 addition & 1 deletion rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5023,7 +5023,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
// there is no clean infrastructure in place currently to handle those
// removals and perpetually broadcasting transactions which are no longer
// valid is not desirable.
if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular {
if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
s.server.AddRebroadcastInventory(iv, tx)
}
Expand Down
45 changes: 40 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/decred/dcrd/addrmgr"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/indexers"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/bloom"
"github.com/decred/dcrd/chaincfg"
"github.com/decred/dcrd/chaincfg/chainhash"
Expand Down Expand Up @@ -179,6 +180,8 @@ type server struct {
txIndex *indexers.TxIndex
addrIndex *indexers.AddrIndex
existsAddrIndex *indexers.ExistsAddrIndex

pendingInvs map[wire.InvVect]interface{}
}

// serverPeer extends the peer to maintain state shared by the server and
Expand Down Expand Up @@ -1945,13 +1948,44 @@ func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight i
}
}

// PruneInvalidatedSStxRebroadcastInventory removes expired – either by stake difficulty mismatch or exceeded SStx ticket pruning threshold.
func (s *server) PruneInvalidatedSStxRebroadcastInventory() {
// heightDiffToPruneTicket is the number of blocks to pass by in terms
// of height before old tickets are pruned.
heightDiffToPruneTicket := 288
best := s.blockManager.chain.BestSnapshot()
nextStakeDiff, errSDiff :=
s.blockManager.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
srvrLog.Warnf("Failed to get next stake difficulty "+
"calculation: %v", errSDiff)
}

for iv, data := range s.pendingInvs {
tx, ok := data.(dcrutil.Tx)
if !ok {
continue
}

txType := stake.DetermineTxType(tx.MsgTx())
// remove sstx rebroadcast if the amount is less than the stake difficulty
if txType == stake.TxTypeSStx &&
tx.MsgTx().TxOut[0].Value < nextStakeDiff {
delete(s.pendingInvs, iv)
}
if txType == stake.TxTypeSStx &&
int64(tx.MsgTx().Expiry)+int64(heightDiffToPruneTicket) < best.Height {
delete(s.pendingInvs, iv)
}
}
}

// rebroadcastHandler keeps track of user submitted inventories that we have
// sent out but have not yet made it into a block. We periodically rebroadcast
// them in case our peers restarted or otherwise lost track of them.
func (s *server) rebroadcastHandler() {
// Wait 5 min before first tx rebroadcast.
timer := time.NewTimer(5 * time.Minute)
pendingInvs := make(map[wire.InvVect]interface{})

out:
for {
Expand All @@ -1960,20 +1994,20 @@ out:
switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg.invVect] = msg.data
s.pendingInvs[*msg.invVect] = msg.data

// When an InvVect has been added to a block, we can
// now remove it, if it was present.
case broadcastInventoryDel:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
if _, ok := s.pendingInvs[*msg]; ok {
delete(s.pendingInvs, *msg)
}
}

case <-timer.C:
// Any inventory we have has not made it into a block
// yet. We periodically resubmit them until they have.
for iv, data := range pendingInvs {
for iv, data := range s.pendingInvs {
ivCopy := iv
s.RelayInventory(&ivCopy, data)
}
Expand Down Expand Up @@ -2387,6 +2421,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
timeSource: blockchain.NewMedianTime(),
services: services,
sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
pendingInvs: make(map[wire.InvVect]interface{}),
}

// Create the transaction and address indexes if needed.
Expand Down

0 comments on commit fe5b50a

Please sign in to comment.