Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: add specialized rebroadcast handling for stake txs. #979

Merged
merged 1 commit into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,13 +1101,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Retrieve the current previous block hash.
curPrevHash := b.chain.BestPrevHash()

nextStakeDiff, errSDiff :=
nextStakeDiff, err :=
b.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
if err != nil {
bmgrLog.Warnf("Failed to get next stake difficulty "+
"calculation: %v", err)
}
if r != nil && errSDiff == nil {
if r != nil && err == nil {
// Update registered websocket clients on the
// current stake difficulty.
r.ntfnMgr.NotifyStakeDifficulty(
Expand Down Expand Up @@ -2020,6 +2020,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.server.RemoveRebroadcastInventory(iv)
}

// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()

// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}
Expand Down Expand Up @@ -2103,6 +2106,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}
}

// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()

// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)
Expand Down
11 changes: 1 addition & 10 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5081,16 +5081,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
//
// Note that votes are only valid for a specific block and are time
// sensitive, so they should not be added to the rebroadcast logic.
//
// TODO: Ideally ticket purchases and revocations could be added to the
// rebroadcast logic as well, however, they would need to be removed under
// certain circumstances such as when the stake difficulty interval changes
// and if a revocation is for a ticket that was missed, but then becomes
// live again due to a reorg. All stake transactions are ignored here since
// there is no clean infrastructure in place currently to handle those
// removals and perpetually broadcasting transactions which are no longer
// valid is not desirable.
if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular {
if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
s.server.AddRebroadcastInventory(iv, tx)
}
Expand Down
71 changes: 71 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/decred/dcrd/addrmgr"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/indexers"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/chaincfg"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/connmgr"
Expand Down Expand Up @@ -83,6 +84,10 @@ type broadcastInventoryAdd relayMsg
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *wire.InvVect

// broadcastPruneInventory is a type used to declare that rebroadcast
// inventory entries need to be filtered and removed where necessary
type broadcastPruneInventory struct{}

// relayMsg packages an inventory vector along with the newly discovered
// inventory so the relay has access to that information.
type relayMsg struct {
Expand Down Expand Up @@ -1095,6 +1100,17 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}

// PruneRebroadcastInventory filters and removes rebroadcast inventory entries
// where necessary.
func (s *server) PruneRebroadcastInventory() {
// Ignore if shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}

s.modifyRebroadcastInv <- broadcastPruneInventory{}
}

// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
Expand Down Expand Up @@ -1910,8 +1926,10 @@ func (s *server) rebroadcastHandler() {
out:
for {
select {

case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) {

// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg.invVect] = msg.data
Expand All @@ -1922,6 +1940,59 @@ out:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}

case broadcastPruneInventory:
best := s.blockManager.chain.BestSnapshot()
nextStakeDiff, err :=
s.blockManager.chain.CalcNextRequiredStakeDifficulty()
if err != nil {
srvrLog.Errorf("Failed to get next stake difficulty: %v",
err)
break
}

for iv, data := range pendingInvs {
tx, ok := data.(*dcrutil.Tx)
if !ok {
continue
}

txType := stake.DetermineTxType(tx.MsgTx())

// Remove the ticket rebroadcast if the amount not equal to
// the current stake difficulty.
if txType == stake.TxTypeSStx &&
tx.MsgTx().TxOut[0].Value != nextStakeDiff {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending ticket purchase broadcast "+
"inventory for tx %v removed. Ticket value not "+
"equal to stake difficulty.", tx.Hash())
continue
}

// Remove the ticket rebroadcast if it has already expired.
if txType == stake.TxTypeSStx &&
blockchain.IsExpired(tx, best.Height) {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending ticket purchase broadcast "+
"inventory for tx %v removed. Transaction "+
"expired.", tx.Hash())
continue
}

// Remove the revocation rebroadcast if the associated
// ticket has been revived.
if txType == stake.TxTypeSSRtx {
refSStxHash := tx.MsgTx().TxIn[0].PreviousOutPoint.Hash
if !s.blockManager.chain.CheckLiveTicket(refSStxHash) {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending revocation broadcast "+
"inventory for tx %v removed. "+
"Associated ticket was revived.", tx.Hash())
continue
}
}
}
}

case <-timer.C:
Expand Down