Skip to content

Commit

Permalink
multi: automate block template regeneration.
Browse files Browse the repository at this point in the history
  • Loading branch information
dnldd committed Aug 16, 2018
1 parent 9536f0c commit 792fd3f
Show file tree
Hide file tree
Showing 8 changed files with 572 additions and 526 deletions.
269 changes: 177 additions & 92 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ const (
// otherwise arise from sending old orphan blocks and forcing nodes to
// do expensive lottery data calculations for them.
maxReorgDepthNotify = 6

// templateRegenSeconds is the required number of seconds elapsed,
// with incoming non vote transactions, before template
// regeneration is required.
templateRegenSeconds = 30

// maxTemplateRegenSeconds is the maximum number of seconds elapsed
// without incoming transactions before template regeneration is required.
maxTemplateRegenSeconds = 60
)

// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
Expand Down Expand Up @@ -235,60 +244,20 @@ type processTransactionMsg struct {
reply chan processTransactionResponse
}

// regenMsg handles a request for block template regeneration.
type regenMsg struct{}

// nonVoteMsg handles a request signalling the receipt of a non vote
// transaction.
type nonVoteMsg struct{}

// isCurrentMsg is a message type to be sent across the message channel for
// requesting whether or not the block manager believes it is synced with
// the currently connected peers.
type isCurrentMsg struct {
reply chan bool
}

// getCurrentTemplateMsg handles a request for the current mining block template.
type getCurrentTemplateMsg struct {
reply chan getCurrentTemplateResponse
}

// getCurrentTemplateResponse is a response sent to the reply channel of a
// getCurrentTemplateMsg.
type getCurrentTemplateResponse struct {
Template *BlockTemplate
}

// setCurrentTemplateMsg handles a request to change the current mining block
// template.
type setCurrentTemplateMsg struct {
Template *BlockTemplate
reply chan setCurrentTemplateResponse
}

// setCurrentTemplateResponse is a response sent to the reply channel of a
// setCurrentTemplateMsg.
type setCurrentTemplateResponse struct {
}

// getParentTemplateMsg handles a request for the current parent mining block
// template.
type getParentTemplateMsg struct {
reply chan getParentTemplateResponse
}

// getParentTemplateResponse is a response sent to the reply channel of a
// getParentTemplateMsg.
type getParentTemplateResponse struct {
Template *BlockTemplate
}

// setParentTemplateMsg handles a request to change the parent mining block
// template.
type setParentTemplateMsg struct {
Template *BlockTemplate
reply chan setParentTemplateResponse
}

// setParentTemplateResponse is a response sent to the reply channel of a
// setParentTemplateMsg.
type setParentTemplateResponse struct {
}

