From 85b8f842da871122a6268e83646301f63e78c053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Mon, 15 Apr 2024 19:39:44 +0800 Subject: [PATCH 1/7] disable stream sync for partner network and other networks --- cmd/harmony/default.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 22b964b997..217c7306f1 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -241,9 +241,9 @@ var ( } defaultPartnerSyncConfig = harmonyconfig.SyncConfig{ - Enabled: true, + Enabled: false, SyncMode: 0, - Downloader: true, + Downloader: false, StagedSync: false, StagedSyncCfg: defaultStagedSyncConfig, Concurrency: 2, @@ -257,9 +257,9 @@ var ( } defaultElseSyncConfig = harmonyconfig.SyncConfig{ - Enabled: true, + Enabled: false, SyncMode: 0, - Downloader: true, + Downloader: false, StagedSync: false, StagedSyncCfg: defaultStagedSyncConfig, Concurrency: 4, From b98464b89aef70b92a5a9a0fc4fec38247cc79d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Mon, 15 Apr 2024 19:40:33 +0800 Subject: [PATCH 2/7] remove unused tx in block manager --- api/service/stagedstreamsync/block_manager.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/api/service/stagedstreamsync/block_manager.go b/api/service/stagedstreamsync/block_manager.go index d614d24205..d651dcab0e 100644 --- a/api/service/stagedstreamsync/block_manager.go +++ b/api/service/stagedstreamsync/block_manager.go @@ -6,7 +6,6 @@ import ( "github.com/ethereum/go-ethereum/common" sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/ledgerwatch/erigon-lib/kv" "github.com/rs/zerolog" ) @@ -19,7 +18,6 @@ type BlockDownloadDetails struct { // blockDownloadManager is the helper structure for get blocks request management type blockDownloadManager struct { chain blockChain - tx kv.RwTx targetBN uint64 requesting map[uint64]struct{} // block numbers that have been assigned to workers but not received @@ -32,10 +30,9 @@ type blockDownloadManager struct { lock sync.Mutex } -func newBlockDownloadManager(tx kv.RwTx, chain blockChain, targetBN uint64, logger zerolog.Logger) *blockDownloadManager { +func newBlockDownloadManager(chain blockChain, targetBN uint64, logger zerolog.Logger) *blockDownloadManager { return &blockDownloadManager{ chain: chain, - tx: tx, targetBN: targetBN, requesting: make(map[uint64]struct{}), processing: make(map[uint64]struct{}), From 3a04dc9dba1370c4c71d795cc05870f8ec218733 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Mon, 15 Apr 2024 19:41:32 +0800 Subject: [PATCH 3/7] add log for staged sync creation --- api/service/stagedsync/syncing.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/api/service/stagedsync/syncing.go b/api/service/stagedsync/syncing.go index 623ea476d9..980269d8c8 100644 --- a/api/service/stagedsync/syncing.go +++ b/api/service/stagedsync/syncing.go @@ -112,6 +112,24 @@ func CreateStagedSync( finishCfg, ) + utils.Logger().Info(). + Str("ip", ip). + Str("port", port). + Uint32("shard", bc.ShardID()). + Bool("isExplorer", isExplorer). + Bool("TurboMode", TurboMode). + Bool("memdb", UseMemDB). + Bool("doubleCheckBlockHashes", doubleCheckBlockHashes). + Uint64("maxBlocksPerCycle", maxBlocksPerCycle). + Uint64("maxBackgroundBlocks", maxBackgroundBlocks). + Uint64("maxMemSyncCycleSize", maxMemSyncCycleSize). + Bool("verifyAllSig", verifyAllSig). + Uint64("verifyHeaderBatchSize", verifyHeaderBatchSize). + Int("insertChainBatchSize", insertChainBatchSize). + Bool("debugMode", debugMode). + Str("dbDir", dbDir). + Msg("[STAGED_SYNC] staged sync created successfully") + return New(ctx, ip, port, From b18f63c68918e83c9cc65f1e053b142be34db3ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Mon, 15 Apr 2024 19:45:18 +0800 Subject: [PATCH 4/7] change bucket for stroring progress in staged stream sync, fix saveBlocks tx issue --- api/service/stagedstreamsync/stage_bodies.go | 52 ++++++++++---------- api/service/stagedstreamsync/stages.go | 8 +-- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/api/service/stagedstreamsync/stage_bodies.go b/api/service/stagedstreamsync/stage_bodies.go index 9fdf4681a1..18cbb1722a 100644 --- a/api/service/stagedstreamsync/stage_bodies.go +++ b/api/service/stagedstreamsync/stage_bodies.go @@ -77,6 +77,15 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev currProgress := uint64(0) targetHeight := s.state.currentCycle.TargetHeight + if useInternalTx { + var err error + tx, err = b.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + if errV := CreateView(ctx, b.configs.db, tx, func(etx kv.Tx) error { if currProgress, err = s.CurrentStageProgress(etx); err != nil { return err @@ -97,24 +106,14 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return nil } - // size := uint64(0) startTime := time.Now() // startBlock := currProgress if b.configs.logProgress { fmt.Print("\033[s") // save the cursor position } - if useInternalTx { - var err error - tx, err = b.configs.db.BeginRw(ctx) - if err != nil { - return err - } - defer tx.Rollback() - } - // Fetch blocks from neighbors - s.state.gbm = newBlockDownloadManager(tx, b.configs.bc, targetHeight, s.state.logger) + s.state.gbm = newBlockDownloadManager(b.configs.bc, targetHeight, s.state.logger) // Setup workers to fetch blocks from remote node var wg sync.WaitGroup @@ -188,7 +187,7 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload gbm.HandleRequestError(batch, err, stid) b.configs.protocol.RemoveStream(stid) } else { - if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil { + if err = b.saveBlocks(ctx, nil, batch, blockBytes, sigBytes, loopID, stid); err != nil { panic(ErrSaveBlocksToDbFailed) } gbm.HandleRequestResult(batch, blockBytes, sigBytes, loopID, stid) @@ -239,8 +238,9 @@ func (b *StageBodies) verifyBlockAndExtractReceiptsData(batchBlockBytes [][]byte // redownloadBadBlock tries to redownload the bad block from other streams func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) error { - batch := make([]uint64, 1) - batch = append(batch, s.state.invalidBlock.Number) + batch := []uint64{s.state.invalidBlock.Number} + +badBlockDownloadLoop: for { if b.configs.protocol.NumStreams() == 0 { @@ -253,21 +253,20 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err } continue } - isOneOfTheBadStreams := false for _, id := range s.state.invalidBlock.StreamID { if id == stid { + // TODO: if block is invalid then call StreamFailed b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed") - isOneOfTheBadStreams = true - break + continue badBlockDownloadLoop } } - if isOneOfTheBadStreams { - continue - } s.state.gbm.SetDownloadDetails(batch, 0, stid) if errU := b.configs.blockDBs[0].Update(ctx, func(tx kv.RwTx) error { if err = b.saveBlocks(ctx, tx, batch, blockBytes, sigBytes, 0, stid); err != nil { - return errors.Errorf("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed.") + utils.Logger().Error(). + Err(err). + Msg("[STAGED_STREAM_SYNC] saving re-downloaded bad block to db failed") + return errors.Errorf("%s: %s", ErrSaveBlocksToDbFailed.Error(), err.Error()) } return nil }); errU != nil { @@ -314,11 +313,14 @@ func validateGetBlocksResult(requested []uint64, result []*types.Block) error { // saveBlocks saves the blocks into db func (b *StageBodies) saveBlocks(ctx context.Context, tx kv.RwTx, bns []uint64, blockBytes [][]byte, sigBytes [][]byte, loopID int, stid sttypes.StreamID) error { - tx, err := b.configs.blockDBs[loopID].BeginRw(ctx) - if err != nil { - return err + if tx == nil { + var err error + tx, err = b.configs.blockDBs[loopID].BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() } - defer tx.Rollback() for i := uint64(0); i < uint64(len(blockBytes)); i++ { block := blockBytes[i] diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 33f3b293b0..03dcafea3c 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -40,7 +40,7 @@ func GetStageID(stage SyncStageID, isBeacon bool, prune bool) []byte { // GetStageProgress retrieves saved progress of a given sync stage from the database func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) { stgID := GetStageID(stage, isBeacon, false) - v, err := db.GetOne(kv.SyncStageProgress, stgID) + v, err := db.GetOne(StageProgressBucket, stgID) if err != nil { return 0, err } @@ -50,13 +50,13 @@ func GetStageProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, e // SaveStageProgress saves progress of given sync stage func SaveStageProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error { stgID := GetStageID(stage, isBeacon, false) - return db.Put(kv.SyncStageProgress, stgID, marshalData(progress)) + return db.Put(StageProgressBucket, stgID, marshalData(progress)) } // GetStageCleanUpProgress retrieves saved progress of given sync stage from the database func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (uint64, error) { stgID := GetStageID(stage, isBeacon, true) - v, err := db.GetOne(kv.SyncStageProgress, stgID) + v, err := db.GetOne(StageProgressBucket, stgID) if err != nil { return 0, err } @@ -66,5 +66,5 @@ func GetStageCleanUpProgress(db kv.Getter, stage SyncStageID, isBeacon bool) (ui // SaveStageCleanUpProgress stores the progress of the clean up for a given sync stage to the database func SaveStageCleanUpProgress(db kv.Putter, stage SyncStageID, isBeacon bool, progress uint64) error { stgID := GetStageID(stage, isBeacon, true) - return db.Put(kv.SyncStageProgress, stgID, marshalData(progress)) + return db.Put(StageProgressBucket, stgID, marshalData(progress)) } From 79ed09bedc02bdfa1654968f296d7b8f5590033e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Mon, 15 Apr 2024 21:35:40 +0800 Subject: [PATCH 5/7] refactor stream sync db initialization --- api/service/stagedstreamsync/syncing.go | 61 ++++++++++++++----------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index 0db0dd4e2d..b996b516a8 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -17,7 +17,6 @@ import ( sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/log/v3" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -62,23 +61,34 @@ func CreateStagedSync(ctx context.Context, var mainDB kv.RwDB dbs := make([]kv.RwDB, config.Concurrency) if config.UseMemDB { - mainDB = memdb.New(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)) + mdbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir) + logger.Info(). + Str("path", mdbPath). + Msg(WrapStagedSyncMsg("creating main db in memory")) + mainDB = mdbx.NewMDBX(log.New()).InMem(mdbPath).MustOpen() for i := 0; i < config.Concurrency; i++ { dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir) - dbs[i] = memdb.New(dbPath) + logger.Info(). + Str("path", dbPath). + Msg(WrapStagedSyncMsg("creating blocks db in memory")) + dbs[i] = mdbx.NewMDBX(log.New()).InMem(dbPath).MustOpen() } } else { + mdbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir) logger.Info(). - Str("path", getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)). - Msg(WrapStagedSyncMsg("creating main db")) - mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).MustOpen() + Str("path", mdbPath). + Msg(WrapStagedSyncMsg("creating main db in disk")) + mainDB = mdbx.NewMDBX(log.New()).Path(mdbPath).MustOpen() for i := 0; i < config.Concurrency; i++ { dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir) + logger.Info(). + Str("path", dbPath). + Msg(WrapStagedSyncMsg("creating blocks db in disk")) dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen() } } - if errInitDB := initDB(ctx, mainDB, dbs, config.Concurrency); errInitDB != nil { + if errInitDB := initDB(ctx, mainDB, dbs); errInitDB != nil { logger.Error().Err(errInitDB).Msg("create staged sync instance failed") return nil, errInitDB } @@ -118,7 +128,8 @@ func CreateStagedSync(ctx context.Context, Str("dbDir", dbDir). Bool("serverOnly", config.ServerOnly). Int("minStreams", config.MinStreams). - Msg(WrapStagedSyncMsg("staged sync created successfully")) + Str("dbDir", dbDir). + Msg(WrapStagedSyncMsg("staged stream sync created successfully")) return New( bc, @@ -134,7 +145,7 @@ func CreateStagedSync(ctx context.Context, } // initDB inits the sync loop main database and create buckets -func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int) error { +func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB) error { // create buckets for mainDB tx, errRW := mainDB.BeginRw(ctx) @@ -143,8 +154,8 @@ func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int) } defer tx.Rollback() - for _, name := range Buckets { - if err := tx.CreateBucket(GetStageName(name, false, false)); err != nil { + for _, bucketName := range Buckets { + if err := tx.CreateBucket(bucketName); err != nil { return err } } @@ -152,34 +163,32 @@ func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB, concurrency int) return err } - // create buckets for block cache DBs - for _, db := range dbs { - tx, errRW := db.BeginRw(ctx) - if errRW != nil { - return errRW + createBlockBuckets := func(db kv.RwDB) error { + tx, err := db.BeginRw(ctx) + if err != nil { + return err } - + defer tx.Rollback() if err := tx.CreateBucket(BlocksBucket); err != nil { return err } if err := tx.CreateBucket(BlockSignaturesBucket); err != nil { return err } - if err := tx.Commit(); err != nil { return err } + return nil } - return nil -} - -// getMemDbTempPath returns the path of the temporary cache database for memdb -func getMemDbTempPath(dbDir string, dbIndex int) string { - if dbIndex >= 0 { - return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/memdb/db"), dbIndex) + // create buckets for block cache DBs + for _, db := range dbs { + if err := createBlockBuckets(db); err != nil { + return err + } } - return filepath.Join(dbDir, "cache/memdb/db_main") + + return nil } // getBlockDbPath returns the path of the cache database which stores blocks From 156ea518af4654258b9ecaab98b8a79034555b44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 30 Apr 2024 04:11:28 -0400 Subject: [PATCH 6/7] set SnapshotLimit to zero for localnet --- cmd/harmony/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/harmony/config.go b/cmd/harmony/config.go index 0b798551ae..0d2632d8aa 100644 --- a/cmd/harmony/config.go +++ b/cmd/harmony/config.go @@ -173,6 +173,7 @@ func getDefaultCacheConfig(nt nodeconfig.NetworkType) harmonyconfig.CacheConfig case nodeconfig.Localnet: cacheConfig.Disabled = false cacheConfig.Preimages = false + cacheConfig.SnapshotLimit = 0 default: cacheConfig.Disabled = false cacheConfig.Preimages = true From be600ee5cbcd9626534b3fc99211140767aa2a13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGheisMohammadi=E2=80=9D?= <36589218+GheisMohammadi@users.noreply.github.com> Date: Tue, 30 Apr 2024 04:20:55 -0400 Subject: [PATCH 7/7] add new logs for stream request manager --- p2p/stream/common/requestmanager/requestmanager.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 37a810dcc3..84d7210c7a 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -154,6 +154,8 @@ func (rm *requestManager) loop() { if req == nil { break loop } + rm.logger.Debug().Str("request", req.String()). + Msg("add new incoming request to pending queue") rm.addPendingRequest(req, st) b, err := req.Encode() if err != nil { @@ -202,7 +204,8 @@ func (rm *requestManager) loop() { func (rm *requestManager) handleNewRequest(req *request) bool { rm.lock.Lock() defer rm.lock.Unlock() - + rm.logger.Debug().Str("request", req.String()). + Msg("add new outgoing request to waiting queue") err := rm.addRequestToWaitings(req, reqPriorityLow) if err != nil { rm.logger.Warn().Err(err).Msg("failed to add new request to waitings")