Skip to content

Commit

Permalink
core/txpool, eth/catalyst: fix racy simulator due to txpool backgroun…
Browse files Browse the repository at this point in the history
…d reset (#28837)

This PR fixes an issues in the new simulated backend. The root cause is the fact that the transaction pool has an internal reset operation that runs on a background thread.

When a new transaction is added to the pool via the RPC, the transaction is added to a non-executable queue and will be moved to its final location on a background thread. If the machine is overloaded (or simply due to timing issues), it can happen that the simulated backend will try to produce the next block, whilst the pool has not yet marked the newly added transaction executable. This will cause the block to not contain the transaction. This is an issue because we want determinism from the simulator: add a tx, mine a block. It should be in there.

The PR fixes it by adding a Sync function to the txpool, which waits for the current reset operation (if any) to finish, and then runs an entire round of reset on top. The new round is needed because resets are only triggered by new head events, so newly added transactions will not trigger the outer resets that we can wait on. The transaction pool would eventually internally do a reset even on transaction addition, but there's no easy way to wait on that and there's no meaningful reason to bubble that across everything. A clean outer reset will at worse be a small noop goroutine.
  • Loading branch information
karalabe committed Jan 23, 2024
1 parent 98eaa57 commit 542c861
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
66 changes: 64 additions & 2 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type TxPool struct {

subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done
}

// New creates a new transaction pool to gather, sort and filter inbound
Expand All @@ -86,6 +89,8 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error)
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
}
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
Expand Down Expand Up @@ -174,6 +179,9 @@ func (p *TxPool) Close() error {
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (p *TxPool) loop(head *types.Header, chain BlockChain) {
// Close the termination marker when the pool stops
defer close(p.term)

// Subscribe to chain head events to trigger subpool resets
var (
newHeadCh = make(chan core.ChainHeadEvent)
Expand All @@ -190,13 +198,23 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
var (
resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently
resetDone = make(chan *types.Header)

resetForced bool // Whether a forced reset was requested, only used in simulator mode
resetWaiter chan error // Channel waiting on a forced reset, only used in simulator mode
)
// Notify the live reset waiter to not block if the txpool is closed.
defer func() {
if resetWaiter != nil {
resetWaiter <- errors.New("pool already terminated")
resetWaiter = nil
}
}()
var errc chan error
for errc == nil {
// Something interesting might have happened, run a reset if there is
// one needed but none is running. The resetter will run on its own
// goroutine to allow chain head events to be consumed contiguously.
if newHead != oldHead {
if newHead != oldHead || resetForced {
// Try to inject a busy marker and start a reset if successful
select {
case resetBusy <- struct{}{}:
Expand All @@ -208,8 +226,17 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
resetDone <- newHead
}(oldHead, newHead)

// If the reset operation was explicitly requested, consider it
// being fulfilled and drop the request marker. If it was not,
// this is a noop.
resetForced = false

default:
// Reset already running, wait until it finishes
// Reset already running, wait until it finishes.
//
// Note, this will not drop any forced reset request. If a forced
// reset was requested, but we were busy, then when the currently
// running reset finishes, a new one will be spun up.
}
}
// Wait for the next chain head event or a previous reset finish
Expand All @@ -223,8 +250,26 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
oldHead = head
<-resetBusy

// If someone is waiting for a reset to finish, notify them, unless
// the forced op is still pending. In that case, wait another round
// of resets.
if resetWaiter != nil && !resetForced {
resetWaiter <- nil
resetWaiter = nil
}

case errc = <-p.quit:
// Termination requested, break out on the next loop round

case syncc := <-p.sync:
// Transaction pool is running inside a simulator, and we are about
// to create a new block. Request a forced sync operation to ensure
// that any running reset operation finishes to make block imports
// deterministic. On top of that, run a new reset operation to make
// transaction insertions deterministic instead of being stuck in a
// queue waiting for a reset.
resetForced = true
resetWaiter = syncc
}
}
// Notify the closer of termination (no error possible for now)
Expand Down Expand Up @@ -415,3 +460,20 @@ func (p *TxPool) Status(hash common.Hash) TxStatus {
}
return TxStatusUnknown
}

// Sync is a helper method for unit tests or simulator runs where the chain events
// are arriving in quick succession, without any time in between them to run the
// internal background reset operations. This method will run an explicit reset
// operation to ensure the pool stabilises, thus avoiding flakey behavior.
//
// Note, do not use this in production / live code. In live code, the pool is
// meant to reset on a separate thread to avoid DoS vectors.
func (p *TxPool) Sync() error {
sync := make(chan error)
select {
case p.sync <- sync:
return <-sync
case <-p.term:
return errors.New("pool already terminated")
}
}
25 changes: 19 additions & 6 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai"))
}
}
return api.forkchoiceUpdated(update, payloadAttributes)
return api.forkchoiceUpdated(update, payloadAttributes, false)
}

// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload attributes.
Expand All @@ -196,7 +196,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called for shanghai payloads"))
}
}
return api.forkchoiceUpdated(update, params)
return api.forkchoiceUpdated(update, params, false)
}

// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root in the payload attributes.
Expand All @@ -220,10 +220,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params)
return api.forkchoiceUpdated(update, params, false)
}

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, simulatorMode bool) (engine.ForkChoiceResponse, error) {
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()

Expand Down Expand Up @@ -330,7 +330,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if merger := api.eth.Merger(); !merger.PoSFinalized() {
merger.FinalizePoS()
}
// If the finalized block is not in our canonical tree, somethings wrong
// If the finalized block is not in our canonical tree, something is wrong
finalBlock := api.eth.BlockChain().GetBlockByHash(update.FinalizedBlockHash)
if finalBlock == nil {
log.Warn("Final block not available in database", "hash", update.FinalizedBlockHash)
Expand All @@ -342,7 +342,7 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
// Set the finalized block
api.eth.BlockChain().SetFinalized(finalBlock.Header())
}
// Check if the safe block hash is in our canonical tree, if not somethings wrong
// Check if the safe block hash is in our canonical tree, if not something is wrong
if update.SafeBlockHash != (common.Hash{}) {
safeBlock := api.eth.BlockChain().GetBlockByHash(update.SafeBlockHash)
if safeBlock == nil {
Expand Down Expand Up @@ -374,6 +374,19 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) {
return valid(&id), nil
}
// If the beacon chain is ran by a simulator, then transaction insertion,
// block insertion and block production will happen without any timing
// delay between them. This will cause flaky simulator executions due to
// the transaction pool running its internal reset operation on a back-
// ground thread. To avoid the racey behavior - in simulator mode - the
// pool will be explicitly blocked on its reset before continuing to the
// block production below.
if simulatorMode {
if err := api.eth.TxPool().Sync(); err != nil {
log.Error("Failed to sync transaction pool", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
}
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
log.Error("Failed to build payload", "err", err)
Expand Down
4 changes: 2 additions & 2 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u

var random [32]byte
rand.Read(random[:])
fcResponse, err := c.engineAPI.ForkchoiceUpdatedV2(c.curForkchoiceState, &engine.PayloadAttributes{
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
Timestamp: timestamp,
SuggestedFeeRecipient: feeRecipient,
Withdrawals: withdrawals,
Random: random,
})
}, true)
if err != nil {
return err
}
Expand Down

0 comments on commit 542c861

Please sign in to comment.