From e44dd934c00ba11b7841d250c5931c128ed4d45e Mon Sep 17 00:00:00 2001 From: weiihann Date: Fri, 13 Dec 2024 10:05:56 +0800 Subject: [PATCH] Squashed commit of the following: commit 4ff174d8cfdfbacfc119430ae56d289fa305f3d6 Author: IronGauntlets Date: Fri Apr 26 22:10:44 2024 +0100 Move pending to sync package In order to moved handling of pending to synchroniser the following changes needed to be made: - Add database to synchroniser, so that pending state can be served - Blockchain and Events Filter have a pendingBlockFn() which returns the pending block. Due to import cycle pending struct could not be referenced, therefore, the anonymous function is passed. - Add PendingBlock() to return just the pending block, this was mainly added to support the pendingBlockFn(). - In rpc package the pending block and state is retrieved through synchroniser. Therefore, receipt and transaction handler now check the pending block for the requested transaction/receipt. commit fb75cd65e05f1ebcecdd525e540fd58153cf6a11 Author: IronGauntlets Date: Fri Apr 26 15:18:33 2024 +0100 Rename cachedPending to pending commit 3ffc458fe597853734fc6a3c81c13b6bad36ee48 Author: IronGauntlets Date: Fri Apr 26 01:27:53 2024 +0100 Check pending block protocol version before storing commit 317ca386122a04622571034a582f92e22b8619db Author: IronGauntlets Date: Fri Apr 26 00:52:59 2024 +0100 Refactor blockchain.Pending to return a reference commit 03f4bfabc20512e63cba2c4c7b9f8f2c78fe5ea4 Author: IronGauntlets Date: Mon Apr 22 21:16:22 2024 +0100 Remove pending and empty pending from DB Pending Block is now only managed in memory this is to make sure that pending block in the DB and in memory do not become out of sync. Before the pending block was managed in memory as a cache, however, since there is only one pending block at a given time it doesn't make sense to keep track of pending block in both memory and DB. To reduce the number of block not found errors while simulating transactions it was decided to store empty pending block, using the latest header to fill in fields such as block number, parent block hash, etc. This meant that any time we didn't have a pending block this empty pending block would be served along with empty state diff and classes. Every time a new block was added to the blockchain a new empty pending block was also added to the DB. The unforeseen side effect of this change was when the --poll-pending-interval flag was disabled the rpc would still serve a pending block. This is incorrect behaviour. As the blocks changed per new versions of starknet the empty block also needed to be changed and a storage diff with a special contract "0x1" needed to be updated in the state diff. This overhead is unnecessary and incorrectly informs the user that there is a pending block when there isn't one. --- cmd/juno/dbcmd.go | 2 +- rpc/subscriptions_test.go | 8 +++- sync/sync.go | 91 +++++++++++++++++++++++++++++++++++++++ sync/sync_test.go | 30 +++++++++++-- 4 files changed, 124 insertions(+), 7 deletions(-) diff --git a/cmd/juno/dbcmd.go b/cmd/juno/dbcmd.go index 6f6afbbdda..d9892b6be1 100644 --- a/cmd/juno/dbcmd.go +++ b/cmd/juno/dbcmd.go @@ -85,7 +85,7 @@ func dbInfo(cmd *cobra.Command, args []string) error { } defer database.Close() - chain := blockchain.New(database, nil) + chain := blockchain.New(database, nil, nil) var info DBInfo // Get the latest block information diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index e524302924..a58b68bd72 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -364,6 +364,10 @@ func (fs *fakeSyncer) HighestBlockHeader() *core.Header { return nil } +func (fs *fakeSyncer) Pending() (*sync.Pending, error) { return nil, nil } +func (fs *fakeSyncer) PendingBlock() *core.Block { return nil } +func (fs *fakeSyncer) PendingState() (core.StateReader, func() error, error) { return nil, nil, nil } + func TestSubscribeNewHeads(t *testing.T) { log := utils.NewNopZapLogger() @@ -471,10 +475,10 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { require.NoError(t, err) testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Mainnet) + chain := blockchain.New(testDB, &utils.Mainnet, nil) assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil)) - chain = blockchain.New(testDB, &utils.Mainnet) + chain = blockchain.New(testDB, &utils.Mainnet, nil) syncer := newFakeSyncer() ctx, cancel := context.WithCancel(context.Background()) diff --git a/sync/sync.go b/sync/sync.go index c45e1f55bb..ae9617429d 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -55,6 +55,10 @@ type Reader interface { SubscribeNewHeads() HeaderSubscription SubscribeReorg() ReorgSubscription SubscribePendingTxs() PendingTxSubscription + + Pending() (*Pending, error) + PendingBlock() *core.Block + PendingState() (core.StateReader, func() error, error) } // This is temporary and will be removed once the p2p synchronizer implements this interface. @@ -92,6 +96,18 @@ type ReorgData struct { EndBlockNum uint64 `json:"ending_block_number"` } +func (n *NoopSynchronizer) PendingBlock() *core.Block { + return nil +} + +func (n *NoopSynchronizer) Pending() (*Pending, error) { + return nil, errors.New("Pending() is not implemented") +} + +func (n *NoopSynchronizer) PendingState() (core.StateReader, func() error, error) { + return nil, nil, errors.New("PendingState() not implemented") +} + // Synchronizer manages a list of StarknetData to fetch the latest blockchain updates type Synchronizer struct { blockchain *blockchain.Blockchain @@ -571,3 +587,78 @@ func (s *Synchronizer) SubscribePendingTxs() PendingTxSubscription { Subscription: s.pendingTxsFeed.Subscribe(), } } + +// StorePending stores a pending block given that it is for the next height +func (s *Synchronizer) StorePending(p *Pending) error { + err := blockchain.CheckBlockVersion(p.Block.ProtocolVersion) + if err != nil { + return err + } + + expectedParentHash := new(felt.Felt) + h, err := s.blockchain.HeadsHeader() + if err != nil && !errors.Is(err, db.ErrKeyNotFound) { + return err + } else if err == nil { + expectedParentHash = h.Hash + } + + if !expectedParentHash.Equal(p.Block.ParentHash) { + return fmt.Errorf("store pending: %w", blockchain.ErrParentDoesNotMatchHead) + } + + if existingPending, err := s.Pending(); err == nil { + if existingPending.Block.TransactionCount >= p.Block.TransactionCount { + // ignore the incoming pending if it has fewer transactions than the one we already have + return nil + } + } else if !errors.Is(err, ErrPendingBlockNotFound) { + return err + } + s.pending.Store(p) + + return nil +} + +func (s *Synchronizer) Pending() (*Pending, error) { + p := s.pending.Load() + if p == nil { + return nil, ErrPendingBlockNotFound + } + + expectedParentHash := &felt.Zero + if head, err := s.blockchain.HeadsHeader(); err == nil { + expectedParentHash = head.Hash + } + if p.Block.ParentHash.Equal(expectedParentHash) { + return p, nil + } + + // Since the pending block in the cache is outdated remove it + s.pending.Store(nil) + + return nil, ErrPendingBlockNotFound +} + +func (s *Synchronizer) PendingBlock() *core.Block { + pending, err := s.Pending() + if err != nil { + return nil + } + return pending.Block +} + +// PendingState returns the state resulting from execution of the pending block +func (s *Synchronizer) PendingState() (core.StateReader, func() error, error) { + txn, err := s.db.NewTransaction(false) + if err != nil { + return nil, nil, err + } + + pending, err := s.Pending() + if err != nil { + return nil, nil, utils.RunAndWrapOnError(txn.Discard, err) + } + + return NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, core.NewState(txn)), txn.Discard, nil +} diff --git a/sync/sync_test.go b/sync/sync_test.go index 3839154322..38ec896c4a 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -164,7 +164,7 @@ func TestReorg(t *testing.T) { integStart, err := bc.BlockHeaderByNumber(0) require.NoError(t, err) - synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false) + synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false, testDB) sub := synchronizer.SubscribeReorg() ctx, cancel = context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -185,6 +185,28 @@ func TestReorg(t *testing.T) { }) } +func TestPending(t *testing.T) { + t.Parallel() + + client := feeder.NewTestClient(t, &utils.Mainnet) + gw := adaptfeeder.New(client) + + testDB := pebble.NewMemTest(t) + log := utils.NewNopZapLogger() + bc := blockchain.New(testDB, &utils.Mainnet, nil) + synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false, testDB) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + + require.NoError(t, synchronizer.Run(ctx)) + cancel() + + head, err := bc.HeadsHeader() + require.NoError(t, err) + pending, err := synchronizer.Pending() + require.NoError(t, err) + assert.Equal(t, head.Hash, pending.Block.ParentHash) +} + func TestSubscribeNewHeads(t *testing.T) { t.Parallel() testDB := pebble.NewMemTest(t) @@ -217,8 +239,8 @@ func TestSubscribePendingTxs(t *testing.T) { testDB := pebble.NewMemTest(t) log := utils.NewNopZapLogger() - bc := blockchain.New(testDB, &utils.Mainnet) - synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false) + bc := blockchain.New(testDB, &utils.Mainnet, nil) + synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false, testDB) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) sub := synchronizer.SubscribePendingTxs() @@ -226,7 +248,7 @@ func TestSubscribePendingTxs(t *testing.T) { require.NoError(t, synchronizer.Run(ctx)) cancel() - pending, err := bc.Pending() + pending, err := synchronizer.Pending() require.NoError(t, err) pendingTxs, ok := <-sub.Recv() require.True(t, ok)