Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disable tx gossip option #13

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ type Pool interface {
AddNewGoodPeer(peerID types.PeerID)
}

var _ Pool = (*TxPool)(nil) // compile-time interface check
var _ Pool = (*TxPool)(nil) // compile-time interface check
var _ Pool = (*TxPoolDropRemote)(nil) // compile-time interface check

// SubPoolMarker ordered bitset responsible to sort transactions by sub-pools. Bits meaning:
// 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means this transaction will never be included into this particular chain.
Expand Down Expand Up @@ -305,6 +306,11 @@ type TxPool struct {
l1Cost L1CostFn
}

// disables adding remote transactions
type TxPoolDropRemote struct {
*TxPool
}

func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, cache kvcache.Cache, chainID uint256.Int, shanghaiTime *big.Int) (*TxPool, error) {
var err error
localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil)
Expand Down Expand Up @@ -347,6 +353,10 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config,
}, nil
}

func NewTxPoolDropRemote(txPool *TxPool) *TxPoolDropRemote {
return &TxPoolDropRemote{TxPool: txPool}
}

func RawRLPTxToOptimismL1CostFn(payload []byte) (L1CostFn, error) {
// skip prefix byte
if len(payload) == 0 {
Expand Down Expand Up @@ -1497,7 +1507,8 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
if !p.Started() {
continue
}

// when transaction gossip disabled, we still need to call processRemoteTxs
// just in case to make unprocessedRemoteTxs to be cleared
if err := p.processRemoteTxs(ctx); err != nil {
if grpcutil.IsRetryLater(err) || grpcutil.IsEndOfStream(err) {
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -1537,6 +1548,14 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs

notifyMiningAboutNewSlots()

if p.cfg.NoTxGossip {
// drain newTxs for emptying newTx channel
// newTx channel will be filled only with local transactions
// early return to avoid outbound transaction propagation
log.Debug("[txpool] tx gossip disabled", "state", "drain new transactions")
return
}

var localTxTypes []byte
var localTxSizes []uint32
var localTxHashes types.Hashes
Expand Down Expand Up @@ -1596,6 +1615,11 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
if len(newPeers) == 0 {
continue
}
if p.cfg.NoTxGossip {
// avoid transaction gossiping for new peers
log.Debug("[txpool] tx gossip disabled", "state", "sync new peers")
continue
}
t := time.Now()
var hashes types.Hashes
var types []byte
Expand Down Expand Up @@ -1908,6 +1932,11 @@ func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp []byte, sender
})
}

func (p *TxPoolDropRemote) AddRemoteTxs(ctx context.Context, newTxs types.TxSlots) {
// disable adding remote transactions
// consume remote tx from fetch
}

// CalcIntrinsicGas computes the 'intrinsic gas' for a message with the given data.
func CalcIntrinsicGas(dataLen, dataNonZeroLen uint64, accessList types.AccessList, isContractCreation, isHomestead, isEIP2028, isShanghai bool) (uint64, DiscardReason) {
// Set the starting gas for the raw transaction
Expand Down
103 changes: 103 additions & 0 deletions txpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,3 +712,106 @@ func TestShanghaiValidateTx(t *testing.T) {
})
}
}

