Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mechanism for peers blocking #4455

Merged
merged 14 commits into from
Oct 26, 2023
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,7 @@ debug_external: clean
bash test/debug-external.sh

build_localnet_validator:
bash test/build-localnet-validator.sh
bash test/build-localnet-validator.sh

tt:
go test -v -test.run OnDisconnectCheck ./p2p/security
3 changes: 2 additions & 1 deletion consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog"

msg_pb "github.com/harmony-one/harmony/api/proto/message"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (consensus *Consensus) isViewChangingMode() bool {
}

// HandleMessageUpdate will update the consensus state according to received message
func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb.Message, senderKey *bls.SerializedPublicKey) error {
func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, peer libp2p_peer.ID, msg *msg_pb.Message, senderKey *bls.SerializedPublicKey) error {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
// when node is in ViewChanging mode, it still accepts normal messages into FBFTLog
Expand Down
2 changes: 2 additions & 0 deletions core/blockchain_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,8 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i
if len(chain) == 0 {
return 0, nil, nil, ErrEmptyChain
}
first := chain[0]
fmt.Println("insertChain", utils.GetPort(), first.ShardID(), first.Epoch().Uint64(), first.NumberU64())
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
Expand Down
43 changes: 43 additions & 0 deletions internal/utils/blockedpeers/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package blockedpeers

import (
"time"

"github.com/harmony-one/harmony/internal/utils/lrucache"
"github.com/libp2p/go-libp2p/core/peer"
)

type Manager struct {
internal *lrucache.Cache[peer.ID, time.Time]
}

func NewManager(size int) *Manager {
return &Manager{
internal: lrucache.NewCache[peer.ID, time.Time](size),
}
}

func (m *Manager) IsBanned(key peer.ID, now time.Time) bool {
future, ok := m.internal.Get(key)

if ok {
return future.After(now) // future > now
}
return ok
}

func (m *Manager) Ban(key peer.ID, future time.Time) {
m.internal.Set(key, future)
}

func (m *Manager) Contains(key peer.ID) bool {
return m.internal.Contains(key)
}

func (m *Manager) Len() int {
return m.internal.Len()
}

func (m *Manager) Keys() []peer.ID {
return m.internal.Keys()
}
27 changes: 27 additions & 0 deletions internal/utils/blockedpeers/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package blockedpeers

import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

func TestNewManager(t *testing.T) {
var (
peer1 peer.ID = "peer1"
now = time.Now()
m = NewManager(4)
)

t.Run("check_empty", func(t *testing.T) {
require.False(t, m.IsBanned(peer1, now), "peer1 should not be banned")
})
t.Run("ban_peer1", func(t *testing.T) {
m.Ban(peer1, now.Add(2*time.Second))
require.True(t, m.IsBanned(peer1, now), "peer1 should be banned")
require.False(t, m.IsBanned(peer1, now.Add(3*time.Second)), "peer1 should not be banned after 3 seconds")
})

}
18 changes: 18 additions & 0 deletions internal/utils/lrucache/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,21 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
func (c *Cache[K, V]) Set(key K, value V) {
c.cache.Add(key, value)
}

// Contains checks if a key is in the cache, without updating the
// recent-ness or deleting it for being stale.
func (c *Cache[K, V]) Contains(key K) bool {
return c.cache.Contains(key)
}

func (c *Cache[K, V]) Len() int {
return c.cache.Len()
}

func (c *Cache[K, V]) Keys() []K {
out := make([]K, 0, c.cache.Len())
for _, v := range c.cache.Keys() {
out = append(out, v.(K))
}
return out
}
28 changes: 28 additions & 0 deletions internal/utils/lrucache/lrucache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package lrucache

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestKeys(t *testing.T) {
c := NewCache[int, int](10)

for i := 0; i < 3; i++ {
c.Set(i, i)
}
m := map[int]int{
0: 0,
1: 1,
2: 2,
}
keys := c.Keys()

m2 := map[int]int{}
for _, k := range keys {
m2[k] = k
}

require.Equal(t, m, m2)
}
10 changes: 7 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
// validate shardID
// validate public key size
// verify message signature
func validateShardBoundMessage(consensus *consensus.Consensus, nodeConfig *nodeconfig.ConfigType, payload []byte,
func validateShardBoundMessage(consensus *consensus.Consensus, peer libp2p_peer.ID, nodeConfig *nodeconfig.ConfigType, payload []byte,
) (*msg_pb.Message, *bls.SerializedPublicKey, bool, error) {
var (
m msg_pb.Message
Expand Down Expand Up @@ -736,6 +736,7 @@ func (node *Node) StartPubSub() error {
// p2p consensus message handler function
type p2pHandlerConsensus func(
ctx context.Context,
peer libp2p_peer.ID,
msg *msg_pb.Message,
key *bls.SerializedPublicKey,
) error
Expand All @@ -749,6 +750,7 @@ func (node *Node) StartPubSub() error {

// interface pass to p2p message validator
type validated struct {
peerID libp2p_peer.ID
consensusBound bool
handleC p2pHandlerConsensus
handleCArg *msg_pb.Message
Expand Down Expand Up @@ -806,7 +808,7 @@ func (node *Node) StartPubSub() error {

// validate consensus message
validMsg, senderPubKey, ignore, err := validateShardBoundMessage(
node.Consensus, node.NodeConfig, openBox[proto.MessageCategoryBytes:],
node.Consensus, peer, node.NodeConfig, openBox[proto.MessageCategoryBytes:],
)

if err != nil {
Expand All @@ -820,6 +822,7 @@ func (node *Node) StartPubSub() error {
}

msg.ValidatorData = validated{
peerID: peer,
consensusBound: true,
handleC: node.Consensus.HandleMessageUpdate,
handleCArg: validMsg,
Expand Down Expand Up @@ -850,6 +853,7 @@ func (node *Node) StartPubSub() error {
}
}
msg.ValidatorData = validated{
peerID: peer,
consensusBound: false,
handleE: node.HandleNodeMessage,
handleEArg: validMsg,
Expand Down Expand Up @@ -901,7 +905,7 @@ func (node *Node) StartPubSub() error {
errChan <- withError{err, nil}
}
} else {
if err := msg.handleC(ctx, msg.handleCArg, msg.senderPubKey); err != nil {
if err := msg.handleC(ctx, msg.peerID, msg.handleCArg, msg.senderPubKey); err != nil {
errChan <- withError{err, msg.senderPubKey}
}
}
Expand Down
24 changes: 12 additions & 12 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"sync"
"time"

"github.com/harmony-one/bls/ffi/go/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/blockedpeers"
"github.com/harmony-one/harmony/p2p/discovery"
"github.com/harmony-one/harmony/p2p/security"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -24,19 +31,11 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"

"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/rs/zerolog"

"github.com/harmony-one/bls/ffi/go/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p/discovery"
"github.com/harmony-one/harmony/p2p/security"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
)

type ConnectCallback func(net libp2p_network.Network, conn libp2p_network.Conn) error
Expand Down Expand Up @@ -254,7 +253,8 @@ func NewHost(cfg HostConfig) (Host, error) {
self.PeerID = p2pHost.ID()
subLogger := utils.Logger().With().Str("hostID", p2pHost.ID().Pretty()).Logger()

security := security.NewManager(cfg.MaxConnPerIP, int(cfg.MaxPeers))
banned := blockedpeers.NewManager(1024)
security := security.NewManager(cfg.MaxConnPerIP, int(cfg.MaxPeers), banned)
// has to save the private key for host
h := &HostV2{
h: p2pHost,
Expand All @@ -269,6 +269,7 @@ func NewHost(cfg HostConfig) (Host, error) {
logger: &subLogger,
ctx: ctx,
cancel: cancel,
banned: banned,
}

utils.Logger().Info().
Expand Down Expand Up @@ -323,6 +324,7 @@ type HostV2 struct {
onDisconnects DisconnectCallbacks
ctx context.Context
cancel func()
banned *blockedpeers.Manager
}

// PubSub ..
Expand Down Expand Up @@ -492,9 +494,7 @@ func (host *HostV2) ListPeer(topic string) []libp2p_peer.ID {

// ListBlockedPeer returns list of blocked peer
func (host *HostV2) ListBlockedPeer() []libp2p_peer.ID {
// TODO: this is a place holder for now
peers := make([]libp2p_peer.ID, 0)
return peers
return host.banned.Keys()
}

// GetPeerCount ...
Expand Down
29 changes: 20 additions & 9 deletions p2p/security/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package security
import (
"fmt"
"sync"
"time"

"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/blockedpeers"
libp2p_network "github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
Expand All @@ -15,14 +17,6 @@ type Security interface {
OnDisconnectCheck(conn libp2p_network.Conn) error
}

type Manager struct {
maxConnPerIP int
maxPeers int

mutex sync.Mutex
peers *peerMap // All the connected nodes, key is the Peer's IP, value is the peer's ID array
}

type peerMap struct {
peers map[string][]string
}
Expand Down Expand Up @@ -63,7 +57,16 @@ func (peerMap *peerMap) Range(f func(key string, value []string) bool) {
}
}

func NewManager(maxConnPerIP int, maxPeers int) *Manager {
type Manager struct {
maxConnPerIP int
maxPeers int

mutex sync.Mutex
peers *peerMap // All the connected nodes, key is the Peer's IP, value is the peer's ID array
banned *blockedpeers.Manager
}

func NewManager(maxConnPerIP int, maxPeers int, banned *blockedpeers.Manager) *Manager {
if maxConnPerIP < 0 {
panic("maximum connections per IP must not be negative")
}
Expand All @@ -74,6 +77,7 @@ func NewManager(maxConnPerIP int, maxPeers int) *Manager {
maxConnPerIP: maxConnPerIP,
maxPeers: maxPeers,
peers: newPeersMap(),
banned: banned,
}
}

Expand Down Expand Up @@ -118,6 +122,13 @@ func (m *Manager) OnConnectCheck(net libp2p_network.Network, conn libp2p_network
Msg("too many peers, closing")
return net.ClosePeer(conn.RemotePeer())
}
if m.banned.IsBanned(conn.RemotePeer(), time.Now()) {
utils.Logger().Warn().
Str("new peer", remoteIp).
Msg("peer is banned, closing")
return net.ClosePeer(conn.RemotePeer())
}

m.peers.Store(remoteIp, peers)
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions p2p/security/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/harmony-one/harmony/internal/utils/blockedpeers"
"github.com/libp2p/go-libp2p"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestManager_OnConnectCheck(t *testing.T) {
defer h1.Close()

fakeHost := &fakeHost{}
security := NewManager(2, 1)
security := NewManager(2, 1, blockedpeers.NewManager(4))
h1.Network().Notify(fakeHost)
fakeHost.SetConnectCallback(security.OnConnectCheck)
fakeHost.SetDisconnectCallback(security.OnDisconnectCheck)
Expand Down Expand Up @@ -100,7 +101,7 @@ func TestManager_OnDisconnectCheck(t *testing.T) {
defer h1.Close()

fakeHost := &fakeHost{}
security := NewManager(2, 0)
security := NewManager(2, 0, blockedpeers.NewManager(4))
h1.Network().Notify(fakeHost)
fakeHost.SetConnectCallback(security.OnConnectCheck)
fakeHost.SetDisconnectCallback(security.OnDisconnectCheck)
Expand Down