Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Use local addr var in version handler. #1256

Merged
20 changes: 20 additions & 0 deletions addrmgr/addrmanager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -937,6 +938,25 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
a.addrNew[newBucket][rmkey] = rmka
}

// SetServices sets the services for the giiven address to the provided value.
func (a *AddrManager) SetServices(addr *wire.NetAddress, services wire.ServiceFlag) {
a.mtx.Lock()
defer a.mtx.Unlock()

ka := a.find(addr)
if ka == nil {
return
}

// Update the services if needed.
if ka.na.Services != services {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this isn't yet useful as we don't currently store the services on disk within the addrmgr. However, it does pave the way for us properly updating the services for a peer as they change overtime once we start to store the services on disk properly.

// ka.na is immutable, so replace it.
naCopy := *ka.na
naCopy.Services = services
ka.na = &naCopy
}
}

// AddLocalAddress adds na to the list of known local addresses to advertise
// with the given priority.
func (a *AddrManager) AddLocalAddress(na *wire.NetAddress, priority AddressPriority) error {
Expand Down
6 changes: 4 additions & 2 deletions peer/example_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2015-2018 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -72,8 +73,9 @@ func Example_newOutboundPeer() {
Services: 0,
TrickleInterval: time.Second * 10,
Listeners: peer.MessageListeners{
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
fmt.Println("outbound: received version")
return nil
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
Expand Down
151 changes: 69 additions & 82 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
// inv message to a peer.
DefaultTrickleInterval = 10 * time.Second

// minAcceptableProtocolVersion is the lowest protocol version that a
// MinAcceptableProtocolVersion is the lowest protocol version that a
// connected peer may support.
minAcceptableProtocolVersion = wire.MultipleAddressVersion
MinAcceptableProtocolVersion = wire.MultipleAddressVersion

// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 50
Expand Down Expand Up @@ -187,7 +187,9 @@ type MessageListeners struct {
OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)

// OnVersion is invoked when a peer receives a version bitcoin message.
OnVersion func(p *Peer, msg *wire.MsgVersion)
// The caller may return a reject message in which case the message will
// be sent to the peer and the peer will be disconnected.
OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject

// OnVerAck is invoked when a peer receives a verack bitcoin message.
OnVerAck func(p *Peer, msg *wire.MsgVerAck)
Expand Down Expand Up @@ -1369,7 +1371,7 @@ out:
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:

// Limit to one version message per peer.
p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
"duplicate version message", nil, true)
break out
Expand Down Expand Up @@ -1875,26 +1877,42 @@ func (p *Peer) Disconnect() {
close(p.quit)
}

// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}

// Notify and disconnect clients if the first message is not a version
// message.
msg, ok := remoteMsg.(*wire.MsgVersion)
if !ok {
reason := "a version message must precede all others"
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}

// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}

// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Negotiate the protocol version and set the services to what the remote
// peer advertised.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
p.services = msg.Services
p.flagsMtx.Unlock()
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)

// Updating a bunch of stats including block based stats, and the
// peer's time offset.
Expand All @@ -1904,22 +1922,10 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()

// Negotiate the protocol version.
// Set the peer's ID, user agent, and potentially the flag which
// specifies the witness support is enabled.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)

// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)

// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services

// Set the remote peer's user agent.
p.userAgent = msg.UserAgent

// Determine if the peer would like to receive witness data with
Expand All @@ -1938,36 +1944,33 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.wireEncoding = wire.WitnessEncoding
}

return nil
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}

remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)

rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
// Invoke the callback if specified.
if p.cfg.Listeners.OnVersion != nil {
rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
if rejectMsg != nil {
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(rejectMsg.Reason)
}
}

if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
// Send a reject message indicating the protocol version is
// obsolete and wait for the message to be sent before
// disconnecting.
reason := fmt.Sprintf("protocol version must be %d or greater",
MinAcceptableProtocolVersion)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}

if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
}
return nil
}

Expand All @@ -1992,7 +1995,8 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
theirNA.Services)
}
}

Expand Down Expand Up @@ -2020,25 +2024,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)

// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork

// Advertise the services flag
// Advertise local services.
msg.Services = p.cfg.Services

// Advertise our max supported protocol version.
Expand Down Expand Up @@ -2099,9 +2085,11 @@ func (p *Peer) start() error {
select {
case err := <-negotiateErr:
if err != nil {
p.Disconnect()
return err
}
case <-time.After(negotiateTimeout):
p.Disconnect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I think this might explain some issues I've seen where the connection count keeps rising (towards max connected), yet the number of stable peers queryable is much lower.

return errors.New("protocol negotiation timeout")
}
log.Debugf("Connected to %s", p.Addr())
Expand Down Expand Up @@ -2224,14 +2212,13 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
}

if cfg.HostToNetAddress != nil {
na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services)
na, err := cfg.HostToNetAddress(host, uint16(port), 0)
if err != nil {
return nil, err
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port),
cfg.Services)
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
}

return p, nil
Expand Down
63 changes: 62 additions & 1 deletion peer/peer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -431,8 +432,9 @@ func TestPeerListeners(t *testing.T) {
OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
ok <- msg
},
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
ok <- msg
return nil
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
Expand Down Expand Up @@ -856,6 +858,65 @@ func TestUnsupportedVersionPeer(t *testing.T) {
}
}

// TestDuplicateVersionMsg ensures that receiving a version message after one
// has already been received results in the peer being disconnected.
func TestDuplicateVersionMsg(t *testing.T) {
// Create a pair of peers that are connected to each other using a fake
// connection.
verack := make(chan struct{})
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
ChainParams: &chaincfg.MainNetParams,
Services: 0,
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr)
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
outPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)
// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second):
t.Fatal("verack timeout")
}
}
// Queue a duplicate version message from the outbound peer and wait until
// it is sent.
done := make(chan struct{})
outPeer.QueueMessage(&wire.MsgVersion{}, done)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("send duplicate version timeout")
}
// Ensure the peer that is the recipient of the duplicate version closes the
// connection.
disconnected := make(chan struct{}, 1)
go func() {
inPeer.WaitForDisconnect()
disconnected <- struct{}{}
}()
select {
case <-disconnected:
case <-time.After(time.Second):
t.Fatal("peer did not disconnect")
}
}

func init() {
// Allow self connection when running the tests.
peer.TstAllowSelfConns()
Expand Down
Loading