Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): Implement time based handle transaction #1942

Merged
merged 18 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"path"
"time"

"github.com/ChainSafe/gossamer/internal/log"
"github.com/libp2p/go-libp2p-core/crypto"

"github.com/ChainSafe/gossamer/internal/log"
)

const (
Expand Down Expand Up @@ -39,6 +40,8 @@ const (

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

defaultTxnBatchSize = 100
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
)

// DefaultBootnodes the default value for Config.Bootnodes
Expand Down Expand Up @@ -93,6 +96,11 @@ type Config struct {
telemetryInterval time.Duration

noPreAllocate bool // internal option

batchSize int // internal option

// SlotDuration is the slot duration to produce a block
SlotDuration time.Duration
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
4 changes: 2 additions & 2 deletions dot/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/utils"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/lib/utils"
)

func newTestDiscovery(t *testing.T, num int) []*discovery {
Expand Down
15 changes: 15 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,18 @@ func (h *host) protocols() []string {
func (h *host) closePeer(peer peer.ID) error {
return h.h.Network().ClosePeer(peer)
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() != pID {
continue
}
err := st.Close()
if err != nil {
logger.Tracef("Failed to close stream for protocol %s: %s", pID, err)
}
}
}
}
14 changes: 7 additions & 7 deletions dot/network/mock_transaction_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 18 additions & 37 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/peerset"

"github.com/libp2p/go-libp2p-core/mux"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/ChainSafe/gossamer/dot/peerset"
)

const handshakeTimeout = time.Second * 10
Expand All @@ -42,10 +42,8 @@ type (
// NotificationsMessageHandler is called when a (non-handshake) message is received over a notifications stream.
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake)
// message is received over a notifications stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (
batchMsgs []*BatchMessage, err error)
// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for me to gain understanding, but what is the distinction here between a handshake and a non-handshake message when in batch processing mode, and why do they have to be handled differently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a notifications stream is opened (see docs on the different stream types here https://docs.rs/sc-network/0.9.0/sc_network/) the first message each side sends to each other over it is a "handshake" message, which each side validates and if they don't like it they close the stream, otherwise it stays open and the rest of the messages that are sent over the stream (by the opening side only) are the normal messages for that stream protocol, so in this case TransactionMessages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay that makes sense. I wasn't sure if handshake meant something different in this context, but this sounds like what I would think of as a typical handshake for establishing connection between peers (correct me if I'm misunderstanding though). And to clarify, the streams are one sided in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, After handshake the steam is unidirectional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's not exactly the same as a handshake for estabilishing connection/public keys, but it's a more high-level stream level handshake

NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage)
)

// BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go
Expand Down Expand Up @@ -223,47 +221,30 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
logger.Tracef("received message on notifications sub-protocol %s from peer %s, message is: %s",
info.protocolID, stream.Conn().RemotePeer(), msg)

var (
propagate bool
err error
msgs []*BatchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
batchHandler(peer, msg)
return nil
}

msgs = append(msgs, &BatchMessage{
msg: msg,
peer: peer,
})
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if !propagate || s.noGossip {
return nil
}

for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
if !s.gossip.hasSeen(msg) {
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}
}
Expand Down
69 changes: 36 additions & 33 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)

func TestCreateDecoder_BlockAnnounce(t *testing.T) {
Expand Down Expand Up @@ -302,23 +302,17 @@ func Test_HandshakeTimeout(t *testing.T) {
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
const batchSize = 5
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage",
mock.AnythingOfType("peer.ID"),
mock.AnythingOfType("*network.TransactionMessage")).
Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
batchSize: batchSize,
}

s := createTestService(t, config)
s.batchSize = 5
srvc1 := createTestService(t, config)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand All @@ -327,42 +321,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
NoMDNS: true,
}

b := createTestService(t, configB)
srvc2 := createTestService(t, configB)

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()
txnBatch := make(chan *BatchMessage, batchSize)
txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch)

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
addrInfoB := srvc2.host.addrInfo()
err := srvc1.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
err = srvc1.host.connect(addrInfoB)
require.NoError(t, err)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
protocolID: txnProtocolID,
getHandshake: srvc1.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, &handshakeData{
info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{
received: true,
validated: true,
})

msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
Expand Down Expand Up @@ -396,11 +389,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
Expand Down
17 changes: 9 additions & 8 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

const (
Expand Down Expand Up @@ -89,8 +90,6 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -125,6 +124,9 @@ func NewService(cfg *Config) (*Service, error) {
connectToPeersTimeout = cfg.DiscoveryInterval
}

if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}
// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -162,7 +164,6 @@ func NewService(cfg *Config) (*Service, error) {
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
batchSize: 100,
}

return network, err
Expand Down Expand Up @@ -211,7 +212,7 @@ func (s *Service) Start() error {
blockAnnounceID, err)
}

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatch := make(chan *BatchMessage, s.cfg.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
Expand Down
9 changes: 5 additions & 4 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func createServiceHelper(t *testing.T, num int) []*Service {

// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
t.Helper()

if cfg == nil {
basePath := utils.NewTestBasePath(t, "node")

Expand All @@ -71,14 +73,13 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {

if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage",
mock.AnythingOfType("peer.ID"),
mock.AnythingOfType("*TransactionMessage")).
Return(nil)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}

cfg.SlotDuration = time.Second
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

cfg.ProtocolID = TestProtocolID // default "/gossamer/gssmr/0"

if cfg.LogLvl == 0 {
Expand Down
Loading