Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les, les/lespay: implement new server pool #20758

Merged
merged 57 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d6d27cd
les: new server pool
zsfelfoldi Feb 26, 2020
ce4930c
les/utils: NodeStateMachine simplifications
zsfelfoldi Apr 24, 2020
71d1497
les/utils: handle enode.Node inside NodeStateMachine
zsfelfoldi Apr 24, 2020
c73cac6
les/utils: fixed NodeStateMachine tests
zsfelfoldi Apr 24, 2020
aef76b6
les/lespay/client: fixed tests
zsfelfoldi Apr 24, 2020
e769402
les: fixed serverpool test
zsfelfoldi Apr 24, 2020
07bc71a
les/utils: add comments
zsfelfoldi Apr 25, 2020
a5bec6c
les/utils: fix linter warnings
zsfelfoldi Apr 25, 2020
136b341
les: add node state logger, fix bug
zsfelfoldi Apr 25, 2020
0065fe1
les: add persistent redialWait timeouts
zsfelfoldi Apr 26, 2020
1d37de2
les: fixed test and removed debug prints
zsfelfoldi Apr 26, 2020
3604369
les: add metrics
zsfelfoldi Apr 26, 2020
185f59f
p2p/nodestate: moved NodeStateMachine to its own package
zsfelfoldi Apr 28, 2020
1fa4919
p2p/nodestate, les: changed flag/field definition
zsfelfoldi Apr 28, 2020
b032eab
les, p2p/nodestate: fixed tests
zsfelfoldi Apr 28, 2020
44942ee
p2p/nodestate: removed unnecessary errors and added comments
zsfelfoldi Apr 28, 2020
6948793
p2p/nodestate: fix after rebase
zsfelfoldi Apr 28, 2020
8028528
cmd/utils: use les DNS list for --syncmode=light
fjl Apr 29, 2020
e7d7a4f
les: remove redundant ENR filter on DNS output
fjl Apr 29, 2020
095bc6e
les/lespay/client: use sync.Cond in QueueIterator
fjl Apr 29, 2020
52fbd74
les/lespay/client: use sync.Cond in WrsIterator
fjl Apr 29, 2020
a543d42
cmd/utils: revert default DNS setup condition
fjl Apr 29, 2020
c0a1ca3
les/lespay/client: use enode.SignNull in test
fjl Apr 29, 2020
df77440
les/lespay/client: fix wrs iterator panic
fjl Apr 29, 2020
c65ba19
les: remove goroutine in weight callback
fjl Apr 29, 2020
2eb1018
les/lespay/client: fixed race condition in WrsIterator test
zsfelfoldi Apr 29, 2020
3452414
les: move NodeStateMachine construction inside serverPool
zsfelfoldi Apr 29, 2020
12320a5
les/lespay/client: add PreNegFilter
zsfelfoldi May 1, 2020
2d4e7e5
les: nodeWeights field
zsfelfoldi May 2, 2020
4f94866
les: sp test
zsfelfoldi May 4, 2020
477b5cd
les: sp test fix
zsfelfoldi May 4, 2020
95fdee9
les: fixed sp test
zsfelfoldi May 4, 2020
e37b225
les/lespay/client: fixed tests
zsfelfoldi May 4, 2020
2fba0ce
p2p/nodestate: fixed minor issues
zsfelfoldi May 4, 2020
a5c13c5
les: removed dummyQuery
zsfelfoldi May 4, 2020
1562211
les/lespay/client: TestPreNegFilter
zsfelfoldi May 4, 2020
92e5552
les: avoid persisting clock
rjl493456442 May 6, 2020
904b835
les: polish
rjl493456442 May 7, 2020
9a1f898
les: use real-time clock for redialWait
zsfelfoldi May 8, 2020
d1ffbf8
les: refactored node stats, weights and wait time calculation
zsfelfoldi May 8, 2020
8810945
p2p/nodestate: use uint operands for bit shift
zsfelfoldi May 8, 2020
bc628b9
les: remove unnecessary connectedStats flag reset
zsfelfoldi May 10, 2020
b4aed55
les, les/lespay/client: more elegant pre-neg filter
zsfelfoldi May 14, 2020
94315f7
les: add recovery mechanism for UDP not working
zsfelfoldi May 14, 2020
0d34da1
les: removed mclock rtc
zsfelfoldi May 15, 2020
1cc3502
p2p/nodestate: removed mapping conversion, added simple version checking
zsfelfoldi May 15, 2020
478f506
les: drop known node if redialWait becomes extremely long
zsfelfoldi May 15, 2020
33928fe
eth: switched ethEntry back to lower case
zsfelfoldi May 15, 2020
ee8eb42
les: start serverPool first
zsfelfoldi May 15, 2020
816b90b
les: fixed redialWait calculation
zsfelfoldi May 15, 2020
4da949e
les: add mixer timeout
zsfelfoldi May 18, 2020
6041177
p2p/nodestate: fix duplicated flags
rjl493456442 May 20, 2020
0106fae
les/utils: fixed ExpirationFactor.Value overflow error
zsfelfoldi May 21, 2020
5749d01
les: made serverpool test safer
zsfelfoldi May 21, 2020
ab6c8bc
les: make dial timeout safe
zsfelfoldi May 21, 2020
85a22fb
les: more readable redialWait logic
zsfelfoldi May 21, 2020
d038392
les: changed flag names
zsfelfoldi May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,19 +1563,19 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.NetworkId = 3
}
cfg.Genesis = core.DefaultRopstenGenesisBlock()
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.RopstenGenesisHash])
setDNSDiscoveryDefaults(cfg, params.RopstenGenesisHash)
case ctx.GlobalBool(RinkebyFlag.Name):
if !ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = 4
}
cfg.Genesis = core.DefaultRinkebyGenesisBlock()
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.RinkebyGenesisHash])
setDNSDiscoveryDefaults(cfg, params.RinkebyGenesisHash)
case ctx.GlobalBool(GoerliFlag.Name):
if !ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = 5
}
cfg.Genesis = core.DefaultGoerliGenesisBlock()
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.GoerliGenesisHash])
setDNSDiscoveryDefaults(cfg, params.GoerliGenesisHash)
case ctx.GlobalBool(DeveloperFlag.Name):
if !ctx.GlobalIsSet(NetworkIdFlag.Name) {
cfg.NetworkId = 1337
Expand Down Expand Up @@ -1604,18 +1604,25 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
}
default:
if cfg.NetworkId == 1 {
setDNSDiscoveryDefaults(cfg, params.KnownDNSNetworks[params.MainnetGenesisHash])
setDNSDiscoveryDefaults(cfg, params.MainnetGenesisHash)
}
}
}

