Skip to content

Commit

Permalink
les: remove LesVersion, add safety checks
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Feb 27, 2021
1 parent c18c8d0 commit 5550b81
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 17 deletions.
21 changes: 11 additions & 10 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,30 +203,31 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R
return replies
}

// enrVersion returns the les and vflux UDP versions advertised in the ENR record
func (s *LightEthereum) enrVersion(n *enode.Node) (lesVersion, vfxVersion uint) {
// vfxVersion returns the version number of the "les" service subdomain of the vflux UDP
// service, as advertised in the ENR record
func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
if n.Seq() == 0 {
var err error
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
s.serverPool.Persist(n)
} else {
return 0, 0
return 0
}
}

var les []rlp.RawValue
if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 2 {
return 0, 0
if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 1 {
return 0
}
rlp.DecodeBytes(les[0], &lesVersion)
rlp.DecodeBytes(les[1], &vfxVersion)
return
var version uint
rlp.DecodeBytes(les[0], &version) // Ignore additional fields (for forward compatibility).
return version
}

// prenegQuery sends a capacity query to the given server node to determine whether
// a connection slot is immediately available
func (s *LightEthereum) prenegQuery(n *enode.Node) int {
if _, ver := s.enrVersion(n); ver < 1 {
if s.vfxVersion(n) < 1 {
// UDP query not supported, always try TCP connection
return 1
}
Expand All @@ -238,7 +239,7 @@ func (s *LightEthereum) prenegQuery(n *enode.Node) int {
})
replies := s.VfluxRequest(n, requests)
var cqr vflux.CapacityQueryReply
if replies.Get(0, &cqr) != nil || len(cqr) != 1 {
if replies.Get(0, &cqr) != nil || len(cqr) != 1 { // Note: Get returns an error if replies is nil
return -1
}
if cqr[0] > 0 {
Expand Down
4 changes: 2 additions & 2 deletions les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
// lesEntry is the "les" ENR entry. This is set for LES servers only.
type lesEntry struct {
// Ignore additional fields (for forward compatibility).
LesVersion, VfxVersion uint
Rest []rlp.RawValue `rlp:"tail"`
VfxVersion uint
Rest []rlp.RawValue `rlp:"tail"`
}

func (lesEntry) ENRKey() string { return "les" }
Expand Down
1 change: 0 additions & 1 deletion les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func (s *LesServer) Protocols() []p2p.Protocol {
// Add "les" ENR entries.
for i := range ps {
ps[i].Attributes = []enr.Entry{&lesEntry{
LesVersion: ServerProtocolVersions[len(ServerProtocolVersions)-1],
VfxVersion: 1,
}}
}
Expand Down
7 changes: 5 additions & 2 deletions les/vflux/server/prioritypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,11 @@ func (pp *PriorityPool) GetCapacityCurve() *CapacityCurve {
// reduce node capacities or remove nodes until nothing is left in the queue;
// record the available capacity and the necessary priority after each step
for pp.activeCap > 0 {
cp := curvePoint{
freeCap: pp.maxCap - pp.activeCap,
cp := curvePoint{}
if pp.activeCap > pp.maxCap {
log.Error("Active capacity is greater than allowed maximum", "active", pp.activeCap, "maximum", pp.maxCap)
} else {
cp.freeCap = pp.maxCap - pp.activeCap
}
// temporarily increase activeCap to enforce reducing or removing a node capacity
tempCap := cp.freeCap + 1
Expand Down
10 changes: 8 additions & 2 deletions les/vflux/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package server
import (
"net"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/les/utils"
Expand All @@ -32,6 +33,7 @@ type (
// Server serves vflux requests
Server struct {
limiter *utils.Limiter
lock sync.Mutex
services map[string]*serviceEntry
delayPerRequest time.Duration
}
Expand Down Expand Up @@ -66,7 +68,9 @@ func (s *Server) Register(b Service) {
log.Error("Service ID contains ':'", "id", srv.id)
return
}
s.lock.Lock()
s.services[srv.id] = srv
s.lock.Unlock()
}

// Serve serves a vflux request batch
Expand All @@ -83,15 +87,17 @@ func (s *Server) Serve(id enode.ID, address string, requests vflux.Requests) vfl
if ch == nil {
return nil
}
// Note: the following section is protected from concurrency by the limiter
// Note: the limiter ensures that the following section is not running concurrently,
// the lock only protects against contention caused by new service registration
s.lock.Lock()
results := make(vflux.Replies, len(requests))
for i, req := range requests {
if service := s.services[req.Service]; service != nil {
results[i] = service.backend.Handle(id, address, req.Name, req.Params)
}
}
s.lock.Unlock()
time.Sleep(s.delayPerRequest * time.Duration(reqLen))
// The protected section ends by closing the channel and thereby allowing the limiter to start the next request
close(ch)
return results
}
Expand Down

0 comments on commit 5550b81

Please sign in to comment.