func TestDropRemote(t *testing.T) {
assert, require := assert.New(t), require.New(t)
ch := make(chan types.Announcements, 100)
db, coreDB := memdb.NewTestPoolDB(t), memdb.NewTestDB(t)

cfg := txpoolcfg.DefaultConfig
cfg.NoTxGossip = true

sendersCache := kvcache.New(kvcache.DefaultCoherentConfig)
txPool, err := New(ch, coreDB, cfg, sendersCache, *u256.N1, nil)
assert.NoError(err)
require.True(txPool != nil)
txPoolDropRemote := NewTxPoolDropRemote(txPool)

ctx := context.Background()
var stateVersionID uint64 = 0
pendingBaseFee := uint64(1_000_000)
// start blocks from 0, set empty hash - then kvcache will also work on this
h1 := gointerfaces.ConvertHashToH256([32]byte{})
change := &remote.StateChangeBatch{
StateVersionId: stateVersionID,
PendingBlockBaseFee: pendingBaseFee,
BlockGasLimit: 1000000,
ChangeBatch: []*remote.StateChange{
{BlockHeight: 0, BlockHash: h1},
},
}
var addr [20]byte
addr[0] = 1
v := make([]byte, types.EncodeSenderLengthForStorage(2, *uint256.NewInt(1 * common.Ether)))
types.EncodeSender(2, *uint256.NewInt(1 * common.Ether), v)
change.ChangeBatch[0].Changes = append(change.ChangeBatch[0].Changes, &remote.AccountChange{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(addr),
Data: v,
})
tx, err := db.BeginRw(ctx)
require.NoError(err)
defer tx.Rollback()
err = txPoolDropRemote.OnNewBlock(ctx, change, types.TxSlots{}, types.TxSlots{}, tx)
assert.NoError(err)
// 1. Try Local Tx
{
var txSlots types.TxSlots
txSlot := &types.TxSlot{
Tip: *uint256.NewInt(500_000),
FeeCap: *uint256.NewInt(3_000_000),
Gas: 100000,
Nonce: 3,
}
txSlot.IDHash[0] = 1
txSlots.Append(txSlot, addr[:], true)

reasons, err := txPoolDropRemote.AddLocalTxs(ctx, txSlots, tx)
assert.NoError(err)
for _, reason := range reasons {
assert.Equal(Success, reason, reason.String())
}
}
select {
case annoucements := <-ch:
for i := 0; i < annoucements.Len(); i++ {
_, _, hash := annoucements.At(i)
fmt.Printf("propagated hash %x\n", hash)
}
default:

}

// 2. Try same Tx, but as remote; tx must be dropped
{
var txSlots types.TxSlots
txSlot := &types.TxSlot{
Tip: *uint256.NewInt(500_000),
FeeCap: *uint256.NewInt(3_000_000),
Gas: 100000,
Nonce: 3,
}
txSlot.IDHash[0] = 1
txSlots.Append(txSlot, addr[:], true)

txPoolDropRemote.AddRemoteTxs(ctx, txSlots)
}

// empty because AddRemoteTxs logic is intentionally empty
assert.Equal(0, len(txPoolDropRemote.unprocessedRemoteByHash))
assert.Equal(0, len(txPoolDropRemote.unprocessedRemoteTxs.Txs))

assert.NoError(txPoolDropRemote.processRemoteTxs(ctx))

checkAnnouncementEmpty := func() bool {
select {
case <-ch:
return false
default:
return true
}
}
// no announcement because unprocessedRemoteTxs is already empty
assert.True(checkAnnouncementEmpty())

}
6 changes: 4 additions & 2 deletions txpool/txpoolcfg/txpoolcfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type Config struct {
PriceBump uint64 // Price bump percentage to replace an already existing transaction
OverrideShanghaiTime *big.Int

Optimism bool
Optimism bool
NoTxGossip bool
}

var DefaultConfig = Config{
Expand All @@ -38,5 +39,6 @@ var DefaultConfig = Config{
PriceBump: 10, // Price bump percentage to replace an already existing transaction
OverrideShanghaiTime: nil,

Optimism: false,
Optimism: false,
NoTxGossip: false,
}
9 changes: 7 additions & 2 deletions txpool/txpooluitl/all_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,21 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach
shanghaiTime = cfg.OverrideShanghaiTime
}

var pool txpool.Pool
txPool, err := txpool.New(newTxs, chainDB, cfg, cache, *chainID, shanghaiTime)
if err != nil {
return nil, nil, nil, nil, nil, err
}
pool = txpool.Pool(txPool)
if cfg.NoTxGossip {
pool = txpool.Pool(txpool.NewTxPoolDropRemote(txPool))
}

fetch := txpool.NewFetch(ctx, sentryClients, txPool, stateChangesClient, chainDB, txPoolDB, *chainID)
fetch := txpool.NewFetch(ctx, sentryClients, pool, stateChangesClient, chainDB, txPoolDB, *chainID)
//fetch.ConnectCore()
//fetch.ConnectSentries()

send := txpool.NewSend(ctx, sentryClients, txPool)
send := txpool.NewSend(ctx, sentryClients, pool)
txpoolGrpcServer := txpool.NewGrpcServer(ctx, txPool, txPoolDB, *chainID)
return txPoolDB, txPool, fetch, send, txpoolGrpcServer, nil
}