Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit max tx count perpeer #3267

Merged
merged 8 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ type OecConfig struct {
commitGapOffset int64

maxSubscriptionClients int

maxTxLimitPerPeer uint64
}

const (
Expand Down Expand Up @@ -161,8 +163,8 @@ const (
FlagEnableHasBlockPartMsg = "enable-blockpart-ack"
FlagDebugGcInterval = "debug.gc-interval"
FlagCommitGapOffset = "commit-gap-offset"

FlagMaxSubscriptionClients = "max-subscription-clients"
FlagMaxSubscriptionClients = "max-subscription-clients"
FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer"
)

var (
Expand Down Expand Up @@ -278,6 +280,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush))
c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost))
c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock))
c.SetMaxTxLimitPerPeer(int64(viper.GetUint64(FlagMaxTxLimitPerPeer)))
c.SetEnableDeleteMinGPTx(viper.GetBool(FlagMempoolEnableDeleteMinGPTx))
c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock))
c.SetEnablePGU(viper.GetBool(FlagEnablePGU))
Expand Down Expand Up @@ -463,6 +466,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetMaxTxNumPerBlock(r)
case FlagMaxTxLimitPerPeer:
r, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return
}
c.SetMaxTxLimitPerPeer(r)
case FlagMempoolEnableDeleteMinGPTx:
r, err := strconv.ParseBool(v)
if err != nil {
Expand Down Expand Up @@ -1099,3 +1108,14 @@ func (c *OecConfig) SetMaxSubscriptionClients(v int) {
func (c *OecConfig) GetMaxSubscriptionClients() int {
return c.maxSubscriptionClients
}

func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) {
if maxTxLimitPerPeer < 0 {
return
}
c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer)
}

func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 {
return c.maxTxLimitPerPeer
}
3 changes: 2 additions & 1 deletion dev/testnet/testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ run() {
--enable-wtx=${WRAPPEDTX} \
--mempool.node_key_whitelist ${WHITE_LIST} \
--p2p.pex=false \
--mempool.max_tx_limit_per_peer=1 \
--p2p.addr_book_strict=false \
$p2p_seed_opt $p2p_seed_arg \
--p2p.laddr tcp://${IP}:${p2pport} \
Expand All @@ -158,7 +159,7 @@ run() {
--chain-id ${CHAIN_ID} \
--upload-delta=false \
--enable-gid \
--consensus.timeout_commit 3800ms \
--consensus.timeout_commit 10000ms \
--enable-blockpart-ack=false \
--append-pid=true \
${LOG_SERVER} \
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ func AddNodeFlags(cmd *cobra.Command) {
config.Mempool.PendingRemoveEvent,
"Push event when remove a pending tx",
)
cmd.Flags().Uint64(
"mempool.max_tx_limit_per_peer",
config.Mempool.MaxTxLimitPerPeer,
BananaLF marked this conversation as resolved.
Show resolved Hide resolved
"Max tx limit per peer. If set 0 ,this flag disable",
)

cmd.Flags().String(
"mempool.node_key_whitelist",
Expand Down
10 changes: 7 additions & 3 deletions libs/tendermint/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ type MempoolConfig struct {
PendingPoolMaxTxPerAddress int `mapstructure:"pending_pool_max_tx_per_address"`
NodeKeyWhitelist []string `mapstructure:"node_key_whitelist"`
PendingRemoveEvent bool `mapstructure:"pending_remove_event"`
MaxTxLimitPerPeer uint64 `mapstructure:"max_tx_limit_per_peer"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
Expand All @@ -715,6 +716,7 @@ func DefaultMempoolConfig() *MempoolConfig {
PendingPoolMaxTxPerAddress: 100,
NodeKeyWhitelist: []string{},
PendingRemoveEvent: false,
MaxTxLimitPerPeer: 100,
}
}

Expand Down Expand Up @@ -953,12 +955,14 @@ func (cfg *ConsensusConfig) ValidateBasic() error {
return nil
}

//-----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// TxIndexConfig
// Remember that Event has the following structure:
// type: [
// key: value,
// ...
//
// key: value,
// ...
//
// ]
//
// CompositeKeys are constructed by `type.key`
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type IDynamicConfig interface {
GetDynamicGpMaxTxNum() int64
GetDynamicGpMaxGasUsed() int64
GetMaxSubscriptionClients() int
GetMaxTxLimitPerPeer() uint64
}

var DynamicConfig IDynamicConfig = MockDynamicConfig{}
Expand Down Expand Up @@ -198,3 +199,7 @@ func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) {
}
d.maxSubscriptionClients = value
}

func (c MockDynamicConfig) GetMaxTxLimitPerPeer() uint64 {
return DefaultMempoolConfig().MaxTxLimitPerPeer
}
40 changes: 40 additions & 0 deletions libs/tendermint/mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ type CListMempool struct {
rmPendingTxChan chan types.EventDataRmPendingTx

gpo *Oracle

peersTxCountMtx sync.RWMutex
peersTxCount map[string]uint64
}

func (mem *CListMempool) filterCMTx(tx abci.TxEssentials) bool {
Expand Down Expand Up @@ -166,6 +169,7 @@ func NewCListMempool(
simQueue: make(chan *mempoolTx, 100000),
gasCache: gasCache,
gpo: gpo,
peersTxCount: make(map[string]uint64, 0),
}

if config.PendingRemoveEvent {
Expand Down Expand Up @@ -296,6 +300,38 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
return mem.txs.TxsWaitChan()
}

func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error {
if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 {
return nil
}
mem.peersTxCountMtx.Lock()
BananaLF marked this conversation as resolved.
Show resolved Hide resolved
defer mem.peersTxCountMtx.Unlock()
if len(txInfo.SenderP2PID) != 0 {
peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)]
if !ok {
peerTxCount = 0
}
if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() {
BananaLF marked this conversation as resolved.
Show resolved Hide resolved
mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()))
return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())
}
peerTxCount++
mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount
}
return nil
}

func (mem *CListMempool) resetPeerCount() {
if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 {
return
}
mem.peersTxCountMtx.Lock()
defer mem.peersTxCountMtx.Unlock()
for key := range mem.peersTxCount {
delete(mem.peersTxCount, key)
}
}

// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
//
Expand All @@ -305,6 +341,9 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
if err := mem.validatePeerCount(txInfo); err != nil {
return err
}
timeStart := int64(0)
if cfg.DynamicConfig.GetMempoolCheckTxCost() {
timeStart = time.Now().UnixMicro()
Expand Down Expand Up @@ -1010,6 +1049,7 @@ func (mem *CListMempool) Update(
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
mem.resetPeerCount()
// no need to update when mempool is unavailable
if mem.config.Sealed {
return mem.updateSealed(height, txs, deliverTxResponses)
Expand Down