Skip to content

Commit

Permalink
Clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Frozen committed Oct 25, 2023
1 parent 394de85 commit e686ee1
Show file tree
Hide file tree
Showing 25 changed files with 114 additions and 161 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,7 @@ debug_external: clean
bash test/debug-external.sh

build_localnet_validator:
bash test/build-localnet-validator.sh
bash test/build-localnet-validator.sh

tt:
go test -v -test.run OnDisconnectCheck ./p2p/security
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog"

"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
Expand All @@ -14,7 +16,6 @@ import (
"github.com/harmony-one/harmony/p2p/stream/common/streammanager"
"github.com/harmony-one/harmony/p2p/stream/protocols/sync"
"github.com/harmony-one/harmony/shard"
"github.com/rs/zerolog"
)

type (
Expand All @@ -37,7 +38,7 @@ type (
)

// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config, c *consensus.Consensus) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()

sp := sync.NewProtocol(sync.Config{
Expand Down
4 changes: 2 additions & 2 deletions api/service/stagedstreamsync/downloaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Downloaders struct {
}

// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config, c *consensus.Consensus) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
Expand All @@ -26,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config, c)
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,
Expand Down
4 changes: 2 additions & 2 deletions api/service/stagedstreamsync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type StagedStreamSyncService struct {
}

// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string, c *consensus.Consensus) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config, c),
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
}
}

Expand Down
2 changes: 1 addition & 1 deletion api/service/stagedstreamsync/stage_heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (heads *StageHeads) Exec(ctx context.Context, firstCycle bool, invalidBlock

maxHeight := s.state.status.targetBN
maxBlocksPerSyncCycle := uint64(1024) // TODO: should be in config -> s.state.MaxBlocksPerSyncCycle
currentHeight := heads.configs.bc.CurrentHeader().NumberU64()
currentHeight := heads.configs.bc.CurrentBlock().NumberU64()
s.state.currentCycle.TargetHeight = maxHeight
targetHeight := uint64(0)
if errV := CreateView(ctx, heads.configs.db, tx, func(etx kv.Tx) (err error) {
Expand Down
34 changes: 4 additions & 30 deletions api/service/stagedstreamsync/stage_short_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package stagedstreamsync
import (
"context"

"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
Expand All @@ -20,7 +18,6 @@ type StageShortRange struct {
type StageShortRangeCfg struct {
bc core.BlockChain
db kv.RwDB
c *consensus.Consensus
}

func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
Expand All @@ -29,11 +26,10 @@ func NewStageShortRange(cfg StageShortRangeCfg) *StageShortRange {
}
}

func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB, c *consensus.Consensus) StageShortRangeCfg {
func NewStageShortRangeCfg(bc core.BlockChain, db kv.RwDB) StageShortRangeCfg {
return StageShortRangeCfg{
bc: bc,
db: db,
c: c,
}
}

Expand Down Expand Up @@ -108,12 +104,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
return 0, errors.Wrap(err, "prerequisite")
}
}
var (
bc = sr.configs.bc
curBN = bc.CurrentHeader().NumberU64()
blkNums = sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err = sh.getHashChain(ctx, blkNums)
)
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)
hashChain, whitelist, err := sh.getHashChain(ctx, blkNums)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return 0, nil
Expand Down Expand Up @@ -163,25 +156,6 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
return 0, err
}

numInserted := 0
err = sr.configs.c.GetLastMileBlockIter(sr.configs.bc.CurrentHeader().NumberU64()+1, func(blockIter *consensus.LastMileBlockIter) error {
for {
block := blockIter.Next()
if block == nil {
break
}
if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil {
return errors.Wrap(err, "failed to InsertChain")
}
numInserted++
}
return nil
})
if err != nil {
return 0, errors.WithMessage(err, "failed to InsertChain for last mile blocks")
}
utils.Logger().Info().Int("last mile blocks inserted", numInserted).Msg("Insert last mile blocks success")

return n, nil
}

Expand Down
25 changes: 7 additions & 18 deletions api/service/stagedstreamsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func CreateStagedSync(ctx context.Context,
config Config,
logger zerolog.Logger,
) (*StagedStreamSync, error) {

logger.Info().
Uint32("shard", bc.ShardID()).
Bool("beaconNode", isBeaconNode).
Expand All @@ -55,6 +56,7 @@ func CreateStagedSync(ctx context.Context,
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("creating staged sync"))

var mainDB kv.RwDB
dbs := make([]kv.RwDB, config.Concurrency)
if config.UseMemDB {
Expand All @@ -80,7 +82,7 @@ func CreateStagedSync(ctx context.Context,
}

stageHeadsCfg := NewStageHeadersCfg(bc, mainDB)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB, consensus)
stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB)
stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB)
stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress)
stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress)
Expand Down Expand Up @@ -225,9 +227,6 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
return 0, 0, err
}

s.startSyncing()
defer s.finishSyncing()

var estimatedHeight uint64
if initSync {
if h, err := s.estimateCurrentNumber(downloaderContext); err != nil {
Expand All @@ -244,20 +243,13 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
}
}

i := 0
s.startSyncing()
defer s.finishSyncing()

