Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

td update from node + bugfix #692

Merged
merged 1 commit into from
Apr 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 85 additions & 104 deletions blockpool/blockpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type BlockPool struct {
// alloc-easy pool of hash slices
hashSlicePool chan []common.Hash

nodeCache map[common.Hash]*node
nodeCacheLock sync.RWMutex

// waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status
wg sync.WaitGroup //
Expand Down Expand Up @@ -210,6 +213,7 @@ func (self *BlockPool) Start() {
self.Config.init()

self.hashSlicePool = make(chan []common.Hash, 150)
self.nodeCache = make(map[common.Hash]*node)
self.status = newStatus()
self.quit = make(chan bool)
self.pool = make(map[common.Hash]*entry)
Expand Down Expand Up @@ -615,127 +619,104 @@ LOOP:
If the block received is the head block of the current best peer, signal it to the head section process
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
hash := block.Hash()

sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}

self.status.lock.Lock()
self.status.activePeers[peerId]++
self.status.lock.Unlock()

entry := self.get(hash)
blockIsCurrentHead := false
sender.lock.RLock()
currentBlockHash := sender.currentBlockHash
currentBlock := sender.currentBlock
currentBlockC := sender.currentBlockC
switchC := sender.switchC
sender.lock.RUnlock()

// a peer's current head block is appearing the first time
if hash == currentBlockHash {
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
blockIsCurrentHead = true
if currentBlock == nil {
sender.lock.Lock()
sender.setChainInfoFromBlock(block)
sender.lock.Unlock()

self.status.lock.Lock()
self.status.values.BlockHashes++
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
// signal to head section process
select {
case currentBlockC <- block:
case <-switchC:
}
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), peerId, hex(currentBlockHash))
}
} else {

plog.DebugDetailf("AddBlock: block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(currentBlockHash))

/* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
delayed B sends you block ... UNREQUESTED. Blocked
if entry == nil {
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)

self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
}
hash := block.Hash()

if entry == nil {
// FIXME: here check the cache find or create node -
// put peer as blockBy!
// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
return
}

node := entry.node
node.lock.Lock()
defer node.lock.Unlock()

// register peer on node as source
if node.peers == nil {
node.peers = make(map[string]bool)
}
FoundBlockCurrentHead, found := node.peers[sender.id]
if !found || FoundBlockCurrentHead {
// if found but not FoundBlockCurrentHead, then no update
// necessary (||)
node.peers[sender.id] = blockIsCurrentHead
// for those that are false, TD will update their head
// for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD
}
// check if block already received
if node.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), node.blockBy)
}

// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already in the blockchain", hex(hash), peerId, hex(sender.currentBlockHash))
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block)

/*
@zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache
creation is not blocking
// validate block for PoW
if !self.verifyPoW(block) {
plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)
entry := self.get(hash)

self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
/* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
delayed B sends you block ... UNREQUESTED. Blocked
if entry == nil {
plog.DebugDetailf("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)

self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/

return
var bnode *node
if entry == nil {
self.nodeCacheLock.Lock()
bnode, _ = self.nodeCache[hash]
if bnode == nil {
bnode = &node{
hash: currentBlockHash,
block: block,
hashBy: peerId,
blockBy: peerId,
td: tdFromCurrentHead,
}
*/
self.nodeCache[hash] = bnode
}
self.nodeCacheLock.Unlock()
} else {
bnode = entry.node
}

node.block = block
node.blockBy = peerId
bnode.lock.Lock()
defer bnode.lock.Unlock()

self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
// check if block already received
if bnode.block != nil {
plog.DebugDetailf("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy)
// register peer on node as source
if bnode.peers == nil {
bnode.peers = make(map[string]bool)
}
foundBlockCurrentHead, found := bnode.peers[sender.id]
if !found || foundBlockCurrentHead {
// if found but not FoundBlockCurrentHead, then no update
// necessary (||)
bnode.peers[sender.id] = (currentBlockHash == hash)
// for those that are false, TD will update their head
// for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD
}
sender.setChainInfoFromNode(bnode)
} else {
/*
@zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache
creation is not blocking
// validate block for PoW
if !self.verifyPoW(block) {
plog.Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)

self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()

return
}
*/
bnode.block = block
bnode.blockBy = peerId
bnode.td = tdFromCurrentHead
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
}

}