// setDNSDiscoveryDefaults configures DNS discovery with the given URL if
// no URLs are set.
func setDNSDiscoveryDefaults(cfg *eth.Config, url string) {
func setDNSDiscoveryDefaults(cfg *eth.Config, genesis common.Hash) {
if cfg.DiscoveryURLs != nil {
return
return // already set through flags/config
}

protocol := "eth"
if cfg.SyncMode == downloader.LightSync {
protocol = "les"
}
if url := params.KnownDNSNetwork(genesis, protocol); url != "" {
cfg.DiscoveryURLs = []string{url}
}
cfg.DiscoveryURLs = []string{url}
}

// RegisterEthService adds an Ethereum client to the stack.
Expand Down
18 changes: 15 additions & 3 deletions core/forkid/forkid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)
Expand All @@ -44,6 +44,18 @@ var (
ErrLocalIncompatibleOrStale = errors.New("local incompatible or needs update")
)

// Blockchain defines all necessary method to build a forkID.
type Blockchain interface {
// Config retrieves the chain's fork configuration.
Config() *params.ChainConfig

// Genesis retrieves the chain's genesis block.
Genesis() *types.Block

// CurrentHeader retrieves the current head header of the canonical chain.
CurrentHeader() *types.Header
}

// ID is a fork identifier as defined by EIP-2124.
type ID struct {
Hash [4]byte // CRC32 checksum of the genesis block and passed fork block numbers
Expand All @@ -54,7 +66,7 @@ type ID struct {
type Filter func(id ID) error

// NewID calculates the Ethereum fork ID from the chain config and head.
func NewID(chain *core.BlockChain) ID {
func NewID(chain Blockchain) ID {
return newID(
chain.Config(),
chain.Genesis().Hash(),
Expand Down Expand Up @@ -85,7 +97,7 @@ func newID(config *params.ChainConfig, genesis common.Hash, head uint64) ID {

// NewFilter creates a filter that returns if a fork ID should be rejected or not
// based on the local chain's status.
func NewFilter(chain *core.BlockChain) Filter {
func NewFilter(chain Blockchain) Filter {
return newFilter(
chain.Config(),
chain.Genesis().Hash(),
Expand Down
6 changes: 3 additions & 3 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Ethereum struct {
blockchain *core.BlockChain
protocolManager *ProtocolManager
lesServer LesServer
dialCandiates enode.Iterator
dialCandidates enode.Iterator

// DB interfaces
chainDb ethdb.Database // Block chain database
Expand Down Expand Up @@ -226,7 +226,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

eth.dialCandiates, err = eth.setupDiscovery(&ctx.Config.P2P)
eth.dialCandidates, err = eth.setupDiscovery(&ctx.Config.P2P)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -523,7 +523,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
for i, vsn := range ProtocolVersions {
protos[i] = s.protocolManager.makeProtocol(vsn)
protos[i].Attributes = []enr.Entry{s.currentEthEntry()}
protos[i].DialCandidates = s.dialCandiates
protos[i].DialCandidates = s.dialCandidates
}
if s.lesServer != nil {
protos = append(protos, s.lesServer.Protocols()...)
Expand Down
54 changes: 30 additions & 24 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ import (
type LightEthereum struct {
lesCommons

peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *lpc.ValueTracker
peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *lpc.ValueTracker
dialCandidates enode.Iterator

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
Expand Down Expand Up @@ -104,11 +105,19 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, false, chainDb),
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
serverPool: newServerPool(chainDb, config.UltraLightServers),
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
}
peers.subscribe((*vtSubscription)(leth.valueTracker))
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)

dnsdisc, err := leth.setupDiscovery(&ctx.Config.P2P)
if err != nil {
return nil, err
}
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, dnsdisc, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator

leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
leth.relay = newLesTxRelay(peers, leth.retriever)

leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
Expand Down Expand Up @@ -140,11 +149,6 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
leth.chtIndexer.Start(leth.blockchain)
leth.bloomIndexer.Start(leth.blockchain)

leth.handler = newClientHandler(config.UltraLightServers, config.UltraLightFraction, checkpoint, leth)
if leth.handler.ulc != nil {
log.Warn("Ultra light client is enabled", "trustedNodes", len(leth.handler.ulc.keys), "minTrustedFraction", leth.handler.ulc.fraction)
leth.blockchain.DisableCheckFreq()
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
Expand All @@ -159,6 +163,11 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}
leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams)

leth.handler = newClientHandler(config.UltraLightServers, config.UltraLightFraction, checkpoint, leth)
if leth.handler.ulc != nil {
log.Warn("Ultra light client is enabled", "trustedNodes", len(leth.handler.ulc.keys), "minTrustedFraction", leth.handler.ulc.fraction)
leth.blockchain.DisableCheckFreq()
}
return leth, nil
}

Expand Down Expand Up @@ -260,30 +269,29 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
return p.Info()
}
return nil
})
}, s.dialCandidates)
}

// Start implements node.Service, starting all internal goroutines needed by the
// light ethereum protocol implementation.
func (s *LightEthereum) Start(srvr *p2p.Server) error {
log.Warn("Light client mode is an experimental feature")

s.serverPool.start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
s.startBloomHandlers(params.BloomBitsBlocksClient)

s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.config.NetworkId)

// clients are searching for the first advertised protocol in the list
protocolVersion := AdvertiseProtocolVersions[0]
s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash(), protocolVersion))
return nil
}

