Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make txArriveTimeout Configurable w/ CLI Flag #734

Merged
merged 14 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,13 @@ var (
Usage: "Gas price below which gpo will ignore transactions",
Value: ethconfig.Defaults.GPO.IgnorePrice.Int64(),
}
// flag to set the transaction fetcher's txArrivalWait value, which is the maximum waiting
// period the fetcher will wait to receive an announced tx before explicitly requesting it
TxArrivalWaitFlag = cli.DurationFlag{
Name: "txarrivalwait",
Usage: "Maximum duration to wait for a transaction before requesting it (defaults to 500ms)",
Value: node.DefaultConfig.P2P.TxArrivalWait,
}

// Metrics flags
MetricsEnabledFlag = cli.BoolFlag{
Expand Down Expand Up @@ -1288,6 +1295,10 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}

if ctx.GlobalIsSet(TxArrivalWaitFlag.Name) {
cfg.TxArrivalWait = TxArrivalWaitFlag.Value
}
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
13 changes: 7 additions & 6 deletions docs/cli/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ devfakeauthor = false # Run miner without validator set authorization
debug = true # Prepends log messages with call-site location (file and line number) - {requires some effort}

[p2p]
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
maxpendpeers = 50 # Maximum number of pending connection attempts
bind = "0.0.0.0" # Network binding address
port = 30303 # Network listening port
nodiscover = false # Disables the peer discovery mechanism (manual peer addition)
nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:<IP>)
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
maxpendpeers = 50 # Maximum number of pending connection attempts
bind = "0.0.0.0" # Network binding address
port = 30303 # Network listening port
nodiscover = false # Disables the peer discovery mechanism (manual peer addition)
nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:<IP>)
netrestrict = "" # Restricts network communication to the given IP networks (CIDR masks)
nodekey = "" # P2P node key file
nodekeyhex = "" # P2P node key as hex
txarrivalwait = "500ms" # Maximum duration to wait before requesting an announced transaction
[p2p.discovery]
v5disc = false # Enables the experimental RLPx V5 (Topic Discovery) mechanism
bootnodes = [] # Comma separated enode URLs for P2P discovery bootstrap
Expand Down
2 changes: 2 additions & 0 deletions docs/cli/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ The ```bor server``` command runs the Bor client.

- ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false)

- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500ms)

### Sealer Options

- ```mine```: Enable mining (default: false)
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EthAPI: ethAPI,
PeerRequiredBlocks: config.PeerRequiredBlocks,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
}); err != nil {
return nil, err
}
Expand Down
78 changes: 48 additions & 30 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ const (
// re-request them.
maxTxUnderpricedSetSize = 32768

// txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested.
txArriveTimeout = 500 * time.Millisecond

// txGatherSlack is the interval used to collate almost-expired announces
// with network fetches.
txGatherSlack = 100 * time.Millisecond

// maxTxArrivalWait is the longest acceptable duration for the txArrivalWait
// configuration value. Longer config values will default to this.
maxTxArrivalWait = 500 * time.Millisecond
)

var (
Expand Down Expand Up @@ -176,38 +176,41 @@ type TxFetcher struct {
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)

txArrivalWait time.Duration // txArrivalWait is the time allowance before an announced transaction is explicitly requested.
}

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, txArrivalWait time.Duration) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil, txArrivalWait)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
clock mclock.Clock, rand *mrand.Rand, txArrivalWait time.Duration) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
txArrivalWait: txArrivalWait,
}
}

Expand Down Expand Up @@ -333,6 +336,16 @@ func (f *TxFetcher) Drop(peer string) error {
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func (f *TxFetcher) Start() {
// the txArrivalWait duration should not be less than the txGatherSlack duration
if f.txArrivalWait < txGatherSlack {
f.txArrivalWait = txGatherSlack
}

// the txArrivalWait duration should not be greater than the maxTxArrivalWait duration
if f.txArrivalWait > maxTxArrivalWait {
f.txArrivalWait = maxTxArrivalWait
}

go f.loop()
}

Expand All @@ -350,6 +363,9 @@ func (f *TxFetcher) loop() {
waitTrigger = make(chan struct{}, 1)
timeoutTrigger = make(chan struct{}, 1)
)

log.Info("TxFetcher", "txArrivalWait", f.txArrivalWait.String())

for {
select {
case ann := <-f.notify:
Expand Down Expand Up @@ -441,7 +457,7 @@ func (f *TxFetcher) loop() {
// ones into the retrieval queues
actives := make(map[string]struct{})
for hash, instance := range f.waittime {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > f.txArrivalWait {
// Transaction expired without propagation, schedule for retrieval
if f.announced[hash] != nil {
panic("announce tracker already contains waitlist item")
Expand Down Expand Up @@ -698,14 +714,16 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
for _, instance := range f.waittime {
if earliest > instance {
earliest = instance
if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
if f.txArrivalWait-time.Duration(now-earliest) < gatherSlack {
break
}
}
}
*timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
trigger <- struct{}{}
})

*timer = f.clock.AfterFunc(
f.txArrivalWait-time.Duration(now-earliest),
func() { trigger <- struct{}{} },
)
}

// rescheduleTimeout iterates over all the transactions currently in flight and
Expand Down
Loading