Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 4ff174d
Author: IronGauntlets <aneequesafdar@gmail.com>
Date:   Fri Apr 26 22:10:44 2024 +0100

    Move pending to sync package

    In order to moved handling of pending to synchroniser the following
    changes needed to be made:
    - Add database to synchroniser, so that pending state can be served
    - Blockchain and Events Filter have a pendingBlockFn() which returns the
      pending block. Due to import cycle pending struct could not be
      referenced, therefore, the anonymous function is passed.
    - Add PendingBlock() to return just the pending block, this was mainly
      added to support the pendingBlockFn().
    - In rpc package the pending block and state is retrieved through
      synchroniser. Therefore, receipt and transaction handler now check the
      pending block for the requested transaction/receipt.

commit fb75cd6
Author: IronGauntlets <aneequesafdar@gmail.com>
Date:   Fri Apr 26 15:18:33 2024 +0100

    Rename cachedPending to pending

commit 3ffc458
Author: IronGauntlets <aneequesafdar@gmail.com>
Date:   Fri Apr 26 01:27:53 2024 +0100

    Check pending block protocol version before storing

commit 317ca38
Author: IronGauntlets <aneequesafdar@gmail.com>
Date:   Fri Apr 26 00:52:59 2024 +0100

    Refactor blockchain.Pending to return a reference

commit 03f4bfa
Author: IronGauntlets <aneequesafdar@gmail.com>
Date:   Mon Apr 22 21:16:22 2024 +0100

    Remove pending and empty pending from DB

    Pending Block is now only managed in memory this is to make sure that
    pending block in the DB and in memory do not become out of sync. Before
    the pending block was managed in memory as a cache, however, since there
    is only one pending block at a given time it doesn't make sense to keep
    track of pending block in both memory and DB.

    To reduce the number of block not found errors while simulating
    transactions it was decided to store empty pending block, using the
    latest header to fill in fields such as block number, parent block hash,
    etc. This meant that any time we didn't have a pending block this
    empty pending block would be served along with empty state diff and
    classes. Every time a new block was added to the blockchain a new empty
    pending block was also added to the DB.

    The unforeseen side effect of this change was when the
    --poll-pending-interval flag was disabled the rpc would still serve a
    pending block. This is incorrect behaviour.

    As the blocks changed per new versions of starknet the empty block also
    needed to be changed and a storage diff with a special contract "0x1"
    needed to be updated in the state diff. This overhead is unnecessary and
    incorrectly informs the user that there is a pending block when
    there isn't one.
  • Loading branch information
weiihann committed Dec 13, 2024
1 parent 105255a commit e44dd93
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/juno/dbcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func dbInfo(cmd *cobra.Command, args []string) error {
}
defer database.Close()

chain := blockchain.New(database, nil)
chain := blockchain.New(database, nil, nil)
var info DBInfo

// Get the latest block information
Expand Down
8 changes: 6 additions & 2 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ func (fs *fakeSyncer) HighestBlockHeader() *core.Header {
return nil
}

func (fs *fakeSyncer) Pending() (*sync.Pending, error) { return nil, nil }
func (fs *fakeSyncer) PendingBlock() *core.Block { return nil }
func (fs *fakeSyncer) PendingState() (core.StateReader, func() error, error) { return nil, nil, nil }

func TestSubscribeNewHeads(t *testing.T) {
log := utils.NewNopZapLogger()

Expand Down Expand Up @@ -471,10 +475,10 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
require.NoError(t, err)

testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, &utils.Mainnet)
chain := blockchain.New(testDB, &utils.Mainnet, nil)
assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))

chain = blockchain.New(testDB, &utils.Mainnet)
chain = blockchain.New(testDB, &utils.Mainnet, nil)
syncer := newFakeSyncer()

ctx, cancel := context.WithCancel(context.Background())
Expand Down
91 changes: 91 additions & 0 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Reader interface {
SubscribeNewHeads() HeaderSubscription
SubscribeReorg() ReorgSubscription
SubscribePendingTxs() PendingTxSubscription

Pending() (*Pending, error)
PendingBlock() *core.Block
PendingState() (core.StateReader, func() error, error)
}

// This is temporary and will be removed once the p2p synchronizer implements this interface.
Expand Down Expand Up @@ -92,6 +96,18 @@ type ReorgData struct {
EndBlockNum uint64 `json:"ending_block_number"`
}

func (n *NoopSynchronizer) PendingBlock() *core.Block {
return nil
}

