Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: multi: automate block template regeneration #1173

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions blockchain/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func (b *BlockChain) processOrphans(hash *chainhash.Hash, flags BehaviorFlags) e
return nil
}

// WaitForChain blocks if the chain is processing a block or handling a reorg.
func (b *BlockChain) WaitForChain() {
b.chainLock.RLock()
b.chainLock.RUnlock()
}

// ProcessBlock is the main workhorse for handling insertion of new blocks into
// the block chain. It includes functionality such as rejecting duplicate
// blocks, ensuring blocks follow all rules, orphan handling, and insertion into
Expand Down
244 changes: 152 additions & 92 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ const (
// otherwise arise from sending old orphan blocks and forcing nodes to
// do expensive lottery data calculations for them.
maxReorgDepthNotify = 6

// templateRegenSeconds is the required number of seconds elapsed,
// with incoming non vote transactions, before template
// regeneration is required.
templateRegenSeconds = 30

// maxTemplateRegenSeconds is the maximum number of seconds elapsed
// without incoming transactions before template regeneration is required.
maxTemplateRegenSeconds = 60
)

// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
Expand Down Expand Up @@ -235,60 +244,20 @@ type processTransactionMsg struct {
reply chan processTransactionResponse
}

// regenMsg handles a request for block template regeneration.
type regenMsg struct{}

// nonVoteMsg handles a request signalling the receipt of a non vote
// transaction.
type nonVoteMsg struct{}

// isCurrentMsg is a message type to be sent across the message channel for
// requesting whether or not the block manager believes it is synced with
// the currently connected peers.
type isCurrentMsg struct {
reply chan bool
}

// getCurrentTemplateMsg handles a request for the current mining block template.
type getCurrentTemplateMsg struct {
reply chan getCurrentTemplateResponse
}

// getCurrentTemplateResponse is a response sent to the reply channel of a
// getCurrentTemplateMsg.
type getCurrentTemplateResponse struct {
Template *BlockTemplate
}

// setCurrentTemplateMsg handles a request to change the current mining block
// template.
type setCurrentTemplateMsg struct {
Template *BlockTemplate
reply chan setCurrentTemplateResponse
}

// setCurrentTemplateResponse is a response sent to the reply channel of a
// setCurrentTemplateMsg.
type setCurrentTemplateResponse struct {
}

// getParentTemplateMsg handles a request for the current parent mining block
// template.
type getParentTemplateMsg struct {
reply chan getParentTemplateResponse
}

// getParentTemplateResponse is a response sent to the reply channel of a
// getParentTemplateMsg.
type getParentTemplateResponse struct {
Template *BlockTemplate
}

// setParentTemplateMsg handles a request to change the parent mining block
// template.
type setParentTemplateMsg struct {
Template *BlockTemplate
reply chan setParentTemplateResponse
}

// setParentTemplateResponse is a response sent to the reply channel of a
// setParentTemplateMsg.
type setParentTemplateResponse struct {
}

