Skip to content

Commit

Permalink
[release-v2.0] Attempt cxns to >=3 mixing-capable peers
Browse files Browse the repository at this point in the history
Similar to dcrd, we want SPV wallets to maintain a minimum number of peers
that support the mixing protocol version.  This will be especially important
after dcrwallet is updated to a mixclient branch that submits PR messages over
the entire duration before the epoch occurs, as any errors for failed message
sends due to not having a single mixing-capable peer connected would be
delayed far into the future.

Backport of 62c10d5.
  • Loading branch information
jrick committed Jun 19, 2024
1 parent fee6056 commit 7830dd6
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ import (
// they do not provide each of these services.
const reqSvcs = wire.SFNodeNetwork

const (
targetPeerCount = 8

// Require a subset of all connected outbounded peers to meet some
// minimum protocol version.
minVersion = wire.MixVersion
minVersionTarget = 3
)

// Syncer implements wallet synchronization services by over the Decred wire
// protocol using Simplified Payment Verification (SPV) with compact filters.
type Syncer struct {
Expand Down Expand Up @@ -451,9 +460,12 @@ func (s *Syncer) peerCandidate(svcs wire.ServiceFlag) (*addrmgr.NetAddress, erro
return nil, errors.New("no addresses")
}

var errBreaksMinVersionTarget = errors.New("peer uses too low version to satisify " +
"minimum target version count")

// connectAndRunPeer connects to and runs the syncing process with the specified
// peer. It blocks until the peer disconnects and logs any errors.
func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string) {
func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent bool) {
// Attempt connection to peer.
rp, err := s.lp.ConnectOutbound(ctx, raddr, reqSvcs)
if err != nil {
Expand All @@ -467,18 +479,24 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string) {
}
return
}
log.Infof("New peer %v %v %v", raddr, rp.UA(), rp.Services())

// Track peer as running as opposed to attempting connection.
s.remotesMu.Lock()
delete(s.connectingRemotes, raddr)
if !persistent && s.breaksMinVersionTarget(rp) {
s.remotesMu.Unlock()
log.Debugf("Disconnecting %v: %v", raddr, errBreaksMinVersionTarget)
rp.Disconnect(errBreaksMinVersionTarget)
return
}
s.remotes[raddr] = rp
n := len(s.remotes)
if s.remoteAvailable != nil {
close(s.remoteAvailable)
s.remoteAvailable = nil
}
s.remotesMu.Unlock()
log.Infof("New peer %v %v version=%d %v", raddr, rp.UA(), rp.Pver(), rp.Services())
s.peerConnected(n, raddr)

// Alert disconnection once this peer is done.
Expand Down Expand Up @@ -514,9 +532,29 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string) {
}
}

func (s *Syncer) breaksMinVersionTarget(rp *p2p.RemotePeer) bool {
if rp.Pver() >= minVersion {
return false
}
n := len(s.remotes)
if n < targetPeerCount-minVersionTarget {
return false
}
var meetsMin int
for _, rp := range s.remotes {
if rp.Pver() >= minVersion {
meetsMin++
if meetsMin == minVersionTarget {
return false
}
}
}
return true
}

func (s *Syncer) connectToPersistent(ctx context.Context, raddr string) error {
for {
s.connectAndRunPeer(ctx, raddr)
s.connectAndRunPeer(ctx, raddr, true)

// Retry persistent peer after 5 seconds.
select {
Expand All @@ -531,7 +569,7 @@ func (s *Syncer) connectToCandidates(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()

sem := make(chan struct{}, 8)
sem := make(chan struct{}, targetPeerCount)
for {
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -555,7 +593,7 @@ func (s *Syncer) connectToCandidates(ctx context.Context) error {
wg.Add(1)
go func() {
raddr := na.String()
s.connectAndRunPeer(ctx, raddr)
s.connectAndRunPeer(ctx, raddr, false)
wg.Done()
<-sem
}()
Expand Down

0 comments on commit 7830dd6

Please sign in to comment.