Skip to content

Commit

Permalink
multi: add specialized rebroadcast handling for stake txs.
Browse files Browse the repository at this point in the history
  • Loading branch information
dnldd authored and davecgh committed Jul 6, 2018
1 parent d3199d2 commit 9b08b58
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
12 changes: 9 additions & 3 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,13 +1101,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Retrieve the current previous block hash.
curPrevHash := b.chain.BestPrevHash()

nextStakeDiff, errSDiff :=
nextStakeDiff, err :=
b.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
if err != nil {
bmgrLog.Warnf("Failed to get next stake difficulty "+
"calculation: %v", err)
}
if r != nil && errSDiff == nil {
if r != nil && err == nil {
// Update registered websocket clients on the
// current stake difficulty.
r.ntfnMgr.NotifyStakeDifficulty(
Expand Down Expand Up @@ -2020,6 +2020,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.server.RemoveRebroadcastInventory(iv)
}

// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()

// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}
Expand Down Expand Up @@ -2103,6 +2106,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}
}

// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()

// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)
Expand Down
11 changes: 1 addition & 10 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5081,16 +5081,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
//
// Note that votes are only valid for a specific block and are time
// sensitive, so they should not be added to the rebroadcast logic.
//
// TODO: Ideally ticket purchases and revocations could be added to the
// rebroadcast logic as well, however, they would need to be removed under
// certain circumstances such as when the stake difficulty interval changes
// and if a revocation is for a ticket that was missed, but then becomes
// live again due to a reorg. All stake transactions are ignored here since
// there is no clean infrastructure in place currently to handle those
// removals and perpetually broadcasting transactions which are no longer
// valid is not desirable.
if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular {
if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
s.server.AddRebroadcastInventory(iv, tx)
}
Expand Down
71 changes: 71 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/decred/dcrd/addrmgr"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/indexers"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/chaincfg"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/connmgr"
Expand Down Expand Up @@ -83,6 +84,10 @@ type broadcastInventoryAdd relayMsg
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *wire.InvVect

// broadcastPruneInventory is a type used to declare that rebroadcast
// inventory entries need to be filtered and removed where necessary
type broadcastPruneInventory struct{}

// relayMsg packages an inventory vector along with the newly discovered
// inventory so the relay has access to that information.
type relayMsg struct {
Expand Down Expand Up @@ -1095,6 +1100,17 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}

// PruneRebroadcastInventory filters and removes rebroadcast inventory entries
// where necessary.
func (s *server) PruneRebroadcastInventory() {
// Ignore if shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}

s.modifyRebroadcastInv <- broadcastPruneInventory{}
}

// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
Expand Down Expand Up @@ -1910,8 +1926,10 @@ func (s *server) rebroadcastHandler() {
out:
for {
select {

case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) {

// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg.invVect] = msg.data
Expand All @@ -1922,6 +1940,59 @@ out:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}

case broadcastPruneInventory:
best := s.blockManager.chain.BestSnapshot()
nextStakeDiff, err :=
s.blockManager.chain.CalcNextRequiredStakeDifficulty()
if err != nil {
srvrLog.Errorf("Failed to get next stake difficulty: %v",
err)
break
}

for iv, data := range pendingInvs {
tx, ok := data.(*dcrutil.Tx)
if !ok {
continue
}

txType := stake.DetermineTxType(tx.MsgTx())

// Remove the ticket rebroadcast if the amount not equal to
// the current stake difficulty.
if txType == stake.TxTypeSStx &&
tx.MsgTx().TxOut[0].Value != nextStakeDiff {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending ticket purchase broadcast "+
"inventory for tx %v removed. Ticket value not "+
"equal to stake difficulty.", tx.Hash())
continue
}

// Remove the ticket rebroadcast if it has already expired.
if txType == stake.TxTypeSStx &&
blockchain.IsExpired(tx, best.Height) {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending ticket purchase broadcast "+
"inventory for tx %v removed. Transaction "+
"expired.", tx.Hash())
continue
}

// Remove the revocation rebroadcast if the associated
// ticket has been revived.
if txType == stake.TxTypeSSRtx {
refSStxHash := tx.MsgTx().TxIn[0].PreviousOutPoint.Hash
if !s.blockManager.chain.CheckLiveTicket(refSStxHash) {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending revocation broadcast "+
"inventory for tx %v removed. "+
"Associated ticket was revived.", tx.Hash())
continue
}
}
}
}

case <-timer.C:
Expand Down

0 comments on commit 9b08b58

Please sign in to comment.