// Stop implements node.Service, terminating all internal goroutines used by the
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.serverPool.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it will break something for changing the order of stop.

s.valueTracker.Stop()
s.peers.close()
s.reqDist.close()
s.odr.Stop()
Expand All @@ -295,8 +303,6 @@ func (s *LightEthereum) Stop() error {
s.txPool.Stop()
s.engine.Close()
s.eventMux.Stop()
s.serverPool.stop()
s.valueTracker.Stop()
s.chainDb.Close()
s.wg.Wait()
log.Info("Light ethereum stopped")
Expand Down
11 changes: 1 addition & 10 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
if checkpoint != nil {
height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
}
handler.fetcher = newLightFetcher(handler)
handler.fetcher = newLightFetcher(handler, backend.serverPool.getTimeout)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler
Expand All @@ -85,14 +85,9 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
}
peer := newServerPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version)))
defer peer.close()
peer.poolEntry = h.backend.serverPool.connect(peer, peer.Node())
if peer.poolEntry == nil {
return p2p.DiscRequested
}
h.wg.Add(1)
defer h.wg.Done()
err := h.handle(peer)
h.backend.serverPool.disconnect(peer.poolEntry)
return err
}

Expand Down Expand Up @@ -129,10 +124,6 @@ func (h *clientHandler) handle(p *serverPeer) error {

h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td})