// headerNode is used as a node in a list of headers that are linked together
// between checkpoints.
type headerNode struct {
Expand Down Expand Up @@ -407,6 +376,18 @@ type blockManager struct {
lotteryDataBroadcast map[chainhash.Hash]struct{}
lotteryDataBroadcastMutex sync.RWMutex

// the following fields handle block template regeneration and updating
// registered clients.
mining bool
lastRegen time.Time
nonVoteTxCountLastInterval uint64
nonVoteTxCount uint64
registeredClients map[chan *BlockTemplate]struct{}
regenMtx sync.Mutex
templateMtx sync.RWMutex
nonVoteTxCountMtx sync.RWMutex
registerMtx sync.RWMutex

cachedCurrentTemplate *BlockTemplate
cachedParentTemplate *BlockTemplate
AggressiveMining bool
Expand Down Expand Up @@ -764,12 +745,33 @@ func (b *blockManager) current() bool {
return true
}

// NotifyRegisteredTemplateClients broadcasts the provided block template
// to all registered clients.
func (b *blockManager) NotifyRegisteredTemplateClients(t *BlockTemplate) {
b.registerMtx.Lock()
for c := range b.registeredClients {
c <- t
// Delete the client after updating it.
delete(b.registeredClients, c)
}
b.registerMtx.Unlock()
}

// RequestTemplateUpdate registers a client for block template updates.
func (b *blockManager) RequestTemplateUpdate() chan *BlockTemplate {
updateChan := make(chan *BlockTemplate)
b.registerMtx.Lock()
b.registeredClients[updateChan] = struct{}{}
b.registerMtx.Unlock()
return updateChan
}

// checkBlockForHiddenVotes checks to see if a newly added block contains
// any votes that were previously unknown to our daemon. If it does, it
// adds these votes to the cached parent block template.
//
// This is UNSAFE for concurrent access. It must be called in single threaded
// access through the block mananger. All template access must also be routed
// access through the block manager. All template access must also be routed
// through the block manager.
func (b *blockManager) checkBlockForHiddenVotes(block *dcrutil.Block) {
var votesFromBlock []*dcrutil.Tx
Expand Down Expand Up @@ -1592,6 +1594,15 @@ func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
// the fetching should proceed.
func (b *blockManager) blockHandler() {
candidatePeers := list.New()
b.lastRegen = b.server.txMemPool.LastUpdated()
updateNonVoteCounters := func() {
b.nonVoteTxCountMtx.Lock()
b.nonVoteTxCountLastInterval, b.nonVoteTxCount = b.nonVoteTxCount, 0
b.nonVoteTxCountMtx.Unlock()
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

out:
for {
select {
Expand Down Expand Up @@ -1805,31 +1816,92 @@ out:
case isCurrentMsg:
msg.reply <- b.current()

case getCurrentTemplateMsg:
cur := deepCopyBlockTemplate(b.cachedCurrentTemplate)
msg.reply <- getCurrentTemplateResponse{
Template: cur,
}

case setCurrentTemplateMsg:
b.cachedCurrentTemplate = deepCopyBlockTemplate(msg.Template)
msg.reply <- setCurrentTemplateResponse{}

case getParentTemplateMsg:
par := deepCopyBlockTemplate(b.cachedParentTemplate)
msg.reply <- getParentTemplateResponse{
Template: par,
case nonVoteMsg:
if b.mining {
// Increment the non vote transaction counter.
b.nonVoteTxCountMtx.Lock()
b.nonVoteTxCount++
b.nonVoteTxCountMtx.Unlock()
}

case setParentTemplateMsg:
b.cachedParentTemplate = deepCopyBlockTemplate(msg.Template)
msg.reply <- setParentTemplateResponse{}
case regenMsg:
b.chain.WaitForChain()
go func() {
if b.mining {
// Pick a mining address at random.
rand.Seed(time.Now().UnixNano())
payToAddr :=
cfg.miningAddrs[rand.Intn(len(cfg.miningAddrs))]

// Regenerate the block template.
b.templateMtx.Lock()
template, err :=
NewBlockTemplate(b.server.rpcServer.policy, b.server,
payToAddr)
if err != nil {
bmgrLog.Infof(
"block template generation failed: %v", err)
}

if template != nil {
// Update the current and parent templates and
// notify all registered template clients.
b.cachedParentTemplate, b.cachedCurrentTemplate =
b.cachedCurrentTemplate, template

b.NotifyRegisteredTemplateClients(b.cachedCurrentTemplate)
bmgrLog.Debugf("block template generated "+
"(height: %v, hash: %v)",
template.Block.Header.Height,
template.Block.BlockHash())

// Update the last regen time.
b.regenMtx.Lock()
b.lastRegen = time.Now()
b.regenMtx.Unlock()
}
b.templateMtx.Unlock()
}
}()
default:
bmgrLog.Warnf("Invalid message type in block "+
"handler: %T", msg)
}

case <-ticker.C:
go func() {
if b.mining {
b.nonVoteTxCountMtx.RLock()
hasNewNonVoteTxs := b.nonVoteTxCount > 0
b.nonVoteTxCountMtx.RUnlock()

b.regenMtx.Lock()
timeDiff := time.Since(b.lastRegen)
regen := b.server.txMemPool.LastUpdated().After(b.lastRegen.
Add(templateRegenSeconds * time.Second))
b.regenMtx.Unlock()

// Signal template regeneration if a minute has elapsed
// since the last regeneration and there is at least one
// registered subscriber for block template updates.
if timeDiff.Seconds() > maxTemplateRegenSeconds {
b.RegenTemplate()
updateNonVoteCounters()
return
}

// Signal template regeneration if thirty seconds have
// elapsed and at least a new non vote transaction has
// been accepted by the mempool nd there is at least one
// registered subscriber for block template updates.
if regen && hasNewNonVoteTxs {
b.RegenTemplate()
}

updateNonVoteCounters()
}
}()

case <-b.quit:
break out
}
Expand Down Expand Up @@ -2119,7 +2191,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {

// Drop the associated mining template from the old chain, since it
// will be no longer valid.
b.templateMtx.Lock()
b.cachedCurrentTemplate = nil
b.templateMtx.Unlock()
}
}

Expand Down Expand Up @@ -2413,39 +2487,19 @@ func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) {
return b.chain.TicketPoolValue()
}

// GetCurrentTemplate gets the current block template for mining.
func (b *blockManager) GetCurrentTemplate() *BlockTemplate {
reply := make(chan getCurrentTemplateResponse)
b.msgChan <- getCurrentTemplateMsg{reply: reply}
response := <-reply
return response.Template
}

// SetCurrentTemplate sets the current block template for mining.
func (b *blockManager) SetCurrentTemplate(bt *BlockTemplate) {
reply := make(chan setCurrentTemplateResponse)
b.msgChan <- setCurrentTemplateMsg{Template: bt, reply: reply}
<-reply
}

// GetParentTemplate gets the current parent block template for mining.
func (b *blockManager) GetParentTemplate() *BlockTemplate {
reply := make(chan getParentTemplateResponse)
b.msgChan <- getParentTemplateMsg{reply: reply}
response := <-reply
return response.Template
// RegenTemplate regenerates the block template.
func (b *blockManager) RegenTemplate() {
b.msgChan <- regenMsg{}
}

// SetParentTemplate sets the current parent block template for mining.
func (b *blockManager) SetParentTemplate(bt *BlockTemplate) {
reply := make(chan setParentTemplateResponse)
b.msgChan <- setParentTemplateMsg{Template: bt, reply: reply}
<-reply
// ReceiveNonVoteTx signals the receipt of a non vote transaction.
func (b *blockManager) ReceiveNonVoteTx() {
b.msgChan <- nonVoteMsg{}
}

// newBlockManager returns a new Decred block manager.
// Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server, indexManager blockchain.IndexManager, interrupt <-chan struct{}) (*blockManager, error) {
func newBlockManager(s *server, indexManager blockchain.IndexManager, mining bool, interrupt <-chan struct{}) (*blockManager, error) {
bm := blockManager{
server: s,
rejectedTxns: make(map[chainhash.Hash]struct{}),
Expand All @@ -2456,6 +2510,12 @@ func newBlockManager(s *server, indexManager blockchain.IndexManager, interrupt
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerList: list.New(),
mining: mining,
regenMtx: sync.Mutex{},
templateMtx: sync.RWMutex{},
nonVoteTxCountMtx: sync.RWMutex{},
registerMtx: sync.RWMutex{},
registeredClients: make(map[chan *BlockTemplate]struct{}),
AggressiveMining: !cfg.NonAggressive,
quit: make(chan struct{}),
}
Expand Down
Loading