Skip to content

Commit

Permalink
Merge pull request #89 from irisnet/develop
Browse files Browse the repository at this point in the history
Release v0.31.0
  • Loading branch information
zhangyelong committed Jul 5, 2019
2 parents 45e4967 + 46f4700 commit 6258e4d
Show file tree
Hide file tree
Showing 64 changed files with 1,450 additions and 516 deletions.
48 changes: 40 additions & 8 deletions blockchain/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
// peers
peers map[p2p.ID]*bpPeer
maxPeerHeight int64
maxPeerHeight int64 // the biggest reported height

// atomic
numPending int32 // number of requests pending assignment or block response
Expand All @@ -78,6 +78,8 @@ type BlockPool struct {
errorsCh chan<- peerError
}

// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
Expand All @@ -93,15 +95,15 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
return bp
}

// OnStart implements cmn.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}

func (pool *BlockPool) OnStop() {}

// Run spawns requesters as needed.
// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
Expand Down Expand Up @@ -150,13 +152,16 @@ func (pool *BlockPool) removeTimedoutPeers() {
}
}

// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}

// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
Expand All @@ -170,8 +175,9 @@ func (pool *BlockPool) IsCaughtUp() bool {

// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height. Note we use maxPeerHeight - 1
// because to sync block H requires block H+1 to verify the LastCommit.
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
Expand Down Expand Up @@ -260,7 +266,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
}
}

// MaxPeerHeight returns the highest height reported by a peer.
// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
Expand All @@ -286,6 +292,8 @@ func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
}
}

// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
Expand All @@ -299,7 +307,31 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) {
requester.redo(peerID)
}
}
delete(pool.peers, peerID)
peer, ok := pool.peers[peerID]
if ok {
if peer.timeout != nil {
peer.timeout.Stop()
}

delete(pool.peers, peerID)

// Find a new peer with the biggest height and update maxPeerHeight if the
// peer's height was the biggest.
if peer.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
}

// If no peers are left, maxPeerHeight is set to 0.
func (pool *BlockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
}
}
pool.maxPeerHeight = max
}

// Pick an available peer with at least the given minHeight.
Expand Down
45 changes: 42 additions & 3 deletions blockchain/pool_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package blockchain

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"

"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -66,7 +68,7 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
return peers
}

func TestBasic(t *testing.T) {
func TestBlockPoolBasic(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
Expand Down Expand Up @@ -122,7 +124,7 @@ func TestBasic(t *testing.T) {
}
}

func TestTimeout(t *testing.T) {
func TestBlockPoolTimeout(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
Expand Down Expand Up @@ -180,3 +182,40 @@ func TestTimeout(t *testing.T) {
}
}
}

func TestBlockPoolRemovePeer(t *testing.T) {
peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, height, make(chan inputData)}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)

pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
require.NoError(t, err)
defer pool.Stop()

// add peers
for peerID, peer := range peers {
pool.SetPeerHeight(peerID, peer.height)
}
assert.EqualValues(t, 10, pool.MaxPeerHeight())

// remove not-existing peer
assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) })

// remove peer with biggest height
pool.RemovePeer(p2p.ID("10"))
assert.EqualValues(t, 9, pool.MaxPeerHeight())

// remove all peers
for peerID := range peers {
pool.RemovePeer(peerID)
}

assert.EqualValues(t, 0, pool.MaxPeerHeight())
}
23 changes: 14 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,13 @@ func DefaultFuzzConnConfig() *FuzzConnConfig {

// MempoolConfig defines the configuration options for the Tendermint mempool
type MempoolConfig struct {
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Broadcast bool `mapstructure:"broadcast"`
WalPath string `mapstructure:"wal_dir"`
Size int `mapstructure:"size"`
CacheSize int `mapstructure:"cache_size"`
RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"`
Broadcast bool `mapstructure:"broadcast"`
WalPath string `mapstructure:"wal_dir"`
Size int `mapstructure:"size"`
MaxTxsBytes int64 `mapstructure:"max_txs_bytes"`
CacheSize int `mapstructure:"cache_size"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
Expand All @@ -529,10 +530,11 @@ func DefaultMempoolConfig() *MempoolConfig {
Recheck: true,
Broadcast: true,
WalPath: "",
// Each signature verification takes .5ms, size reduced until we implement
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
CacheSize: 10000,
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
}
}

Expand All @@ -559,6 +561,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.Size < 0 {
return errors.New("size can't be negative")
}
if cfg.MaxTxsBytes < 0 {
return errors.New("max_txs_bytes can't be negative")
}
if cfg.CacheSize < 0 {
return errors.New("cache_size can't be negative")
}
Expand Down
9 changes: 7 additions & 2 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,15 @@ recheck = {{ .Mempool.Recheck }}
broadcast = {{ .Mempool.Broadcast }}
wal_dir = "{{ js .Mempool.WalPath }}"
# size of the mempool
# Maximum number of transactions in the mempool
size = {{ .Mempool.Size }}
# size of the cache (used to filter transactions we saw earlier)
# Limit the total size of all txs in the mempool.
# This only accounts for raw transactions (e.g. given 1MB transactions and
# max_txs_bytes=5MB, mempool will only accept 5 transactions).
max_txs_bytes = {{ .Mempool.MaxTxsBytes }}
# Size of the cache (used to filter transactions we saw earlier) in transactions
cache_size = {{ .Mempool.CacheSize }}
##### consensus configuration options #####
Expand Down
10 changes: 8 additions & 2 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,21 @@ func startTestRound(cs *ConsensusState, height int64, round int) {

// Create proposal block from cs1 but sign it with vs
func decideProposal(cs1 *ConsensusState, vs *validatorStub, height int64, round int) (proposal *types.Proposal, block *types.Block) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
cs1.mtx.Unlock()
if block == nil { // on error
panic("error creating proposal block")
}

// Make proposal
polRound, propBlockID := cs1.ValidRound, types.BlockID{block.Hash(), blockParts.Header()}
cs1.mtx.RLock()
validRound := cs1.ValidRound
chainID := cs1.state.ChainID
cs1.mtx.RUnlock()
polRound, propBlockID := validRound, types.BlockID{block.Hash(), blockParts.Header()}
proposal = types.NewProposal(height, round, polRound, propBlockID)
if err := vs.SignProposal(cs1.state.ChainID, proposal); err != nil {
if err := vs.SignProposal(chainID, proposal); err != nil {
panic(err)
}
return
Expand Down
Loading

0 comments on commit 6258e4d

Please sign in to comment.