diff --git a/eth/backend.go b/eth/backend.go index 52e67683958..c9a943941f6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -34,10 +34,6 @@ import ( "sync/atomic" "time" - "github.com/ledgerwatch/erigon-lib/common/dir" - "github.com/ledgerwatch/erigon-lib/common/disk" - "github.com/ledgerwatch/erigon-lib/common/mem" - "github.com/erigontech/mdbx-go/mdbx" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/holiman/uint256" @@ -54,6 +50,9 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/common/dir" + "github.com/ledgerwatch/erigon-lib/common/disk" + "github.com/ledgerwatch/erigon-lib/common/mem" "github.com/ledgerwatch/erigon-lib/config3" "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/downloader" @@ -609,7 +608,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } } - sentryMcDisableBlockDownload := config.PolygonSync + sentryMcDisableBlockDownload := config.PolygonSync || config.PolygonSyncStage backend.sentriesClient, err = sentry_multi_client.NewMultiClient( backend.chainDB, chainConfig, @@ -847,6 +846,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger if config.PolygonSyncStage { backend.syncStages = stages2.NewPolygonSyncStages( backend.sentryCtx, + logger, backend.chainDB, config, backend.chainConfig, @@ -859,6 +859,10 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.silkworm, backend.forkValidator, heimdallClient, + polygonSyncSentry(sentries), + p2pConfig.MaxPeers, + statusDataProvider, + backend.stopNode, ) backend.syncUnwindOrder = stagedsync.PolygonSyncUnwindOrder backend.syncPruneOrder = stagedsync.PolygonSyncPruneOrder @@ -966,27 +970,12 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } if config.PolygonSync { - // TODO - pending sentry multi client refactor - // - sentry multi client should conform to the SentryClient interface and internally - // multiplex - // - for now we just use 1 sentry - var sentryClient direct.SentryClient - for _, client := range sentries { - if client.Protocol() == direct.ETH68 { - sentryClient = client - break - } - } - if sentryClient == nil { - return nil, errors.New("nil sentryClient for polygon sync") - } - backend.polygonSyncService = polygonsync.NewService( logger, chainConfig, dirs.DataDir, tmpdir, - sentryClient, + polygonSyncSentry(sentries), p2pConfig.MaxPeers, statusDataProvider, config.HeimdallURL, @@ -1741,3 +1730,23 @@ func setBorDefaultTxPoolPriceLimit(chainConfig *chain.Config, config txpoolcfg.C config.MinFeeCap = txpoolcfg.BorDefaultTxPoolPriceLimit } } + +func polygonSyncSentry(sentries []direct.SentryClient) direct.SentryClient { + // TODO - pending sentry multi client refactor + // - sentry multi client should conform to the SentryClient interface and internally + // multiplex + // - for now we just use 1 sentry + var sentryClient direct.SentryClient + for _, client := range sentries { + if client.Protocol() == direct.ETH68 { + sentryClient = client + break + } + } + + if sentryClient == nil { + panic("nil sentryClient for polygon sync") + } + + return sentryClient +} diff --git a/eth/stagedsync/bor_heimdall_shared.go b/eth/stagedsync/bor_heimdall_shared.go index e6d5aa1c6ac..7dbdf3463a6 100644 --- a/eth/stagedsync/bor_heimdall_shared.go +++ b/eth/stagedsync/bor_heimdall_shared.go @@ -13,7 +13,9 @@ import ( "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/accounts/abi" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/polygon/bor/borcfg" "github.com/ledgerwatch/erigon/polygon/heimdall" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/services" @@ -299,7 +301,7 @@ func fetchAndWriteHeimdallMilestonesIfNeeded( } for milestoneId := lastId + 1; milestoneId <= uint64(count) && (lastMilestone == nil || lastMilestone.EndBlock().Uint64() < toBlockNum); milestoneId++ { - if _, lastMilestone, err = fetchAndWriteHeimdallMilestone(ctx, milestoneId, uint64(count), tx, cfg.heimdallClient, logPrefix, logger); err != nil { + if _, lastMilestone, err = fetchAndWriteHeimdallMilestone(ctx, milestoneId, tx, cfg.heimdallClient, logPrefix, logger); err != nil { if !errors.Is(err, heimdall.ErrNotInMilestoneList) { return 0, err } @@ -318,7 +320,6 @@ var activeMilestones uint64 = 100 func fetchAndWriteHeimdallMilestone( ctx context.Context, milestoneId uint64, - count uint64, tx kv.RwTx, heimdallClient heimdall.HeimdallClient, logPrefix string, @@ -368,7 +369,19 @@ func fetchRequiredHeimdallStateSyncEventsIfNeeded( return lastStateSyncEventID, 0, 0, nil } - return fetchAndWriteHeimdallStateSyncEvents(ctx, header, lastStateSyncEventID, tx, cfg, logPrefix, logger) + return fetchAndWriteHeimdallStateSyncEvents( + ctx, + header, + lastStateSyncEventID, + tx, + cfg.borConfig, + cfg.blockReader, + cfg.heimdallClient, + cfg.chainConfig.ChainID.String(), + cfg.stateReceiverABI, + logPrefix, + logger, + ) } func fetchAndWriteHeimdallStateSyncEvents( @@ -376,16 +389,15 @@ func fetchAndWriteHeimdallStateSyncEvents( header *types.Header, lastStateSyncEventID uint64, tx kv.RwTx, - cfg BorHeimdallCfg, + config *borcfg.BorConfig, + blockReader services.FullBlockReader, + heimdallClient heimdall.HeimdallClient, + chainID string, + stateReceiverABI abi.ABI, logPrefix string, logger log.Logger, ) (uint64, int, time.Duration, error) { fetchStart := time.Now() - config := cfg.borConfig - blockReader := cfg.blockReader - heimdallClient := cfg.heimdallClient - chainID := cfg.chainConfig.ChainID.String() - stateReceiverABI := cfg.stateReceiverABI // Find out the latest eventId var ( from uint64 diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index 9db7778f312..5faf692aa90 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -739,6 +739,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc func PolygonSyncStages( ctx context.Context, snapshots SnapshotsCfg, + polygonSyncStageCfg PolygonSyncStageCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg, @@ -766,7 +767,7 @@ func PolygonSyncStages( ID: stages.PolygonSync, Description: "Use polygon sync component to sync headers, bodies and heimdall data", Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, unwinder Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnPolygonSyncStage() + return SpawnPolygonSyncStage(ctx, txc.Tx, s, polygonSyncStageCfg) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { return UnwindPolygonSyncStage() diff --git a/eth/stagedsync/stage_bor_heimdall.go b/eth/stagedsync/stage_bor_heimdall.go index 625af847971..69d4eff0d2a 100644 --- a/eth/stagedsync/stage_bor_heimdall.go +++ b/eth/stagedsync/stage_bor_heimdall.go @@ -32,12 +32,12 @@ import ( "github.com/ledgerwatch/erigon/polygon/bor/finality/whitelist" "github.com/ledgerwatch/erigon/polygon/bor/valset" "github.com/ledgerwatch/erigon/polygon/heimdall" + "github.com/ledgerwatch/erigon/polygon/sync" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" ) const ( - InMemorySignatures = 4096 // Number of recent block signatures to keep in memory inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory snapshotPersistInterval = 1024 // Number of blocks after which to persist the vote snapshot to the database ) @@ -172,7 +172,7 @@ func BorHeimdallForward( return err } - signatures, err := lru.NewARC[libcommon.Hash, libcommon.Address](InMemorySignatures) + signatures, err := lru.NewARC[libcommon.Hash, libcommon.Address](sync.InMemorySignatures) if err != nil { return err } diff --git a/eth/stagedsync/stage_polygon_sync.go b/eth/stagedsync/stage_polygon_sync.go index d606b1a9fd4..5c421a2eb4c 100644 --- a/eth/stagedsync/stage_polygon_sync.go +++ b/eth/stagedsync/stage_polygon_sync.go @@ -1,6 +1,130 @@ package stagedsync -func SpawnPolygonSyncStage() error { +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" + + "github.com/ledgerwatch/erigon-lib/chain" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/metrics" + "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/accounts/abi" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/p2p/sentry" + "github.com/ledgerwatch/erigon/polygon/bor/borcfg" + "github.com/ledgerwatch/erigon/polygon/heimdall" + "github.com/ledgerwatch/erigon/polygon/p2p" + polygonsync "github.com/ledgerwatch/erigon/polygon/sync" + "github.com/ledgerwatch/erigon/turbo/services" +) + +const hashProgressKeySuffix = "_hash" + +func NewPolygonSyncStageCfg( + logger log.Logger, + chainConfig *chain.Config, + db kv.RwDB, + heimdallClient heimdall.HeimdallClient, + sentry direct.SentryClient, + maxPeers int, + statusDataProvider *sentry.StatusDataProvider, + blockReader services.FullBlockReader, + stopNode func() error, + stateReceiverABI abi.ABI, +) PolygonSyncStageCfg { + dataStream := make(chan polygonSyncStageDataItem) + storage := &polygonSyncStageStorage{ + db: db, + blockReader: blockReader, + dataStream: dataStream, + } + executionEngine := &polygonSyncStageExecutionEngine{ + db: db, + blockReader: blockReader, + dataStream: dataStream, + } + p2pService := p2p.NewService(maxPeers, logger, sentry, statusDataProvider.GetStatusData) + headersVerifier := polygonsync.VerifyAccumulatedHeaders + blocksVerifier := polygonsync.VerifyBlocks + heimdallService := heimdall.NewHeimdall(heimdallClient, logger, heimdall.WithStore(storage)) + blockDownloader := polygonsync.NewBlockDownloader( + logger, + p2pService, + heimdallService, + headersVerifier, + blocksVerifier, + storage, + ) + spansCache := polygonsync.NewSpansCache() + events := polygonsync.NewTipEvents(logger, p2pService, heimdallService) + borConfig := chainConfig.Bor.(*borcfg.BorConfig) + sync := polygonsync.NewSync( + storage, + executionEngine, + headersVerifier, + blocksVerifier, + p2pService, + blockDownloader, + polygonsync.NewCanonicalChainBuilderFactory(chainConfig, borConfig, spansCache), + spansCache, + heimdallService.FetchLatestSpan, + events.Events(), + logger, + ) + syncService := &polygonSyncStageService{ + logger: logger, + chainConfig: chainConfig, + blockReader: blockReader, + sync: sync, + events: events, + p2p: p2pService, + heimdallClient: heimdallClient, + stateReceiverABI: stateReceiverABI, + dataStream: dataStream, + stopNode: stopNode, + } + return PolygonSyncStageCfg{ + db: db, + service: syncService, + } +} + +type PolygonSyncStageCfg struct { + db kv.RwDB + service *polygonSyncStageService +} + +func SpawnPolygonSyncStage(ctx context.Context, tx kv.RwTx, stageState *StageState, cfg PolygonSyncStageCfg) error { + useExternalTx := tx != nil + if !useExternalTx { + var err error + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err := cfg.service.Run(ctx, tx, stageState); err != nil { + return err + } + + if useExternalTx { + return nil + } + + if err := tx.Commit(); err != nil { + return err + } + return nil } @@ -11,3 +135,408 @@ func UnwindPolygonSyncStage() error { func PrunePolygonSyncStage() error { return nil } + +type polygonSyncStageDataItem struct { + updateForkChoice *types.Header + insertBlocks []*types.Block + span *heimdall.Span + milestone *heimdall.Milestone + checkpoint *heimdall.Checkpoint +} + +type polygonSyncStageService struct { + logger log.Logger + chainConfig *chain.Config + blockReader services.FullBlockReader + sync *polygonsync.Sync + events *polygonsync.TipEvents + p2p p2p.Service + heimdallClient heimdall.HeimdallClient + stateReceiverABI abi.ABI + dataStream <-chan polygonSyncStageDataItem + stopNode func() error + // internal + appendLogPrefix func(string) string + stageState *StageState + lastStateSyncEventId uint64 + lastStateSyncEventIdInit bool + bgComponentsRun bool + bgComponentsErr chan error +} + +func (s *polygonSyncStageService) Run(ctx context.Context, tx kv.RwTx, stageState *StageState) error { + s.appendLogPrefix = newAppendLogPrefix(stageState.LogPrefix()) + s.stageState = stageState + s.logger.Info(s.appendLogPrefix("begin..."), "progress", stageState.BlockNumber) + + if !s.bgComponentsRun { + s.runBgComponents(ctx) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-s.bgComponentsErr: + s.logger.Error(s.appendLogPrefix("stopping node"), "err", err) + stopErr := s.stopNode() + if stopErr != nil { + return fmt.Errorf("%w: %w", stopErr, err) + } + return err + case data := <-s.dataStream: + var err error + if data.updateForkChoice != nil { + // exit stage upon update fork choice + return s.handleUpdateForkChoice(ctx, tx, data.updateForkChoice) + } else if len(data.insertBlocks) > 0 { + err = s.handleInsertBlocks(tx, stageState, data.insertBlocks) + } else if data.span != nil { + err = s.handleSpan(ctx, tx, data.span) + } else if data.checkpoint != nil { + err = s.handleCheckpoint(ctx, tx, data.checkpoint) + } else if data.milestone != nil { + err = s.handleMilestone(ctx, tx, data.milestone) + } else { + err = errors.New("unrecognized data") + } + if err != nil { + return err + } + } + } +} + +func (s *polygonSyncStageService) runBgComponents(ctx context.Context) { + s.logger.Info(s.appendLogPrefix("running background components")) + s.bgComponentsRun = true + + go func() { + eg := errgroup.Group{} + + eg.Go(func() error { + return s.events.Run(ctx) + }) + + eg.Go(func() error { + s.p2p.Run(ctx) + select { + case <-ctx.Done(): + return nil + default: + return errors.New("p2p service stopped") + } + }) + + eg.Go(func() error { + return s.sync.Run(ctx) + }) + + if err := eg.Wait(); err != nil { + s.bgComponentsErr <- err + } + }() +} + +func (s *polygonSyncStageService) handleInsertBlocks(tx kv.RwTx, stageState *StageState, blocks []*types.Block) error { + for _, block := range blocks { + height := block.NumberU64() + header := block.Header() + body := block.Body() + + metrics.UpdateBlockConsumerHeaderDownloadDelay(header.Time, height-1, s.logger) + metrics.UpdateBlockConsumerBodyDownloadDelay(header.Time, height-1, s.logger) + + parentTd := common.Big0 + if height > 0 { + // Parent's total difficulty + parentTd, err := rawdb.ReadTd(tx, header.ParentHash, height-1) + if err != nil || parentTd == nil { + return fmt.Errorf( + "parent's total difficulty not found with hash %x and height %d: %v", + header.ParentHash, + height-1, + err, + ) + } + } + + td := parentTd.Add(parentTd, header.Difficulty) + if err := rawdb.WriteHeader(tx, header); err != nil { + return fmt.Errorf("InsertHeaders: writeHeader: %s", err) + } + + if err := rawdb.WriteTd(tx, header.Hash(), height, td); err != nil { + return fmt.Errorf("InsertHeaders: writeTd: %s", err) + } + + if _, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), height, body.RawBody()); err != nil { + return fmt.Errorf("InsertBlocks: writeBody: %s", err) + } + } + + if len(blocks) == 0 { + return nil + } + + // update stage progress + tip := blocks[len(blocks)-1] + tipBlockNum := tip.NumberU64() + tipBlockHash := tip.Hash() + + if err := stageState.Update(tx, tipBlockNum); err != nil { + return err + } + + if err := saveStageHashProgress(tx, stageState.ID, tipBlockHash); err != nil { + return err + } + + return nil +} + +func (s *polygonSyncStageService) handleUpdateForkChoice(ctx context.Context, tx kv.RwTx, tip *types.Header) error { + s.logger.Info(s.appendLogPrefix("handle update fork choice"), "block", tip.Number.Uint64()) + + // make sure all state sync events for the given tip are downloaded to mdbx + // NOTE: remove this once we integrate the bridge component in sync.Run + if err := s.downloadStateSyncEvents(ctx, tx, tip); err != nil { + return err + } + + return nil +} + +func (s *polygonSyncStageService) downloadStateSyncEvents(ctx context.Context, tx kv.RwTx, tip *types.Header) (err error) { + if !s.lastStateSyncEventIdInit { + s.lastStateSyncEventId, _, err = s.blockReader.LastEventId(ctx, tx) + } + if err != nil { + return err + } + + borConfig := s.chainConfig.Bor.(*borcfg.BorConfig) + // need to use latest sprint start block num + tipBlockNum := tip.Number.Uint64() + sprintLen := borConfig.CalculateSprintLength(tipBlockNum) + sprintRemainder := tipBlockNum % sprintLen + if tipBlockNum > sprintLen && sprintRemainder > 0 { + tipBlockNum -= sprintRemainder + tip = rawdb.ReadHeaderByNumber(tx, tipBlockNum) + } + + s.logger.Info( + s.appendLogPrefix("downloading state sync events"), + "sprintStartBlockNum", tip.Number.Uint64(), + "lastStateSyncEventId", s.lastStateSyncEventId, + ) + + var records int + var duration time.Duration + s.lastStateSyncEventId, records, duration, err = fetchAndWriteHeimdallStateSyncEvents( + ctx, + tip, + s.lastStateSyncEventId, + tx, + borConfig, + s.blockReader, + s.heimdallClient, + s.chainConfig.ChainID.String(), + s.stateReceiverABI, + s.stageState.LogPrefix(), + s.logger, + ) + if err != nil { + return err + } + + s.logger.Info( + s.appendLogPrefix("finished downloading state sync events"), + "records", records, + "duration", duration, + ) + + return nil +} + +func (s *polygonSyncStageService) handleSpan(ctx context.Context, tx kv.RwTx, sp *heimdall.Span) error { + return heimdall.NewTxStore(s.blockReader, tx).PutSpan(ctx, sp) +} + +func (s *polygonSyncStageService) handleCheckpoint(ctx context.Context, tx kv.RwTx, cp *heimdall.Checkpoint) error { + return heimdall.NewTxStore(s.blockReader, tx).PutCheckpoint(ctx, cp.Id, cp) +} + +func (s *polygonSyncStageService) handleMilestone(ctx context.Context, tx kv.RwTx, ms *heimdall.Milestone) error { + return heimdall.NewTxStore(s.blockReader, tx).PutMilestone(ctx, ms.Id, ms) +} + +type polygonSyncStageStorage struct { + db kv.RoDB + blockReader services.FullBlockReader + dataStream chan<- polygonSyncStageDataItem +} + +func (s *polygonSyncStageStorage) LastSpanId(ctx context.Context) (id heimdall.SpanId, ok bool, err error) { + err = s.db.View(ctx, func(tx kv.Tx) error { + id, ok, err = heimdall.NewTxReadStore(s.blockReader, tx).LastSpanId(ctx) + return err + }) + return +} + +func (s *polygonSyncStageStorage) GetSpan(ctx context.Context, id heimdall.SpanId) (sp *heimdall.Span, err error) { + err = s.db.View(ctx, func(tx kv.Tx) error { + sp, err = heimdall.NewTxReadStore(s.blockReader, tx).GetSpan(ctx, id) + return err + }) + return +} + +func (s *polygonSyncStageStorage) PutSpan(_ context.Context, span *heimdall.Span) error { + s.dataStream <- polygonSyncStageDataItem{ + span: span, + } + + return nil +} + +func (s *polygonSyncStageStorage) LastMilestoneId(ctx context.Context) (id heimdall.MilestoneId, ok bool, err error) { + err = s.db.View(ctx, func(tx kv.Tx) error { + id, ok, err = heimdall.NewTxReadStore(s.blockReader, tx).LastMilestoneId(ctx) + return err + }) + return +} + +func (s *polygonSyncStageStorage) GetMilestone(ctx context.Context, id heimdall.MilestoneId) (ms *heimdall.Milestone, err error) { + err = s.db.View(ctx, func(tx kv.Tx) error { + ms, err = heimdall.NewTxReadStore(s.blockReader, tx).GetMilestone(ctx, id) + return err + }) + return +} + +func (s *polygonSyncStageStorage) PutMilestone(_ context.Context, _ heimdall.MilestoneId, ms *heimdall.Milestone) error { + s.dataStream <- polygonSyncStageDataItem{ + milestone: ms, + } + + return nil +} + +func (s *polygonSyncStageStorage) LastCheckpointId(ctx context.Context) (id heimdall.CheckpointId, ok bool, err error) { + err = s.db.View(ctx, func(tx kv.Tx) error { + id, ok, err = heimdall.NewTxReadStore(s.blockReader, tx).LastCheckpointId(ctx) + return err + }) + return +} + +func (s *polygonSyncStageStorage) GetCheckpoint(ctx context.Context, id heimdall.CheckpointId) (cp *heimdall.Checkpoint, err error) { + err = s.db.View(ctx, func(tx kv.Tx) error { + cp, err = heimdall.NewTxReadStore(s.blockReader, tx).GetCheckpoint(ctx, id) + return err + }) + return +} + +func (s *polygonSyncStageStorage) PutCheckpoint(_ context.Context, _ heimdall.CheckpointId, cp *heimdall.Checkpoint) error { + s.dataStream <- polygonSyncStageDataItem{ + checkpoint: cp, + } + + return nil +} + +func (s *polygonSyncStageStorage) InsertBlocks(_ context.Context, blocks []*types.Block) error { + s.dataStream <- polygonSyncStageDataItem{ + insertBlocks: blocks, + } + + return nil +} + +func (s *polygonSyncStageStorage) Flush(context.Context) error { + return nil +} + +func (s *polygonSyncStageStorage) Run(context.Context) error { + return nil +} + +type polygonSyncStageExecutionEngine struct { + db kv.RoDB + blockReader services.FullBlockReader + dataStream chan<- polygonSyncStageDataItem +} + +func (e *polygonSyncStageExecutionEngine) InsertBlocks(_ context.Context, blocks []*types.Block) error { + e.dataStream <- polygonSyncStageDataItem{ + insertBlocks: blocks, + } + + return nil +} + +func (e *polygonSyncStageExecutionEngine) UpdateForkChoice(_ context.Context, tip *types.Header, _ *types.Header) error { + e.dataStream <- polygonSyncStageDataItem{ + updateForkChoice: tip, + } + + return nil +} + +func (e *polygonSyncStageExecutionEngine) CurrentHeader(ctx context.Context) (*types.Header, error) { + tx, err := e.db.BeginRo(ctx) + if err != nil { + return nil, err + } + + defer tx.Rollback() + + stageBlockNum, err := stages.GetStageProgress(tx, stages.PolygonSync) + if err != nil { + return nil, err + } + + snapshotBlockNum := e.blockReader.FrozenBlocks() + if stageBlockNum < snapshotBlockNum { + return e.blockReader.HeaderByNumber(ctx, tx, snapshotBlockNum) + } + + stageHash, err := readStageHashProgress(tx, stages.PolygonSync) + if err != nil { + return nil, err + } + + header := rawdb.ReadHeader(tx, stageHash, stageBlockNum) + if header == nil { + return nil, errors.New("header not found") + } + + return header, nil +} + +func saveStageHashProgress(db kv.Putter, stageId stages.SyncStage, progress common.Hash) error { + return db.Put(kv.SyncStageProgress, hashProgressKey(stageId), progress[:]) +} + +func readStageHashProgress(db kv.Getter, stageId stages.SyncStage) (common.Hash, error) { + hashBytes, err := db.GetOne(kv.SyncStageProgress, hashProgressKey(stageId)) + if err != nil { + return common.Hash{}, err + } + + return common.BytesToHash(hashBytes), nil +} + +func hashProgressKey(stageId stages.SyncStage) []byte { + return []byte(stageId + hashProgressKeySuffix) +} + +func newAppendLogPrefix(logPrefix string) func(msg string) string { + return func(msg string) string { + return fmt.Sprintf("[%s] %s", logPrefix, msg) + } +} diff --git a/polygon/bor/bor.go b/polygon/bor/bor.go index 3464bfb3876..7c6ef7a4e8c 100644 --- a/polygon/bor/bor.go +++ b/polygon/bor/bor.go @@ -807,11 +807,7 @@ func (c *Bor) snapshot(chain consensus.ChainHeaderReader, number uint64, hash li // VerifyUncles implements consensus.Engine, always returning an error for any // uncles as this consensus mechanism doesn't permit uncles. func (c *Bor) VerifyUncles(_ consensus.ChainReader, _ *types.Header, uncles []*types.Header) error { - if len(uncles) > 0 { - return errUncleDetected - } - - return nil + return VerifyUncles(uncles) } // VerifySeal implements consensus.Engine, checking whether the signature contained @@ -1669,3 +1665,11 @@ func GetValidatorBytes(h *types.Header, config *borcfg.BorConfig) []byte { return blockExtraData.ValidatorBytes } + +func VerifyUncles(uncles []*types.Header) error { + if len(uncles) > 0 { + return errUncleDetected + } + + return nil +} diff --git a/polygon/heimdall/checkpoint.go b/polygon/heimdall/checkpoint.go index 35feb699219..ea968471764 100644 --- a/polygon/heimdall/checkpoint.go +++ b/polygon/heimdall/checkpoint.go @@ -74,7 +74,7 @@ func (c *Checkpoint) MarshalJSON() ([]byte, error) { Proposer libcommon.Address `json:"proposer"` StartBlock *big.Int `json:"start_block"` EndBlock *big.Int `json:"end_block"` - RootHash libcommon.Hash `json:"hash"` + RootHash libcommon.Hash `json:"root_hash"` ChainID string `json:"bor_chain_id"` Timestamp uint64 `json:"timestamp"` }{ @@ -91,7 +91,7 @@ func (c *Checkpoint) MarshalJSON() ([]byte, error) { func (c *Checkpoint) UnmarshalJSON(b []byte) error { dto := struct { WaypointFields - RootHash libcommon.Hash `json:"hash"` + RootHash libcommon.Hash `json:"root_hash"` Id CheckpointId `json:"id"` }{} diff --git a/polygon/sync/blocks_verifier.go b/polygon/sync/blocks_verifier.go index 69aed9fdfa0..5648445500c 100644 --- a/polygon/sync/blocks_verifier.go +++ b/polygon/sync/blocks_verifier.go @@ -1,6 +1,9 @@ package sync -import "github.com/ledgerwatch/erigon/core/types" +import ( + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/polygon/bor" +) type BlocksVerifier func(blocks []*types.Block) error @@ -13,6 +16,10 @@ func VerifyBlocks(blocks []*types.Block) error { if err := block.HashCheck(); err != nil { return err } + + if err := bor.VerifyUncles(block.Uncles()); err != nil { + return err + } } return nil diff --git a/polygon/sync/canonical_chain_builder_factory.go b/polygon/sync/canonical_chain_builder_factory.go new file mode 100644 index 00000000000..eab4a0b9fea --- /dev/null +++ b/polygon/sync/canonical_chain_builder_factory.go @@ -0,0 +1,45 @@ +package sync + +import ( + lru "github.com/hashicorp/golang-lru/arc/v2" + + "github.com/ledgerwatch/erigon-lib/chain" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/polygon/bor/borcfg" + "github.com/ledgerwatch/erigon/polygon/heimdall" +) + +const InMemorySignatures = 4096 // Number of recent block signatures to keep in memory + +type CanonicalChainBuilderFactory func(root *types.Header, span *heimdall.Span) CanonicalChainBuilder + +func NewCanonicalChainBuilderFactory( + chainConfig *chain.Config, + borConfig *borcfg.BorConfig, + spansCache *SpansCache, +) CanonicalChainBuilderFactory { + signaturesCache, err := lru.NewARC[common.Hash, common.Address](InMemorySignatures) + if err != nil { + panic(err) + } + + difficultyCalculator := NewDifficultyCalculator(borConfig, spansCache, nil, signaturesCache) + headerTimeValidator := NewHeaderTimeValidator(borConfig, spansCache, nil, signaturesCache) + headerValidator := NewHeaderValidator(chainConfig, borConfig, headerTimeValidator) + + return func(root *types.Header, span *heimdall.Span) CanonicalChainBuilder { + if span == nil { + panic("sync.Service: ccBuilderFactory - span is nil") + } + if spansCache.IsEmpty() { + panic("sync.Service: ccBuilderFactory - spansCache is empty") + } + return NewCanonicalChainBuilder( + root, + difficultyCalculator, + headerValidator, + spansCache, + ) + } +} diff --git a/polygon/sync/difficulty.go b/polygon/sync/difficulty.go index fee15ff5179..c9a2c4e829b 100644 --- a/polygon/sync/difficulty.go +++ b/polygon/sync/difficulty.go @@ -5,8 +5,6 @@ import ( lru "github.com/hashicorp/golang-lru/arc/v2" - "github.com/ledgerwatch/erigon/eth/stagedsync" - libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/polygon/bor" @@ -33,7 +31,7 @@ func NewDifficultyCalculator( ) DifficultyCalculator { if signaturesCache == nil { var err error - signaturesCache, err = lru.NewARC[libcommon.Hash, libcommon.Address](stagedsync.InMemorySignatures) + signaturesCache, err = lru.NewARC[libcommon.Hash, libcommon.Address](InMemorySignatures) if err != nil { panic(err) } diff --git a/polygon/sync/header_time_validator.go b/polygon/sync/header_time_validator.go index e504cd3b2f4..a7a7b6dcfc9 100644 --- a/polygon/sync/header_time_validator.go +++ b/polygon/sync/header_time_validator.go @@ -8,7 +8,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/polygon/bor" "github.com/ledgerwatch/erigon/polygon/bor/borcfg" "github.com/ledgerwatch/erigon/polygon/bor/valset" @@ -33,7 +32,7 @@ func NewHeaderTimeValidator( ) HeaderTimeValidator { if signaturesCache == nil { var err error - signaturesCache, err = lru.NewARC[libcommon.Hash, libcommon.Address](stagedsync.InMemorySignatures) + signaturesCache, err = lru.NewARC[libcommon.Hash, libcommon.Address](InMemorySignatures) if err != nil { panic(err) } diff --git a/polygon/sync/service.go b/polygon/sync/service.go index 0fa767c69c1..5b6e9daf28b 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -3,16 +3,12 @@ package sync import ( "context" - lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/errgroup" "github.com/ledgerwatch/erigon-lib/chain" - "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/gointerfaces/executionproto" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/p2p/sentry" "github.com/ledgerwatch/erigon/polygon/bor/borcfg" "github.com/ledgerwatch/erigon/polygon/heimdall" @@ -67,26 +63,7 @@ func NewService( store, ) spansCache := NewSpansCache() - signaturesCache, err := lru.NewARC[common.Hash, common.Address](stagedsync.InMemorySignatures) - if err != nil { - panic(err) - } - difficultyCalculator := NewDifficultyCalculator(borConfig, spansCache, nil, signaturesCache) - headerTimeValidator := NewHeaderTimeValidator(borConfig, spansCache, nil, signaturesCache) - headerValidator := NewHeaderValidator(chainConfig, borConfig, headerTimeValidator) - ccBuilderFactory := func(root *types.Header, span *heimdall.Span) CanonicalChainBuilder { - if span == nil { - panic("sync.Service: ccBuilderFactory - span is nil") - } - if spansCache.IsEmpty() { - panic("sync.Service: ccBuilderFactory - spansCache is empty") - } - return NewCanonicalChainBuilder( - root, - difficultyCalculator, - headerValidator, - spansCache) - } + ccBuilderFactory := NewCanonicalChainBuilderFactory(chainConfig, borConfig, spansCache) events := NewTipEvents(logger, p2pService, heimdallService) sync := NewSync( store, diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index 589bac5a34c..d96947eff81 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -32,7 +32,7 @@ func NewSync( blocksVerifier BlocksVerifier, p2pService p2p.Service, blockDownloader BlockDownloader, - ccBuilderFactory func(root *types.Header, span *heimdall.Span) CanonicalChainBuilder, + ccBuilderFactory CanonicalChainBuilderFactory, spansCache *SpansCache, fetchLatestSpan func(ctx context.Context) (*heimdall.Span, error), events <-chan Event, @@ -260,8 +260,8 @@ func (s *Sync) Run(ctx context.Context) error { if err != nil { return err } - s.spansCache.Add(latestSpan) + s.spansCache.Add(latestSpan) ccBuilder := s.ccBuilderFactory(tip, latestSpan) for { diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 162bae95cfd..d260f2db249 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -158,6 +158,7 @@ var DefaultFlags = []cli.Flag{ &utils.WithHeimdallMilestones, &utils.WithHeimdallWaypoints, &utils.PolygonSyncFlag, + &utils.PolygonSyncStageFlag, &utils.EthStatsURLFlag, &utils.OverridePragueFlag, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index b4295faf6cd..019fabc5181 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -14,6 +14,7 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/datadir" "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/direct" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloaderproto" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/membatchwithdb" @@ -30,6 +31,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/p2p" + "github.com/ledgerwatch/erigon/p2p/sentry" "github.com/ledgerwatch/erigon/p2p/sentry/sentry_multi_client" "github.com/ledgerwatch/erigon/polygon/bor" "github.com/ledgerwatch/erigon/polygon/bor/finality" @@ -784,6 +786,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config func NewPolygonSyncStages( ctx context.Context, + logger log.Logger, db kv.RwDB, config *ethconfig.Config, chainConfig *chain.Config, @@ -796,6 +799,10 @@ func NewPolygonSyncStages( silkworm *silkworm.Silkworm, forkValidator *engine_helpers.ForkValidator, heimdallClient heimdall.HeimdallClient, + sentry direct.SentryClient, + maxPeers int, + statusDataProvider *sentry.StatusDataProvider, + stopNode func() error, ) []*stagedsync.Stage { loopBreakCheck := NewLoopBreakCheck(config, heimdallClient) return stagedsync.PolygonSyncStages( @@ -815,6 +822,18 @@ func NewPolygonSyncStages( silkworm, config.Prune, ), + stagedsync.NewPolygonSyncStageCfg( + logger, + chainConfig, + db, + heimdallClient, + sentry, + maxPeers, + statusDataProvider, + blockReader, + stopNode, + bor.GenesisContractStateReceiverABI(), + ), stagedsync.StageBlockHashesCfg( db, config.Dirs.Tmp,