// pool entry can be nil during the unit test.
if p.poolEntry != nil {
h.backend.serverPool.registered(p.poolEntry)
}
// Mark the peer starts to be served.
atomic.StoreUint32(&p.serving, 1)
defer atomic.StoreUint32(&p.serving, 0)
Expand Down
5 changes: 3 additions & 2 deletions les/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type NodeInfo struct {
}

// makeProtocols creates protocol descriptors for the given LES versions.
func (c *lesCommons) makeProtocols(versions []uint, runPeer func(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error, peerInfo func(id enode.ID) interface{}) []p2p.Protocol {
func (c *lesCommons) makeProtocols(versions []uint, runPeer func(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error, peerInfo func(id enode.ID) interface{}, dialCandidates enode.Iterator) []p2p.Protocol {
protos := make([]p2p.Protocol, len(versions))
for i, version := range versions {
version := version
Expand All @@ -93,7 +93,8 @@ func (c *lesCommons) makeProtocols(versions []uint, runPeer func(version uint, p
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
return runPeer(version, peer, rw)
},
PeerInfo: peerInfo,
PeerInfo: peerInfo,
DialCandidates: dialCandidates,
}
}
return protos
Expand Down
11 changes: 5 additions & 6 deletions les/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,11 @@ func (d *requestDistributor) loop() {
type selectPeerItem struct {
peer distPeer
req *distReq
weight int64
weight uint64
}

// Weight implements wrsItem interface
func (sp selectPeerItem) Weight() int64 {
return sp.weight
func selectPeerWeight(i interface{}) uint64 {
return i.(selectPeerItem).weight
}

// nextRequest returns the next possible request from any peer, along with the
Expand Down Expand Up @@ -220,9 +219,9 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
wait, bufRemain := peer.waitBefore(cost)
if wait == 0 {
if sel == nil {
sel = utils.NewWeightedRandomSelect()
sel = utils.NewWeightedRandomSelect(selectPeerWeight)
}
sel.Update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
sel.Update(selectPeerItem{peer: peer, req: req, weight: uint64(bufRemain*1000000) + 1})
} else {
if bestWait == 0 || wait < bestWait {
bestWait = wait
Expand Down
12 changes: 12 additions & 0 deletions les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package les

import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)

Expand All @@ -30,3 +33,12 @@ type lesEntry struct {
func (e lesEntry) ENRKey() string {
return "les"
}

// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
if /*cfg.NoDiscovery || */ len(eth.config.DiscoveryURLs) == 0 {
return nil, nil
}
client := dnsdisc.NewClient(dnsdisc.Config{})
return client.NewIterator(eth.config.DiscoveryURLs...)
}
Loading