Skip to content

Commit

Permalink
Move pending to sync package
Browse files Browse the repository at this point in the history
In order to moved handling of pending to synchroniser the following
changes needed to be made:
- Add database to synchroniser, so that pending state can be served
- Blockchain and Events Filter have a pendingBlockFn() which returns the
  pending block. Due to import cycle pending struct could not be
  referenced, therefore, the anonymous function is passed.
- Add PendingBlock() to return just the pending block, this was mainly
  added to support the pendingBlockFn().
- In rpc package the pending block and state is retrieved through
  synchroniser. Therefore, receipt and transaction handler now check the
  pending block for the requested transaction/receipt.
  • Loading branch information
IronGauntlets committed Dec 12, 2024
1 parent b92faef commit a562e91
Show file tree
Hide file tree
Showing 27 changed files with 503 additions and 469 deletions.
154 changes: 12 additions & 142 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync/atomic"

"github.com/Masterminds/semver/v3"
"github.com/NethermindEth/juno/core"
Expand Down Expand Up @@ -39,14 +38,11 @@ type Reader interface {
HeadState() (core.StateReader, StateCloser, error)
StateAtBlockHash(blockHash *felt.Felt) (core.StateReader, StateCloser, error)
StateAtBlockNumber(blockNumber uint64) (core.StateReader, StateCloser, error)
PendingState() (core.StateReader, StateCloser, error)

BlockCommitmentsByNumber(blockNumber uint64) (*core.BlockCommitments, error)

EventFilter(from *felt.Felt, keys [][]felt.Felt) (EventFilterer, error)

Pending() (*Pending, error)

Network() *utils.Network
}

Expand All @@ -56,7 +52,7 @@ var (
SupportedStarknetVersion = semver.MustParse("0.13.3")
)

func checkBlockVersion(protocolVersion string) error {
func CheckBlockVersion(protocolVersion string) error {
blockVer, err := core.ParseBlockVersion(protocolVersion)
if err != nil {
return err
Expand All @@ -81,22 +77,21 @@ func copyWithoutPatch(v *semver.Version) *semver.Version {

var _ Reader = (*Blockchain)(nil)

// Todo: Remove after pending is moved to sychcroniser
var pending atomic.Pointer[Pending]

// Blockchain is responsible for keeping track of all things related to the Starknet blockchain
type Blockchain struct {
network *utils.Network
database db.DB
listener EventListener
network *utils.Network
database db.DB
listener EventListener
pendingBlockFn func() *core.Block
}

func New(database db.DB, network *utils.Network) *Blockchain {
func New(database db.DB, network *utils.Network, pendingBlockFn func() *core.Block) *Blockchain {
RegisterCoreTypesToEncoder()
return &Blockchain{
database: database,
network: network,
listener: &SelectiveListener{},
database: database,
network: network,
listener: &SelectiveListener{},
pendingBlockFn: pendingBlockFn,
}
}

Expand Down Expand Up @@ -266,24 +261,6 @@ func (b *Blockchain) TransactionByHash(hash *felt.Felt) (core.Transaction, error
return transaction, b.database.View(func(txn db.Transaction) error {
var err error
transaction, err = transactionByHash(txn, hash)

// not found in the canonical blocks, try pending
if errors.Is(err, db.ErrKeyNotFound) {
var pending *Pending
pending, err = pendingBlock(txn)
if err != nil {
return err
}

for _, t := range pending.Block.Transactions {
if hash.Equal(t.Hash()) {
transaction = t
return nil
}
}
return db.ErrKeyNotFound
}

return err
})
}
Expand All @@ -299,29 +276,6 @@ func (b *Blockchain) Receipt(hash *felt.Felt) (*core.TransactionReceipt, *felt.F
return receipt, blockHash, blockNumber, b.database.View(func(txn db.Transaction) error {
var err error
receipt, blockHash, blockNumber, err = receiptByHash(txn, hash)

// not found in the canonical blocks, try pending
if errors.Is(err, db.ErrKeyNotFound) {
var pending *Pending
pending, err = pendingBlock(txn)
if err != nil {
if !errors.Is(err, ErrPendingBlockNotFound) {
return err
}
return db.ErrKeyNotFound
}

for i, t := range pending.Block.Transactions {
if hash.Equal(t.Hash()) {
receipt = pending.Block.Receipts[i]
blockHash = nil
blockNumber = 0
return nil
}
}
return db.ErrKeyNotFound
}

return err
})
}
Expand Down Expand Up @@ -407,7 +361,7 @@ func (b *Blockchain) VerifyBlock(block *core.Block) error {
}

func verifyBlock(txn db.Transaction, block *core.Block) error {
if err := checkBlockVersion(block.ProtocolVersion); err != nil {
if err := CheckBlockVersion(block.ProtocolVersion); err != nil {
return err
}

Expand Down Expand Up @@ -865,7 +819,7 @@ func (b *Blockchain) EventFilter(from *felt.Felt, keys [][]felt.Felt) (EventFilt
return nil, err
}

return newEventFilter(txn, from, keys, 0, latest, &pending), nil
return newEventFilter(txn, from, keys, 0, latest, b.pendingBlockFn), nil
}

// RevertHead reverts the head block
Expand Down Expand Up @@ -975,87 +929,3 @@ func removeTxsAndReceipts(txn db.Transaction, blockNumber, numTxs uint64) error

return nil
}

// StorePending stores a pending block given that it is for the next height
func (b *Blockchain) StorePending(p *Pending) error {
err := checkBlockVersion(p.Block.ProtocolVersion)
if err != nil {
return err
}
return b.database.View(func(txn db.Transaction) error {
expectedParentHash := new(felt.Felt)
h, err := headsHeader(txn)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return err
} else if err == nil {
expectedParentHash = h.Hash
}

if !expectedParentHash.Equal(p.Block.ParentHash) {
return ErrParentDoesNotMatchHead
}

if existingPending, err := pendingBlock(txn); err == nil {
if existingPending.Block.TransactionCount >= p.Block.TransactionCount {
// ignore the incoming pending if it has fewer transactions than the one we already have
return nil
}
} else if !errors.Is(err, ErrPendingBlockNotFound) {
return err
}

if h != nil {
p.Block.Number = h.Number + 1
}
pending.Store(p)

return nil
})
}

func pendingBlock(txn db.Transaction) (*Pending, error) {
p := pending.Load()
if p == nil {
return nil, ErrPendingBlockNotFound
}

expectedParentHash := &felt.Zero
if head, err := headsHeader(txn); err == nil {
expectedParentHash = head.Hash
}
if p.Block.ParentHash.Equal(expectedParentHash) {
return p, nil
}

// Since the pending block in the cache is outdated remove it
pending.Store(nil)

return nil, ErrPendingBlockNotFound
}

// Pending returns the pending block from the database
func (b *Blockchain) Pending() (*Pending, error) {
b.listener.OnRead("Pending")
var pending *Pending
return pending, b.database.View(func(txn db.Transaction) error {
var err error
pending, err = pendingBlock(txn)
return err
})
}

// PendingState returns the state resulting from execution of the pending block
func (b *Blockchain) PendingState() (core.StateReader, StateCloser, error) {
b.listener.OnRead("PendingState")
txn, err := b.database.NewTransaction(false)
if err != nil {
return nil, nil, err
}

pending, err := pendingBlock(txn)
if err != nil {
return nil, nil, utils.RunAndWrapOnError(txn.Discard, err)
}

return NewPendingState(pending.StateUpdate.StateDiff, pending.NewClasses, core.NewState(txn)), txn.Discard, nil
}
Loading

0 comments on commit a562e91

Please sign in to comment.