Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: move server pool to les/vflux/client #22377

Merged
merged 6 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var Modules = map[string]string{
"swarmfs": SwarmfsJs,
"txpool": TxpoolJs,
"les": LESJs,
"lespay": LESPayJs,
"vflux": VfluxJs,
}

const ChequebookJs = `
Expand Down Expand Up @@ -877,32 +877,32 @@ web3._extend({
});
`

const LESPayJs = `
const VfluxJs = `
web3._extend({
property: 'lespay',
property: 'vflux',
methods:
[
new web3._extend.Method({
name: 'distribution',
call: 'lespay_distribution',
call: 'vflux_distribution',
params: 2
}),
new web3._extend.Method({
name: 'timeout',
call: 'lespay_timeout',
call: 'vflux_timeout',
params: 2
}),
new web3._extend.Method({
name: 'value',
call: 'lespay_value',
call: 'vflux_value',
params: 2
}),
],
properties:
[
new web3._extend.Property({
name: 'requestStats',
getter: 'lespay_requestStats'
getter: 'vflux_requestStats'
}),
]
});
Expand Down
38 changes: 8 additions & 30 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ type LightEthereum struct {
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *serverPool
valueTracker *vfc.ValueTracker
serverPool *vfc.ServerPool
dialCandidates enode.Iterator
pruner *pruner

Expand Down Expand Up @@ -109,17 +108,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
valueTracker: vfc.NewValueTracker(lesDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
}
peers.subscribe((*vtSubscription)(leth.valueTracker))

leth.serverPool = newServerPool(lesDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
peers.subscribe(leth.serverPool)
leth.dialCandidates = leth.serverPool.dialIterator
leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)

leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
leth.relay = newLesTxRelay(peers, leth.retriever)

leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
Expand Down Expand Up @@ -193,23 +189,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
return leth, nil
}

// vtSubscription implements serverPeerSubscriber
type vtSubscription vfc.ValueTracker

// registerPeer implements serverPeerSubscriber
func (v *vtSubscription) registerPeer(p *serverPeer) {
vt := (*vfc.ValueTracker)(v)
p.setValueTracker(vt, vt.Register(p.ID()))
p.updateVtParams()
}

// unregisterPeer implements serverPeerSubscriber
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
vt := (*vfc.ValueTracker)(v)
vt.Unregister(p.ID())
p.setValueTracker(nil, nil)
}

type LightDummyAPI struct{}

// Etherbase is the address that mining rewards will be send to
Expand Down Expand Up @@ -266,7 +245,7 @@ func (s *LightEthereum) APIs() []rpc.API {
}, {
Namespace: "vflux",
Version: "1.0",
Service: vfc.NewPrivateClientAPI(s.valueTracker),
Service: s.serverPool.API(),
Public: false,
},
}...)
Expand Down Expand Up @@ -302,8 +281,8 @@ func (s *LightEthereum) Start() error {
if err != nil {
return err
}
s.serverPool.addSource(discovery)
s.serverPool.start()
s.serverPool.AddSource(discovery)
s.serverPool.Start()
// Start bloom request workers.
s.wg.Add(bloomServiceThreads)
s.startBloomHandlers(params.BloomBitsBlocksClient)
Expand All @@ -316,8 +295,7 @@ func (s *LightEthereum) Start() error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.serverPool.stop()
s.valueTracker.Stop()
s.serverPool.Stop()
s.peers.close()
s.reqDist.close()
s.odr.Stop()
Expand Down
14 changes: 14 additions & 0 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,25 @@ func (h *clientHandler) handle(p *serverPeer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}
// Register peer with the server pool
if h.backend.serverPool != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that the serverpool can be nil? Perhaps in the testing?

if nvt, err := h.backend.serverPool.RegisterNode(p.Node()); err == nil {
p.setValueTracker(nvt)
Copy link
Member

@rjl493456442 rjl493456442 Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you consider that we can move the "valueTracker" notion into the vflux/client totally?

IIUC, we just need to feed some statistics for the valueTracker and all the magics can be hidden in the vflux/client. The benefit is we can totally get rid of the nodeValueTracker *vfc.NodeValueTracker in the peer.

Copy link
Member

@rjl493456442 rjl493456442 Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can simplify the logic like

// Register peer with the server pool
if err := h.backend.serverPool.RegisterNode(p.Node()); err != nil {
	return err
}
defer h.backend.serverPool.UnregisterNode(p.Node())

p.updateVtParams()
defer func() {
p.setValueTracker(nil)
h.backend.serverPool.UnregisterNode(p.Node())
}()
} else {
return err
}
}
// Register the peer locally
if err := h.backend.peers.register(p); err != nil {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}

serverConnectionGauge.Update(int64(h.backend.peers.len()))

connectedAt := mclock.Now()
Expand Down
9 changes: 3 additions & 6 deletions les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ type serverPeer struct {

fcServer *flowcontrol.ServerNode // Client side mirror token bucket.
vtLock sync.Mutex
valueTracker *vfc.ValueTracker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove this nodeValueTracker out of the serverPeer.

nodeValueTracker *vfc.NodeValueTracker
sentReqs map[uint64]sentReqEntry

Expand Down Expand Up @@ -676,9 +675,8 @@ func (p *serverPeer) Handshake(genesis common.Hash, forkid forkid.ID, forkFilter

// setValueTracker sets the value tracker references for connected servers. Note that the
// references should be removed upon disconnection by setValueTracker(nil, nil).
func (p *serverPeer) setValueTracker(vt *vfc.ValueTracker, nvt *vfc.NodeValueTracker) {
func (p *serverPeer) setValueTracker(nvt *vfc.NodeValueTracker) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this function.

p.vtLock.Lock()
p.valueTracker = vt
p.nodeValueTracker = nvt
if nvt != nil {
p.sentReqs = make(map[uint64]sentReqEntry)
Expand All @@ -705,7 +703,7 @@ func (p *serverPeer) updateVtParams() {
}
}
}
p.valueTracker.UpdateCosts(p.nodeValueTracker, reqCosts)
p.nodeValueTracker.UpdateCosts(reqCosts)
}

// sentReqEntry remembers sent requests and their sending times
Expand All @@ -732,7 +730,6 @@ func (p *serverPeer) answeredRequest(id uint64) {
}
e, ok := p.sentReqs[id]
delete(p.sentReqs, id)
vt := p.valueTracker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function can be modified a bit, which only returns the serving time. All the value calculation work can be thrown to vflux/client/serverpool.go

nvt := p.nodeValueTracker
p.vtLock.Unlock()
if !ok {
Expand All @@ -752,7 +749,7 @@ func (p *serverPeer) answeredRequest(id uint64) {
vtReqs[1] = vfc.ServedRequest{ReqType: uint32(m.rest), Amount: e.amount - 1}
}
dt := time.Duration(mclock.Now() - e.at)
vt.Served(nvt, vtReqs[:reqCount], dt)
nvt.Served(vtReqs[:reqCount], dt)
}

// clientPeer represents each node to which the les server is connected.
Expand Down
11 changes: 0 additions & 11 deletions les/vflux/client/queueiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nodestate"
)

func testNodeID(i int) enode.ID {
return enode.ID{42, byte(i % 256), byte(i / 256)}
}

func testNodeIndex(id enode.ID) int {
if id[0] != 42 {
return -1
}
return int(id[1]) + int(id[2])*256
}

func testNode(i int) *enode.Node {
return enode.SignNull(new(enr.Record), testNodeID(i))
}
Expand Down
Loading