diff --git a/chain/dev/defaults.go b/chain/dev/defaults.go index 5999c216fe..ecc3765e24 100644 --- a/chain/dev/defaults.go +++ b/chain/dev/defaults.go @@ -17,9 +17,10 @@ package dev import ( + log "github.com/ChainSafe/log15" + "github.com/ChainSafe/gossamer/lib/genesis" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" - log "github.com/ChainSafe/log15" ) var ( @@ -76,7 +77,7 @@ var ( // NetworkConfig // DefaultNetworkPort network port - DefaultNetworkPort = uint32(7001) + DefaultNetworkPort = uint16(7001) // DefaultNetworkBootnodes network bootnodes DefaultNetworkBootnodes = []string(nil) // DefaultNoBootstrap disables bootstrap diff --git a/chain/gssmr/defaults.go b/chain/gssmr/defaults.go index f672ecf754..762c30e331 100644 --- a/chain/gssmr/defaults.go +++ b/chain/gssmr/defaults.go @@ -19,9 +19,10 @@ package gssmr import ( "time" + log "github.com/ChainSafe/log15" + "github.com/ChainSafe/gossamer/lib/genesis" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" - log "github.com/ChainSafe/log15" ) var ( @@ -78,7 +79,7 @@ var ( // NetworkConfig // DefaultNetworkPort network port - DefaultNetworkPort = uint32(7001) + DefaultNetworkPort = uint16(7001) // DefaultNetworkBootnodes network bootnodes DefaultNetworkBootnodes = []string(nil) // DefaultNoBootstrap disables bootstrap diff --git a/chain/kusama/defaults.go b/chain/kusama/defaults.go index a656f896bb..0c1f2cf6f1 100644 --- a/chain/kusama/defaults.go +++ b/chain/kusama/defaults.go @@ -17,9 +17,10 @@ package kusama import ( + log "github.com/ChainSafe/log15" + "github.com/ChainSafe/gossamer/lib/genesis" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" - log "github.com/ChainSafe/log15" ) var ( @@ -72,7 +73,7 @@ var ( // NetworkConfig // DefaultNetworkPort network port - DefaultNetworkPort = uint32(7001) + DefaultNetworkPort = uint16(7001) // DefaultNetworkBootnodes network bootnodes DefaultNetworkBootnodes = []string(nil) // DefaultNoBootstrap disables bootstrap diff --git a/chain/polkadot/defaults.go b/chain/polkadot/defaults.go index d49ace2cac..b449b24fcd 100644 --- a/chain/polkadot/defaults.go +++ b/chain/polkadot/defaults.go @@ -17,9 +17,10 @@ package polkadot import ( + log "github.com/ChainSafe/log15" + "github.com/ChainSafe/gossamer/lib/genesis" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" - log "github.com/ChainSafe/log15" ) var ( @@ -73,7 +74,7 @@ var ( // NetworkConfig // DefaultNetworkPort network port - DefaultNetworkPort = uint32(7001) + DefaultNetworkPort = uint16(7001) // DefaultNetworkBootnodes network bootnodes DefaultNetworkBootnodes = []string(nil) // DefaultNoBootstrap disables bootstrap diff --git a/cmd/gossamer/config.go b/cmd/gossamer/config.go index 9b8bee0506..1d083972f7 100644 --- a/cmd/gossamer/config.go +++ b/cmd/gossamer/config.go @@ -652,7 +652,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot // check --port flag and update node configuration if port := ctx.GlobalUint(PortFlag.Name); port != 0 { - cfg.Port = uint32(port) + cfg.Port = uint16(port) } // check --bootnodes flag and update node configuration diff --git a/dot/config.go b/dot/config.go index 07c5258e52..59b37a3b49 100644 --- a/dot/config.go +++ b/dot/config.go @@ -20,6 +20,8 @@ import ( "encoding/json" "time" + log "github.com/ChainSafe/log15" + "github.com/ChainSafe/gossamer/chain/dev" "github.com/ChainSafe/gossamer/chain/gssmr" "github.com/ChainSafe/gossamer/chain/kusama" @@ -27,7 +29,6 @@ import ( "github.com/ChainSafe/gossamer/dot/state/pruner" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/genesis" - log "github.com/ChainSafe/log15" ) // TODO: update config to have toml rules and perhaps un-export some fields, since we don't want to expose all @@ -85,7 +86,7 @@ type AccountConfig struct { // NetworkConfig is to marshal/unmarshal toml network config vars type NetworkConfig struct { - Port uint32 + Port uint16 Bootnodes []string ProtocolID string NoBootstrap bool diff --git a/dot/config/toml/config.go b/dot/config/toml/config.go index b6c090fa2b..6f16514b40 100644 --- a/dot/config/toml/config.go +++ b/dot/config/toml/config.go @@ -63,7 +63,7 @@ type AccountConfig struct { // NetworkConfig is to marshal/unmarshal toml network config vars type NetworkConfig struct { - Port uint32 `toml:"port,omitempty"` + Port uint16 `toml:"port,omitempty"` Bootnodes []string `toml:"bootnodes,omitempty"` ProtocolID string `toml:"protocol,omitempty"` NoBootstrap bool `toml:"nobootstrap,omitempty"` diff --git a/dot/core/interface.go b/dot/core/interface.go index 6336b50820..28caf8accc 100644 --- a/dot/core/interface.go +++ b/dot/core/interface.go @@ -20,7 +20,10 @@ import ( "math/big" "sync" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/runtime" @@ -86,6 +89,7 @@ type TransactionState interface { type Network interface { GossipMessage(network.NotificationsMessage) IsSynced() bool + ReportPeer(change peerset.ReputationChange, p peer.ID) } // EpochState is the interface for state.EpochState diff --git a/dot/core/messages.go b/dot/core/messages.go index 1c7572e188..c0a4a54b2b 100644 --- a/dot/core/messages.go +++ b/dot/core/messages.go @@ -17,15 +17,21 @@ package core import ( + "errors" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/runtime" "github.com/ChainSafe/gossamer/lib/transaction" ) // HandleTransactionMessage validates each transaction in the message and // adds valid transactions to the transaction queue of the BABE session // returns boolean for transaction propagation, true - transactions should be propagated -func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (bool, error) { +func (s *Service) HandleTransactionMessage(peerID peer.ID, msg *network.TransactionMessage) (bool, error) { logger.Debug("received TransactionMessage") if !s.net.IsSynced() { @@ -64,6 +70,12 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo externalExt := types.Extrinsic(append([]byte{byte(types.TxnExternal)}, tx...)) val, err := rt.ValidateTransaction(externalExt) if err != nil { + if errors.Is(err, runtime.ErrInvalidTransaction) { + s.net.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadTransactionValue, + Reason: peerset.BadTransactionReason, + }, peerID) + } logger.Debug("failed to validate transaction", "err", err) return nil } @@ -88,6 +100,11 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) (boo } } + s.net.ReportPeer(peerset.ReputationChange{ + Value: peerset.GoodTransactionValue, + Reason: peerset.GoodTransactionReason, + }, peerID) + msg.Extrinsics = toPropagate return len(msg.Extrinsics) > 0, nil } diff --git a/dot/core/messages_test.go b/dot/core/messages_test.go index 8fd3e054ec..48c1d1a62c 100644 --- a/dot/core/messages_test.go +++ b/dot/core/messages_test.go @@ -21,6 +21,10 @@ import ( "testing" "time" + "github.com/centrifuge/go-substrate-rpc-client/v3/signature" + ctypes "github.com/centrifuge/go-substrate-rpc-client/v3/types" + "github.com/stretchr/testify/require" + "github.com/ChainSafe/gossamer/dot/core/mocks" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" @@ -31,11 +35,6 @@ import ( "github.com/ChainSafe/gossamer/lib/keystore" "github.com/ChainSafe/gossamer/lib/runtime" "github.com/ChainSafe/gossamer/pkg/scale" - - "github.com/centrifuge/go-substrate-rpc-client/v3/signature" - ctypes "github.com/centrifuge/go-substrate-rpc-client/v3/types" - - "github.com/stretchr/testify/require" ) func createExtrinsic(t *testing.T, rt runtime.Instance, genHash common.Hash, nonce uint64) types.Extrinsic { @@ -128,6 +127,10 @@ func TestService_HandleBlockProduced(t *testing.T) { } func TestService_HandleTransactionMessage(t *testing.T) { + t.Parallel() + + const peer1 = "testPeer1" + kp, err := sr25519.GenerateKeypair() require.NoError(t, err) @@ -158,7 +161,7 @@ func TestService_HandleTransactionMessage(t *testing.T) { extBytes := createExtrinsic(t, rt, genHash, 0) msg := &network.TransactionMessage{Extrinsics: []types.Extrinsic{extBytes}} - b, err := s.HandleTransactionMessage(msg) + b, err := s.HandleTransactionMessage(peer1, msg) require.NoError(t, err) require.True(t, b) @@ -168,7 +171,7 @@ func TestService_HandleTransactionMessage(t *testing.T) { extBytes = []byte(`bogus extrinsic`) msg = &network.TransactionMessage{Extrinsics: []types.Extrinsic{extBytes}} - b, err = s.HandleTransactionMessage(msg) + b, err = s.HandleTransactionMessage(peer1, msg) require.NoError(t, err) require.False(t, b) } diff --git a/dot/core/mocks/Network.go b/dot/core/mocks/Network.go index f7df49c1f6..37cd0c072f 100644 --- a/dot/core/mocks/Network.go +++ b/dot/core/mocks/Network.go @@ -5,6 +5,10 @@ package mocks import ( network "github.com/ChainSafe/gossamer/dot/network" mock "github.com/stretchr/testify/mock" + + peer "github.com/libp2p/go-libp2p-core/peer" + + peerset "github.com/ChainSafe/gossamer/dot/peerset" ) // Network is an autogenerated mock type for the Network type @@ -30,3 +34,8 @@ func (_m *Network) IsSynced() bool { return r0 } + +// ReportPeer provides a mock function with given fields: change, p +func (_m *Network) ReportPeer(change peerset.ReputationChange, p peer.ID) { + _m.Called(change, p) +} diff --git a/dot/core/test_helpers.go b/dot/core/test_helpers.go index f17f65e113..d3f80f3e7e 100644 --- a/dot/core/test_helpers.go +++ b/dot/core/test_helpers.go @@ -21,8 +21,11 @@ import ( "path/filepath" "testing" + log "github.com/ChainSafe/log15" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + coremocks "github.com/ChainSafe/gossamer/dot/core/mocks" - "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/state" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/crypto/sr25519" @@ -32,9 +35,6 @@ import ( rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage" "github.com/ChainSafe/gossamer/lib/runtime/wasmer" "github.com/ChainSafe/gossamer/lib/utils" - log "github.com/ChainSafe/log15" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) // NewTestService creates a new test core service @@ -127,6 +127,7 @@ func NewTestService(t *testing.T, cfg *Config) *Service { net := new(coremocks.Network) net.On("GossipMessage", mock.AnythingOfType("*network.TransactionMessage")) net.On("IsSynced").Return(true) + net.On("ReportPeer", mock.AnythingOfType("peerset.ReputationChange"), mock.AnythingOfType("peer.ID")) cfg.Network = net } @@ -148,10 +149,5 @@ func NewTestService(t *testing.T, cfg *Config) *Service { s, err := NewService(cfg) require.NoError(t, err) - if net, ok := cfg.Network.(*network.Service); ok { - net.SetTransactionHandler(s) - _ = net.Stop() - } - return s } diff --git a/dot/network/config.go b/dot/network/config.go index 8946a740d7..a8162e1ad5 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -33,7 +33,7 @@ const ( DefaultBasePath = "~/.gossamer/gssmr" // DefaultPort the default value for Config.Port - DefaultPort = uint32(7000) + DefaultPort = uint16(7000) // DefaultRandSeed the default value for Config.RandSeed (0 = non-deterministic) DefaultRandSeed = int64(0) @@ -74,7 +74,7 @@ type Config struct { TransactionHandler TransactionHandler // Port the network port used for listening - Port uint32 + Port uint16 // RandSeed the seed used to generate the network p2p identity (0 = non-deterministic random seed) RandSeed int64 // Bootnodes the peer addresses used for bootstrapping diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index bb1806154f..d4b747b525 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -21,18 +21,14 @@ import ( "crypto/rand" "math/big" "sync" - "time" "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - ma "github.com/multiformats/go-multiaddr" -) -var ( - maxRetries = 12 + "github.com/ChainSafe/gossamer/dot/peerset" ) // ConnManager implements connmgr.ConnManager @@ -51,16 +47,24 @@ type ConnManager struct { // persistentPeers contains peers we should remain connected to. persistentPeers *sync.Map // map[peer.ID]struct{} + + peerSetHandler PeerSetHandler } -func newConnManager(min, max int) *ConnManager { +func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) { + psh, err := peerset.NewPeerSetHandler(peerSetCfg) + if err != nil { + return nil, err + } + return &ConnManager{ min: min, max: max, closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)), protectedPeers: new(sync.Map), persistentPeers: new(sync.Map), - } + peerSetHandler: psh, + }, nil } // Notifee is used to monitor changes to a connection @@ -90,18 +94,18 @@ func (*ConnManager) UpsertTag(peer.ID, string, func(int) int) {} func (*ConnManager) GetTagInfo(peer.ID) *connmgr.TagInfo { return &connmgr.TagInfo{} } // TrimOpenConns peer -func (*ConnManager) TrimOpenConns(ctx context.Context) {} +func (*ConnManager) TrimOpenConns(context.Context) {} // Protect peer will add the given peer to the protectedPeerMap which will // protect the peer from pruning. -func (cm *ConnManager) Protect(id peer.ID, tag string) { +func (cm *ConnManager) Protect(id peer.ID, _ string) { cm.protectedPeers.Store(id, struct{}{}) } // Unprotect peer will remove the given peer from prune protection. // returns true if we have successfully removed the peer from the // protectedPeerMap. False otherwise. -func (cm *ConnManager) Unprotect(id peer.ID, tag string) bool { +func (cm *ConnManager) Unprotect(id peer.ID, _ string) bool { _, wasDeleted := cm.protectedPeers.LoadAndDelete(id) return wasDeleted } @@ -110,7 +114,7 @@ func (cm *ConnManager) Unprotect(id peer.ID, tag string) bool { func (*ConnManager) Close() error { return nil } // IsProtected returns whether the given peer is protected from pruning or not. -func (cm *ConnManager) IsProtected(id peer.ID, tag string) (protected bool) { +func (cm *ConnManager) IsProtected(id peer.ID, _ string) (protected bool) { _, ok := cm.protectedPeers.Load(id) return ok } @@ -184,7 +188,7 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) { } // Disconnected is called when a connection closed -func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) { +func (cm *ConnManager) Disconnected(_ network.Network, c network.Conn) { logger.Trace( "Disconnected from peer", "host", c.LocalPeer(), @@ -195,57 +199,10 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) { if cm.disconnectHandler != nil { cm.disconnectHandler(c.RemotePeer()) } - - if !cm.isPersistent(c.RemotePeer()) { - return - } - - addrs := cm.host.h.Peerstore().Addrs(c.RemotePeer()) - info := peer.AddrInfo{ - ID: c.RemotePeer(), - Addrs: addrs, - } - - count := 0 - retry := func() bool { - err := cm.host.connect(info) - if err != nil { - logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err) - return false - } - - count++ - if count > maxRetries { - return true - } - return true - } - - go func() { - if retry() { - return - } - - retryTimer := time.NewTicker(time.Minute) - defer retryTimer.Stop() - for { - select { - case <-cm.host.ctx.Done(): - return - case <-retryTimer.C: - if retry() { - return - } - } - } - }() - - // TODO: if number of peers falls below the min desired peer count, - // we should try to connect to previously discovered peers (#1852) } // OpenedStream is called when a stream opened -func (cm *ConnManager) OpenedStream(n network.Network, s network.Stream) { +func (cm *ConnManager) OpenedStream(_ network.Network, s network.Stream) { logger.Trace( "Opened stream", "peer", s.Conn().RemotePeer(), @@ -258,7 +215,7 @@ func (cm *ConnManager) registerCloseHandler(protocolID protocol.ID, cb func(id p } // ClosedStream is called when a stream closed -func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) { +func (cm *ConnManager) ClosedStream(_ network.Network, s network.Stream) { logger.Trace( "Closed stream", "peer", s.Conn().RemotePeer(), diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index 50e0661b80..b1986055c4 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -21,18 +21,57 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/utils" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/dot/peerset" + "github.com/ChainSafe/gossamer/lib/utils" ) +func TestMinPeers(t *testing.T) { + const min = 1 + + nodes := make([]*Service, 2) + for i := range nodes { + config := &Config{ + BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)), + Port: 7000 + uint16(i), + NoBootstrap: true, + NoMDNS: true, + } + node := createTestService(t, config) + nodes[i] = node + } + + addrs := nodes[0].host.multiaddrs()[0] + addrs1 := nodes[1].host.multiaddrs()[0] + + configB := &Config{ + BasePath: utils.NewTestBasePath(t, "nodeB"), + Port: 7002, + Bootnodes: []string{addrs.String(), addrs1.String()}, + NoMDNS: true, + MinPeers: min, + } + + nodeB := createTestService(t, configB) + + require.Equal(t, min, nodeB.host.peerCount()) + + nodeB.host.cm.peerSetHandler.DisconnectPeer(0, nodes[0].host.id()) + time.Sleep(200 * time.Millisecond) + + require.Equal(t, min, nodeB.host.peerCount()) +} + func TestMaxPeers(t *testing.T) { - max := 3 + const max = 3 nodes := make([]*Service, max+2) for i := range nodes { config := &Config{ BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)), - Port: 7000 + uint32(i), + Port: 7000 + uint16(i), NoBootstrap: true, NoMDNS: true, MaxPeers: max, @@ -51,19 +90,25 @@ func TestMaxPeers(t *testing.T) { continue } - err = n.host.connect(*ainfo) - if err != nil { - err = n.host.connect(*ainfo) - } - require.NoError(t, err, i) + n.host.h.Peerstore().AddAddrs(ainfo.ID, ainfo.Addrs, peerstore.PermanentAddrTTL) + n.host.cm.peerSetHandler.AddPeer(0, ainfo.ID) } + time.Sleep(200 * time.Millisecond) p := nodes[0].host.h.Peerstore().Peers() require.LessOrEqual(t, max, len(p)) } func TestProtectUnprotectPeer(t *testing.T) { - cm := newConnManager(1, 4) + const ( + min = 1 + max = 4 + slotAllocationTime = time.Second * 2 + ) + + peerCfgSet := peerset.NewConfigSet(uint32(max-min), uint32(max), false, slotAllocationTime) + cm, err := newConnManager(min, max, peerCfgSet) + require.NoError(t, err) p1 := peer.ID("a") p2 := peer.ID("b") @@ -104,13 +149,89 @@ func TestPersistentPeers(t *testing.T) { } nodeB := createTestService(t, configB) + time.Sleep(time.Millisecond * 600) // B should have connected to A during bootstrap conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) require.NotEqual(t, 0, len(conns)) // if A disconnects from B, B should reconnect - nodeA.host.h.Network().ClosePeer(nodeB.host.id()) - time.Sleep(time.Millisecond * 500) + nodeA.host.cm.peerSetHandler.DisconnectPeer(0, nodeB.host.id()) + + time.Sleep(time.Millisecond * 100) + conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) require.NotEqual(t, 0, len(conns)) } + +func TestRemovePeer(t *testing.T) { + basePathA := utils.NewTestBasePath(t, "nodeA") + configA := &Config{ + BasePath: basePathA, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + } + + nodeA := createTestService(t, configA) + nodeA.noGossip = true + + addrA := nodeA.host.multiaddrs()[0] + + basePathB := utils.NewTestBasePath(t, "nodeB") + configB := &Config{ + BasePath: basePathB, + Port: 7002, + Bootnodes: []string{addrA.String()}, + NoMDNS: true, + } + + nodeB := createTestService(t, configB) + nodeB.noGossip = true + + // nodeB will be connected to nodeA through bootnodes. + require.Equal(t, 1, nodeB.host.peerCount()) + + nodeB.host.cm.peerSetHandler.RemovePeer(0, nodeA.host.id()) + time.Sleep(time.Millisecond * 200) + + require.Equal(t, 0, nodeB.host.peerCount()) +} + +func TestSetReservedPeer(t *testing.T) { + nodes := make([]*Service, 3) + for i := range nodes { + config := &Config{ + BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)), + Port: 7000 + uint16(i), + NoBootstrap: true, + NoMDNS: true, + } + node := createTestService(t, config) + nodes[i] = node + } + + addrA := nodes[0].host.multiaddrs()[0] + addrB := nodes[1].host.multiaddrs()[0] + addrC := nodes[2].host.addrInfo() + + basePathD := utils.NewTestBasePath(t, "node3") + config := &Config{ + BasePath: basePathD, + Port: 7004, + NoMDNS: true, + PersistentPeers: []string{addrA.String(), addrB.String()}, + } + + node3 := createTestService(t, config) + node3.noGossip = true + + require.Equal(t, 2, node3.host.peerCount()) + + node3.host.h.Peerstore().AddAddrs(addrC.ID, addrC.Addrs, peerstore.PermanentAddrTTL) + node3.host.cm.peerSetHandler.SetReservedPeer(0, addrC.ID) + time.Sleep(200 * time.Millisecond) + + // reservedOnly mode is not yet implemented, so nodeA and nodeB won't be disconnected (#1888). + // TODO: once reservedOnly mode is implemented and reservedOnly is set to true, change expected value to 1 (nodeC) + require.Equal(t, 3, node3.host.peerCount()) +} diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 6c3c6b1a54..d6ff4b0cf8 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -55,9 +55,10 @@ type discovery struct { ds *badger.Datastore pid protocol.ID minPeers, maxPeers int + handler PeerSetHandler } -func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int) *discovery { +func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int, handler PeerSetHandler) *discovery { return &discovery{ ctx: ctx, h: h, @@ -66,6 +67,7 @@ func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrI pid: pid, minPeers: min, maxPeers: max, + handler: handler, } } @@ -208,16 +210,12 @@ func (d *discovery) findPeers(ctx context.Context) { logger.Trace("found new peer via DHT", "peer", peer.ID) - // found a peer, try to connect if we need more peers - if len(d.h.Network().Peers()) < d.maxPeers { - err = d.h.Connect(d.ctx, peer) - if err != nil { - logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err) - } - } else { - d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) - return - } + d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) + d.handler.AddPeer(0, peer.ID) } } } + +func (d *discovery) findPeer(peerID peer.ID) (peer.AddrInfo, error) { + return d.dht.FindPeer(d.ctx, peerID) +} diff --git a/dot/network/discovery_test.go b/dot/network/discovery_test.go index 1a3ce9ef51..6ea389f80a 100644 --- a/dot/network/discovery_test.go +++ b/dot/network/discovery_test.go @@ -36,7 +36,7 @@ func newTestDiscovery(t *testing.T, num int) []*discovery { for i := 0; i < num; i++ { config := &Config{ BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)), - Port: uint32(7001 + i), + Port: uint16(7001 + i), NoBootstrap: true, NoMDNS: true, } diff --git a/dot/network/host.go b/dot/network/host.go index dcca60e3e2..c3fcb562ed 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/chyeh/pubip" "github.com/dgraph-io/ristretto" badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p" @@ -36,7 +37,7 @@ import ( "github.com/libp2p/go-libp2p-peerstore/pstoreds" ma "github.com/multiformats/go-multiaddr" - "github.com/chyeh/pubip" + "github.com/ChainSafe/gossamer/dot/peerset" ) var privateCIDRs = []string{ @@ -48,7 +49,10 @@ var privateCIDRs = []string{ "169.254.0.0/16", } -var connectTimeout = time.Second * 5 +const ( + peerSetSlotAllocTime = time.Second * 2 + connectTimeout = time.Second * 5 +) // host wraps libp2p host with network host configuration and services type host struct { @@ -85,9 +89,6 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { } } - // create connection manager - cm := newConnManager(cfg.MinPeers, cfg.MaxPeers) - // format bootnodes bns, err := stringsToAddrInfos(cfg.Bootnodes) if err != nil { @@ -100,6 +101,13 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { return nil, err } + peerCfgSet := peerset.NewConfigSet(uint32(cfg.MaxPeers-cfg.MinPeers), uint32(cfg.MinPeers), false, peerSetSlotAllocTime) + // create connection manager + cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet) + if err != nil { + return nil, err + } + for _, pp := range pps { cm.persistentPeers.Store(pp.ID, struct{}{}) } @@ -171,7 +179,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { } bwc := metrics.NewBandwidthCounter() - discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers) + discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers, cm.peerSetHandler) host := &host{ ctx: ctx, @@ -238,20 +246,15 @@ func (h *host) connect(p peer.AddrInfo) (err error) { // bootstrap connects the host to the configured bootnodes func (h *host) bootstrap() { - failed := 0 - var allNodes []peer.AddrInfo - allNodes = append(allNodes, h.bootnodes...) - allNodes = append(allNodes, h.persistentPeers...) - for _, addrInfo := range allNodes { - logger.Debug("bootstrapping to peer", "peer", addrInfo.ID) - err := h.connect(addrInfo) - if err != nil { - logger.Debug("failed to bootstrap to peer", "error", err) - failed++ - } + for _, info := range h.persistentPeers { + h.h.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + h.cm.peerSetHandler.AddReservedPeer(0, info.ID) } - if failed == len(allNodes) && len(allNodes) != 0 { - logger.Error("failed to bootstrap to any bootnode") + + for _, addrInfo := range h.bootnodes { + logger.Debug("bootstrapping to peer", "peer", addrInfo.ID) + h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) + h.cm.peerSetHandler.AddPeer(0, addrInfo.ID) } } @@ -321,20 +324,17 @@ func (h *host) peers() []peer.ID { // addReservedPeers adds the peers `addrs` to the protected peers list and connects to them func (h *host) addReservedPeers(addrs ...string) error { for _, addr := range addrs { - maddr, err := ma.NewMultiaddr(addr) + mAddr, err := ma.NewMultiaddr(addr) if err != nil { return err } - addinfo, err := peer.AddrInfoFromP2pAddr(maddr) + addrInfo, err := peer.AddrInfoFromP2pAddr(mAddr) if err != nil { return err } - - h.h.ConnManager().Protect(addinfo.ID, "") - if err := h.connect(*addinfo); err != nil { - return err - } + h.h.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, peerstore.PermanentAddrTTL) + h.cm.peerSetHandler.AddReservedPeer(0, addrInfo.ID) } return nil @@ -347,7 +347,7 @@ func (h *host) removeReservedPeers(ids ...string) error { if err != nil { return err } - + h.cm.peerSetHandler.RemoveReservedPeer(0, peerID) h.h.ConnManager().Unprotect(peerID, "") } @@ -396,3 +396,8 @@ func (h *host) multiaddrs() (multiaddrs []ma.Multiaddr) { func (h *host) protocols() []string { return h.h.Mux().Protocols() } + +// closePeer closes connection with peer. +func (h *host) closePeer(peer peer.ID) error { + return h.h.Network().ClosePeer(peer) +} diff --git a/dot/network/host_test.go b/dot/network/host_test.go index c3e6f0cd36..800e58b65f 100644 --- a/dot/network/host_test.go +++ b/dot/network/host_test.go @@ -21,11 +21,14 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/utils" + "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/protocol" ma "github.com/multiformats/go-multiaddr" + "github.com/ChainSafe/gossamer/dot/peerset" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/utils" + "github.com/stretchr/testify/require" ) @@ -434,8 +437,9 @@ func Test_AddReservedPeers(t *testing.T) { err := nodeA.host.addReservedPeers(nodeBPeerAddr) require.NoError(t, err) - isProtected := nodeA.host.h.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "") - require.True(t, isProtected) + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 1, nodeA.host.peerCount()) } func Test_RemoveReservedPeers(t *testing.T) { @@ -465,11 +469,17 @@ func Test_RemoveReservedPeers(t *testing.T) { err := nodeA.host.addReservedPeers(nodeBPeerAddr) require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 1, nodeA.host.peerCount()) pID := nodeB.host.addrInfo().ID.String() err = nodeA.host.removeReservedPeers(pID) require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 1, nodeA.host.peerCount()) isProtected := nodeA.host.h.ConnManager().IsProtected(nodeB.host.addrInfo().ID, "") require.False(t, isProtected) @@ -524,3 +534,148 @@ func TestStreamCloseEOF(t *testing.T) { require.True(t, handler.exit) } + +// Test to check the nodes connection by peer set manager +func TestPeerConnect(t *testing.T) { + basePathA := utils.NewTestBasePath(t, "nodeA") + configA := &Config{ + BasePath: basePathA, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + MinPeers: 1, + MaxPeers: 2, + } + + nodeA := createTestService(t, configA) + nodeA.noGossip = true + + basePathB := utils.NewTestBasePath(t, "nodeB") + + configB := &Config{ + BasePath: basePathB, + Port: 7002, + NoBootstrap: true, + NoMDNS: true, + MinPeers: 1, + MaxPeers: 3, + } + + nodeB := createTestService(t, configB) + nodeB.noGossip = true + + addrInfoB := nodeB.host.addrInfo() + nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) + nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) + + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 1, nodeA.host.peerCount()) + require.Equal(t, 1, nodeB.host.peerCount()) +} + +// Test to check banned peer disconnection by peer set manager +func TestBannedPeer(t *testing.T) { + basePathA := utils.NewTestBasePath(t, "nodeA") + + configA := &Config{ + BasePath: basePathA, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + MinPeers: 1, + MaxPeers: 3, + } + + nodeA := createTestService(t, configA) + nodeA.noGossip = true + + basePathB := utils.NewTestBasePath(t, "nodeB") + + configB := &Config{ + BasePath: basePathB, + Port: 7002, + NoBootstrap: true, + NoMDNS: true, + MinPeers: 1, + MaxPeers: 2, + } + + nodeB := createTestService(t, configB) + nodeB.noGossip = true + + addrInfoB := nodeB.host.addrInfo() + nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) + nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) + + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 1, nodeA.host.peerCount()) + require.Equal(t, 1, nodeB.host.peerCount()) + + nodeA.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.BannedThresholdValue - 1, + Reason: peerset.BannedReason, + }, addrInfoB.ID) + + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 0, nodeA.host.peerCount()) + require.Equal(t, 0, nodeB.host.peerCount()) + + time.Sleep(3 * time.Second) + + require.Equal(t, 1, nodeA.host.peerCount()) + require.Equal(t, 1, nodeB.host.peerCount()) +} + +// Test to check reputation updated by peer set manager +func TestPeerReputation(t *testing.T) { + basePathA := utils.NewTestBasePath(t, "nodeA") + + configA := &Config{ + BasePath: basePathA, + Port: 7001, + NoBootstrap: true, + NoMDNS: true, + MinPeers: 1, + MaxPeers: 3, + } + + nodeA := createTestService(t, configA) + nodeA.noGossip = true + + basePathB := utils.NewTestBasePath(t, "nodeB") + + configB := &Config{ + BasePath: basePathB, + Port: 7002, + NoBootstrap: true, + NoMDNS: true, + MinPeers: 1, + MaxPeers: 3, + } + + nodeB := createTestService(t, configB) + nodeB.noGossip = true + + addrInfoB := nodeB.host.addrInfo() + nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) + nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) + + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 1, nodeA.host.peerCount()) + require.Equal(t, 1, nodeB.host.peerCount()) + + nodeA.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.GoodTransactionValue, + Reason: peerset.GoodTransactionReason, + }, addrInfoB.ID) + + time.Sleep(100 * time.Millisecond) + + rep, err := nodeA.host.cm.peerSetHandler.PeerReputation(addrInfoB.ID) + require.NoError(t, err) + require.Greater(t, rep, int32(0)) +} diff --git a/dot/network/mdns.go b/dot/network/mdns.go index 4a353f7e6f..5c5f837d7d 100644 --- a/dot/network/mdns.go +++ b/dot/network/mdns.go @@ -22,6 +22,7 @@ import ( log "github.com/ChainSafe/log15" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" libp2pdiscovery "github.com/libp2p/go-libp2p/p2p/discovery/mdns_legacy" ) @@ -106,9 +107,7 @@ func (n Notifee) HandlePeerFound(p peer.AddrInfo) { "peer", p.ID, ) + n.host.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) // connect to found peer - err := n.host.connect(p) - if err != nil { - n.logger.Error("Failed to connect to peer using mDNS discovery", "error", err) - } + n.host.cm.peerSetHandler.AddPeer(0, p.ID) } diff --git a/dot/network/mock_transaction_handler.go b/dot/network/mock_transaction_handler.go index 1d7461418a..3c67662777 100644 --- a/dot/network/mock_transaction_handler.go +++ b/dot/network/mock_transaction_handler.go @@ -2,7 +2,10 @@ package network -import mock "github.com/stretchr/testify/mock" +import ( + "github.com/libp2p/go-libp2p-core/peer" + mock "github.com/stretchr/testify/mock" +) // MockTransactionHandler is an autogenerated mock type for the TransactionHandler type type MockTransactionHandler struct { @@ -10,7 +13,7 @@ type MockTransactionHandler struct { } // HandleTransactionMessage provides a mock function with given fields: _a0 -func (_m *MockTransactionHandler) HandleTransactionMessage(_a0 *TransactionMessage) (bool, error) { +func (_m *MockTransactionHandler) HandleTransactionMessage(_ peer.ID, _a0 *TransactionMessage) (bool, error) { ret := _m.Called(_a0) var r0 bool diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 7ba62745e7..c80ed3d23a 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/ChainSafe/gossamer/dot/peerset" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -253,6 +254,12 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, if !seen { s.broadcastExcluding(info, data.peer, data.msg) } + + // report peer if we get duplicate gossip message. + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.DuplicateGossipValue, + Reason: peerset.DuplicateGossipReason, + }, peer) } return nil @@ -261,6 +268,11 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtocol, msg NotificationsMessage) { if support, err := s.host.supportsProtocol(peer, info.protocolID); err != nil || !support { + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadProtocolValue, + Reason: peerset.BadProtocolReason, + }, peer) + return } @@ -297,6 +309,11 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc var hs Handshake select { case <-hsTimer.C: + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.TimeOutValue, + Reason: peerset.TimeOutReason, + }, peer) + logger.Trace("handshake timeout reached", "protocol", info.protocolID, "peer", peer) _ = stream.Close() info.outboundHandshakeData.Delete(peer) @@ -351,6 +368,11 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc if err != nil { logger.Debug("failed to send message to peer", "peer", peer, "error", err) } + + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.GossipSuccessValue, + Reason: peerset.GossipSuccessReason, + }, peer) } // broadcastExcluding sends a message to each connected peer except the given peer, @@ -396,6 +418,11 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe hs, err := decoder(msgBytes[:tot]) if err != nil { + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadMessageValue, + Reason: peerset.BadMessageReason, + }, stream.Conn().RemotePeer()) + hsC <- &handshakeReader{hs: nil, err: err} return } diff --git a/dot/network/service.go b/dot/network/service.go index c192486859..bca1ddb84f 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -19,21 +19,24 @@ package network import ( "context" "errors" + "fmt" "io" "math/big" "os" "sync" "time" - gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" - "github.com/ChainSafe/gossamer/dot/telemetry" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/services" log "github.com/ChainSafe/log15" "github.com/ethereum/go-ethereum/metrics" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" + "github.com/ChainSafe/gossamer/dot/peerset" + "github.com/ChainSafe/gossamer/dot/telemetry" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/services" ) const ( @@ -96,7 +99,7 @@ type Service struct { // telemetry telemetryInterval time.Duration - closeCh chan interface{} + closeCh chan struct{} blockResponseBuf []byte blockResponseBufMu sync.Mutex @@ -172,7 +175,7 @@ func NewService(cfg *Config) (*Service, error) { notificationsProtocols: make(map[byte]*notificationsProtocol), lightRequest: make(map[peer.ID]struct{}), telemetryInterval: cfg.telemetryInterval, - closeCh: make(chan interface{}), + closeCh: make(chan struct{}), bufPool: bufPool, streamManager: newStreamManager(ctx), blockResponseBuf: make([]byte, maxBlockResponseSize), @@ -239,7 +242,7 @@ func (s *Service) Start() error { txnBatchHandler, ) if err != nil { - logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err) + logger.Warn("failed to register notifications protocol", "sub-protocol", transactionsID, "error", err) } // since this opens block announce streams, it should happen after the protocol is registered @@ -250,9 +253,7 @@ func (s *Service) Start() error { logger.Info("Started listening", "address", addr) } - if !s.noBootstrap { - s.host.bootstrap() - } + s.startPeerSetHandler() if !s.noMDNS { s.mdns.start() @@ -268,7 +269,6 @@ func (s *Service) Start() error { } time.Sleep(time.Millisecond * 500) - logger.Info("started network service", "supported protocols", s.host.protocols()) if s.cfg.PublishMetrics { @@ -319,7 +319,7 @@ func (s *Service) logPeerCount() { } } -func (s *Service) publishNetworkTelemetry(done chan interface{}) { +func (s *Service) publishNetworkTelemetry(done <-chan struct{}) { ticker := time.NewTicker(s.telemetryInterval) defer ticker.Stop() @@ -373,8 +373,9 @@ func (s *Service) sentBlockIntervalTelemetry() { } } -func (*Service) handleConn(conn libp2pnetwork.Conn) { - // TODO: update this for scoring (#1399) +func (s *Service) handleConn(conn libp2pnetwork.Conn) { + // TODO: currently we only have one set so setID is 0, change this once we have more set in peerSet. + s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer()) } // Stop closes running instances of the host and network services as well as @@ -395,6 +396,8 @@ func (s *Service) Stop() error { logger.Error("Failed to close host", "error", err) } + s.host.cm.peerSetHandler.Stop() + // check if closeCh is closed, if not, close it. mainloop: for { @@ -741,3 +744,65 @@ func (*Service) StartingBlock() int64 { func (s *Service) IsSynced() bool { return s.syncer.IsSynced() } + +// ReportPeer reports ReputationChange according to the peer behaviour. +func (s *Service) ReportPeer(change peerset.ReputationChange, p peer.ID) { + s.host.cm.peerSetHandler.ReportPeer(change, p) +} + +func (s *Service) startPeerSetHandler() { + s.host.cm.peerSetHandler.Start() + // wait for peerSetHandler to start. + if !s.noBootstrap { + s.host.bootstrap() + } + + go s.startProcessingMsg() +} + +func (s *Service) processMessage(msg peerset.Message) { + peerID := msg.PeerID + switch msg.Status { + case peerset.Connect: + addrInfo := s.host.h.Peerstore().PeerInfo(peerID) + if len(addrInfo.Addrs) == 0 { + var err error + addrInfo, err = s.host.discovery.findPeer(peerID) + if err != nil { + logger.Debug("failed to find peer", "peer", peerID, "error", err) + return + } + } + + err := s.host.connect(addrInfo) + if err != nil { + logger.Debug("failed to open connection", "peer", peerID, "error", err) + return + } + logger.Debug("connection successful ", "peer", peerID) + case peerset.Drop, peerset.Reject: + err := s.host.closePeer(peerID) + if err != nil { + logger.Debug("failed to close connection", "peer", peerID, "error", err) + return + } + logger.Debug("connection dropped successfully ", "peer", peerID) + } +} + +func (s *Service) startProcessingMsg() { + msgCh := s.host.cm.peerSetHandler.Messages() + for { + select { + case <-s.ctx.Done(): + return + case m := <-msgCh: + msg, ok := m.(peerset.Message) + if !ok { + logger.Error(fmt.Sprintf("failed to get message from peerSet: type is %T instead of peerset.Message", m)) + continue + } + s.processMessage(msg) + } + } +} diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 6074605009..56a5a85ee4 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -24,9 +24,10 @@ import ( "testing" "time" - "github.com/ChainSafe/gossamer/lib/utils" mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/ChainSafe/gossamer/lib/utils" ) var TestProtocolID = "/gossamer/test/0" @@ -48,7 +49,7 @@ func createServiceHelper(t *testing.T, num int) []*Service { for i := 0; i < num; i++ { config := &Config{ BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)), - Port: uint32(7001 + i), + Port: uint16(7001 + i), NoBootstrap: true, NoMDNS: true, } diff --git a/dot/network/state.go b/dot/network/state.go index 0a343d2eb1..8940f8213f 100644 --- a/dot/network/state.go +++ b/dot/network/state.go @@ -19,10 +19,11 @@ package network import ( "math/big" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" - - "github.com/libp2p/go-libp2p-core/peer" ) //go:generate mockery --name BlockState --structname MockBlockState --case underscore --inpackage @@ -58,6 +59,38 @@ type Syncer interface { // TransactionHandler is the interface used by the transactions sub-protocol type TransactionHandler interface { - HandleTransactionMessage(*TransactionMessage) (bool, error) + HandleTransactionMessage(peer.ID, *TransactionMessage) (bool, error) TransactionsCount() int } + +// PeerSetHandler is the interface used by the connection manager to handle peerset. +type PeerSetHandler interface { + Start() + Stop() + ReportPeer(peerset.ReputationChange, ...peer.ID) + PeerAdd + PeerRemove + Peer +} + +// PeerAdd is the interface used by the PeerSetHandler to add peers in peerSet. +type PeerAdd interface { + Incoming(int, ...peer.ID) + AddReservedPeer(int, ...peer.ID) + AddPeer(int, ...peer.ID) + SetReservedPeer(int, ...peer.ID) +} + +// PeerRemove is the interface used by the PeerSetHandler to remove peers from peerSet. +type PeerRemove interface { + DisconnectPeer(int, ...peer.ID) + RemoveReservedPeer(int, ...peer.ID) + RemovePeer(int, ...peer.ID) +} + +// Peer is the interface used by the PeerSetHandler to get the peer data from peerSet. +type Peer interface { + PeerReputation(peer.ID) (peerset.Reputation, error) + SortedPeers(idx int) chan peer.IDSlice + Messages() chan interface{} +} diff --git a/dot/network/sync.go b/dot/network/sync.go index 9a0e677c61..c96949f20c 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/ChainSafe/gossamer/dot/peerset" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ) @@ -80,6 +81,10 @@ func (s *Service) receiveBlockResponse(stream libp2pnetwork.Stream) (*BlockRespo msg := new(BlockResponseMessage) err = msg.Decode(buf[:n]) if err != nil { + s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadMessageValue, + Reason: peerset.BadMessageReason, + }, stream.Conn().RemotePeer()) return nil, fmt.Errorf("failed to decode block response: %w", err) } diff --git a/dot/network/transaction.go b/dot/network/transaction.go index 0e4cead9fc..f82aa10efb 100644 --- a/dot/network/transaction.go +++ b/dot/network/transaction.go @@ -162,11 +162,11 @@ func decodeTransactionMessage(in []byte) (NotificationsMessage, error) { return msg, err } -func (s *Service) handleTransactionMessage(_ peer.ID, msg NotificationsMessage) (bool, error) { +func (s *Service) handleTransactionMessage(peerID peer.ID, msg NotificationsMessage) (bool, error) { txMsg, ok := msg.(*TransactionMessage) if !ok { return false, errors.New("invalid transaction type") } - return s.transactionHandler.HandleTransactionMessage(txMsg) + return s.transactionHandler.HandleTransactionMessage(peerID, txMsg) } diff --git a/dot/peerset/constants.go b/dot/peerset/constants.go new file mode 100644 index 0000000000..3e900da533 --- /dev/null +++ b/dot/peerset/constants.go @@ -0,0 +1,62 @@ +package peerset + +import "math" + +// ReputationChange value and reason +const ( + + // BadMessageValue used when fail to decode message. + BadMessageValue Reputation = -(1 << 12) + // BadMessageReason used when fail to decode message. + BadMessageReason = "Bad message" + + // BadProtocolValue used when a peer is on unsupported protocol version. + BadProtocolValue Reputation = math.MinInt32 + // BadProtocolReason used when a peer is on unsupported protocol version. + BadProtocolReason = "Unsupported protocol" + + // TimeOutValue used when a peer doesn't respond in time to our messages. + TimeOutValue Reputation = -(1 << 10) + // TimeOutReason used when a peer doesn't respond in time to our messages. + TimeOutReason = "Request timeout" + + // GossipSuccessValue used when a peer successfully sends a gossip messages. + GossipSuccessValue Reputation = 1 << 4 + // GossipSuccessReason used when a peer successfully sends a gossip messages. + GossipSuccessReason = "Successful gossip" + + // DuplicateGossipValue used when a peer sends us a gossip message that we already knew about. + DuplicateGossipValue Reputation = -(1 << 2) + // DuplicateGossipReason used when a peer send duplicate gossip message. + DuplicateGossipReason = "Duplicate gossip" + + // GoodTransactionValue is the used for good transaction. + GoodTransactionValue Reputation = 1 << 7 + // GoodTransactionReason is the reason for used for good transaction. + GoodTransactionReason = "Good Transaction" + + // BadTransactionValue used when transaction import was not performed. + BadTransactionValue Reputation = -(1 << 12) + // BadTransactionReason when transaction import was not performed. + BadTransactionReason = "Bad Transaction" + + // BadBlockAnnouncementValue is used when peer announces invalid block. + BadBlockAnnouncementValue Reputation = -(1 << 12) + // BadBlockAnnouncementReason is used when peer announces invalid block. + BadBlockAnnouncementReason = "Bad block announcement" + + // IncompleteHeaderValue is used when peer sends block with invalid header. + IncompleteHeaderValue Reputation = -(1 << 20) + // IncompleteHeaderReason is used when peer sends block with invalid header. + IncompleteHeaderReason = "Incomplete header" + + // BannedThresholdValue used when we need to ban peer. + BannedThresholdValue Reputation = 82 * (math.MinInt32 / 100) + // BannedReason used when we need to ban peer. + BannedReason = "Banned" + + // BadJustificationValue is used when peer send invalid justification. + BadJustificationValue Reputation = -(1 << 16) + // BadJustificationReason is used when peer send invalid justification. + BadJustificationReason = "Bad justification" +) diff --git a/dot/peerset/errors.go b/dot/peerset/errors.go new file mode 100644 index 0000000000..7a8d8a329d --- /dev/null +++ b/dot/peerset/errors.go @@ -0,0 +1,17 @@ +package peerset + +import "errors" + +var ( + ErrDisconnectReceivedForNonConnectedPeer = errors.New("received disconnect for non-connected node") + + ErrConfigSetIsEmpty = errors.New("config set is empty") + + ErrPeerDoesNotExist = errors.New("peer doesn't exist") + + ErrPeerDisconnected = errors.New("node is already disconnected") + + ErrOutgoingSlotsUnavailable = errors.New("not enough outgoing slots") + + ErrIncomingSlotsUnavailable = errors.New("not enough incoming slots") +) diff --git a/dot/peerset/handler.go b/dot/peerset/handler.go new file mode 100644 index 0000000000..6567fe89c8 --- /dev/null +++ b/dot/peerset/handler.go @@ -0,0 +1,139 @@ +package peerset + +import "github.com/libp2p/go-libp2p-core/peer" + +// Handler manages peerSet. +type Handler struct { + actionQueue chan<- action + peerSet *PeerSet + closeCh chan struct{} +} + +// NewPeerSetHandler creates a new *peerset.Handler. +func NewPeerSetHandler(cfg *ConfigSet) (*Handler, error) { + ps, err := newPeerSet(cfg) + if err != nil { + return nil, err + } + + return &Handler{ + peerSet: ps, + }, nil +} + +// AddReservedPeer adds reserved peer into peerSet. +func (h *Handler) AddReservedPeer(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: addReservedPeer, + setID: setID, + peers: peers, + } +} + +// RemoveReservedPeer remove reserved peer from peerSet. +func (h *Handler) RemoveReservedPeer(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: removeReservedPeer, + setID: setID, + peers: peers, + } +} + +// SetReservedPeer set the reserve peer into peerSet +func (h *Handler) SetReservedPeer(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: setReservedPeers, + setID: setID, + peers: peers, + } +} + +// AddPeer adds peer to peerSet. +func (h *Handler) AddPeer(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: addToPeerSet, + setID: setID, + peers: peers, + } +} + +// RemovePeer removes peer from peerSet. +func (h *Handler) RemovePeer(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: removeFromPeerSet, + setID: setID, + peers: peers, + } +} + +// ReportPeer reports ReputationChange according to the peer behaviour. +func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: reportPeer, + reputation: rep, + peers: peers, + } +} + +// Incoming calls when we have an incoming connection from peer. +func (h *Handler) Incoming(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: incoming, + peers: peers, + setID: setID, + } +} + +// Messages return result message chan. +func (h *Handler) Messages() chan interface{} { + return h.peerSet.resultMsgCh +} + +// DisconnectPeer calls for disconnecting a connection from peer. +func (h *Handler) DisconnectPeer(setID int, peers ...peer.ID) { + h.actionQueue <- action{ + actionCall: disconnect, + setID: setID, + peers: peers, + } +} + +// PeerReputation returns the reputation of the peer. +func (h *Handler) PeerReputation(peerID peer.ID) (Reputation, error) { + n, err := h.peerSet.peerState.getNode(peerID) + if err != nil { + return 0, err + } + return n.getReputation(), nil +} + +// Start starts peerSet processing +func (h *Handler) Start() { + actionCh := make(chan action, msgChanSize) + h.closeCh = make(chan struct{}) + h.actionQueue = actionCh + h.peerSet.start(actionCh) +} + +// SortedPeers return chan for sorted connected peer in the peerSet. +func (h *Handler) SortedPeers(setIdx int) chan peer.IDSlice { + resultPeersCh := make(chan peer.IDSlice) + h.actionQueue <- action{ + actionCall: sortedPeers, + resultPeersCh: resultPeersCh, + setID: setIdx, + } + + return resultPeersCh +} + +// Stop closes the actionQueue and result message chan. +func (h *Handler) Stop() { + select { + case <-h.closeCh: + default: + close(h.closeCh) + close(h.actionQueue) + h.peerSet.stop() + } +} diff --git a/dot/peerset/peerset.go b/dot/peerset/peerset.go new file mode 100644 index 0000000000..71ac202ed3 --- /dev/null +++ b/dot/peerset/peerset.go @@ -0,0 +1,656 @@ +package peerset + +import ( + "fmt" + "math" + "time" + + log "github.com/ChainSafe/log15" + "github.com/libp2p/go-libp2p-core/peer" +) + +var ( + logger = log.New("pkg", "peerset") +) + +const ( + // disconnectReputationChange Reputation change value for a node when we get disconnected from it. + disconnectReputationChange Reputation = -256 + // forgetAfterTime amount of time between the moment we disconnect from a node and the moment we remove it from the list. + forgetAfterTime = time.Second * 3600 // one hour + // default channel size for peerSet. + msgChanSize = 100 +) + +// ActionReceiver represents the enum value for action to be performed on peerSet +type ActionReceiver uint8 + +const ( + // addReservedPeer is for adding reserved peers + addReservedPeer ActionReceiver = iota + // removeReservedPeer is for removing reserved peers + removeReservedPeer + // setReservedPeers is for setting peerList in peerSet reserved peers + setReservedPeers + // setReservedOnly is for setting peerList in peerSet reserved peers only + setReservedOnly + // reportPeer is for reporting peers if it misbehaves + reportPeer + // addToPeerSet is for adding peer in the peerSet + addToPeerSet + // removeFromPeerSet is for removing peer in the peerSet + removeFromPeerSet + // incoming is for inbound request + incoming + // sortedPeers is for sorted connected peers + sortedPeers + // disconnect peer + disconnect +) + +// action struct stores the action type and required parameters to perform action +type action struct { + actionCall ActionReceiver + setID int + reputation ReputationChange + peers peer.IDSlice + resultPeersCh chan peer.IDSlice +} + +// Status represents the enum value for Message +type Status uint8 + +const ( + // Connect is request to open a connection to the given peer. + Connect Status = iota + // Drop the connection to the given peer, or cancel the connection attempt after a Connect. + Drop + // Accept incoming connect request. + Accept + // Reject incoming connect request. + Reject +) + +// Message that will be sent by the peerSet. +type Message struct { + // Status of the peer in current set. + Status Status + setID uint64 + // PeerID peer in message. + PeerID peer.ID +} + +// Reputation represents reputation value of the node +type Reputation int32 + +// add handles overflow and underflow condition while adding two Reputation values. +func (r Reputation) add(num Reputation) Reputation { + if num > 0 { + if r > math.MaxInt32-num { + return math.MaxInt32 + } + } else if r < math.MinInt32-num { + return math.MinInt32 + } + return r + num +} + +// sub handles underflow condition while subtracting two Reputation values. +func (r Reputation) sub(num Reputation) Reputation { + if num < 0 { + if r > math.MaxInt32+num { + return math.MaxInt32 + } + } else if r < math.MinInt32+num { + return math.MinInt32 + } + return r - num +} + +// ReputationChange is description of a reputation adjustment for a node +type ReputationChange struct { + // PeerReputation value + Value Reputation + // Reason for reputation change + Reason string +} + +func newReputationChange(value Reputation, reason string) ReputationChange { + return ReputationChange{value, reason} +} + +// PeerSet is a container for all the components of a peerSet. +type PeerSet struct { + peerState *PeersState + + reservedNode map[peer.ID]struct{} + // TODO: this will be useful for reserved only mode + // this is for future purpose if reserved-only flag is enabled (#1888). + isReservedOnly bool + resultMsgCh chan interface{} + // time when the PeerSet was created. + created time.Time + // last time when we updated the reputations of connected nodes. + latestTimeUpdate time.Time + // next time to do a periodic call to allocSlots with all Set. This is done once two + // second, to match the period of the Reputation updates. + nextPeriodicAllocSlots time.Duration + // chan for receiving action request. + actionQueue <-chan action +} + +// config is configuration of a single set. +type config struct { + // maximum number of slot occupying nodes for incoming connections. + inPeers uint32 + // maximum number of slot occupying nodes for outgoing connections. + outPeers uint32 + + // TODO Use in future for reserved only peers + // if true, we only accept reservedNodes (#1888). + reservedOnly bool + + // time duration for a peerSet to periodically call allocSlots. + periodicAllocTime time.Duration +} + +// ConfigSet set of peerSet config. +type ConfigSet struct { + Set []*config +} + +// NewConfigSet creates a new config set for the peerSet +func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet { + set := &config{ + inPeers: in, + outPeers: out, + reservedOnly: reservedOnly, + periodicAllocTime: allocTime, + } + + return &ConfigSet{ + Set: []*config{set}, + } +} + +func newPeerSet(cfg *ConfigSet) (*PeerSet, error) { + if len(cfg.Set) == 0 { + return nil, ErrConfigSetIsEmpty + } + + peerState, err := NewPeerState(cfg.Set) + if err != nil { + return nil, err + } + + // TODO: currently we only have one set, change this once we have more (#1886). + cfgSet := cfg.Set[0] + now := time.Now() + ps := &PeerSet{ + peerState: peerState, + reservedNode: make(map[peer.ID]struct{}), + isReservedOnly: cfgSet.reservedOnly, + created: now, + latestTimeUpdate: now, + nextPeriodicAllocSlots: cfgSet.periodicAllocTime, + } + + return ps, nil +} + +// If we multiply each second the reputation by `k` (where `k` is between 0 and 1), it +// takes `ln(0.5) / ln(k)` seconds to reduce the reputation by half. Use this formula to +// empirically determine a value of `k` that looks correct. +// we use `k = 0.98`, so we divide by `50`. With that value, it takes 34.3 seconds +// to reduce the reputation by half. +func reputationTick(reput Reputation) Reputation { + diff := reput / 50 + if diff == 0 && reput < 0 { + diff = -1 + } else if diff == 0 && reput > 0 { + diff = 1 + } + + return reput.sub(diff) +} + +// updateTime updates the value of latestTimeUpdate and performs all the updates that happen +// over time, such as Reputation increases for staying connected. +func (ps *PeerSet) updateTime() error { + currTime := time.Now() + // identify the time difference between current time and last update time for peer reputation in seconds. + // update the latestTimeUpdate to currTime. + elapsedLatest := ps.latestTimeUpdate.Sub(ps.created) + elapsedNow := currTime.Sub(ps.created) + ps.latestTimeUpdate = currTime + secDiff := int64(elapsedNow.Seconds() - elapsedLatest.Seconds()) + + // This will give for how many seconds decaying is required for each peer. + // For each elapsed second, move the node reputation towards zero. + for i := int64(0); i < secDiff; i++ { + for _, peerID := range ps.peerState.peers() { + n, err := ps.peerState.getNode(peerID) + if err != nil { + return err + } + + before := n.getReputation() + after := reputationTick(before) + n.setReputation(after) + ps.peerState.nodes[peerID] = n + + if after != 0 { + continue + } + + // if the peer reaches reputation 0, and there is no connection to it, forget it. + length := ps.peerState.getSetLength() + for set := 0; set < length; set++ { + if ps.peerState.peerStatus(set, peerID) != notConnectedPeer { + continue + } + + lastDiscoveredTime := ps.peerState.lastConnectedAndDiscovered(set, peerID) + if lastDiscoveredTime.Add(forgetAfterTime).Second() >= currTime.Second() { + continue + } + + // forget peer removes the peer from the list of members of the set. + err = ps.peerState.forgetPeer(set, peerID) + if err != nil { + return err + } + } + } + } + + return nil +} + +// reportPeer on report ReputationChange of the peer based on its behaviour, +// if the updated Reputation is below BannedThresholdValue then, this node need to be disconnected +// and a drop message for the peer is sent in order to disconnect. +func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error { + // we want reputations to be up-to-date before adjusting them. + if err := ps.updateTime(); err != nil { + return err + } + + for _, pid := range peers { + n, err := ps.peerState.getNode(pid) + if err != nil { + return err + } + + rep := n.addReputation(change.Value) + ps.peerState.nodes[pid] = n + if rep >= BannedThresholdValue { + return nil + } + + setLen := ps.peerState.getSetLength() + for i := 0; i < setLen; i++ { + if ps.peerState.peerStatus(i, pid) == connectedPeer { + // disconnect peer + err = ps.peerState.disconnect(i, pid) + if err != nil { + return err + } + + ps.resultMsgCh <- Message{ + Status: Drop, + setID: uint64(i), + PeerID: pid, + } + if err = ps.allocSlots(i); err != nil { + return err + } + } + } + } + return nil +} + +// allocSlots tries to fill available outgoing slots of nodes for the given set. +func (ps *PeerSet) allocSlots(setIdx int) error { + err := ps.updateTime() + if err != nil { + return err + } + + peerState := ps.peerState + for reservePeer := range ps.reservedNode { + status := peerState.peerStatus(setIdx, reservePeer) + switch status { + case connectedPeer: + continue + case unknownPeer: + peerState.discover(setIdx, reservePeer) + } + + var n *node + n, err = ps.peerState.getNode(reservePeer) + if err != nil { + return err + } + + if n.getReputation() < BannedThresholdValue { + break + } + + if err = peerState.tryOutgoing(setIdx, reservePeer); err != nil { + return err + } + + ps.resultMsgCh <- Message{ + Status: Connect, + setID: uint64(setIdx), + PeerID: reservePeer, + } + } + // nothing more to do if we're in reserved mode. + if ps.isReservedOnly { + return nil + } + + for peerState.hasFreeOutgoingSlot(setIdx) { + peerID := peerState.highestNotConnectedPeer(setIdx) + if peerID == "" { + break + } + + n := peerState.nodes[peerID] + if n.getReputation() < BannedThresholdValue { + logger.Crit("highest rated peer is below bannedThresholdValue") + break + } + + if err = peerState.tryOutgoing(setIdx, peerID); err != nil { + break + } + + ps.resultMsgCh <- Message{ + Status: Connect, + setID: uint64(setIdx), + PeerID: peerID, + } + + logger.Debug("Sent connect message", "peer", peerID) + } + return nil +} + +func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error { + for _, peerID := range peers { + if _, ok := ps.reservedNode[peerID]; ok { + logger.Debug("peer already exists in peerSet", "peer", peerID) + return nil + } + + ps.reservedNode[peerID] = struct{}{} + ps.peerState.addNoSlotNode(setID, peerID) + if err := ps.allocSlots(setID); err != nil { + return err + } + } + return nil +} + +func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error { + for _, peerID := range peers { + if _, ok := ps.reservedNode[peerID]; !ok { + logger.Debug("peer doesn't exists in the peerSet", "peerID", peerID) + return nil + } + + delete(ps.reservedNode, peerID) + ps.peerState.removeNoSlotNode(setID, peerID) + + // nothing more to do if not in reservedOnly mode. + if !ps.isReservedOnly { + return nil + } + + // reservedOnly mode is not yet implemented for future this code will help. + // If however the peerSet is in reserved-only mode, then non-reserved node peers needs to be + // disconnected. + if ps.peerState.peerStatus(setID, peerID) == connectedPeer { + err := ps.peerState.disconnect(setID, peerID) + if err != nil { + return err + } + + ps.resultMsgCh <- Message{ + Status: Drop, + setID: uint64(setID), + PeerID: peerID, + } + } + } + + return nil +} + +func (ps *PeerSet) setReservedPeer(setID int, peers ...peer.ID) error { + toInsert, toRemove := make([]peer.ID, 0, len(peers)), make([]peer.ID, 0, len(peers)) + peerIDMap := make(map[peer.ID]struct{}, len(peers)) + for _, pid := range peers { + peerIDMap[pid] = struct{}{} + if _, ok := ps.reservedNode[pid]; ok { + continue + } + toInsert = append(toInsert, pid) + } + + for pid := range ps.reservedNode { + if _, ok := peerIDMap[pid]; ok { + continue + } + toRemove = append(toRemove, pid) + } + + if err := ps.addReservedPeers(setID, toInsert...); err != nil { + return err + } + + return ps.removeReservedPeers(setID, toRemove...) +} + +func (ps *PeerSet) addPeer(setID int, peers peer.IDSlice) error { + for _, pid := range peers { + if ps.peerState.peerStatus(setID, pid) != unknownPeer { + return nil + } + + ps.peerState.discover(setID, pid) + if err := ps.allocSlots(setID); err != nil { + return err + } + } + return nil +} + +func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error { + for _, pid := range peers { + if _, ok := ps.reservedNode[pid]; ok { + logger.Debug("peer is reserved and cannot be removed", "peer", pid) + return nil + } + + if status := ps.peerState.peerStatus(setID, pid); status == connectedPeer { + ps.resultMsgCh <- Message{ + Status: Drop, + setID: uint64(setID), + PeerID: pid, + } + + // disconnect and forget + err := ps.peerState.disconnect(setID, pid) + if err != nil { + return err + } + + if err = ps.peerState.forgetPeer(setID, pid); err != nil { + return err + } + } else if status == notConnectedPeer { + if err := ps.peerState.forgetPeer(setID, pid); err != nil { + return err + } + } + } + return nil +} + +// incoming indicates that we have received an incoming connection. Must be answered either with +// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. +func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { + if err := ps.updateTime(); err != nil { + return err + } + + // This is for reserved only mode. + for _, pid := range peers { + if ps.isReservedOnly { + if _, ok := ps.reservedNode[pid]; !ok { + ps.resultMsgCh <- Message{Status: Reject} + continue + } + } + + status := ps.peerState.peerStatus(setID, pid) + switch status { + case connectedPeer: + continue + case notConnectedPeer: + ps.peerState.nodes[pid].lastConnected[setID] = time.Now() + case unknownPeer: + ps.peerState.discover(setID, pid) + } + + state := ps.peerState + p := state.nodes[pid] + switch { + case p.getReputation() < BannedThresholdValue: + ps.resultMsgCh <- Message{Status: Reject} + case state.tryAcceptIncoming(setID, pid) != nil: + ps.resultMsgCh <- Message{Status: Reject} + default: + ps.resultMsgCh <- Message{Status: Accept} + } + } + + return nil +} + +// DropReason represents reason for disconnection of the peer +type DropReason int + +const ( + // UnknownDrop is used when substream or connection has been closed for an unknown reason + UnknownDrop DropReason = iota + // RefusedDrop is used when sub-stream or connection has been explicitly refused by the target. + // In other words, the peer doesn't actually belong to this set. + RefusedDrop +) + +// disconnect indicate that we disconnect an active connection with a peer, or that we failed to connect. +// Must only be called after the peerSet has either generated a Connect message with this +// peer, or accepted an incoming connection with this peer. +func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) error { + err := ps.updateTime() + if err != nil { + return err + } + + state := ps.peerState + for _, pid := range peers { + connectionStatus := state.peerStatus(setIdx, pid) + if connectionStatus != connectedPeer { + return ErrDisconnectReceivedForNonConnectedPeer + } + + n := state.nodes[pid] + n.addReputation(disconnectReputationChange) + state.nodes[pid] = n + + if err = state.disconnect(setIdx, pid); err != nil { + return err + } + ps.resultMsgCh <- Message{ + Status: Drop, + PeerID: pid, + } + + // TODO: figure out the condition of connection refuse. + if reason == RefusedDrop { + if err = ps.removePeer(setIdx, pid); err != nil { + return err + } + } + } + + return ps.allocSlots(setIdx) +} + +// start handles all the action for the peerSet. +func (ps *PeerSet) start(aq chan action) { + ps.actionQueue = aq + ps.resultMsgCh = make(chan interface{}, msgChanSize) + go ps.doWork() +} + +func (ps *PeerSet) doWork() { + ticker := time.NewTicker(ps.nextPeriodicAllocSlots) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + l := ps.peerState.getSetLength() + for i := 0; i < l; i++ { + if err := ps.allocSlots(i); err != nil { + logger.Debug("failed to do action on peerSet ", "error", err) + } + } + case act, ok := <-ps.actionQueue: + if !ok { + return + } + + var err error + switch act.actionCall { + case addReservedPeer: + err = ps.addReservedPeers(act.setID, act.peers...) + case removeReservedPeer: + err = ps.removeReservedPeers(act.setID, act.peers...) + case setReservedPeers: + // TODO: this is not used yet, might required to implement RPC Call for this. + err = ps.setReservedPeer(act.setID, act.peers...) + case setReservedOnly: + // TODO: not yet implemented (#1888) + err = fmt.Errorf("not implemented yet") + case reportPeer: + err = ps.reportPeer(act.reputation, act.peers...) + case addToPeerSet: + err = ps.addPeer(act.setID, act.peers) + case removeFromPeerSet: + err = ps.removePeer(act.setID, act.peers...) + case incoming: + err = ps.incoming(act.setID, act.peers...) + case sortedPeers: + act.resultPeersCh <- ps.peerState.sortedPeers(act.setID) + case disconnect: + err = ps.disconnect(act.setID, UnknownDrop, act.peers...) + } + + if err != nil { + logger.Error("failed to do action on peerSet", "action", act, "error", err) + } + } + } +} + +func (ps *PeerSet) stop() { + close(ps.resultMsgCh) +} diff --git a/dot/peerset/peerset_test.go b/dot/peerset/peerset_test.go new file mode 100644 index 0000000000..fcd6fb1d71 --- /dev/null +++ b/dot/peerset/peerset_test.go @@ -0,0 +1,184 @@ +package peerset + +import ( + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" +) + +func TestPeerSetBanned(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 25, 25, nil, nil, false) + + ps := handler.peerSet + require.Equal(t, unknownPeer, ps.peerState.peerStatus(0, peer1)) + ps.peerState.discover(0, peer1) + // adding peer1 with incoming slot. + err := ps.peerState.tryAcceptIncoming(0, peer1) + require.NoError(t, err) + + // we ban a node by setting its reputation under the threshold. + rpc := newReputationChange(BannedThresholdValue-1, "") + // we need one for the message to be processed. + handler.ReportPeer(rpc, peer1) + time.Sleep(time.Millisecond * 100) + + checkMessageStatus(t, <-ps.resultMsgCh, Drop) + + // check that an incoming connection from that node gets refused. + handler.Incoming(0, peer1) + checkMessageStatus(t, <-ps.resultMsgCh, Reject) + + // wait a bit for the node's reputation to go above the threshold. + time.Sleep(time.Millisecond * 1200) + + // try again. This time the node should be accepted. + handler.Incoming(0, peer1) + require.NoError(t, err) + checkMessageStatus(t, <-ps.resultMsgCh, Accept) +} + +func TestAddReservedPeers(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 0, 2, []peer.ID{bootNode}, []peer.ID{}, false) + ps := handler.peerSet + + handler.AddReservedPeer(0, reservedPeer) + handler.AddReservedPeer(0, reservedPeer2) + + time.Sleep(time.Millisecond * 200) + + expectedMsgs := []Message{ + {Status: Connect, setID: 0, PeerID: bootNode}, + {Status: Connect, setID: 0, PeerID: reservedPeer}, + {Status: Connect, setID: 0, PeerID: reservedPeer2}, + } + + require.Equal(t, uint32(1), ps.peerState.sets[0].numOut) + require.Equal(t, 3, len(ps.resultMsgCh)) + + for i := 0; ; i++ { + if len(ps.resultMsgCh) == 0 { + break + } + m := <-ps.resultMsgCh + msg, ok := m.(Message) + require.True(t, ok) + require.Equal(t, expectedMsgs[i], msg) + } +} + +func TestPeerSetIncoming(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 2, 1, []peer.ID{bootNode}, []peer.ID{}, false) + ps := handler.peerSet + + // connect message will be added ingoing queue for bootnode. + checkMessageStatus(t, <-ps.resultMsgCh, Connect) + + handler.Incoming(0, incomingPeer) + checkMessageStatus(t, <-ps.resultMsgCh, Accept) + + handler.Incoming(0, incoming2) + checkMessageStatus(t, <-ps.resultMsgCh, Accept) + + handler.Incoming(0, incoming3) + checkMessageStatus(t, <-ps.resultMsgCh, Reject) +} + +func TestPeerSetDiscovered(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 0, 2, []peer.ID{}, []peer.ID{reservedPeer}, false) + + ps := handler.peerSet + + handler.AddPeer(0, discovered1) + handler.AddPeer(0, discovered1) + handler.AddPeer(0, discovered2) + + time.Sleep(200 * time.Millisecond) + + require.Equal(t, 3, len(ps.resultMsgCh)) + for len(ps.resultMsgCh) == 0 { + checkMessageStatus(t, <-ps.resultMsgCh, Connect) + } +} + +func TestReAllocAfterBanned(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 25, 25, []peer.ID{}, []peer.ID{}, false) + + ps := handler.peerSet + // adding peer1 with incoming slot. + if ps.peerState.peerStatus(0, peer1) == unknownPeer { + ps.peerState.discover(0, peer1) + err := ps.peerState.tryAcceptIncoming(0, peer1) + require.NoError(t, err) + } + + // We ban a node by setting its reputation under the threshold. + rep := newReputationChange(BannedThresholdValue-1, "") + // we need one for the message to be processed. + handler.ReportPeer(rep, peer1) + time.Sleep(time.Millisecond * 100) + + checkMessageStatus(t, <-ps.resultMsgCh, Drop) + + // Check that an incoming connection from that node gets refused. + + handler.Incoming(0, peer1) + checkMessageStatus(t, <-ps.resultMsgCh, Reject) + + time.Sleep(time.Millisecond * 100) + checkMessageStatus(t, <-ps.resultMsgCh, Connect) +} + +func TestRemovePeer(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 0, 2, []peer.ID{discovered1, discovered2}, nil, false) + ps := handler.peerSet + + require.Equal(t, 2, len(ps.resultMsgCh)) + for len(ps.resultMsgCh) != 0 { + checkMessageStatus(t, <-ps.resultMsgCh, Connect) + } + + handler.RemovePeer(0, discovered1, discovered2) + time.Sleep(200 * time.Millisecond) + + require.Equal(t, 2, len(ps.resultMsgCh)) + for len(ps.resultMsgCh) != 0 { + checkMessageStatus(t, <-ps.resultMsgCh, Drop) + } + + require.Equal(t, 0, len(ps.peerState.nodes)) +} + +func TestSetReservePeer(t *testing.T) { + t.Parallel() + + handler := newTestPeerSet(t, 0, 2, nil, []peer.ID{reservedPeer, reservedPeer2}, true) + ps := handler.peerSet + + require.Equal(t, 2, len(ps.resultMsgCh)) + for len(ps.resultMsgCh) != 0 { + checkMessageStatus(t, <-ps.resultMsgCh, Connect) + } + + newRsrPeerSet := peer.IDSlice{reservedPeer, peer.ID("newRsrPeer")} + handler.SetReservedPeer(0, newRsrPeerSet...) + time.Sleep(200 * time.Millisecond) + + require.Equal(t, len(newRsrPeerSet), len(ps.reservedNode)) + for _, p := range newRsrPeerSet { + require.Contains(t, ps.reservedNode, p) + } +} diff --git a/dot/peerset/peerstate.go b/dot/peerset/peerstate.go new file mode 100644 index 0000000000..9844ddc355 --- /dev/null +++ b/dot/peerset/peerstate.go @@ -0,0 +1,413 @@ +package peerset + +import ( + "math" + "sort" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +const ( + // connectedPeer peerStatus is ingoing connected state. + connectedPeer = "connectedPeer" + // notConnectedPeer peerStatus is ingoing not connected state. + notConnectedPeer = "notConnectedPeer" + // unknownPeer peerStatus is unknown + unknownPeer = "unknownPeer" +) + +// MembershipState represent the state of node ingoing the set. +type MembershipState int + +const ( + // notMember node isn't part of that set. + notMember MembershipState = iota + // ingoing node is connected through an ingoing connection. + ingoing + // outgoing node is connected through an outgoing connection. + outgoing + // notConnected node is part of that set, but we are not connected to it. + notConnected +) + +// Info is state of a single set. +type Info struct { + // number of slot occupying nodes for which the MembershipState is ingoing. + numIn uint32 + + // number of slot occupying nodes for which the MembershipState is ingoing. + numOut uint32 + + // maximum allowed number of slot occupying nodes for which the MembershipState is ingoing. + maxIn uint32 + + // maximum allowed number of slot occupying nodes for which the MembershipState is outgoing. + maxOut uint32 + + // list of node identities (discovered or not) that don't occupy slots. + // Note for future readers: this module is purely dedicated to managing slots. + // If you are considering adding more features, please consider doing so outside this module rather + // than inside. + noSlotNodes map[peer.ID]struct{} +} + +// node represents state of a single node that we know about +type node struct { + // list of Set the node belongs to. + // always has a fixed size equal to the one of PeersState Set. The various possible Set + // are indices into this Set. + state []MembershipState + + // when we were last connected to the node, or if we were never connected when we + // discovered it. + lastConnected []time.Time + + // Reputation of the node, between int32 MIN and int32 MAX. + rep Reputation +} + +// newNode method to create a node with 0 Reputation at starting. +func newNode(n int) *node { + now := time.Now() + sets := make([]MembershipState, n) + lastConnected := make([]time.Time, n) + for i := 0; i < n; i++ { + sets[i] = notMember + lastConnected[i] = now + } + + return &node{ + state: sets, + lastConnected: lastConnected, + } +} + +func (n *node) getReputation() Reputation { + return n.rep +} + +func (n *node) addReputation(modifier Reputation) Reputation { + n.rep = n.rep.add(modifier) + return n.rep +} + +func (n *node) setReputation(modifier Reputation) { + n.rep = modifier +} + +// PeersState struct contains a list of nodes, where each node +// has a reputation and is either connected to us or not +type PeersState struct { + // list of nodes that we know about. + nodes map[peer.ID]*node + // configuration of each set. The size of this Info is never modified. + // since, single Info can also manage the flow. + sets []Info +} + +func (ps *PeersState) getNode(p peer.ID) (*node, error) { + if n, ok := ps.nodes[p]; ok { + return n, nil + } + + return nil, ErrPeerDoesNotExist +} + +// NewPeerState initiates a new PeersState +func NewPeerState(cfgs []*config) (*PeersState, error) { + if len(cfgs) == 0 { + return nil, ErrConfigSetIsEmpty + } + infoSet := make([]Info, 0, len(cfgs)) + for _, cfg := range cfgs { + info := Info{ + numIn: 0, + numOut: 0, + maxIn: cfg.inPeers, + maxOut: cfg.outPeers, + noSlotNodes: make(map[peer.ID]struct{}), + } + + infoSet = append(infoSet, info) + } + + peerState := &PeersState{ + nodes: make(map[peer.ID]*node), + sets: infoSet, + } + + return peerState, nil +} + +func (ps *PeersState) getSetLength() int { + return len(ps.sets) +} + +// peerStatus returns the status of peer based on its connection state, i.e. connectedPeer, notConnectedPeer or unknownPeer. +func (ps *PeersState) peerStatus(set int, peerID peer.ID) string { + n, err := ps.getNode(peerID) + if err != nil { + return unknownPeer + } + + switch n.state[set] { + case ingoing, outgoing: + return connectedPeer + case notConnected: + return notConnectedPeer + } + + return unknownPeer +} + +// peers return the list of all the peers we know of. +func (ps *PeersState) peers() []peer.ID { + peerIDs := make([]peer.ID, 0, len(ps.nodes)) + for k := range ps.nodes { + peerIDs = append(peerIDs, k) + } + return peerIDs +} + +// sortedPeers returns the list of peers we are connected to of a specific set. +func (ps *PeersState) sortedPeers(idx int) peer.IDSlice { + if len(ps.sets) < idx { + logger.Debug("peer state doesn't have info for the provided index") + return nil + } + + type kv struct { + peerID peer.ID + Node *node + } + + var ss []kv + for k, v := range ps.nodes { + state := v.state[idx] + if isPeerConnected(state) { + ss = append(ss, kv{k, v}) + } + } + + sort.Slice(ss, func(i, j int) bool { + return ss[i].Node.rep > ss[j].Node.rep + }) + + peerIDs := make(peer.IDSlice, len(ss)) + for i, kv := range ss { + peerIDs[i] = kv.peerID + } + + return peerIDs +} + +// highestNotConnectedPeer returns the peer with the highest Reputation and that we are not connected to. +func (ps *PeersState) highestNotConnectedPeer(set int) peer.ID { + var maxRep = math.MinInt32 + var peerID peer.ID + for id, n := range ps.nodes { + if n.state[set] != notConnected { + continue + } + + val := int(n.rep) + if val >= maxRep { + maxRep = val + peerID = id + } + } + + return peerID +} + +func (ps *PeersState) hasFreeOutgoingSlot(set int) bool { + return ps.sets[set].numOut < ps.sets[set].maxOut +} + +// Note: that it is possible for numIn to be strictly superior to the max, in case we were +// connected to reserved node then marked them as not reserved. +func (ps *PeersState) hasFreeIncomingSlot(set int) bool { + return ps.sets[set].numIn >= ps.sets[set].maxIn +} + +// addNoSlotNode adds a node to the list of nodes that don't occupy slots. +// has no effect if the node was already in the group. +func (ps *PeersState) addNoSlotNode(idx int, peerID peer.ID) { + if _, ok := ps.sets[idx].noSlotNodes[peerID]; ok { + logger.Debug("peer is already exists in no slot node", "peer", peerID) + return + } + + // Insert peerStatus + ps.sets[idx].noSlotNodes[peerID] = struct{}{} + n, err := ps.getNode(peerID) + if err != nil { + return + } + + switch n.state[idx] { + case ingoing: + ps.sets[idx].numIn-- + case outgoing: + ps.sets[idx].numOut-- + } + + ps.nodes[peerID] = n +} + +func (ps *PeersState) removeNoSlotNode(idx int, peerID peer.ID) { + if _, ok := ps.sets[idx].noSlotNodes[peerID]; !ok { + return + } + + delete(ps.sets[idx].noSlotNodes, peerID) + n, err := ps.getNode(peerID) + if err != nil { + return + } + + switch n.state[idx] { + case ingoing: + ps.sets[idx].numIn++ + case outgoing: + ps.sets[idx].numOut++ + } +} + +// disconnect method updates the node status to notConnected state it should be called only when the node is in connected state. +func (ps *PeersState) disconnect(idx int, peerID peer.ID) error { + info := ps.sets[idx] + n, err := ps.getNode(peerID) + if err != nil { + return err + } + + if _, ok := info.noSlotNodes[peerID]; !ok { + switch n.state[idx] { + case ingoing: + info.numIn-- + case outgoing: + info.numOut-- + default: + return ErrPeerDisconnected + } + } + + // set node state to notConnected. + n.state[idx] = notConnected + n.lastConnected[idx] = time.Now() + ps.sets[idx] = info + return nil +} + +// discover takes input for set id and create a node and insert in the list. +// the initial Reputation of the peer will be 0 and ingoing notMember state. +func (ps *PeersState) discover(set int, peerID peer.ID) { + numSet := len(ps.sets) + if _, err := ps.getNode(peerID); err != nil { + n := newNode(numSet) + n.state[set] = notConnected + ps.nodes[peerID] = n + } +} + +func (ps *PeersState) lastConnectedAndDiscovered(set int, peerID peer.ID) time.Time { + node, err := ps.getNode(peerID) + if err != nil && node.state[set] == notConnected { + return node.lastConnected[set] + } + return time.Now() +} + +// forgetPeer removes the peer with reputation 0 from the peerSet. +func (ps *PeersState) forgetPeer(set int, peerID peer.ID) error { + n, err := ps.getNode(peerID) + if err != nil { + return err + } + + if n.state[set] != notMember { + n.state[set] = notMember + } + + if n.getReputation() != 0 { + return nil + } + // remove the peer from peerSet nodes entirely if it isn't a member of any set. + remove := true + for _, state := range n.state { + if state != notMember { + remove = false + break + } + } + + if remove { + delete(ps.nodes, peerID) + } + + return nil +} + +// tryOutgoing tries to set the peer as connected as an outgoing connection. +// If there are enough slots available, switches the node to Connected and returns nil error. If +// the slots are full, the node stays "not connected" and we return error. +// non slot occupying nodes don't count towards the number of slots. +func (ps *PeersState) tryOutgoing(setID int, peerID peer.ID) error { + var isNoSlotOccupied bool + if _, ok := ps.sets[setID].noSlotNodes[peerID]; ok { + isNoSlotOccupied = true + } + + if !ps.hasFreeOutgoingSlot(setID) && !isNoSlotOccupied { + return ErrOutgoingSlotsUnavailable + } + + n, err := ps.getNode(peerID) + if err != nil { + return err + } + + n.state[setID] = outgoing + if !isNoSlotOccupied { + ps.sets[setID].numOut++ + } + + return nil +} + +// tryAcceptIncoming tries to accept the peer as an incoming connection. +// if there are enough slots available, switches the node to Connected and returns nil. +// If the slots are full, the node stays "not connected" and we return Err. +// non slot occupying nodes don't count towards the number of slots. +func (ps *PeersState) tryAcceptIncoming(setID int, peerID peer.ID) error { + var isNoSlotOccupied bool + if _, ok := ps.sets[setID].noSlotNodes[peerID]; ok { + isNoSlotOccupied = true + } + + // if slot is not available and the node is not a reserved node then error + if ps.hasFreeIncomingSlot(setID) && !isNoSlotOccupied { + return ErrIncomingSlotsUnavailable + } + + n, err := ps.getNode(peerID) + if err != nil { + // state inconsistency tryOutgoing on an unknown node + return err + } + + n.state[setID] = ingoing + if !isNoSlotOccupied { + // this need to be added as incoming connection allocate slot. + ps.sets[setID].numIn++ + } + + return nil +} + +// isPeerConnected returns true if peer is connected else false +func isPeerConnected(state MembershipState) bool { + return state == ingoing || state == outgoing +} diff --git a/dot/peerset/peerstate_test.go b/dot/peerset/peerstate_test.go new file mode 100644 index 0000000000..69bcf946f9 --- /dev/null +++ b/dot/peerset/peerstate_test.go @@ -0,0 +1,216 @@ +package peerset + +import ( + "testing" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" +) + +func TestFullSlotIn(t *testing.T) { + t.Parallel() + + state := newTestPeerState(t, 1, 1) + + // initially peer1 state will be unknownPeer. + require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) + // discover peer1 + state.discover(0, peer1) + // peer1 state will change from unknownPeer to notConnectedPeer, once we tried to discover it. + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer1)) + // try to make peer1 as an incoming connection. + err := state.tryAcceptIncoming(0, peer1) + require.NoError(t, err) + + // peer1 is connected + require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) + + // initially peer2 state will be unknownPeer. + require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) + // discover peer2 + state.discover(0, peer2) + // try to make peer2 as an incoming connection. + err = state.tryAcceptIncoming(0, peer2) + // peer2 will not be accepted as incoming connection, as we only have one incoming connection slot ingoing peerState. + require.Error(t, err) +} + +func TestNoSlotNodeDoesntOccupySlot(t *testing.T) { + t.Parallel() + + state := newTestPeerState(t, 1, 1) + + // peer1 will not occupy any slot. + state.addNoSlotNode(0, peer1) + // initially peer1 state will be unknownPeer. + require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) + // discover peer1 + state.discover(0, peer1) + // peer1 will become an incoming connection. + err := state.tryAcceptIncoming(0, peer1) + require.NoError(t, err) + // peer1 is connected + require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) + + // peer1 is connected, but the slot is still not occupied. + require.Equal(t, uint32(0), state.sets[0].numIn) + + // initially peer2 state will be unknownPeer. + require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) + // discover peer2 + state.discover(0, peer2) + // peer2 state will change from unknownPeer to notConnectedPeer, once we tried to discover it. + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) + + // try to accept peer2 as an incoming connection. + err = state.tryAcceptIncoming(0, peer2) + require.NoError(t, err) + + // peer2 is connected + require.Equal(t, connectedPeer, state.peerStatus(0, peer2)) + + // peer2 is connected, but the slot is still not occupied. + require.Equal(t, uint32(1), state.sets[0].numIn) +} + +func TestDisconnectingFreeSlot(t *testing.T) { + t.Parallel() + + state := newTestPeerState(t, 1, 1) + + // initially peer1 state will be unknownPeer. + require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) + // discover peer1 + state.discover(0, peer1) + err := state.tryAcceptIncoming(0, peer1) // try to make peer1 as an incoming connection. + require.NoError(t, err) + // peer1 is connected + require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) + + // initially peer2 state will be unknownPeer. + require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) + // discover peer2 + state.discover(0, peer2) + // peer2 state will change from unknownPeer to notConnectedPeer, once we tried to discover it. + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) + // try to make peer2 as an incoming connection. + err = state.tryAcceptIncoming(0, peer2) + require.Error(t, err) // peer2 will not be accepted as incoming connection, as we only have one incoming connection slot ingoing peerState. + + // disconnect peer1 + err = state.disconnect(0, peer1) + require.NoError(t, err) + + // peer2 will be accepted as incoming connection, as peer1 is disconnected. + err = state.tryAcceptIncoming(0, peer2) + require.NoError(t, err) +} + +func TestDisconnectNoSlotDoesntPanic(t *testing.T) { + t.Parallel() + + state := newTestPeerState(t, 1, 1) + + state.addNoSlotNode(0, peer1) + + require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) + + state.discover(0, peer1) + err := state.tryOutgoing(0, peer1) + require.NoError(t, err) + + require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) + + err = state.disconnect(0, peer1) + require.NoError(t, err) + + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer1)) +} + +func TestHighestNotConnectedPeer(t *testing.T) { + t.Parallel() + + state := newTestPeerState(t, 25, 25) + emptyPeerID := peer.ID("") + + require.Equal(t, emptyPeerID, state.highestNotConnectedPeer(0)) + + require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) + + state.discover(0, peer1) + n, err := state.getNode(peer1) + require.NoError(t, err) + n.setReputation(50) + state.nodes[peer1] = n + + require.Equal(t, Reputation(50), state.nodes[peer1].getReputation()) + + require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) + + state.discover(0, peer2) + n, err = state.getNode(peer2) + require.NoError(t, err) + n.setReputation(25) + state.nodes[peer2] = n + + // peer1 still has the highest reputation + require.Equal(t, peer1, state.highestNotConnectedPeer(0)) + require.Equal(t, Reputation(25), state.nodes[peer2].getReputation()) + + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) + + n, err = state.getNode(peer2) + require.NoError(t, err) + + n.setReputation(75) + state.nodes[peer2] = n + + require.Equal(t, peer2, state.highestNotConnectedPeer(0)) + require.Equal(t, Reputation(75), state.nodes[peer2].getReputation()) + + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) + err = state.tryAcceptIncoming(0, peer2) + require.NoError(t, err) + + require.Equal(t, peer1, state.highestNotConnectedPeer(0)) + + require.Equal(t, connectedPeer, state.peerStatus(0, peer2)) + err = state.disconnect(0, peer2) + require.NoError(t, err) + + require.Equal(t, notConnectedPeer, state.peerStatus(0, peer1)) + n, err = state.getNode(peer1) + require.NoError(t, err) + n.setReputation(100) + state.nodes[peer1] = n + + require.Equal(t, peer1, state.highestNotConnectedPeer(0)) +} + +func TestSortedPeers(t *testing.T) { + t.Parallel() + + const msgChanSize = 1 + state := newTestPeerState(t, 2, 1) + + state.addNoSlotNode(0, peer1) + + state.discover(0, peer1) + err := state.tryAcceptIncoming(0, peer1) + require.NoError(t, err) + + require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) + + // discover peer2 + state.discover(0, peer2) + // try to make peer2 as an incoming connection. + err = state.tryAcceptIncoming(0, peer2) + require.NoError(t, err) + + require.Equal(t, connectedPeer, state.peerStatus(0, peer1)) + + peerCh := make(chan peer.IDSlice, msgChanSize) + peerCh <- state.sortedPeers(0) + peers := <-peerCh + require.Equal(t, 2, len(peers)) +} diff --git a/dot/peerset/test_helpers.go b/dot/peerset/test_helpers.go new file mode 100644 index 0000000000..7dc1cdfe73 --- /dev/null +++ b/dot/peerset/test_helpers.go @@ -0,0 +1,67 @@ +package peerset + +import ( + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" +) + +const ( + bootNode = peer.ID("testBootNode") + reservedPeer = peer.ID("testReservedPeer") + reservedPeer2 = peer.ID("testReservedPeer2") + discovered1 = peer.ID("testDiscovered1") + discovered2 = peer.ID("testDiscovered2") + incomingPeer = peer.ID("testIncoming") + incoming2 = peer.ID("testIncoming2") + incoming3 = peer.ID("testIncoming3") + peer1 = peer.ID("testPeer1") + peer2 = peer.ID("testPeer2") +) + +func newTestPeerSet(t *testing.T, in, out uint32, bootNodes, reservedPeers []peer.ID, reservedOnly bool) *Handler { + t.Helper() + con := &ConfigSet{ + Set: []*config{ + { + inPeers: in, + outPeers: out, + reservedOnly: reservedOnly, + periodicAllocTime: time.Second * 2, + }, + }, + } + + handler, err := NewPeerSetHandler(con) + require.NoError(t, err) + + handler.Start() + + handler.AddPeer(0, bootNodes...) + handler.AddReservedPeer(0, reservedPeers...) + time.Sleep(time.Millisecond * 100) + + return handler +} + +func newTestPeerState(t *testing.T, maxIn, maxOut uint32) *PeersState { //nolint + t.Helper() + state, err := NewPeerState([]*config{ + { + inPeers: maxIn, + outPeers: maxOut, + }, + }) + require.NoError(t, err) + + return state +} + +func checkMessageStatus(t *testing.T, m interface{}, expectedStatus Status) { + t.Helper() + msg, ok := m.(Message) + require.True(t, ok) + require.Equal(t, expectedStatus, msg.Status) +} diff --git a/dot/rpc/modules/mocks/network_api.go b/dot/rpc/modules/mocks/network_api.go index d9dd66275a..753703cb7a 100644 --- a/dot/rpc/modules/mocks/network_api.go +++ b/dot/rpc/modules/mocks/network_api.go @@ -3,7 +3,10 @@ package mocks import ( + "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/gossamer/dot/peerset" common "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p-core/peer" mock "github.com/stretchr/testify/mock" ) @@ -12,6 +15,14 @@ type NetworkAPI struct { mock.Mock } +func (_m *NetworkAPI) GossipMessage(message network.NotificationsMessage) { + _m.Called(message) +} + +func (_m *NetworkAPI) ReportPeer(change peerset.ReputationChange, p peer.ID) { + _m.Called(change, p) +} + // AddReservedPeers provides a mock function with given fields: addrs func (_m *NetworkAPI) AddReservedPeers(addrs ...string) error { _va := make([]interface{}, len(addrs)) diff --git a/dot/rpc/modules/system_test.go b/dot/rpc/modules/system_test.go index 2561cbc8c8..013675da80 100644 --- a/dot/rpc/modules/system_test.go +++ b/dot/rpc/modules/system_test.go @@ -25,7 +25,14 @@ import ( "testing" "time" + log "github.com/ChainSafe/log15" + "github.com/btcsuite/btcutil/base58" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/ChainSafe/gossamer/dot/core" + coremocks "github.com/ChainSafe/gossamer/dot/core/mocks" "github.com/ChainSafe/gossamer/dot/network" "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks" "github.com/ChainSafe/gossamer/dot/state" @@ -38,13 +45,6 @@ import ( "github.com/ChainSafe/gossamer/lib/transaction" "github.com/ChainSafe/gossamer/lib/trie" "github.com/ChainSafe/gossamer/pkg/scale" - log "github.com/ChainSafe/log15" - "github.com/btcsuite/btcutil/base58" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - coremocks "github.com/ChainSafe/gossamer/dot/core/mocks" ) var ( @@ -53,7 +53,7 @@ var ( IsSyncing: true, ShouldHavePeers: true, } - testPeers = []common.PeerInfo{} + testPeers []common.PeerInfo ) func newNetworkService(t *testing.T) *network.Service { @@ -88,17 +88,15 @@ func newNetworkService(t *testing.T) *network.Service { // Test RPC's System.Health() response func TestSystemModule_Health(t *testing.T) { - net := newNetworkService(t) - net.Stop() - sys := NewSystemModule(net, nil, nil, nil, nil, nil) + networkMock := new(mocks.NetworkAPI) + networkMock.On("Health").Return(testHealth) + + sys := NewSystemModule(networkMock, nil, nil, nil, nil, nil) res := &SystemHealthResponse{} err := sys.Health(nil, nil, res) require.NoError(t, err) - - if *res != SystemHealthResponse(testHealth) { - t.Errorf("System.Health.: expected: %+v got: %+v\n", testHealth, *res) - } + require.Equal(t, SystemHealthResponse(testHealth), *res) } // Test RPC's System.NetworkState() response diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index e7d4d5c9b4..4305d05e03 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/ChainSafe/gossamer/dot/peerset" "github.com/libp2p/go-libp2p-core/peer" "github.com/ChainSafe/gossamer/dot/network" @@ -271,6 +272,10 @@ func (cs *chainSync) setPeerHead(p peer.ID, hash common.Hash, number *big.Int) e if fin.Number.Cmp(ps.number) >= 0 { // TODO: downscore this peer, or temporarily don't sync from them? (#1399) // perhaps we need another field in `peerState` to mark whether the state is valid or not + cs.network.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadBlockAnnouncementValue, + Reason: peerset.BadBlockAnnouncementReason, + }, p) return errPeerOnInvalidFork } @@ -400,11 +405,19 @@ func (cs *chainSync) sync() { case errors.Is(res.err.err, context.Canceled): return case errors.Is(res.err.err, context.DeadlineExceeded): + cs.network.ReportPeer(peerset.ReputationChange{ + Value: peerset.TimeOutValue, + Reason: peerset.TimeOutReason, + }, res.err.who) cs.ignorePeer(res.err.who) case strings.Contains(res.err.err.Error(), "dial backoff"): cs.ignorePeer(res.err.who) continue case res.err.err.Error() == "protocol not supported": + cs.network.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadProtocolValue, + Reason: peerset.BadProtocolReason, + }, res.err.who) cs.ignorePeer(res.err.who) continue default: @@ -650,7 +663,7 @@ func (cs *chainSync) doSync(req *network.BlockRequestMessage) *workerError { } // perform some pre-validation of response, error if failure - if err := cs.validateResponse(req, resp); err != nil { + if err := cs.validateResponse(req, resp, who); err != nil { return &workerError{ err: err, who: who, @@ -714,7 +727,7 @@ func (cs *chainSync) determineSyncPeers(_ *network.BlockRequestMessage) []peer.I // - the response is not empty // - the response contains all the expected fields // - each block has the correct parent, ie. the response constitutes a valid chain -func (cs *chainSync) validateResponse(req *network.BlockRequestMessage, resp *network.BlockResponseMessage) error { +func (cs *chainSync) validateResponse(req *network.BlockRequestMessage, resp *network.BlockResponseMessage, p peer.ID) error { if resp == nil || len(resp.BlockData) == 0 { return errEmptyBlockData } @@ -728,7 +741,7 @@ func (cs *chainSync) validateResponse(req *network.BlockRequestMessage, resp *ne headerRequested := (req.RequestedData & network.RequestedDataHeader) == 1 for i, bd := range resp.BlockData { - if err = validateBlockData(req, bd); err != nil { + if err = cs.validateBlockData(req, bd, p); err != nil { return err } @@ -737,6 +750,10 @@ func (cs *chainSync) validateResponse(req *network.BlockRequestMessage, resp *ne } else { // if this is a justification-only request, make sure we have the block for the justification if err = cs.validateJustification(bd); err != nil { + cs.network.ReportPeer(peerset.ReputationChange{ + Value: peerset.BadJustificationValue, + Reason: peerset.BadJustificationReason, + }, p) return err } continue @@ -801,7 +818,7 @@ func (cs *chainSync) validateResponse(req *network.BlockRequestMessage, resp *ne } // validateBlockData checks that the expected fields are in the block data -func validateBlockData(req *network.BlockRequestMessage, bd *types.BlockData) error { +func (cs *chainSync) validateBlockData(req *network.BlockRequestMessage, bd *types.BlockData, p peer.ID) error { if bd == nil { return errNilBlockData } @@ -809,6 +826,10 @@ func validateBlockData(req *network.BlockRequestMessage, bd *types.BlockData) er requestedData := req.RequestedData if (requestedData&network.RequestedDataHeader) == 1 && bd.Header == nil { + cs.network.ReportPeer(peerset.ReputationChange{ + Value: peerset.IncompleteHeaderValue, + Reason: peerset.IncompleteHeaderReason, + }, p) return errNilHeaderInResponse } diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index aab2ffdfd1..5e2ba43fea 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -53,6 +53,7 @@ func newTestChainSync(t *testing.T) (*chainSync, *blockQueue) { net := new(syncmocks.Network) net.On("DoBlockRequest", mock.AnythingOfType("peer.ID"), mock.AnythingOfType("*network.BlockRequestMessage")).Return(nil, nil) + net.On("ReportPeer", mock.AnythingOfType("peerset.ReputationChange"), mock.AnythingOfType("peer.ID")) readyBlocks := newBlockQueue(maxResponseSize) cs := newChainSync(bs, net, readyBlocks, newDisjointBlockSet(pendingBlocksLimit), defaultMinPeers, defaultSlotDuration) @@ -440,31 +441,32 @@ func TestWorkerToRequests(t *testing.T) { } func TestValidateBlockData(t *testing.T) { + cs, _ := newTestChainSync(t) req := &network.BlockRequestMessage{ RequestedData: bootstrapRequestData, } - err := validateBlockData(req, nil) + err := cs.validateBlockData(req, nil, "") require.Equal(t, errNilBlockData, err) - err = validateBlockData(req, &types.BlockData{}) + err = cs.validateBlockData(req, &types.BlockData{}, "") require.Equal(t, errNilHeaderInResponse, err) - err = validateBlockData(req, &types.BlockData{ + err = cs.validateBlockData(req, &types.BlockData{ Header: &types.Header{}, - }) + }, "") require.Equal(t, errNilBodyInResponse, err) - err = validateBlockData(req, &types.BlockData{ + err = cs.validateBlockData(req, &types.BlockData{ Header: &types.Header{}, Body: &types.Body{}, - }) + }, "") require.NoError(t, err) } func TestChainSync_validateResponse(t *testing.T) { cs, _ := newTestChainSync(t) - err := cs.validateResponse(nil, nil) + err := cs.validateResponse(nil, nil, "") require.Equal(t, errEmptyBlockData, err) req := &network.BlockRequestMessage{ @@ -491,7 +493,7 @@ func TestChainSync_validateResponse(t *testing.T) { hash := (&types.Header{ Number: big.NewInt(2), }).Hash() - err = cs.validateResponse(req, resp) + err = cs.validateResponse(req, resp, "") require.Equal(t, errResponseIsNotChain, err) require.True(t, cs.pendingBlocks.hasBlock(hash)) cs.pendingBlocks.removeBlock(hash) @@ -524,7 +526,7 @@ func TestChainSync_validateResponse(t *testing.T) { ParentHash: parent, Number: big.NewInt(3), }).Hash() - err = cs.validateResponse(req, resp) + err = cs.validateResponse(req, resp, "") require.Equal(t, errResponseIsNotChain, err) require.True(t, cs.pendingBlocks.hasBlock(hash)) bd := cs.pendingBlocks.getBlock(hash) @@ -552,7 +554,7 @@ func TestChainSync_validateResponse(t *testing.T) { }, } - err = cs.validateResponse(req, resp) + err = cs.validateResponse(req, resp, "") require.NoError(t, err) require.False(t, cs.pendingBlocks.hasBlock(hash)) @@ -567,7 +569,7 @@ func TestChainSync_validateResponse(t *testing.T) { }, } - err = cs.validateResponse(req, resp) + err = cs.validateResponse(req, resp, "") require.NoError(t, err) require.False(t, cs.pendingBlocks.hasBlock(hash)) } @@ -599,7 +601,7 @@ func TestChainSync_validateResponse_firstBlock(t *testing.T) { }, } - err := cs.validateResponse(req, resp) + err := cs.validateResponse(req, resp, "") require.True(t, errors.Is(err, errUnknownParent)) require.True(t, cs.pendingBlocks.hasBlock(header.Hash())) bd := cs.pendingBlocks.getBlock(header.Hash()) diff --git a/dot/sync/interface.go b/dot/sync/interface.go index a9746523c3..8d75922f47 100644 --- a/dot/sync/interface.go +++ b/dot/sync/interface.go @@ -21,11 +21,11 @@ import ( "sync" "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/runtime" rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage" - "github.com/libp2p/go-libp2p-core/peer" ) @@ -111,4 +111,7 @@ type Network interface { // Peers returns a list of currently connected peers Peers() []common.PeerInfo + + // ReportPeer reports peer based on the peer behaviour. + ReportPeer(change peerset.ReputationChange, p peer.ID) } diff --git a/dot/sync/mocks/Network.go b/dot/sync/mocks/Network.go index 85560e977a..e4b4abd567 100644 --- a/dot/sync/mocks/Network.go +++ b/dot/sync/mocks/Network.go @@ -3,6 +3,7 @@ package mocks import ( + "github.com/ChainSafe/gossamer/dot/peerset" common "github.com/ChainSafe/gossamer/lib/common" mock "github.com/stretchr/testify/mock" @@ -16,6 +17,10 @@ type Network struct { mock.Mock } +func (_m *Network) ReportPeer(change peerset.ReputationChange, peer peer.ID) { + _m.Called(change, peer) +} + // DoBlockRequest provides a mock function with given fields: to, req func (_m *Network) DoBlockRequest(to peer.ID, req *network.BlockRequestMessage) (*network.BlockResponseMessage, error) { ret := _m.Called(to, req) diff --git a/dot/sync/syncer.go b/dot/sync/syncer.go index 9df49c15b5..3cfa2670ee 100644 --- a/dot/sync/syncer.go +++ b/dot/sync/syncer.go @@ -35,6 +35,7 @@ type Service struct { blockState BlockState chainSync ChainSync chainProcessor ChainProcessor + network Network } // Config is the configuration for the sync Service. @@ -94,6 +95,7 @@ func NewService(cfg *Config) (*Service, error) { blockState: cfg.BlockState, chainSync: chainSync, chainProcessor: chainProcessor, + network: cfg.Network, }, nil } diff --git a/lib/trie/proof_test.go b/lib/trie/proof_test.go index 72a42ad81b..d32cdbab47 100644 --- a/lib/trie/proof_test.go +++ b/lib/trie/proof_test.go @@ -26,6 +26,7 @@ import ( func TestProofGeneration(t *testing.T) { t.Parallel() + tmp, err := ioutil.TempDir("", "*-test-trie") require.NoError(t, err)