Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make txArriveTimeout Configurable w/ CLI Flag #734

Merged
merged 14 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,13 @@ var (
Usage: "Gas price below which gpo will ignore transactions",
Value: ethconfig.Defaults.GPO.IgnorePrice.Int64(),
}
// flag to set the transaction fetcher's txArrivalWait value, which is the maximum waiting
// period the fetcher will wait to receive an announced tx before explicitly requesting it
TxArrivalWaitFlag = cli.DurationFlag{
Name: "txarrivalwait",
Usage: "Maximum duration to wait for a transaction before requesting it (defaults to 500ms)",
Value: node.DefaultConfig.P2P.TxArrivalWait,
}

// Metrics flags
MetricsEnabledFlag = cli.BoolFlag{
Expand Down Expand Up @@ -1288,6 +1295,10 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}

if ctx.GlobalIsSet(TxArrivalWaitFlag.Name) {
cfg.TxArrivalWait = TxArrivalWaitFlag.Value
}
}

// SetNodeConfig applies node-related command line flags to the config.
Expand Down
2 changes: 2 additions & 0 deletions docs/cli/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ The ```bor server``` command runs the Bor client.

- ```v5disc```: Enables the experimental RLPx V5 (Topic Discovery) mechanism (default: false)

- ```txarrivalwait```: Maximum duration to wait before requesting an announced transaction (default: 500ms)

### Sealer Options

- ```mine```: Enable mining (default: false)
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EthAPI: ethAPI,
PeerRequiredBlocks: config.PeerRequiredBlocks,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
}); err != nil {
return nil, err
}
Expand Down
63 changes: 35 additions & 28 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ const (
// re-request them.
maxTxUnderpricedSetSize = 32768

// txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested.
txArriveTimeout = 100 * time.Millisecond

// txGatherSlack is the interval used to collate almost-expired announces
// with network fetches.
txGatherSlack = 20 * time.Millisecond
Expand Down Expand Up @@ -176,38 +172,41 @@ type TxFetcher struct {
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
rand *mrand.Rand // Randomizer to use in tests instead of map range loops (soft-random)

txArrivalWait time.Duration // txArrivalWait is the time allowance before an announced transaction is explicitly requested.
}

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, txArrivalWait time.Duration) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil, txArrivalWait)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
clock mclock.Clock, rand *mrand.Rand, txArrivalWait time.Duration) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet(),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
clock: clock,
rand: rand,
txArrivalWait: txArrivalWait,
}
}

Expand Down Expand Up @@ -333,6 +332,11 @@ func (f *TxFetcher) Drop(peer string) error {
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
func (f *TxFetcher) Start() {
// the txArrivalWait duration should not be less than the txGatherSlack duration
if f.txArrivalWait < txGatherSlack {
f.txArrivalWait = txGatherSlack
}

go f.loop()
}

Expand All @@ -350,6 +354,9 @@ func (f *TxFetcher) loop() {
waitTrigger = make(chan struct{}, 1)
timeoutTrigger = make(chan struct{}, 1)
)

log.Info("TxFetcher", "txArrivalWait", f.txArrivalWait.String())

for {
select {
case ann := <-f.notify:
Expand Down Expand Up @@ -441,7 +448,7 @@ func (f *TxFetcher) loop() {
// ones into the retrieval queues
actives := make(map[string]struct{})
for hash, instance := range f.waittime {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > f.txArrivalWait {
// Transaction expired without propagation, schedule for retrieval
if f.announced[hash] != nil {
panic("announce tracker already contains waitlist item")
Expand Down Expand Up @@ -698,12 +705,12 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) {
for _, instance := range f.waittime {
if earliest > instance {
earliest = instance
if txArriveTimeout-time.Duration(now-earliest) < gatherSlack {
if f.txArrivalWait-time.Duration(now-earliest) < gatherSlack {
break
}
}
}
*timer = f.clock.AfterFunc(txArriveTimeout-time.Duration(now-earliest), func() {
*timer = f.clock.AfterFunc(f.txArrivalWait-time.Duration(now-earliest), func() {
trigger <- struct{}{}
})
}
Expand Down
Loading