for {
i++
ctx, cancel := context.WithCancel(downloaderContext)
started := s.bc.CurrentHeader().NumberU64()

n, err := s.doSyncCycle(ctx, initSync)
finished := s.bc.CurrentHeader().NumberU64()
utils.Logger().Info().
Uint64("from", started).
Int("returned", n).
Uint64("to", finished).
Bool("initSync", initSync).
Int("cycle", i).
Msg(WrapStagedSyncMsg("synced blocks"))
if err != nil {
utils.Logger().Error().
Err(err).
Expand Down Expand Up @@ -377,9 +369,6 @@ func (s *StagedStreamSync) finishSyncing() {
if s.evtDownloadFinishedSubscribed {
s.evtDownloadFinished.Send(struct{}{})
}
utils.Logger().Info().
Bool("evtDownloadFinishedSubscribed", s.evtDownloadFinishedSubscribed).
Msg(WrapStagedSyncMsg("finished syncing"))
}

func (s *StagedStreamSync) checkPrerequisites() error {
Expand Down
5 changes: 2 additions & 3 deletions cmd/harmony/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"math/big"
"math/rand"
Expand Down Expand Up @@ -522,7 +521,7 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) {
Msg("Start p2p host failed")
}

if err := node.BootstrapConsensus(context.TODO(), currentNode.Consensus, currentNode.Host()); err != nil {
if err := currentNode.BootstrapConsensus(); err != nil {
fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error())
if !currentNode.NodeConfig.IsOffline {
os.Exit(-1)
Expand Down Expand Up @@ -1033,7 +1032,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har
}
}
//Setup stream sync service
s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir, node.Consensus)
s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir)

node.RegisterService(service.StagedStreamSync, s)

Expand Down
21 changes: 14 additions & 7 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type Consensus struct {
// The post-consensus job func passed from Node object
// Called when consensus on a new block is done
PostConsensusJob func(*types.Block) error
// The verifier func passed from Node object
BlockVerifier VerifyBlockFunc
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
// will trigger state syncing when blockNum is low
Expand Down Expand Up @@ -169,12 +171,12 @@ func (consensus *Consensus) Beaconchain() core.BlockChain {
}

// VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (FBFTLog *FBFTLog) verifyBlock(block *types.Block) error {
if !FBFTLog.IsBlockVerified(block.Hash()) {
if err := FBFTLog.BlockVerify(block); err != nil {
func (consensus *Consensus) verifyBlock(block *types.Block) error {
if !consensus.fBFTLog.IsBlockVerified(block.Hash()) {
if err := consensus.BlockVerifier(block); err != nil {
return errors.Errorf("Block verification failed: %s", err)
}
FBFTLog.MarkBlockVerified(block)
consensus.fBFTLog.MarkBlockVerified(block)
}
return nil
}
Expand Down Expand Up @@ -268,7 +270,7 @@ func New(
consensus := Consensus{
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(VerifyNewBlock(registry.GetWebHooks(), registry.GetBlockchain(), registry.GetBeaconchain())),
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
Decider: Decider,
Expand All @@ -294,15 +296,20 @@ func New(
// viewID has to be initialized as the height of
// the blockchain during initialization as it was
// displayed on explorer as Height right now
consensus.SetCurBlockViewID(0)
consensus.setCurBlockViewID(0)
consensus.SlashChan = make(chan slash.Record)
consensus.readySignal = make(chan ProposalType)
consensus.commitSigChannel = make(chan []byte)
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.IgnoreViewIDCheck = abool.NewBool(false)
// Make Sure Verifier is not null
consensus.vc = newViewChange(consensus.fBFTLog.BlockVerify)
consensus.vc = newViewChange()
// TODO: reference to blockchain/beaconchain should be removed.
verifier := VerifyNewBlock(registry.GetWebHooks(), consensus.Blockchain(), consensus.Beaconchain())
consensus.BlockVerifier = verifier
consensus.vc.verifyBlock = consensus.verifyBlock

// init prometheus metrics
initMetrics()
consensus.AddPubkeyMetrics()
Expand Down
8 changes: 5 additions & 3 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,14 @@ func (consensus *Consensus) setViewIDs(height uint64) {

// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.setCurBlockViewID(viewID)
}

// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) {
consensus.current.SetCurBlockViewID(viewID)
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}

// SetViewChangingID set the current view change ID
Expand Down
5 changes: 0 additions & 5 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
fbtLog := NewFBFTLog(nil)
state := State{mode: Normal}

timeouts := createTimeout()
Expand All @@ -37,10 +36,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan)

// FBFTLog
assert.Equal(t, fbtLog.blocks, consensus.fBFTLog.blocks)
assert.Equal(t, fbtLog.messages, consensus.fBFTLog.messages)
assert.Equal(t, len(fbtLog.verifiedBlocks), 0)
assert.Equal(t, fbtLog.verifiedBlocks, consensus.fBFTLog.verifiedBlocks)
assert.NotNil(t, consensus.FBFTLog())

assert.Equal(t, FBFTAnnounce, consensus.phase)
Expand Down
Loading

0 comments on commit e686ee1

Please sign in to comment.