Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve staged stream sync #4660

Merged
merged 7 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions api/service/stagedstreamsync/block_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/ethereum/go-ethereum/common"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/rs/zerolog"
)

Expand All @@ -19,7 +18,6 @@ type BlockDownloadDetails struct {
// blockDownloadManager is the helper structure for get blocks request management
type blockDownloadManager struct {
chain blockChain
tx kv.RwTx

targetBN uint64
requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received
Expand All @@ -32,10 +30,9 @@ type blockDownloadManager struct {
lock sync.Mutex
}

func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logger zerolog.Logger) *blockDownloadManager {
func newBlockDownloadManager(chain blockChain, targetBN uint64, logger zerolog.Logger) *blockDownloadManager {
return &blockDownloadManager{
chain: chain,
tx: tx,
targetBN: targetBN,
requesting: make(map[uint64]struct{}),
processing: make(map[uint64]struct{}),
Expand Down
52 changes: 27 additions & 25 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
currProgress := uint64(0)
targetHeight := s.state.currentCycle.TargetHeight

if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}

if errV := CreateView(ctx, b.configs.db, tx, func(etx kv.Tx) error {
if currProgress, err = s.CurrentStageProgress(etx); err != nil {
return err
Expand All @@ -97,24 +106,14 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return nil
}

// size := uint64(0)
startTime := time.Now()
// startBlock := currProgress
if b.configs.logProgress {
fmt.Print("\033[s") // save the cursor position
}

if useInternalTx {
var err error
tx, err = b.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}

// Fetch blocks from neighbors
s.state.gbm = newBlockDownloadManager(tx, b.configs.bc, targetHeight, s.state.logger)
s.state.gbm = newBlockDownloadManager(b.configs.bc, targetHeight, s.state.logger)

// Setup workers to fetch blocks from remote node
var wg sync.WaitGroup
Expand Down Expand Up @@ -188,7 +187,7 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.RemoveStream(stid)
} else {
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
if err = b.saveBlocks(ctx, nil, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)
}
gbm.HandleRequestResult(batch, blockBytes, sigBytes, loopID, stid)
Expand Down Expand Up @@ -239,8 +238,9 @@ func (b *StageBodies) verifyBlockAndExtractReceiptsData(batchBlockBytes [][]byte
// redownloadBadBlock tries to redownload the bad block from other streams
func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error {

batch := make([]uint64, 1)
batch = append(batch, s.state.invalidBlock.Number)
batch := []uint64{s.state.invalidBlock.Number}

badBlockDownloadLoop:

for {
if b.configs.protocol.NumStreams() == 0 {
Expand All @@ -253,21 +253,20 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err
}
continue
}
isOneOfTheBadStreams := false
for _, id := range s.state.invalidBlock.StreamID {
if id == stid {
// TODO: if block is invalid then call StreamFailed
b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed")
isOneOfTheBadStreams = true
break
continue badBlockDownloadLoop
}
}
if isOneOfTheBadStreams {
continue
}
s.state.gbm.SetDownloadDetails(batch, 0, stid)
if errU := b.configs.blockDBs[0].Update(ctx, func(tx kv.RwTx) error {
if err = b.saveBlocks(ctx, tx, batch, blockBytes, sigBytes, 0, stid); err != nil {
return errors.Errorf("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed.")
utils.Logger().Error().
Err(err).
Msg("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed")
return errors.Errorf("%s: %s", ErrSaveBlocksToDbFailed.Error(), err.Error())
}
return nil
}); errU != nil {
Expand Down Expand Up @@ -314,11 +313,14 @@ func validateGetBlocksResult(requested []uint64, result []*types.Block) error {
// saveBlocks saves the blocks into db
func (b *StageBodies) saveBlocks(ctx context.Context, tx kv.RwTx, bns []uint64, blockBytes [][]byte, sigBytes [][]byte, loopID int, stid sttypes.StreamID) error {

tx, err := b.configs.blockDBs[loopID].BeginRw(ctx)
if err != nil {
return err
if tx == nil {
var err error
tx, err = b.configs.blockDBs[loopID].BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
defer tx.Rollback()

for i := uint64(0); i < uint64(len(blockBytes)); i++ {
block := blockBytes[i]
Expand Down
8 changes: 4 additions & 4 deletions api/service/stagedstreamsync/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func GetStageID(stage SyncStageID, isBeacon bool, prune bool) []byte {
// GetStageProgress retrieves saved progress of a given sync stage from the database
func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) {
stgID := GetStageID(stage, isBeacon, false)
v, err := db.GetOne(kv.SyncStageProgress, stgID)
v, err := db.GetOne(StageProgressBucket, stgID)
if err != nil {
return 0, err
}
Expand All @@ -50,13 +50,13 @@ func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, e
// SaveStageProgress saves progress of given sync stage
func SaveStageProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error {
stgID := GetStageID(stage, isBeacon, false)
return db.Put(kv.SyncStageProgress, stgID, marshalData(progress))
return db.Put(StageProgressBucket, stgID, marshalData(progress))
}

// GetStageCleanUpProgress retrieves saved progress of given sync stage from the database
func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) {
stgID := GetStageID(stage, isBeacon, true)
v, err := db.GetOne(kv.SyncStageProgress, stgID)
v, err := db.GetOne(StageProgressBucket, stgID)
if err != nil {
return 0, err
}
Expand All @@ -66,5 +66,5 @@ func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (ui
// SaveStageCleanUpProgress stores the progress of the clean up for a given sync stage to the database
func SaveStageCleanUpProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error {
stgID := GetStageID(stage, isBeacon, true)
return db.Put(kv.SyncStageProgress, stgID, marshalData(progress))
return db.Put(StageProgressBucket, stgID, marshalData(progress))
}
61 changes: 35 additions & 26 deletions api/service/stagedstreamsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/log/v3"
"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -62,23 +61,34 @@ func CreateStagedSync(ctx context.Context,
var mainDB kv.RwDB
dbs := make([]kv.RwDB, config.Concurrency)
if config.UseMemDB {
mainDB = memdb.New(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir))
mdbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)
logger.Info().
Str("path", mdbPath).
Msg(WrapStagedSyncMsg("creating main db in memory"))
mainDB = mdbx.NewMDBX(log.New()).InMem(mdbPath).MustOpen()
for i := 0; i < config.Concurrency; i++ {
dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir)
dbs[i] = memdb.New(dbPath)
logger.Info().
Str("path", dbPath).
Msg(WrapStagedSyncMsg("creating blocks db in memory"))
dbs[i] = mdbx.NewMDBX(log.New()).InMem(dbPath).MustOpen()
}
} else {
mdbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)
logger.Info().
Str("path", getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).
Msg(WrapStagedSyncMsg("creating main db"))
mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).MustOpen()
Str("path", mdbPath).
Msg(WrapStagedSyncMsg("creating main db in disk"))
mainDB = mdbx.NewMDBX(log.New()).Path(mdbPath).MustOpen()
for i := 0; i < config.Concurrency; i++ {
dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir)
logger.Info().
Str("path", dbPath).
Msg(WrapStagedSyncMsg("creating blocks db in disk"))
dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen()
}
}

if errInitDB := initDB(ctx, mainDB, dbs, config.Concurrency); errInitDB != nil {
if errInitDB := initDB(ctx, mainDB, dbs); errInitDB != nil {
logger.Error().Err(errInitDB).Msg("create staged sync instance failed")
return nil, errInitDB
}
Expand Down Expand Up @@ -118,7 +128,8 @@ func CreateStagedSync(ctx context.Context,
Str("dbDir", dbDir).
Bool("serverOnly", config.ServerOnly).
Int("minStreams", config.MinStreams).
Msg(WrapStagedSyncMsg("staged sync created successfully"))
Str("dbDir", dbDir).
Msg(WrapStagedSyncMsg("staged stream sync created successfully"))

return New(
bc,
Expand All @@ -134,7 +145,7 @@ func CreateStagedSync(ctx context.Context,
}

// initDB inits the sync loop main database and create buckets
func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int) error {
func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB) error {

// create buckets for mainDB
tx, errRW := mainDB.BeginRw(ctx)
Expand All @@ -143,43 +154,41 @@ func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int)
}
defer tx.Rollback()

for _, name := range Buckets {
if err := tx.CreateBucket(GetStageName(name, false, false)); err != nil {
for _, bucketName := range Buckets {
if err := tx.CreateBucket(bucketName); err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}

// create buckets for block cache DBs
for _, db := range dbs {
tx, errRW := db.BeginRw(ctx)
if errRW != nil {
return errRW
createBlockBuckets := func(db kv.RwDB) error {
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()
if err := tx.CreateBucket(BlocksBucket); err != nil {
return err
}
if err := tx.CreateBucket(BlockSignaturesBucket); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return err
}
return nil
}

return nil
}

// getMemDbTempPath returns the path of the temporary cache database for memdb
func getMemDbTempPath(dbDir string, dbIndex int) string {
if dbIndex >= 0 {
return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/memdb/db"), dbIndex)
// create buckets for block cache DBs
for _, db := range dbs {
if err := createBlockBuckets(db); err != nil {
return err
}
}
return filepath.Join(dbDir, "cache/memdb/db_main")

return nil
}

// getBlockDbPath returns the path of the cache database which stores blocks
Expand Down
18 changes: 18 additions & 0 deletions api/service/stagedsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,24 @@ func CreateStagedSync(
finishCfg,
)

utils.Logger().Info().
sophoah marked this conversation as resolved.
Show resolved Hide resolved
Str("ip", ip).
Str("port", port).
Uint32("shard", bc.ShardID()).
Bool("isExplorer", isExplorer).
Bool("TurboMode", TurboMode).
Bool("memdb", UseMemDB).
Bool("doubleCheckBlockHashes", doubleCheckBlockHashes).
Uint64("maxBlocksPerCycle", maxBlocksPerCycle).
Uint64("maxBackgroundBlocks", maxBackgroundBlocks).
Uint64("maxMemSyncCycleSize", maxMemSyncCycleSize).
Bool("verifyAllSig", verifyAllSig).
Uint64("verifyHeaderBatchSize", verifyHeaderBatchSize).
Int("insertChainBatchSize", insertChainBatchSize).
Bool("debugMode", debugMode).
Str("dbDir", dbDir).
Msg("[STAGED_SYNC] staged sync created successfully")

return New(ctx,
ip,
port,
Expand Down
1 change: 1 addition & 0 deletions cmd/harmony/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func getDefaultCacheConfig(nt nodeconfig.NetworkType) harmonyconfig.CacheConfig
case nodeconfig.Localnet:
cacheConfig.Disabled = false
cacheConfig.Preimages = false
cacheConfig.SnapshotLimit = 0
default:
cacheConfig.Disabled = false
cacheConfig.Preimages = true
Expand Down
8 changes: 4 additions & 4 deletions cmd/harmony/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ var (
}

defaultPartnerSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Enabled: false,
SyncMode: 0,
Downloader: true,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 2,
Expand All @@ -257,9 +257,9 @@ var (
}

defaultElseSyncConfig = harmonyconfig.SyncConfig{
Enabled: true,
Enabled: false,
SyncMode: 0,
Downloader: true,
Downloader: false,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
Expand Down
5 changes: 4 additions & 1 deletion p2p/stream/common/requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (rm *requestManager) loop() {
if req == nil {
break loop
}
rm.logger.Debug().Str("request", req.String()).
Msg("add new incoming request to pending queue")
rm.addPendingRequest(req, st)
b, err := req.Encode()
if err != nil {
Expand Down Expand Up @@ -202,7 +204,8 @@ func (rm *requestManager) loop() {
func (rm *requestManager) handleNewRequest(req *request) bool {
rm.lock.Lock()
defer rm.lock.Unlock()

rm.logger.Debug().Str("request", req.String()).
Msg("add new outgoing request to waiting queue")
err := rm.addRequestToWaitings(req, reqPriorityLow)
if err != nil {
rm.logger.Warn().Err(err).Msg("failed to add new request to waitings")
Expand Down