Expand Down
2 changes: 1 addition & 1 deletion blockpool/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestErrInsufficientChainInfo(t *testing.T) {
}

func TestIncorrectTD(t *testing.T) {
t.Skip() // td not tested atm
t.Skip("skipping TD check until network is healthy")
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
Expand Down
85 changes: 58 additions & 27 deletions blockpool/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type peer struct {

// last known blockchain status
td *big.Int
tdAdvertised bool
currentBlockHash common.Hash
currentBlock *types.Block
parentHash common.Hash
Expand Down Expand Up @@ -135,21 +136,52 @@ func (self *peer) addError(code int, format string, params ...interface{}) {
}

// caller must hold peer lock
func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
self.td = td
self.currentBlockHash = c
self.currentBlock = nil
self.parentHash = common.Hash{}
self.headSection = nil
func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
if self.currentBlockHash != currentBlockHash {
previousBlockHash := self.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", self.id, td, hex(currentBlockHash), hex(previousBlockHash))
self.td = td
self.currentBlockHash = currentBlockHash
self.currentBlock = nil
self.parentHash = common.Hash{}
self.headSection = nil
}
self.tdAdvertised = true
}

// caller must hold peer lock
func (self *peer) setChainInfoFromBlock(block *types.Block) {
// use the optional TD to update peer td, this helps second best peer selection
func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
hash := block.Hash()
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
currentBlockHash = self.currentBlockHash
if currentBlockHash == hash && self.currentBlock == nil {
// signal to head section process
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
select {
case self.currentBlockC <- block:
case <-self.switchC:
}
return self.td, currentBlockHash
} else {
plog.DebugDetailf("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
return nil, currentBlockHash
}
}

// this will use the TD given by the first peer to update peer td, this helps second best peer selection
// :FIXME: node
func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost
if block.Td != nil && block.Td.Cmp(self.td) > 0 {
plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
self.td = block.Td
block := n.block
hash := block.Hash()
if n.td != nil && n.td.Cmp(self.td) > 0 {
plog.DebugDetailf("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
self.td = n.td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
self.currentBlock = block
Expand Down Expand Up @@ -218,17 +250,11 @@ func (self *peers) addPeer(
if found {
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
p.lock.Lock()
if p.currentBlockHash != currentBlockHash {
previousBlockHash = p.currentBlockHash
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
p.setChainInfo(td, currentBlockHash)

self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
}
p.lock.Unlock()
p.setChainInfo(td, currentBlockHash)
// FIXME: only count the same block once
self.status.lock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non-blocking suggestion for this PR

You should consider wrapping these things up in accessors (or consider using atomic.AddInt32)

self.status.values.NewBlocks++
self.status.lock.Unlock()
} else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)

Expand Down Expand Up @@ -333,8 +359,8 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
close(oldp.switchC)
}
if newp != nil {
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
// newp.idleC = make(chan bool)
// newp.switchC = make(chan bool)
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
if newp.headSection == nil {
Expand All @@ -354,6 +380,9 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
}
}()

} else {
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
}

var connected = make(map[common.Hash]*section)
Expand Down Expand Up @@ -528,10 +557,12 @@ func (self *peer) getBlockHashes() bool {
// main loop for head section process
func (self *peer) run() {

self.lock.RLock()
self.lock.Lock()
self.switchC = make(chan bool)
self.idleC = make(chan bool)
switchC := self.switchC
plog.Debugf("HeadSection: <%s> section process for head %s started", self.id, hex(self.currentBlockHash))
self.lock.RUnlock()
self.lock.Unlock()

self.blockHashesRequestTimer = nil

Expand Down
Loading