diff --git a/.mergify.yml b/.mergify.yml new file mode 100644 index 0000000000..865a31d66a --- /dev/null +++ b/.mergify.yml @@ -0,0 +1,59 @@ +pull_request_rules: + - name: Squash merge rule to dev + conditions: + # applied for merge to the dev branch + - base=dev + # no unresolved threads + - "#review-threads-unresolved=0" + # Approved by two reviewers + - "#approved-reviews-by>=2" + # no unverified commit + - "#commits-unverified=0" + # Travis ci succeeded + - "check-success=Travis CI - Pull Request" + # git guardian succeeded + - "check-success=GitGuardian Security Checks" + # PR is not a draft + - -draft + # PR is not conflicting with the base branch + - -conflict + # conditions to avoid auto merge mistakes + # PR title doesn't have wip (not case sensitive) + - -title~=(?i)wip + # PR doesn't have WIP label (not case sensitive) + - label!=(?i)wip + # ready-to-merge is required to trigger the merge + - label=ready-to-merge + actions: + merge: + method: squash + - name: merge rule to main + conditions: + # from the dev branch : no direct PR to main + - head=dev + # applied for merge to the dev branch + - base=main + # no unresolved threads + - "#review-threads-unresolved=0" + # Approved by two reviewers + - "#approved-reviews-by>=2" + # no unverified commit + - "#commits-unverified=0" + # Travis ci succeeded + - "check-success=Travis CI - Pull Request" + # git guardian succeeded + - "check-success=GitGuardian Security Checks" + # PR is not a draft + - -draft + # PR is not conflicting with the base branch + - -conflict + # conditions to avoid auto merge mistakes + # PR title doesn't have wip (not case sensitive) + - -title~=(?i)wip + # PR doesn't have WIP label (not case sensitive) + - label!=(?i)wip + # ready-to-merge is required to trigger the merge + - label=ready-to-merge + actions: + merge: + method: merge \ No newline at end of file diff --git a/api/service/legacysync/syncing_test.go b/api/service/legacysync/syncing_test.go index bc17aeaec6..7d99051c76 100644 --- a/api/service/legacysync/syncing_test.go +++ b/api/service/legacysync/syncing_test.go @@ -12,7 +12,7 @@ import ( "time" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - peer "github.com/libp2p/go-libp2p-core/peer" + peer "github.com/libp2p/go-libp2p/core/peer" "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/api/service/legacysync/downloader" diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index 0e6bc6e2cf..5c735d764b 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -14,7 +14,10 @@ const ( BlockByHashesUpperCap int = 10 // number of get blocks by hashes upper cap BlockByHashesLowerCap int = 3 // number of get blocks by hashes lower cap - LastMileBlocksThreshold int = 10 + LastMileBlocksThreshold int = 10 + SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes + VerifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now) + LastMileBlocksSize = 50 // SoftQueueCap is the soft cap of size in resultQueue. When the queue size is larger than this limit, // no more request will be assigned to workers to wait for InsertChain to finish. @@ -50,8 +53,15 @@ type ( // config for beacon config BHConfig *BeaconHelperConfig + // use memory db + UseMemDB bool + // log the stage progress LogProgress bool + + // logs every single process and error to help debugging stream sync + // DebugMode is not accessible to the end user and is only an aid for development + DebugMode bool } // BeaconHelperConfig is the extra config used for beaconHelper which uses diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index 6e4808738f..55986ff6e8 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -15,11 +15,13 @@ var DefaultForwardOrder = ForwardOrder{ BlockBodies, // Stages below don't use Internet States, + LastMile, Finish, } var DefaultRevertOrder = RevertOrder{ Finish, + LastMile, States, BlockBodies, ShortRange, @@ -29,6 +31,7 @@ var DefaultRevertOrder = RevertOrder{ var DefaultCleanUpOrder = CleanUpOrder{ Finish, + LastMile, States, BlockBodies, ShortRange, @@ -42,6 +45,7 @@ func DefaultStages(ctx context.Context, srCfg StageShortRangeCfg, bodiesCfg StageBodiesCfg, statesCfg StageStatesCfg, + lastMileCfg StageLastMileCfg, finishCfg StageFinishCfg, ) []*Stage { @@ -50,6 +54,7 @@ func DefaultStages(ctx context.Context, handlerStageEpochSync := NewStageEpoch(seCfg) handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageStates := NewStageStates(statesCfg) + handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) return []*Stage{ @@ -78,6 +83,11 @@ func DefaultStages(ctx context.Context, Description: "Update Blockchain State", Handler: handlerStageStates, }, + { + ID: LastMile, + Description: "update status for blocks after sync and update last mile blocks as well", + Handler: handlerStageLastMile, + }, { ID: Finish, Description: "Finalize Changes", diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 668698f107..96add97fd3 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/rs/zerolog" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -37,7 +38,7 @@ type ( ) // NewDownloader creates a new downloader -func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode bool, config Config) *Downloader { +func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader { config.fixValues() sp := sync.NewProtocol(sync.Config{ @@ -67,8 +68,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode ctx, cancel := context.WithCancel(context.Background()) - //TODO: use mem db should be in config file - stagedSyncInstance, err := CreateStagedSync(ctx, bc, dbDir, false, isBeaconNode, sp, config, logger, config.LogProgress) + // create an instance of staged sync for the downloader + stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger) if err != nil { cancel() return nil @@ -189,6 +190,7 @@ func (d *Downloader) waitForBootFinish() { func (d *Downloader) loop() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() + // for shard chain and beacon chain node, first we start with initSync=true to // make sure it goes through the long range sync first. // for epoch chain we do only need to go through epoch sync process @@ -208,7 +210,8 @@ func (d *Downloader) loop() { go trigger() case <-d.downloadC: - addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync) + bnBeforeSync := d.bc.CurrentBlock().NumberU64() + estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync) if err != nil { //TODO: if there is a bad block which can't be resolved if d.stagedSyncInstance.invalidBlock.Active { @@ -216,13 +219,14 @@ func (d *Downloader) loop() { // if many streams couldn't solve it, then that's an unresolvable bad block if numTriedStreams >= d.config.InitStreams { if !d.stagedSyncInstance.invalidBlock.IsLogged { - fmt.Println("unresolvable bad block:", d.stagedSyncInstance.invalidBlock.Number) + d.logger.Error(). + Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number). + Msg(WrapStagedSyncMsg("unresolvable bad block")) d.stagedSyncInstance.invalidBlock.IsLogged = true } //TODO: if we don't have any new or untried stream in the list, sleep or panic } } - // If any error happens, sleep 5 seconds and retry d.logger.Error(). Err(err). @@ -242,16 +246,27 @@ func (d *Downloader) loop() { Uint32("shard", d.bc.ShardID()). Msg(WrapStagedSyncMsg("sync finished")) } - + // If block number has been changed, trigger another sync if addedBN != 0 { - // If block number has been changed, trigger another sync go trigger() + // try to add last mile from pub-sub (blocking) + if d.bh != nil { + d.bh.insertSync() + } } - // try to add last mile from pub-sub (blocking) - if d.bh != nil { - d.bh.insertSync() + // if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and + // the node is fully synced now, then switch to short range + // the reason why we need to check distanceBeforeSync is because, if it was long distance, + // very likely, there are a couple of new blocks have been added to the other nodes which + // we should still stay in long range and check them. + bnAfterSync := d.bc.CurrentBlock().NumberU64() + distanceBeforeSync := estimatedHeight - bnBeforeSync + distanceAfterSync := estimatedHeight - bnAfterSync + if estimatedHeight > 0 && addedBN > 0 && + distanceBeforeSync <= uint64(LastMileBlocksThreshold) && + distanceAfterSync <= uint64(LastMileBlocksThreshold) { + initSync = false } - initSync = false case <-d.closeC: return diff --git a/api/service/stagedstreamsync/downloaders.go b/api/service/stagedstreamsync/downloaders.go index 2df8b74aab..583f3e1523 100644 --- a/api/service/stagedstreamsync/downloaders.go +++ b/api/service/stagedstreamsync/downloaders.go @@ -2,6 +2,7 @@ package stagedstreamsync import ( "github.com/harmony-one/abool" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/p2p" ) @@ -15,7 +16,7 @@ type Downloaders struct { } // NewDownloaders creates Downloaders for sync of multiple blockchains -func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config Config) *Downloaders { +func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders { ds := make(map[uint32]*Downloader) isBeaconNode := len(bcs) == 1 for _, bc := range bcs { @@ -25,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config C if _, ok := ds[bc.ShardID()]; ok { continue } - ds[bc.ShardID()] = NewDownloader(host, bc, dbDir, isBeaconNode, config) + ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config) } return &Downloaders{ ds: ds, diff --git a/api/service/stagedstreamsync/service.go b/api/service/stagedstreamsync/service.go index 40fbf7097c..f7ffd7f2d9 100644 --- a/api/service/stagedstreamsync/service.go +++ b/api/service/stagedstreamsync/service.go @@ -1,6 +1,7 @@ package stagedstreamsync import ( + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/p2p" ) @@ -11,9 +12,9 @@ type StagedStreamSyncService struct { } // NewService creates a new downloader service -func NewService(host p2p.Host, bcs []core.BlockChain, config Config, dbDir string) *StagedStreamSyncService { +func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService { return &StagedStreamSyncService{ - Downloaders: NewDownloaders(host, bcs, dbDir, config), + Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config), } } diff --git a/api/service/stagedstreamsync/short_range_helper.go b/api/service/stagedstreamsync/short_range_helper.go index e43b3c6916..42327c78df 100644 --- a/api/service/stagedstreamsync/short_range_helper.go +++ b/api/service/stagedstreamsync/short_range_helper.go @@ -207,6 +207,12 @@ func (sh *srHelper) removeStreams(sts []sttypes.StreamID) { } } +func (sh *srHelper) streamsFailed(sts []sttypes.StreamID, reason string) { + for _, st := range sts { + sh.syncProtocol.StreamFailed(st, reason) + } +} + // blameAllStreams only not to blame all whitelisted streams when the it's not the last block signature verification failed. func (sh *srHelper) blameAllStreams(blocks types.Blocks, errIndex int, err error) bool { if errors.As(err, &emptySigVerifyErr) && errIndex == len(blocks)-1 { diff --git a/api/service/stagedstreamsync/stage_bodies.go b/api/service/stagedstreamsync/stage_bodies.go index 62309b76df..4996ea78b7 100644 --- a/api/service/stagedstreamsync/stage_bodies.go +++ b/api/service/stagedstreamsync/stage_bodies.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" ) @@ -60,6 +61,11 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return nil } + // shouldn't execute for epoch chain + if b.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + maxHeight := s.state.status.targetBN currentHead := b.configs.bc.CurrentBlock().NumberU64() if currentHead >= maxHeight { @@ -77,7 +83,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return errV } - if currProgress == 0 { + if currProgress <= currentHead { if err := b.cleanAllBlockDBs(ctx); err != nil { return err } @@ -209,7 +215,7 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err isOneOfTheBadStreams := false for _, id := range s.state.invalidBlock.StreamID { if id == stid { - b.configs.protocol.RemoveStream(stid) + b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed") isOneOfTheBadStreams = true break } diff --git a/api/service/stagedstreamsync/stage_epoch.go b/api/service/stagedstreamsync/stage_epoch.go index 394a1d5d69..2c51aa1f94 100644 --- a/api/service/stagedstreamsync/stage_epoch.go +++ b/api/service/stagedstreamsync/stage_epoch.go @@ -5,6 +5,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" + sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" @@ -51,9 +52,12 @@ func (sr *StageEpoch) Exec(ctx context.Context, firstCycle bool, invalidBlockRev n, err := sr.doShortRangeSyncForEpochSync(ctx, s) s.state.inserted = n if err != nil { + utils.Logger().Info().Err(err).Msg("short range for epoch sync failed") return err } - + if n > 0 { + utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("epoch sync short range blocks inserted successfully") + } useInternalTx := tx == nil if useInternalTx { var err error @@ -108,30 +112,13 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage return 0, nil } - //////////////////////////////////////////////////////// - hashChain, whitelist, err := sh.getHashChain(ctx, bns) + blocks, streamID, err := sh.getBlocksChain(ctx, bns) if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return 0, nil + } return 0, errors.Wrap(err, "getHashChain") } - if len(hashChain) == 0 { - // short circuit for no sync is needed - return 0, nil - } - blocks, streamID, err := sh.getBlocksByHashes(ctx, hashChain, whitelist) - if err != nil { - utils.Logger().Warn().Err(err).Msg("epoch sync getBlocksByHashes failed") - if !errors.Is(err, context.Canceled) { - sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes - } - return 0, errors.Wrap(err, "epoch sync getBlocksByHashes") - } - /////////////////////////////////////////////////////// - // TODO: check this - // blocks, streamID, err := sh.getBlocksChain(bns) - // if err != nil { - // return 0, errors.Wrap(err, "getHashChain") - // } - /////////////////////////////////////////////////////// if len(blocks) == 0 { // short circuit for no sync is needed return 0, nil @@ -141,12 +128,9 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n)) if err != nil { utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("Insert block failed") - sh.removeStreams(streamID) // Data provided by remote nodes is corrupted + sh.streamsFailed([]sttypes.StreamID{streamID}, "corrupted data") return n, err } - if n > 0 { - utils.Logger().Info().Int("blocks inserted", n).Msg("Insert block success") - } return n, nil } diff --git a/api/service/stagedstreamsync/stage_lastmile.go b/api/service/stagedstreamsync/stage_lastmile.go new file mode 100644 index 0000000000..157dcc3680 --- /dev/null +++ b/api/service/stagedstreamsync/stage_lastmile.go @@ -0,0 +1,109 @@ +package stagedstreamsync + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/shard" + "github.com/ledgerwatch/erigon-lib/kv" +) + +type StageLastMile struct { + configs StageLastMileCfg +} + +type StageLastMileCfg struct { + ctx context.Context + bc core.BlockChain + db kv.RwDB +} + +func NewStageLastMile(cfg StageLastMileCfg) *StageLastMile { + return &StageLastMile{ + configs: cfg, + } +} + +func NewStageLastMileCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageLastMileCfg { + return StageLastMileCfg{ + ctx: ctx, + bc: bc, + db: db, + } +} + +func (lm *StageLastMile) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { + + // no need to download the last mile blocks if we are redoing the stages because of bad block + if invalidBlockRevert { + return nil + } + + // shouldn't execute for epoch chain + if lm.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + + bc := lm.configs.bc + + // update last mile blocks if any + parentHash := bc.CurrentBlock().Hash() + var hashes []common.Hash + for { + block := s.state.getBlockFromLastMileBlocksByParentHash(parentHash) + if block == nil { + break + } + err = s.state.UpdateBlockAndStatus(block, bc, false) + if err != nil { + s.state.RollbackLastMileBlocks(ctx, hashes) + return err + } + hashes = append(hashes, block.Hash()) + parentHash = block.Hash() + } + s.state.purgeLastMileBlocksFromCache() + + return nil +} + +func (lm *StageLastMile) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + tx, err = lm.configs.db.BeginRw(lm.configs.ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = u.Done(tx); err != nil { + return err + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func (lm *StageLastMile) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + tx, err = lm.configs.db.BeginRw(lm.configs.ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index 8fb2f3059e..54534bfbb1 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -44,6 +44,7 @@ func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlo return nil } + // shouldn't execute for epoch chain if sr.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { return nil } @@ -52,8 +53,12 @@ func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlo n, err := sr.doShortRangeSync(ctx, s) s.state.inserted = n if err != nil { + utils.Logger().Info().Err(err).Msg("short range sync failed") return err } + if n > 0 { + utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("short range blocks inserted successfully") + } useInternalTx := tx == nil if useInternalTx { @@ -98,6 +103,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) blkNums := sh.prepareBlockHashNumbers(curBN) hashChain, whitelist, err := sh.getHashChain(ctx, blkNums) if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return 0, nil + } return 0, errors.Wrap(err, "getHashChain") } @@ -114,37 +122,34 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) s.state.status.setTargetBN(expEndBN) - s.state.status.startSyncing() - defer func() { - utils.Logger().Info().Msg("short range finished syncing") - s.state.status.finishSyncing() - }() - blocks, stids, err := sh.getBlocksByHashes(ctx, hashChain, whitelist) if err != nil { utils.Logger().Warn().Err(err).Msg("getBlocksByHashes failed") - if !errors.Is(err, context.Canceled) { - sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return 0, errors.Wrap(err, "getBlocksByHashes") } - return 0, errors.Wrap(err, "getBlocksByHashes") + sh.streamsFailed(whitelist, "remote nodes cannot provide blocks with target hashes") } - utils.Logger().Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result") - n, err := verifyAndInsertBlocks(sr.configs.bc, blocks) numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n)) if err != nil { utils.Logger().Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed") + // rollback all added new blocks + if rbErr := sr.configs.bc.Rollback(hashChain); rbErr != nil { + utils.Logger().Error().Err(rbErr).Msg("short range failed to rollback") + return 0, rbErr + } + // fail streams if sh.blameAllStreams(blocks, n, err) { - sh.removeStreams(whitelist) // Data provided by remote nodes is corrupted + sh.streamsFailed(whitelist, "data provided by remote nodes is corrupted") } else { // It is the last block gives a wrong commit sig. Blame the provider of the last block. st2Blame := stids[len(stids)-1] - sh.removeStreams([]sttypes.StreamID{st2Blame}) + sh.streamsFailed([]sttypes.StreamID{st2Blame}, "the last block provided by stream gives a wrong commit sig") } - return n, err + return 0, err } - utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("Insert block success") return n, nil } diff --git a/api/service/stagedstreamsync/stage_state.go b/api/service/stagedstreamsync/stage_state.go index 4b237c2916..b8dfb18288 100644 --- a/api/service/stagedstreamsync/stage_state.go +++ b/api/service/stagedstreamsync/stage_state.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/rs/zerolog" ) @@ -57,6 +58,11 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR return nil } + // shouldn't execute for epoch chain + if stg.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + maxHeight := s.state.status.targetBN currentHead := stg.configs.bc.CurrentBlock().NumberU64() if currentHead >= maxHeight { @@ -144,8 +150,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR if block.NumberU64() != i { s.state.protocol.StreamFailed(streamID, "invalid block with unmatched number is received from stream") - invalidBlockHash := block.Hash() - reverter.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), i, invalidBlockHash, streamID) + if !invalidBlockRevert { + invalidBlockHash := block.Hash() + reverter.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), i, invalidBlockHash, streamID) + } return ErrInvalidBlockNumber } diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 6c1eec4c37..3cd8756604 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -8,11 +8,16 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/utils" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" ) @@ -54,19 +59,22 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) { } type StagedStreamSync struct { - bc core.BlockChain - isBeacon bool - isExplorer bool - db kv.RwDB - protocol syncProtocol - isBeaconNode bool - gbm *blockDownloadManager // initialized when finished get block number - inserted int - config Config - logger zerolog.Logger - status *status //TODO: merge this with currentSyncCycle - initSync bool // if sets to true, node start long range syncing - UseMemDB bool + bc core.BlockChain + consensus *consensus.Consensus + isBeacon bool + isExplorer bool + db kv.RwDB + protocol syncProtocol + isBeaconNode bool + gbm *blockDownloadManager // initialized when finished get block number + lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus + lastMileMux sync.Mutex + inserted int + config Config + logger zerolog.Logger + status *status //TODO: merge this with currentSyncCycle + initSync bool // if sets to true, node start long range syncing + UseMemDB bool revertPoint *uint64 // used to run stages prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon) @@ -249,12 +257,12 @@ func (s *StagedStreamSync) cleanUp(ctx context.Context, fromStage int, db kv.RwD // New creates a new StagedStreamSync instance func New( bc core.BlockChain, + consensus *consensus.Consensus, db kv.RwDB, stagesList []*Stage, isBeacon bool, protocol syncProtocol, isBeaconNode bool, - useMemDB bool, config Config, logger zerolog.Logger, ) *StagedStreamSync { @@ -286,22 +294,24 @@ func New( status := newStatus() return &StagedStreamSync{ - bc: bc, - isBeacon: isBeacon, - db: db, - protocol: protocol, - isBeaconNode: isBeaconNode, - gbm: nil, - status: &status, - inserted: 0, - config: config, - logger: logger, - stages: stagesList, - currentStage: 0, - revertOrder: revertStages, - pruningOrder: pruneStages, - logPrefixes: logPrefixes, - UseMemDB: useMemDB, + bc: bc, + consensus: consensus, + isBeacon: isBeacon, + db: db, + protocol: protocol, + isBeaconNode: isBeaconNode, + lastMileBlocks: []*types.Block{}, + gbm: nil, + status: &status, + inserted: 0, + config: config, + logger: logger, + stages: stagesList, + currentStage: 0, + revertOrder: revertStages, + pruningOrder: pruneStages, + logPrefixes: logPrefixes, + UseMemDB: config.UseMemDB, } } @@ -583,3 +593,133 @@ func (s *StagedStreamSync) EnableStages(ids ...SyncStageID) { } } } + +func (ss *StagedStreamSync) purgeLastMileBlocksFromCache() { + ss.lastMileMux.Lock() + ss.lastMileBlocks = nil + ss.lastMileMux.Unlock() +} + +// AddLastMileBlock adds the latest a few block into queue for syncing +// only keep the latest blocks with size capped by LastMileBlocksSize +func (ss *StagedStreamSync) AddLastMileBlock(block *types.Block) { + ss.lastMileMux.Lock() + defer ss.lastMileMux.Unlock() + if ss.lastMileBlocks != nil { + if len(ss.lastMileBlocks) >= LastMileBlocksSize { + ss.lastMileBlocks = ss.lastMileBlocks[1:] + } + ss.lastMileBlocks = append(ss.lastMileBlocks, block) + } +} + +func (ss *StagedStreamSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block { + ss.lastMileMux.Lock() + defer ss.lastMileMux.Unlock() + for _, block := range ss.lastMileBlocks { + ph := block.ParentHash() + if ph == parentHash { + return block + } + } + return nil +} + +func (ss *StagedStreamSync) addConsensusLastMile(bc core.BlockChain, cs *consensus.Consensus) ([]common.Hash, error) { + curNumber := bc.CurrentBlock().NumberU64() + var hashes []common.Hash + + err := cs.GetLastMileBlockIter(curNumber+1, func(blockIter *consensus.LastMileBlockIter) error { + for { + block := blockIter.Next() + if block == nil { + break + } + if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil { + return errors.Wrap(err, "failed to InsertChain") + } + hashes = append(hashes, block.Header().Hash()) + } + return nil + }) + return hashes, err +} + +func (ss *StagedStreamSync) RollbackLastMileBlocks(ctx context.Context, hashes []common.Hash) error { + if len(hashes) == 0 { + return nil + } + utils.Logger().Info(). + Interface("block", ss.bc.CurrentBlock()). + Msg("[STAGED_STREAM_SYNC] Rolling back last mile blocks") + if err := ss.bc.Rollback(hashes); err != nil { + utils.Logger().Error().Err(err). + Msg("[STAGED_STREAM_SYNC] failed to rollback last mile blocks") + return err + } + return nil +} + +// UpdateBlockAndStatus updates block and its status in db +func (ss *StagedStreamSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error { + if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 { + utils.Logger().Debug(). + Uint64("curBlockNum", bc.CurrentBlock().NumberU64()). + Uint64("receivedBlockNum", block.NumberU64()). + Msg("[STAGED_STREAM_SYNC] Inappropriate block number, ignore!") + return nil + } + + haveCurrentSig := len(block.GetCurrentCommitSig()) != 0 + // Verify block signatures + if block.NumberU64() > 1 { + // Verify signature every N blocks (which N is verifyHeaderBatchSize and can be adjusted in configs) + verifySeal := block.NumberU64()%VerifyHeaderBatchSize == 0 || verifyAllSig + verifyCurrentSig := verifyAllSig && haveCurrentSig + if verifyCurrentSig { + sig, bitmap, err := chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig()) + if err != nil { + return errors.Wrap(err, "parse commitSigAndBitmap") + } + + startTime := time.Now() + if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sig, bitmap); err != nil { + return errors.Wrapf(err, "verify header signature %v", block.Hash().String()) + } + utils.Logger().Debug(). + Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()). + Msg("[STAGED_STREAM_SYNC] VerifyHeaderSignature") + } + err := bc.Engine().VerifyHeader(bc, block.Header(), verifySeal) + if err == engine.ErrUnknownAncestor { + return nil + } else if err != nil { + utils.Logger().Error(). + Err(err). + Uint64("block number", block.NumberU64()). + Msgf("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: failed verifying signatures for new block") + return err + } + } + + _, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */) + if err != nil { + utils.Logger().Error(). + Err(err). + Uint64("block number", block.NumberU64()). + Uint32("shard", block.ShardID()). + Msgf("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: Error adding new block to blockchain") + return err + } + utils.Logger().Info(). + Uint64("blockHeight", block.NumberU64()). + Uint64("blockEpoch", block.Epoch().Uint64()). + Str("blockHex", block.Hash().Hex()). + Uint32("ShardID", block.ShardID()). + Msg("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: New Block Added to Blockchain") + + for i, tx := range block.StakingTransactions() { + utils.Logger().Info().Msgf("StakingTxn %d: %s, %v", i, tx.StakingType().String(), tx.StakingMessage()) + } + return nil +} diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 55681d68f5..6a21fe7071 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -13,6 +13,7 @@ const ( SyncEpoch SyncStageID = "SyncEpoch" // epoch sync BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified States SyncStageID = "States" // will construct most recent state from downloaded blocks + LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well Finish SyncStageID = "Finish" // Nominal stage after all other stages ) diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index ebef810b19..738f2f9203 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -4,13 +4,15 @@ import ( "context" "fmt" "path/filepath" + "runtime" + "strings" "sync" "time" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/memdb" @@ -38,41 +40,53 @@ var Buckets = []string{ // CreateStagedSync creates an instance of staged sync func CreateStagedSync(ctx context.Context, bc core.BlockChain, + consensus *consensus.Consensus, dbDir string, - UseMemDB bool, isBeaconNode bool, protocol syncProtocol, config Config, logger zerolog.Logger, - logProgress bool, ) (*StagedStreamSync, error) { - isBeacon := bc.ShardID() == shard.BeaconChainShardID + logger.Info(). + Uint32("shard", bc.ShardID()). + Bool("beaconNode", isBeaconNode). + Bool("memdb", config.UseMemDB). + Str("dbDir", dbDir). + Bool("serverOnly", config.ServerOnly). + Int("minStreams", config.MinStreams). + Msg(WrapStagedSyncMsg("creating staged sync")) var mainDB kv.RwDB dbs := make([]kv.RwDB, config.Concurrency) - if UseMemDB { - mainDB = memdb.New(getMemDbTempPath(dbDir, -1)) + if config.UseMemDB { + mainDB = memdb.New(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)) for i := 0; i < config.Concurrency; i++ { - dbs[i] = memdb.New(getMemDbTempPath(dbDir, i)) + dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir) + dbs[i] = memdb.New(dbPath) } } else { - mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(isBeacon, -1, dbDir)).MustOpen() + logger.Info(). + Str("path", getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)). + Msg(WrapStagedSyncMsg("creating main db")) + mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).MustOpen() for i := 0; i < config.Concurrency; i++ { - dbPath := getBlockDbPath(isBeacon, i, dbDir) + dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir) dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen() } } if errInitDB := initDB(ctx, mainDB, dbs, config.Concurrency); errInitDB != nil { + logger.Error().Err(errInitDB).Msg("create staged sync instance failed") return nil, errInitDB } stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) - stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeacon, logProgress) - stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, logProgress) + stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) + stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) + lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) stageFinishCfg := NewStageFinishCfg(mainDB) stages := DefaultStages(ctx, @@ -81,17 +95,27 @@ func CreateStagedSync(ctx context.Context, stageShortRangeCfg, stageBodiesCfg, stageStatesCfg, + lastMileCfg, stageFinishCfg, ) + logger.Info(). + Uint32("shard", bc.ShardID()). + Bool("beaconNode", isBeaconNode). + Bool("memdb", config.UseMemDB). + Str("dbDir", dbDir). + Bool("serverOnly", config.ServerOnly). + Int("minStreams", config.MinStreams). + Msg(WrapStagedSyncMsg("staged sync created successfully")) + return New( bc, + consensus, mainDB, stages, - isBeacon, + isBeaconNode, protocol, isBeaconNode, - UseMemDB, config, logger, ), nil @@ -147,7 +171,7 @@ func getMemDbTempPath(dbDir string, dbIndex int) string { } // getBlockDbPath returns the path of the cache database which stores blocks -func getBlockDbPath(beacon bool, loopID int, dbDir string) string { +func getBlockDbPath(shardID uint32, beacon bool, loopID int, dbDir string) string { if beacon { if loopID >= 0 { return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/beacon_blocks_db"), loopID) @@ -156,30 +180,57 @@ func getBlockDbPath(beacon bool, loopID int, dbDir string) string { } } else { if loopID >= 0 { - return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db"), loopID) + return fmt.Sprintf("%s_%d_%d", filepath.Join(dbDir, "cache/blocks_db"), shardID, loopID) } else { - return filepath.Join(dbDir, "cache/blocks_db_main") + return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db_main"), shardID) } } } +func (s *StagedStreamSync) Debug(source string, msg interface{}) { + // only log the msg in debug mode + if !s.config.DebugMode { + return + } + pc, _, _, _ := runtime.Caller(1) + caller := runtime.FuncForPC(pc).Name() + callerParts := strings.Split(caller, "/") + if len(callerParts) > 0 { + caller = callerParts[len(callerParts)-1] + } + src := source + if src == "" { + src = "message" + } + // SSSD: STAGED STREAM SYNC DEBUG + if msg == nil { + fmt.Printf("[SSSD:%s] %s: nil or no error\n", caller, src) + } else if err, ok := msg.(error); ok { + fmt.Printf("[SSSD:%s] %s: %s\n", caller, src, err.Error()) + } else if str, ok := msg.(string); ok { + fmt.Printf("[SSSD:%s] %s: %s\n", caller, src, str) + } else { + fmt.Printf("[SSSD:%s] %s: %v\n", caller, src, msg) + } +} + // doSync does the long range sync. // One LongRangeSync consists of several iterations. // For each iteration, estimate the current block number, then fetch block & insert to blockchain -func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (int, error) { +func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (uint64, int, error) { var totalInserted int s.initSync = initSync if err := s.checkPrerequisites(); err != nil { - return 0, err + return 0, 0, err } var estimatedHeight uint64 if initSync { if h, err := s.estimateCurrentNumber(downloaderContext); err != nil { - return 0, err + return 0, 0, err } else { estimatedHeight = h //TODO: use directly currentCycle var @@ -187,36 +238,70 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo } if curBN := s.bc.CurrentBlock().NumberU64(); estimatedHeight <= curBN { s.logger.Info().Uint64("current number", curBN).Uint64("target number", estimatedHeight). - Msg(WrapStagedSyncMsg("early return of long range sync")) - return 0, nil + Msg(WrapStagedSyncMsg("early return of long range sync (chain is already ahead of target height)")) + return estimatedHeight, 0, nil } - - s.startSyncing() - defer s.finishSyncing() } + s.startSyncing() + defer s.finishSyncing() + for { ctx, cancel := context.WithCancel(downloaderContext) n, err := s.doSyncCycle(ctx, initSync) if err != nil { + utils.Logger().Error(). + Err(err). + Bool("initSync", s.initSync). + Bool("isBeacon", s.isBeacon). + Uint32("shard", s.bc.ShardID()). + Msg(WrapStagedSyncMsg("sync cycle failed")) + pl := s.promLabels() pl["error"] = err.Error() numFailedDownloadCounterVec.With(pl).Inc() cancel() - return totalInserted + n, err + return estimatedHeight, totalInserted + n, err } cancel() totalInserted += n // if it's not long range sync, skip loop - if n < LastMileBlocksThreshold || !initSync { - return totalInserted, nil + if n == 0 || !initSync { + break + } + } + + if totalInserted > 0 { + utils.Logger().Info(). + Bool("initSync", s.initSync). + Bool("isBeacon", s.isBeacon). + Uint32("shard", s.bc.ShardID()). + Int("blocks", totalInserted). + Msg(WrapStagedSyncMsg("sync cycle blocks inserted successfully")) + } + + // add consensus last mile blocks + if s.consensus != nil { + if hashes, err := s.addConsensusLastMile(s.Blockchain(), s.consensus); err != nil { + utils.Logger().Error().Err(err). + Msg("[STAGED_STREAM_SYNC] Add consensus last mile failed") + s.RollbackLastMileBlocks(downloaderContext, hashes) + return estimatedHeight, totalInserted, err + } else { + totalInserted += len(hashes) + } + // TODO: move this to explorer handler code. + if s.isExplorer { + s.consensus.UpdateConsensusInformation() } } + s.purgeLastMileBlocksFromCache() + return estimatedHeight, totalInserted, nil } func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int, error) { diff --git a/api/service/stagedsync/stagedsync.go b/api/service/stagedsync/stagedsync.go index 83af6abf9f..f1de66f9fc 100644 --- a/api/service/stagedsync/stagedsync.go +++ b/api/service/stagedsync/stagedsync.go @@ -83,6 +83,9 @@ type StagedSync struct { StagedSyncTurboMode bool // log the full sync progress in console LogProgress bool + // log every single process and error to help to debug the syncing + // DebugMode is not accessible to the end user and is only an aid for development + DebugMode bool } // BlockWithSig the serialization structure for request DownloaderRequest_BLOCKWITHSIG @@ -258,7 +261,8 @@ func New(ctx context.Context, verifyAllSig bool, verifyHeaderBatchSize uint64, insertChainBatchSize int, - logProgress bool) *StagedSync { + logProgress bool, + debugMode bool) *StagedSync { revertStages := make([]*Stage, len(stagesList)) for i, stageIndex := range revertOrder { @@ -312,6 +316,7 @@ func New(ctx context.Context, VerifyHeaderBatchSize: verifyHeaderBatchSize, InsertChainBatchSize: insertChainBatchSize, LogProgress: logProgress, + DebugMode: debugMode, } } diff --git a/api/service/stagedsync/syncing.go b/api/service/stagedsync/syncing.go index 11147f6a62..a22a4e9253 100644 --- a/api/service/stagedsync/syncing.go +++ b/api/service/stagedsync/syncing.go @@ -64,6 +64,7 @@ func CreateStagedSync( verifyHeaderBatchSize uint64, insertChainBatchSize int, logProgress bool, + debugMode bool, ) (*StagedSync, error) { ctx := context.Background() @@ -134,6 +135,7 @@ func CreateStagedSync( verifyHeaderBatchSize, insertChainBatchSize, logProgress, + debugMode, ), nil } diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 38ae3dacbb..4cc20cfdf4 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -178,6 +178,7 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{ MaxMemSyncCycleSize: 1024, // max number of blocks to use a single transaction for staged sync UseMemDB: true, // it uses memory by default. set it to false to use disk LogProgress: false, // log the full sync progress in console + DebugMode: false, // log every single process and error to help to debug the syncing (DebugMode is not accessible to the end user and is only an aid for development) } var ( diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index e66d3c9c73..16f985beac 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -606,6 +606,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, nodeConfig.VerifyHeaderBatchSize = hc.Sync.StagedSyncCfg.VerifyHeaderBatchSize nodeConfig.InsertChainBatchSize = hc.Sync.StagedSyncCfg.InsertChainBatchSize nodeConfig.LogProgress = hc.Sync.StagedSyncCfg.LogProgress + nodeConfig.DebugMode = hc.Sync.StagedSyncCfg.DebugMode // P2P private key is used for secure message transfer between p2p nodes. nodeConfig.P2PPriKey, _, err = utils.LoadKeyFromFile(hc.P2P.KeyFile) if err != nil { @@ -942,7 +943,9 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har SmHardLowCap: hc.Sync.DiscHardLowCap, SmHiCap: hc.Sync.DiscHighCap, SmDiscBatch: hc.Sync.DiscBatch, - LogProgress: node.NodeConfig.LogProgress, + UseMemDB: hc.Sync.StagedSyncCfg.UseMemDB, + LogProgress: hc.Sync.StagedSyncCfg.LogProgress, + DebugMode: hc.Sync.StagedSyncCfg.DebugMode, } // If we are running side chain, we will need to do some extra works for beacon @@ -954,7 +957,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har } } //Setup stream sync service - s := stagedstreamsync.NewService(host, blockchains, sConfig, hc.General.DataDir) + s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir) node.RegisterService(service.StagedStreamSync, s) diff --git a/consensus/validator.go b/consensus/validator.go index ee30d8c006..0506f4359d 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -132,7 +132,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block if err := consensus.verifyBlock(&blockObj); err != nil { consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed") - return nil, errors.New("Block verification failed") + return nil, errors.Errorf("Block verification failed: %s", err.Error()) } return &blockObj, nil } diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index f4f2ddb96f..5aca663a89 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -343,6 +343,7 @@ type StagedSyncConfig struct { VerifyHeaderBatchSize uint64 // batch size to verify header before insert to chain UseMemDB bool // it uses memory by default. set it to false to use disk LogProgress bool // log the full sync progress in console + DebugMode bool // log every single process and error to help to debug syncing issues (DebugMode is not accessible to the end user and is only an aid for development) } type PriceLimit int64 diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index 9f681fca9f..20e001e201 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -93,6 +93,7 @@ type ConfigType struct { VerifyAllSig bool // verify signatures for all blocks regardless of height and batch size VerifyHeaderBatchSize uint64 // batch size to verify header before insert to chain LogProgress bool // log the full sync progress in console + DebugMode bool // log every single process and error to help to debug the syncing issues NtpServer string StringRole string P2PPriKey p2p_crypto.PrivKey `json:"-"` diff --git a/node/node_syncing.go b/node/node_syncing.go index 84eb1256ff..68c3338362 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -122,7 +122,8 @@ func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync { node.NodeConfig.VerifyAllSig, node.NodeConfig.VerifyHeaderBatchSize, node.NodeConfig.InsertChainBatchSize, - node.NodeConfig.LogProgress); err != nil { + node.NodeConfig.LogProgress, + node.NodeConfig.DebugMode); err != nil { return nil } else { return s @@ -352,7 +353,7 @@ func (node *Node) NodeSyncing() { if node.HarmonyConfig.TiKV.Role == tikv.RoleWriter { node.supportSyncing() // the writer needs to be in sync with it's other peers } - } else if !node.HarmonyConfig.General.IsOffline && node.HarmonyConfig.DNSSync.Client { + } else if !node.HarmonyConfig.General.IsOffline && (node.HarmonyConfig.DNSSync.Client || node.HarmonyConfig.Sync.Downloader) { node.supportSyncing() // for non-writer-reader mode a.k.a tikv nodes } } @@ -372,6 +373,11 @@ func (node *Node) supportSyncing() { go node.SendNewBlockToUnsync() } + // if stream sync client is running, don't create other sync client instances + if node.HarmonyConfig.Sync.Downloader { + return + } + if !node.NodeConfig.StagedSync && node.stateSync == nil { node.stateSync = node.createStateSync(node.Blockchain()) utils.Logger().Debug().Msg("[SYNC] initialized state sync") diff --git a/p2p/host.go b/p2p/host.go index 9395e1df45..246b598b22 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -12,17 +12,17 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" libp2p_config "github.com/libp2p/go-libp2p/config" libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_network "github.com/libp2p/go-libp2p/core/network" libp2p_peer "github.com/libp2p/go-libp2p/core/peer" libp2p_peerstore "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/security/noise" diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 3dcbd6f063..ca4590c972 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -2,7 +2,6 @@ package sync import ( "context" - "fmt" "strconv" "time" @@ -180,7 +179,8 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) { Msg("failed to add new stream") return } - fmt.Println("Connected to", raw.Conn().RemotePeer().String(), "(", st.ProtoID(), ")", "my ID: ", raw.Conn().LocalPeer().String()) + //to get my ID use raw.Conn().LocalPeer().String() + p.logger.Info().Msgf("Connected to %s (%s)", raw.Conn().RemotePeer().String(), st.ProtoID()) st.run() } diff --git a/p2p/stream/types/utils.go b/p2p/stream/types/utils.go index c27d95d60f..72838222b4 100644 --- a/p2p/stream/types/utils.go +++ b/p2p/stream/types/utils.go @@ -11,7 +11,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/hashicorp/go-version" - libp2p_proto "github.com/libp2p/go-libp2p-core/protocol" + libp2p_proto "github.com/libp2p/go-libp2p/core/protocol" "github.com/pkg/errors" ) diff --git a/scripts/travis_go_checker.sh b/scripts/travis_go_checker.sh index 70638377a5..c6c7640caf 100755 --- a/scripts/travis_go_checker.sh +++ b/scripts/travis_go_checker.sh @@ -84,7 +84,7 @@ fi echo "Running go test..." # Fix https://github.com/golang/go/issues/44129#issuecomment-788351567 go get -t ./... -if go test -v -count=1 -vet=all -race ./... +if go test -count=1 -vet=all -race ./... then echo "go test succeeded." else