// headerNode is used as a node in a list of headers that are linked together
// between checkpoints.
type headerNode struct {
Expand Down Expand Up @@ -407,6 +376,18 @@ type blockManager struct {
lotteryDataBroadcast map[chainhash.Hash]struct{}
lotteryDataBroadcastMutex sync.RWMutex

// the following fields handle block template regeneration and updating
// registered clients.
mining bool
lastRegen time.Time
nonVoteTxCountLastInterval uint64
nonVoteTxCount uint64
registeredClients map[chan *BlockTemplate]struct{}
regenMtx sync.Mutex
templateMtx sync.RWMutex
nonVoteTxCountMtx sync.RWMutex
registerMtx sync.RWMutex

cachedCurrentTemplate *BlockTemplate
cachedParentTemplate *BlockTemplate
AggressiveMining bool
Expand Down Expand Up @@ -764,12 +745,41 @@ func (b *blockManager) current() bool {
return true
}

// NotifyRegisteredTemplateClients broadcasts the provided block template
// to all registered clients.
func (b *blockManager) NotifyRegisteredTemplateClients(t *BlockTemplate) {
b.registerMtx.Lock()
for c := range b.registeredClients {
c <- t
// Delete the client after updating it.
delete(b.registeredClients, c)
}
b.registerMtx.Unlock()
}

// RequestTemplateUpdate registers a client for block template updates.
func (b *blockManager) RequestTemplateUpdates() chan *BlockTemplate {
updateChan := make(chan *BlockTemplate)
b.registerMtx.Lock()
b.registeredClients[updateChan] = struct{}{}
b.registerMtx.Unlock()
return updateChan
}

func (b *blockManager) LockTemplate() {
b.templateMtx.Lock()
}

func (b *blockManager) UnlockTemplate() {
b.templateMtx.Unlock()
}

// checkBlockForHiddenVotes checks to see if a newly added block contains
// any votes that were previously unknown to our daemon. If it does, it
// adds these votes to the cached parent block template.
//
// This is UNSAFE for concurrent access. It must be called in single threaded
// access through the block mananger. All template access must also be routed
// access through the block manager. All template access must also be routed
// through the block manager.
func (b *blockManager) checkBlockForHiddenVotes(block *dcrutil.Block) {
var votesFromBlock []*dcrutil.Tx
Expand All @@ -783,6 +793,7 @@ func (b *blockManager) checkBlockForHiddenVotes(block *dcrutil.Block) {
// the parent template hasn't yet been updated, so we may
// need to use the current template.
var template *BlockTemplate
b.templateMtx.RLock()
if b.cachedCurrentTemplate != nil {
if b.cachedCurrentTemplate.Height ==
block.Height() {
Expand All @@ -796,6 +807,7 @@ func (b *blockManager) checkBlockForHiddenVotes(block *dcrutil.Block) {
template = b.cachedParentTemplate
}
}
b.templateMtx.RUnlock()

// No template to alter.
if template == nil {
Expand Down Expand Up @@ -1592,6 +1604,15 @@ func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
// the fetching should proceed.
func (b *blockManager) blockHandler() {
candidatePeers := list.New()
b.lastRegen = b.server.txMemPool.LastUpdated()
updateNonVoteCounters := func() {
b.nonVoteTxCountMtx.Lock()
b.nonVoteTxCountLastInterval, b.nonVoteTxCount = b.nonVoteTxCount, 0
b.nonVoteTxCountMtx.Unlock()
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

out:
for {
select {
Expand Down Expand Up @@ -1805,31 +1826,107 @@ out:
case isCurrentMsg:
msg.reply <- b.current()

case getCurrentTemplateMsg:
cur := deepCopyBlockTemplate(b.cachedCurrentTemplate)
msg.reply <- getCurrentTemplateResponse{
Template: cur,
case nonVoteMsg:
if b.mining {
// Increment the non vote transaction counter.
b.nonVoteTxCountMtx.Lock()
b.nonVoteTxCount++
b.nonVoteTxCountMtx.Unlock()
}

case setCurrentTemplateMsg:
b.cachedCurrentTemplate = deepCopyBlockTemplate(msg.Template)
msg.reply <- setCurrentTemplateResponse{}

case getParentTemplateMsg:
par := deepCopyBlockTemplate(b.cachedParentTemplate)
msg.reply <- getParentTemplateResponse{
Template: par,
}

case setParentTemplateMsg:
b.cachedParentTemplate = deepCopyBlockTemplate(msg.Template)
msg.reply <- setParentTemplateResponse{}
case regenMsg:
go func() {
// Regenerate a template the chain is either current or
// is due its first block (height = 0) and there is at
// least one registered subscriber for block template
// updates.

b.registerMtx.Lock()
hasRegisteredClients := len(b.registeredClients) > 0
b.registerMtx.Unlock()

if b.mining && (b.IsCurrent() ||
b.chainState.newestHeight == 0) &&
hasRegisteredClients {

// Pick a mining address at random.
rand.Seed(time.Now().UnixNano())
payToAddr :=
cfg.miningAddrs[rand.Intn(len(cfg.miningAddrs))]

// Regenerate the block template.
b.templateMtx.Lock()
template, err :=
NewBlockTemplate(b.server.rpcServer.policy, b.server,
payToAddr)
if err != nil {
bmgrLog.Infof(
"block template generation failed: %v", err)
}

if template != nil {
// Update the current and parent templates and
// notify all registered template clients.
b.cachedParentTemplate, b.cachedCurrentTemplate =
b.cachedCurrentTemplate, template
b.NotifyRegisteredTemplateClients(b.cachedCurrentTemplate)
bmgrLog.Debugf("block template generated "+
"(height: %v, hash: %v)",
template.Block.Header.Height,
template.Block.BlockHash())

// Update the last regen time.
b.regenMtx.Lock()
b.lastRegen = time.Now()
b.regenMtx.Unlock()
}
b.templateMtx.Unlock()
}
}()
default:
bmgrLog.Warnf("Invalid message type in block "+
"handler: %T", msg)
}

case <-ticker.C:
go func() {
if b.mining && b.IsCurrent() {
b.registerMtx.RLock()
hasRegisteredClients := len(b.registeredClients) > 0
b.registerMtx.RUnlock()

b.nonVoteTxCountMtx.RLock()
hasNewNonVoteTxs := b.nonVoteTxCount > 0
b.nonVoteTxCountMtx.RUnlock()

b.regenMtx.Lock()
timeDiff := time.Since(b.lastRegen)
regen := b.server.txMemPool.LastUpdated().After(b.lastRegen.
Add(templateRegenSeconds * time.Second))
b.regenMtx.Unlock()

// Signal template regeneration if a minute has elapsed
// since the last regeneration and there is at least one
// registered subscriber for block template updates.
if (timeDiff.Seconds() > maxTemplateRegenSeconds) &&
hasRegisteredClients {
b.RegenTemplate()
updateNonVoteCounters()
return
}

// Signal template regeneration if thirty seconds have
// elapsed and at least a new non vote transaction has
// been accepted by the mempool nd there is at least one
// registered subscriber for block template updates.
if regen && hasNewNonVoteTxs && hasRegisteredClients {
b.RegenTemplate()
}

updateNonVoteCounters()
}
}()

case <-b.quit:
break out
}
Expand Down Expand Up @@ -2119,7 +2216,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {

// Drop the associated mining template from the old chain, since it
// will be no longer valid.
b.templateMtx.Lock()
b.cachedCurrentTemplate = nil
b.templateMtx.Unlock()
}
}

Expand Down Expand Up @@ -2413,39 +2512,19 @@ func (b *blockManager) TicketPoolValue() (dcrutil.Amount, error) {
return b.chain.TicketPoolValue()
}

// GetCurrentTemplate gets the current block template for mining.
func (b *blockManager) GetCurrentTemplate() *BlockTemplate {
reply := make(chan getCurrentTemplateResponse)
b.msgChan <- getCurrentTemplateMsg{reply: reply}
response := <-reply
return response.Template
}

// SetCurrentTemplate sets the current block template for mining.
func (b *blockManager) SetCurrentTemplate(bt *BlockTemplate) {
reply := make(chan setCurrentTemplateResponse)
b.msgChan <- setCurrentTemplateMsg{Template: bt, reply: reply}
<-reply
}

// GetParentTemplate gets the current parent block template for mining.
func (b *blockManager) GetParentTemplate() *BlockTemplate {
reply := make(chan getParentTemplateResponse)
b.msgChan <- getParentTemplateMsg{reply: reply}
response := <-reply
return response.Template
// RegenTemplate regenerates the block template.
func (b *blockManager) RegenTemplate() {
b.msgChan <- regenMsg{}
}

// SetParentTemplate sets the current parent block template for mining.
func (b *blockManager) SetParentTemplate(bt *BlockTemplate) {
reply := make(chan setParentTemplateResponse)
b.msgChan <- setParentTemplateMsg{Template: bt, reply: reply}
<-reply
// ReceiveNonVoteTx signals the receipt of a non vote transaction.
func (b *blockManager) ReceiveNonVoteTx() {
b.msgChan <- nonVoteMsg{}
}

// newBlockManager returns a new Decred block manager.
// Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server, indexManager blockchain.IndexManager, interrupt <-chan struct{}) (*blockManager, error) {
func newBlockManager(s *server, indexManager blockchain.IndexManager, mining bool, interrupt <-chan struct{}) (*blockManager, error) {
bm := blockManager{
server: s,
rejectedTxns: make(map[chainhash.Hash]struct{}),
Expand All @@ -2456,6 +2535,12 @@ func newBlockManager(s *server, indexManager blockchain.IndexManager, interrupt
progressLogger: newBlockProgressLogger("Processed", bmgrLog),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerList: list.New(),
mining: mining,
regenMtx: sync.Mutex{},
templateMtx: sync.RWMutex{},
nonVoteTxCountMtx: sync.RWMutex{},
registerMtx: sync.RWMutex{},
registeredClients: make(map[chan *BlockTemplate]struct{}),
AggressiveMining: !cfg.NonAggressive,
quit: make(chan struct{}),
}
Expand Down
Loading

0 comments on commit 792fd3f

Please sign in to comment.