diff --git a/addrmgr/addrmanager.go b/addrmgr/addrmanager.go index 4dc8bdbd75..fc6930688b 100644 --- a/addrmgr/addrmanager.go +++ b/addrmgr/addrmanager.go @@ -1,4 +1,5 @@ // Copyright (c) 2013-2016 The btcsuite developers +// Copyright (c) 2015-2018 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -937,6 +938,25 @@ func (a *AddrManager) Good(addr *wire.NetAddress) { a.addrNew[newBucket][rmkey] = rmka } +// SetServices sets the services for the giiven address to the provided value. +func (a *AddrManager) SetServices(addr *wire.NetAddress, services wire.ServiceFlag) { + a.mtx.Lock() + defer a.mtx.Unlock() + + ka := a.find(addr) + if ka == nil { + return + } + + // Update the services if needed. + if ka.na.Services != services { + // ka.na is immutable, so replace it. + naCopy := *ka.na + naCopy.Services = services + ka.na = &naCopy + } +} + // AddLocalAddress adds na to the list of known local addresses to advertise // with the given priority. func (a *AddrManager) AddLocalAddress(na *wire.NetAddress, priority AddressPriority) error { diff --git a/peer/example_test.go b/peer/example_test.go index a549fe21ec..cb67683bae 100644 --- a/peer/example_test.go +++ b/peer/example_test.go @@ -1,4 +1,5 @@ -// Copyright (c) 2015-2016 The btcsuite developers +// Copyright (c) 2015-2018 The btcsuite developers +// Copyright (c) 2016-2018 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -72,8 +73,9 @@ func Example_newOutboundPeer() { Services: 0, TrickleInterval: time.Second * 10, Listeners: peer.MessageListeners{ - OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) { + OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject { fmt.Println("outbound: received version") + return nil }, OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { verack <- struct{}{} diff --git a/peer/peer.go b/peer/peer.go index 4fa77cf741..61bc619ffb 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -34,9 +34,9 @@ const ( // inv message to a peer. DefaultTrickleInterval = 10 * time.Second - // minAcceptableProtocolVersion is the lowest protocol version that a + // MinAcceptableProtocolVersion is the lowest protocol version that a // connected peer may support. - minAcceptableProtocolVersion = wire.MultipleAddressVersion + MinAcceptableProtocolVersion = wire.MultipleAddressVersion // outputBufferSize is the number of elements the output channels use. outputBufferSize = 50 @@ -187,7 +187,9 @@ type MessageListeners struct { OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock) // OnVersion is invoked when a peer receives a version bitcoin message. - OnVersion func(p *Peer, msg *wire.MsgVersion) + // The caller may return a reject message in which case the message will + // be sent to the peer and the peer will be disconnected. + OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject // OnVerAck is invoked when a peer receives a verack bitcoin message. OnVerAck func(p *Peer, msg *wire.MsgVerAck) @@ -1369,7 +1371,7 @@ out: p.stallControl <- stallControlMsg{sccHandlerStart, rmsg} switch msg := rmsg.(type) { case *wire.MsgVersion: - + // Limit to one version message per peer. p.PushRejectMsg(msg.Command(), wire.RejectDuplicate, "duplicate version message", nil, true) break out @@ -1875,26 +1877,42 @@ func (p *Peer) Disconnect() { close(p.quit) } -// handleRemoteVersionMsg is invoked when a version bitcoin message is received -// from the remote peer. It will return an error if the remote peer's version -// is not compatible with ours. -func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { +// readRemoteVersionMsg waits for the next message to arrive from the remote +// peer. If the next message is not a version message or the version is not +// acceptable then return an error. +func (p *Peer) readRemoteVersionMsg() error { + // Read their version message. + remoteMsg, _, err := p.readMessage(wire.LatestEncoding) + if err != nil { + return err + } + + // Notify and disconnect clients if the first message is not a version + // message. + msg, ok := remoteMsg.(*wire.MsgVersion) + if !ok { + reason := "a version message must precede all others" + rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed, + reason) + _ = p.writeMessage(rejectMsg, wire.LatestEncoding) + return errors.New(reason) + } + // Detect self connections. if !allowSelfConns && sentNonces.Exists(msg.Nonce) { return errors.New("disconnecting peer connected to self") } - // Notify and disconnect clients that have a protocol version that is - // too old. - // - // NOTE: If minAcceptableProtocolVersion is raised to be higher than - // wire.RejectVersion, this should send a reject packet before - // disconnecting. - if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion { - reason := fmt.Sprintf("protocol version must be %d or greater", - minAcceptableProtocolVersion) - return errors.New(reason) - } + // Negotiate the protocol version and set the services to what the remote + // peer advertised. + p.flagsMtx.Lock() + p.advertisedProtoVer = uint32(msg.ProtocolVersion) + p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer) + p.versionKnown = true + p.services = msg.Services + p.flagsMtx.Unlock() + log.Debugf("Negotiated protocol version %d for peer %s", + p.protocolVersion, p) // Updating a bunch of stats including block based stats, and the // peer's time offset. @@ -1904,22 +1922,10 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() p.statsMtx.Unlock() - // Negotiate the protocol version. + // Set the peer's ID, user agent, and potentially the flag which + // specifies the witness support is enabled. p.flagsMtx.Lock() - p.advertisedProtoVer = uint32(msg.ProtocolVersion) - p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer) - p.versionKnown = true - log.Debugf("Negotiated protocol version %d for peer %s", - p.protocolVersion, p) - - // Set the peer's ID. p.id = atomic.AddInt32(&nodeCount, 1) - - // Set the supported services for the peer to what the remote peer - // advertised. - p.services = msg.Services - - // Set the remote peer's user agent. p.userAgent = msg.UserAgent // Determine if the peer would like to receive witness data with @@ -1938,36 +1944,33 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { p.wireEncoding = wire.WitnessEncoding } - return nil -} - -// readRemoteVersionMsg waits for the next message to arrive from the remote -// peer. If the next message is not a version message or the version is not -// acceptable then return an error. -func (p *Peer) readRemoteVersionMsg() error { - // Read their version message. - msg, _, err := p.readMessage(wire.LatestEncoding) - if err != nil { - return err - } - - remoteVerMsg, ok := msg.(*wire.MsgVersion) - if !ok { - errStr := "A version message must precede all others" - log.Errorf(errStr) - - rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed, - errStr) - return p.writeMessage(rejectMsg, wire.LatestEncoding) + // Invoke the callback if specified. + if p.cfg.Listeners.OnVersion != nil { + rejectMsg := p.cfg.Listeners.OnVersion(p, msg) + if rejectMsg != nil { + _ = p.writeMessage(rejectMsg, wire.LatestEncoding) + return errors.New(rejectMsg.Reason) + } } - if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil { - return err + // Notify and disconnect clients that have a protocol version that is + // too old. + // + // NOTE: If minAcceptableProtocolVersion is raised to be higher than + // wire.RejectVersion, this should send a reject packet before + // disconnecting. + if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion { + // Send a reject message indicating the protocol version is + // obsolete and wait for the message to be sent before + // disconnecting. + reason := fmt.Sprintf("protocol version must be %d or greater", + MinAcceptableProtocolVersion) + rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete, + reason) + _ = p.writeMessage(rejectMsg, wire.LatestEncoding) + return errors.New(reason) } - if p.cfg.Listeners.OnVersion != nil { - p.cfg.Listeners.OnVersion(p, remoteVerMsg) - } return nil } @@ -1992,7 +1995,8 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy) // invalid proxy means poorly configured, be on the safe side. if err != nil || p.na.IP.String() == proxyaddress { - theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0) + theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, + theirNA.Services) } } @@ -2020,25 +2024,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion, p.cfg.UserAgentComments...) - // XXX: bitcoind appears to always enable the full node services flag - // of the remote peer netaddress field in the version message regardless - // of whether it knows it supports it or not. Also, bitcoind sets - // the services field of the local peer to 0 regardless of support. - // - // Realistically, this should be set as follows: - // - For outgoing connections: - // - Set the local netaddress services to what the local peer - // actually supports - // - Set the remote netaddress services to 0 to indicate no services - // as they are still unknown - // - For incoming connections: - // - Set the local netaddress services to what the local peer - // actually supports - // - Set the remote netaddress services to the what was advertised by - // by the remote peer in its version message - msg.AddrYou.Services = wire.SFNodeNetwork - - // Advertise the services flag + // Advertise local services. msg.Services = p.cfg.Services // Advertise our max supported protocol version. @@ -2099,9 +2085,11 @@ func (p *Peer) start() error { select { case err := <-negotiateErr: if err != nil { + p.Disconnect() return err } case <-time.After(negotiateTimeout): + p.Disconnect() return errors.New("protocol negotiation timeout") } log.Debugf("Connected to %s", p.Addr()) @@ -2224,14 +2212,13 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) { } if cfg.HostToNetAddress != nil { - na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services) + na, err := cfg.HostToNetAddress(host, uint16(port), 0) if err != nil { return nil, err } p.na = na } else { - p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), - cfg.Services) + p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0) } return p, nil diff --git a/peer/peer_test.go b/peer/peer_test.go index daf87ab810..fff0ce3fd8 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -1,4 +1,5 @@ // Copyright (c) 2015-2016 The btcsuite developers +// Copyright (c) 2016-2018 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -431,8 +432,9 @@ func TestPeerListeners(t *testing.T) { OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) { ok <- msg }, - OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) { + OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject { ok <- msg + return nil }, OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { verack <- struct{}{} @@ -856,6 +858,65 @@ func TestUnsupportedVersionPeer(t *testing.T) { } } +// TestDuplicateVersionMsg ensures that receiving a version message after one +// has already been received results in the peer being disconnected. +func TestDuplicateVersionMsg(t *testing.T) { + // Create a pair of peers that are connected to each other using a fake + // connection. + verack := make(chan struct{}) + peerCfg := &peer.Config{ + Listeners: peer.MessageListeners{ + OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { + verack <- struct{}{} + }, + }, + UserAgentName: "peer", + UserAgentVersion: "1.0", + ChainParams: &chaincfg.MainNetParams, + Services: 0, + } + inConn, outConn := pipe( + &conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"}, + &conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"}, + ) + outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr) + if err != nil { + t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err) + } + outPeer.AssociateConnection(outConn) + inPeer := peer.NewInboundPeer(peerCfg) + inPeer.AssociateConnection(inConn) + // Wait for the veracks from the initial protocol version negotiation. + for i := 0; i < 2; i++ { + select { + case <-verack: + case <-time.After(time.Second): + t.Fatal("verack timeout") + } + } + // Queue a duplicate version message from the outbound peer and wait until + // it is sent. + done := make(chan struct{}) + outPeer.QueueMessage(&wire.MsgVersion{}, done) + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("send duplicate version timeout") + } + // Ensure the peer that is the recipient of the duplicate version closes the + // connection. + disconnected := make(chan struct{}, 1) + go func() { + inPeer.WaitForDisconnect() + disconnected <- struct{}{} + }() + select { + case <-disconnected: + case <-time.After(time.Second): + t.Fatal("peer did not disconnect") + } +} + func init() { // Allow self connection when running the tests. peer.TstAllowSelfConns() diff --git a/server.go b/server.go index 2e91eee3b8..42f34da5b2 100644 --- a/server.go +++ b/server.go @@ -1,5 +1,5 @@ // Copyright (c) 2013-2017 The btcsuite developers -// Copyright (c) 2015-2017 The Decred developers +// Copyright (c) 2015-2018 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -385,78 +385,113 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) { } } +// hasServices returns whether or not the provided advertised service flags have +// all of the provided desired service flags set. +func hasServices(advertised, desired wire.ServiceFlag) bool { + return advertised&desired == desired +} + // OnVersion is invoked when a peer receives a version bitcoin message // and is used to negotiate the protocol version details as well as kick start // the communications. -func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { - // Add the remote peer time as a sample for creating an offset against - // the local clock to keep the network time in sync. - sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp) - - // Signal the sync manager this peer is a new sync candidate. - sp.server.syncManager.NewPeer(sp.Peer) +func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject { + // Update the address manager with the advertised services for outbound + // connections in case they have changed. This is not done for inbound + // connections to help prevent malicious behavior and is skipped when + // running on the simulation test network since it is only intended to + // connect to specified peers and actively avoids advertising and + // connecting to discovered peers. + // + // NOTE: This is done before rejecting peers that are too old to ensure + // it is updated regardless in the case a new minimum protocol version is + // enforced and the remote node has not upgraded yet. + isInbound := sp.Inbound() + remoteAddr := sp.NA() + addrManager := sp.server.addrManager + if !cfg.SimNet && !isInbound { + addrManager.SetServices(remoteAddr, msg.Services) + } + + // Ignore peers that have a protcol version that is too old. The peer + // negotiation logic will disconnect it after this callback returns. + if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) { + return nil + } - // Choose whether or not to relay transactions before a filter command - // is received. - sp.setDisableRelayTx(msg.DisableRelayTx) + // Reject outbound peers that are not full nodes. + wantServices := wire.SFNodeNetwork + if !isInbound && !hasServices(msg.Services, wantServices) { + missingServices := wantServices & ^msg.Services + srvrLog.Debugf("Rejecting peer %s with services %v due to not "+ + "providing desired services %v", sp.Peer, msg.Services, + missingServices) + reason := fmt.Sprintf("required services %#x not offered", + uint64(missingServices)) + return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason) + } // Update the address manager and request known addresses from the // remote peer for outbound connections. This is skipped when running // on the simulation test network since it is only intended to connect // to specified peers and actively avoids advertising and connecting to // discovered peers. - if !cfg.SimNet { - addrManager := sp.server.addrManager - - // Outbound connections. - if !sp.Inbound() { - // After soft-fork activation, only make outbound - // connection to peers if they flag that they're segwit - // enabled. - chain := sp.server.chain - segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit) - if err != nil { - peerLog.Errorf("Unable to query for segwit "+ - "soft-fork state: %v", err) - return - } - - if segwitActive && !sp.IsWitnessEnabled() { - peerLog.Infof("Disconnecting non-segwit "+ - "peer %v, isn't segwit enabled and "+ - "we need more segwit enabled peers", sp) - sp.Disconnect() - return - } - - // TODO(davec): Only do this if not doing the initial block - // download and the local address is routable. - if !cfg.DisableListen /* && isCurrent? */ { - // Get address that best matches. - lna := addrManager.GetBestLocalAddress(sp.NA()) - if addrmgr.IsRoutable(lna) { - // Filter addresses the peer already knows about. - addresses := []*wire.NetAddress{lna} - sp.pushAddrMsg(addresses) - } - } + if !cfg.SimNet && !isInbound { + // After soft-fork activation, only make outbound + // connection to peers if they flag that they're segwit + // enabled. + chain := sp.server.chain + segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit) + if err != nil { + peerLog.Errorf("Unable to query for segwit soft-fork state: %v", + err) + return nil + } - // Request known addresses if the server address manager needs - // more and the peer has a protocol version new enough to - // include a timestamp with addresses. - hasTimestamp := sp.ProtocolVersion() >= - wire.NetAddressTimeVersion - if addrManager.NeedMoreAddresses() && hasTimestamp { - sp.QueueMessage(wire.NewMsgGetAddr(), nil) + if segwitActive && !sp.IsWitnessEnabled() { + peerLog.Infof("Disconnecting non-segwit peer %v, isn't segwit "+ + "enabled and we need more segwit enabled peers", sp) + sp.Disconnect() + return nil + } + + // Advertise the local address when the server accepts incoming + // connections and it believes itself to be close to the best known tip. + if !cfg.DisableListen && sp.server.syncManager.IsCurrent() { + // Get address that best matches. + lna := addrManager.GetBestLocalAddress(remoteAddr) + if addrmgr.IsRoutable(lna) { + // Filter addresses the peer already knows about. + addresses := []*wire.NetAddress{lna} + sp.pushAddrMsg(addresses) } + } - // Mark the address as a known good address. - addrManager.Good(sp.NA()) + // Request known addresses if the server address manager needs + // more and the peer has a protocol version new enough to + // include a timestamp with addresses. + hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion + if addrManager.NeedMoreAddresses() && hasTimestamp { + sp.QueueMessage(wire.NewMsgGetAddr(), nil) } + + // Mark the address as a known good address. + addrManager.Good(remoteAddr) } + // Add the remote peer time as a sample for creating an offset against + // the local clock to keep the network time in sync. + sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp) + + // Signal the sync manager this peer is a new sync candidate. + sp.server.syncManager.NewPeer(sp.Peer) + + // Choose whether or not to relay transactions before a filter command + // is received. + sp.setDisableRelayTx(msg.DisableRelayTx) + // Add valid peer to the server. sp.server.AddPeer(sp) + return nil } // OnMemPool is invoked when a peer receives a mempool bitcoin message.