Skip to content

Commit

Permalink
Merge pull request #734 from thogard785/fetch-timeout-config
Browse files Browse the repository at this point in the history
Make txArriveTimeout Configurable w/ CLI Flag
  • Loading branch information
temaniarpit27 authored May 2, 2023
2 parents acff69a + 62d8cc0 commit 5fba098
Show file tree
Hide file tree
Showing 22 changed files with 176 additions and 87 deletions.
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,13 @@ var (
Usage: "Gas price below which gpo will ignore transactions",
Value: ethconfig.Defaults.GPO.IgnorePrice.Int64(),
}
// flag to set the transaction fetcher's txArrivalWait value, which is the maximum waiting
// period the fetcher will wait to receive an announced tx before explicitly requesting it
TxArrivalWaitFlag = cli.DurationFlag{
Name: "txarrivalwait",
Usage: "Maximum duration to wait for a transaction before requesting it (defaults to 500ms)",
Value: node.DefaultConfig.P2P.TxArrivalWait,
}

// Metrics flags
MetricsEnabledFlag = cli.BoolFlag{
Expand Down Expand Up @@ -1288,6 +1295,10 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}

if ctx.GlobalIsSet(TxArrivalWaitFlag.Name) {
cfg.TxArrivalWait = TxArrivalWaitFlag.Value
}
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
13 changes: 7 additions & 6 deletions docs/cli/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ devfakeauthor = false # Run miner without validator set authorization
debug = true # Prepends log messages with call-site location (file and line number) - {requires some effort}

[p2p]
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
maxpendpeers = 50 # Maximum number of pending connection attempts
bind = "0.0.0.0" # Network binding address
port = 30303 # Network listening port
nodiscover = false # Disables the peer discovery mechanism (manual peer addition)
nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:<IP>)
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
maxpendpeers = 50 # Maximum number of pending connection attempts
bind = "0.0.0.0" # Network binding address
port = 30303 # Network listening port
nodiscover = false # Disables the peer discovery mechanism (manual peer addition)
nat = "any" # NAT port mapping mechanism (any|none|upnp|pmp|extip:<IP>)
netrestrict = "" # Restricts network communication to the given IP networks (CIDR masks)
nodekey = "" # P2P node key file
nodekeyhex = "" # P2P node key as hex
txarrivalwait = "500ms" # Maximum duration to wait before requesting an announced transaction
[p2p.discovery]
v5disc = false # Enables the experimental RLPx V5 (Topic Discovery) mechanism
bootnodes = [] # Comma separated enode URLs for P2P discovery bootstrap
Expand Down
2 changes: 2 additions & 0 deletions docs/cli/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ The ```bor server``` command runs the Bor client.

- ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false)

- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500ms)

### Sealer Options

- ```mine```: Enable mining (default: false)
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EthAPI: ethAPI,
PeerRequiredBlocks: config.PeerRequiredBlocks,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
}); err != nil {
return nil, err
}
Expand Down
78 changes: 48 additions & 30 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ const (
// re-request them.
maxTxUnderpricedSetSize = 32768

// txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested.
txArriveTimeout = 500 * time.Millisecond

// txGatherSlack is the interval used to collate almost-expired announces
// with network fetches.
txGatherSlack = 100 * time.Millisecond

// maxTxArrivalWait is the longest acceptable duration for the txArrivalWait
// configuration value. Longer config values will default to this.
maxTxArrivalWait = 500 * time.Millisecond
)

var (
Expand Down Expand Up @@ -176,38 +176,41 @@ type TxFetcher struct {
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)

txArrivalWait time.Duration // txArrivalWait is the time allowance before an announced transaction is explicitly requested.
}

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, txArrivalWait time.Duration) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil, txArrivalWait)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
clock mclock.Clock, rand *mrand.Rand, txArrivalWait time.Duration) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
txArrivalWait: txArrivalWait,
}
}

Expand Down Expand Up @@ -333,6 +336,16 @@ func (f *TxFetcher) Drop(peer string) error {
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func (f *TxFetcher) Start() {
// the txArrivalWait duration should not be less than the txGatherSlack duration
if f.txArrivalWait < txGatherSlack {
f.txArrivalWait = txGatherSlack
}

// the txArrivalWait duration should not be greater than the maxTxArrivalWait duration
if f.txArrivalWait > maxTxArrivalWait {
f.txArrivalWait = maxTxArrivalWait
}

go f.loop()
}

Expand All @@ -350,6 +363,9 @@ func (f *TxFetcher) loop() {
waitTrigger = make(chan struct{}, 1)
timeoutTrigger = make(chan struct{}, 1)
)

log.Info("TxFetcher", "txArrivalWait", f.txArrivalWait.String())

for {
select {
case ann := <-f.notify:
Expand Down Expand Up @@ -441,7 +457,7 @@ func (f *TxFetcher) loop() {
// ones into the retrieval queues
actives := make(map[string]struct{})
for hash, instance := range f.waittime {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > f.txArrivalWait {
// Transaction expired without propagation, schedule for retrieval
if f.announced[hash] != nil {
panic("announce tracker already contains waitlist item")
Expand Down Expand Up @@ -698,14 +714,16 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
for _, instance := range f.waittime {
if earliest > instance {
earliest = instance
if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
if f.txArrivalWait-time.Duration(now-earliest) < gatherSlack {
break
}
}
}
*timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
trigger <- struct{}{}
})

*timer = f.clock.AfterFunc(
f.txArrivalWait-time.Duration(now-earliest),
func() { trigger <- struct{}{} },
)
}

// rescheduleTimeout iterates over all the transactions currently in flight and
Expand Down
Loading

0 comments on commit 5fba098

Please sign in to comment.