func (n *NoopSynchronizer) Pending() (*Pending, error) {
return nil, errors.New("Pending() is not implemented")
}

func (n *NoopSynchronizer) PendingState() (core.StateReader, func() error, error) {
return nil, nil, errors.New("PendingState() not implemented")
}

// Synchronizer manages a list of StarknetData to fetch the latest blockchain updates
type Synchronizer struct {
blockchain *blockchain.Blockchain
Expand Down Expand Up @@ -571,3 +587,78 @@ func (s *Synchronizer) SubscribePendingTxs() PendingTxSubscription {
Subscription: s.pendingTxsFeed.Subscribe(),
}
}

// StorePending stores a pending block given that it is for the next height
func (s *Synchronizer) StorePending(p *Pending) error {
err := blockchain.CheckBlockVersion(p.Block.ProtocolVersion)
if err != nil {
return err
}

expectedParentHash := new(felt.Felt)
h, err := s.blockchain.HeadsHeader()
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return err
} else if err == nil {
expectedParentHash = h.Hash
}

if !expectedParentHash.Equal(p.Block.ParentHash) {
return fmt.Errorf("store pending: %w", blockchain.ErrParentDoesNotMatchHead)
}

if existingPending, err := s.Pending(); err == nil {
if existingPending.Block.TransactionCount >= p.Block.TransactionCount {
// ignore the incoming pending if it has fewer transactions than the one we already have
return nil
}
} else if !errors.Is(err, ErrPendingBlockNotFound) {
return err
}
s.pending.Store(p)

return nil
}

func (s *Synchronizer) Pending() (*Pending, error) {
p := s.pending.Load()
if p == nil {
return nil, ErrPendingBlockNotFound
}

expectedParentHash := &felt.Zero
if head, err := s.blockchain.HeadsHeader(); err == nil {
expectedParentHash = head.Hash
}
if p.Block.ParentHash.Equal(expectedParentHash) {
return p, nil
}

// Since the pending block in the cache is outdated remove it
s.pending.Store(nil)

return nil, ErrPendingBlockNotFound
}

func (s *Synchronizer) PendingBlock() *core.Block {
pending, err := s.Pending()
if err != nil {
return nil
}
return pending.Block
}

// PendingState returns the state resulting from execution of the pending block
func (s *Synchronizer) PendingState() (core.StateReader, func() error, error) {
txn, err := s.db.NewTransaction(false)
if err != nil {
return nil, nil, err
}

pending, err := s.Pending()
if err != nil {
return nil, nil, utils.RunAndWrapOnError(txn.Discard, err)
}

return NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, core.NewState(txn)), txn.Discard, nil
}
30 changes: 26 additions & 4 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestReorg(t *testing.T) {
integStart, err := bc.BlockHeaderByNumber(0)
require.NoError(t, err)

synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false)
synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), 0, false, testDB)
sub := synchronizer.SubscribeReorg()
ctx, cancel = context.WithTimeout(context.Background(), timeout)
require.NoError(t, synchronizer.Run(ctx))
Expand All @@ -185,6 +185,28 @@ func TestReorg(t *testing.T) {
})
}

func TestPending(t *testing.T) {
t.Parallel()

client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)

testDB := pebble.NewMemTest(t)
log := utils.NewNopZapLogger()
bc := blockchain.New(testDB, &utils.Mainnet, nil)
synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false, testDB)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

require.NoError(t, synchronizer.Run(ctx))
cancel()

head, err := bc.HeadsHeader()
require.NoError(t, err)
pending, err := synchronizer.Pending()
require.NoError(t, err)
assert.Equal(t, head.Hash, pending.Block.ParentHash)
}

func TestSubscribeNewHeads(t *testing.T) {
t.Parallel()
testDB := pebble.NewMemTest(t)
Expand Down Expand Up @@ -217,16 +239,16 @@ func TestSubscribePendingTxs(t *testing.T) {

testDB := pebble.NewMemTest(t)
log := utils.NewNopZapLogger()
bc := blockchain.New(testDB, &utils.Mainnet)
synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false)
bc := blockchain.New(testDB, &utils.Mainnet, nil)
synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false, testDB)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)

sub := synchronizer.SubscribePendingTxs()

require.NoError(t, synchronizer.Run(ctx))
cancel()

pending, err := bc.Pending()
pending, err := synchronizer.Pending()
require.NoError(t, err)
pendingTxs, ok := <-sub.Recv()
require.True(t, ok)
Expand Down

0 comments on commit e44dd93

Please sign in to comment.