Skip to content

Commit

Permalink
Merge pull request #3 from sophoah/premain_aka_releasebranch
Browse files Browse the repository at this point in the history
dev to premain squash merge test with PR (#2)
  • Loading branch information
sophoah authored Aug 10, 2023
2 parents 53c2e66 + 6fbdae8 commit cb20b53
Show file tree
Hide file tree
Showing 28 changed files with 591 additions and 132 deletions.
59 changes: 59 additions & 0 deletions .mergify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
pull_request_rules:
- name: Squash merge rule to dev
conditions:
# applied for merge to the dev branch
- base=dev
# no unresolved threads
- "#review-threads-unresolved=0"
# Approved by two reviewers
- "#approved-reviews-by>=2"
# no unverified commit
- "#commits-unverified=0"
# Travis ci succeeded
- "check-success=Travis CI - Pull Request"
# git guardian succeeded
- "check-success=GitGuardian Security Checks"
# PR is not a draft
- -draft
# PR is not conflicting with the base branch
- -conflict
# conditions to avoid auto merge mistakes
# PR title doesn't have wip (not case sensitive)
- -title~=(?i)wip
# PR doesn't have WIP label (not case sensitive)
- label!=(?i)wip
# ready-to-merge is required to trigger the merge
- label=ready-to-merge
actions:
merge:
method: squash
- name: merge rule to main
conditions:
# from the dev branch : no direct PR to main
- head=dev
# applied for merge to the dev branch
- base=main
# no unresolved threads
- "#review-threads-unresolved=0"
# Approved by two reviewers
- "#approved-reviews-by>=2"
# no unverified commit
- "#commits-unverified=0"
# Travis ci succeeded
- "check-success=Travis CI - Pull Request"
# git guardian succeeded
- "check-success=GitGuardian Security Checks"
# PR is not a draft
- -draft
# PR is not conflicting with the base branch
- -conflict
# conditions to avoid auto merge mistakes
# PR title doesn't have wip (not case sensitive)
- -title~=(?i)wip
# PR doesn't have WIP label (not case sensitive)
- label!=(?i)wip
# ready-to-merge is required to trigger the merge
- label=ready-to-merge
actions:
merge:
method: merge
2 changes: 1 addition & 1 deletion api/service/legacysync/syncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"

nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
peer "github.com/libp2p/go-libp2p-core/peer"
peer "github.com/libp2p/go-libp2p/core/peer"

"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/api/service/legacysync/downloader"
Expand Down
12 changes: 11 additions & 1 deletion api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ const (
BlockByHashesUpperCap int = 10 // number of get blocks by hashes upper cap
BlockByHashesLowerCap int = 3 // number of get blocks by hashes lower cap

LastMileBlocksThreshold int = 10
LastMileBlocksThreshold int = 10
SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes
VerifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now)
LastMileBlocksSize = 50

// SoftQueueCap is the soft cap of size in resultQueue. When the queue size is larger than this limit,
// no more request will be assigned to workers to wait for InsertChain to finish.
Expand Down Expand Up @@ -50,8 +53,15 @@ type (
// config for beacon config
BHConfig *BeaconHelperConfig

// use memory db
UseMemDB bool

// log the stage progress
LogProgress bool

// logs every single process and error to help debugging stream sync
// DebugMode is not accessible to the end user and is only an aid for development
DebugMode bool
}

// BeaconHelperConfig is the extra config used for beaconHelper which uses
Expand Down
10 changes: 10 additions & 0 deletions api/service/stagedstreamsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ var DefaultForwardOrder = ForwardOrder{
BlockBodies,
// Stages below don't use Internet
States,
LastMile,
Finish,
}

var DefaultRevertOrder = RevertOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
Expand All @@ -29,6 +31,7 @@ var DefaultRevertOrder = RevertOrder{

var DefaultCleanUpOrder = CleanUpOrder{
Finish,
LastMile,
States,
BlockBodies,
ShortRange,
Expand All @@ -42,6 +45,7 @@ func DefaultStages(ctx context.Context,
srCfg StageShortRangeCfg,
bodiesCfg StageBodiesCfg,
statesCfg StageStatesCfg,
lastMileCfg StageLastMileCfg,
finishCfg StageFinishCfg,
) []*Stage {

Expand All @@ -50,6 +54,7 @@ func DefaultStages(ctx context.Context,
handlerStageEpochSync := NewStageEpoch(seCfg)
handlerStageBodies := NewStageBodies(bodiesCfg)
handlerStageStates := NewStageStates(statesCfg)
handlerStageLastMile := NewStageLastMile(lastMileCfg)
handlerStageFinish := NewStageFinish(finishCfg)

return []*Stage{
Expand Down Expand Up @@ -78,6 +83,11 @@ func DefaultStages(ctx context.Context,
Description: "Update Blockchain State",
Handler: handlerStageStates,
},
{
ID: LastMile,
Description: "update status for blocks after sync and update last mile blocks as well",
Handler: handlerStageLastMile,
},
{
ID: Finish,
Description: "Finalize Changes",
Expand Down
39 changes: 27 additions & 12 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/rs/zerolog"

"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
Expand Down Expand Up @@ -37,7 +38,7 @@ type (
)

// NewDownloader creates a new downloader
func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode bool, config Config) *Downloader {
func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader {
config.fixValues()

sp := sync.NewProtocol(sync.Config{
Expand Down Expand Up @@ -67,8 +68,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode

ctx, cancel := context.WithCancel(context.Background())

//TODO: use mem db should be in config file
stagedSyncInstance, err := CreateStagedSync(ctx, bc, dbDir, false, isBeaconNode, sp, config, logger, config.LogProgress)
// create an instance of staged sync for the downloader
stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger)
if err != nil {
cancel()
return nil
Expand Down Expand Up @@ -189,6 +190,7 @@ func (d *Downloader) waitForBootFinish() {
func (d *Downloader) loop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

// for shard chain and beacon chain node, first we start with initSync=true to
// make sure it goes through the long range sync first.
// for epoch chain we do only need to go through epoch sync process
Expand All @@ -208,21 +210,23 @@ func (d *Downloader) loop() {
go trigger()

case <-d.downloadC:
addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
numTriedStreams := len(d.stagedSyncInstance.invalidBlock.StreamID)
// if many streams couldn't solve it, then that's an unresolvable bad block
if numTriedStreams >= d.config.InitStreams {
if !d.stagedSyncInstance.invalidBlock.IsLogged {
fmt.Println("unresolvable bad block:", d.stagedSyncInstance.invalidBlock.Number)
d.logger.Error().
Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number).
Msg(WrapStagedSyncMsg("unresolvable bad block"))
d.stagedSyncInstance.invalidBlock.IsLogged = true
}
//TODO: if we don't have any new or untried stream in the list, sleep or panic
}
}

// If any error happens, sleep 5 seconds and retry
d.logger.Error().
Err(err).
Expand All @@ -242,16 +246,27 @@ func (d *Downloader) loop() {
Uint32("shard", d.bc.ShardID()).
Msg(WrapStagedSyncMsg("sync finished"))
}

// If block number has been changed, trigger another sync
if addedBN != 0 {
// If block number has been changed, trigger another sync
go trigger()
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
}
}
// try to add last mile from pub-sub (blocking)
if d.bh != nil {
d.bh.insertSync()
// if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and
// the node is fully synced now, then switch to short range
// the reason why we need to check distanceBeforeSync is because, if it was long distance,
// very likely, there are a couple of new blocks have been added to the other nodes which
// we should still stay in long range and check them.
bnAfterSync := d.bc.CurrentBlock().NumberU64()
distanceBeforeSync := estimatedHeight - bnBeforeSync
distanceAfterSync := estimatedHeight - bnAfterSync
if estimatedHeight > 0 && addedBN > 0 &&
distanceBeforeSync <= uint64(LastMileBlocksThreshold) &&
distanceAfterSync <= uint64(LastMileBlocksThreshold) {
initSync = false
}
initSync = false

case <-d.closeC:
return
Expand Down
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/downloaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stagedstreamsync

import (
"github.com/harmony-one/abool"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/p2p"
)
Expand All @@ -15,7 +16,7 @@ type Downloaders struct {
}

// NewDownloaders creates Downloaders for sync of multiple blockchains
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config Config) *Downloaders {
func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders {
ds := make(map[uint32]*Downloader)
isBeaconNode := len(bcs) == 1
for _, bc := range bcs {
Expand All @@ -25,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config C
if _, ok := ds[bc.ShardID()]; ok {
continue
}
ds[bc.ShardID()] = NewDownloader(host, bc, dbDir, isBeaconNode, config)
ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config)
}
return &Downloaders{
ds: ds,
Expand Down
5 changes: 3 additions & 2 deletions api/service/stagedstreamsync/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stagedstreamsync

import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/p2p"
)
Expand All @@ -11,9 +12,9 @@ type StagedStreamSyncService struct {
}

// NewService creates a new downloader service
func NewService(host p2p.Host, bcs []core.BlockChain, config Config, dbDir string) *StagedStreamSyncService {
func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService {
return &StagedStreamSyncService{
Downloaders: NewDownloaders(host, bcs, dbDir, config),
Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config),
}
}

Expand Down
6 changes: 6 additions & 0 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func (sh *srHelper) removeStreams(sts []sttypes.StreamID) {
}
}

func (sh *srHelper) streamsFailed(sts []sttypes.StreamID, reason string) {
for _, st := range sts {
sh.syncProtocol.StreamFailed(st, reason)
}
}

// blameAllStreams only not to blame all whitelisted streams when the it's not the last block signature verification failed.
func (sh *srHelper) blameAllStreams(blocks types.Blocks, errIndex int, err error) bool {
if errors.As(err, &emptySigVerifyErr) && errIndex == len(blocks)-1 {
Expand Down
10 changes: 8 additions & 2 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/harmony-one/harmony/shard"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -60,6 +61,11 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return nil
}

// shouldn't execute for epoch chain
if b.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode {
return nil
}

maxHeight := s.state.status.targetBN
currentHead := b.configs.bc.CurrentBlock().NumberU64()
if currentHead >= maxHeight {
Expand All @@ -77,7 +83,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return errV
}

if currProgress == 0 {
if currProgress <= currentHead {
if err := b.cleanAllBlockDBs(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -209,7 +215,7 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err
isOneOfTheBadStreams := false
for _, id := range s.state.invalidBlock.StreamID {
if id == stid {
b.configs.protocol.RemoveStream(stid)
b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed")
isOneOfTheBadStreams = true
break
}
Expand Down
Loading

0 comments on commit cb20b53

Please sign in to comment.