Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): Implement time based handle transaction #1942

Merged
merged 18 commits into from
Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

// defaultTxnBatchSize is the default size for the transaction batch
defaultTxnBatchSize = 100
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
)

// DefaultBootnodes the default value for Config.Bootnodes
Expand Down Expand Up @@ -104,6 +107,8 @@ type Config struct {
telemetryInterval time.Duration

noPreAllocate bool // internal option

batchSize int // internal option
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
14 changes: 14 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,17 @@ func (h *host) protocols() []string {
func (h *host) closePeer(peer peer.ID) error {
return h.h.Network().ClosePeer(peer)
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() == pID {
err := st.Close()
if err != nil {
logger.Trace("Failed to close stream", "protocol", pID, "error", err)
}
}
}
}
}
47 changes: 16 additions & 31 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for me to gain understanding, but what is the distinction here between a handshake and a non-handshake message when in batch processing mode, and why do they have to be handled differently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a notifications stream is opened (see docs on the different stream types here https://docs.rs/sc-network/0.9.0/sc_network/) the first message each side sends to each other over it is a "handshake" message, which each side validates and if they don't like it they close the stream, otherwise it stays open and the rest of the messages that are sent over the stream (by the opening side only) are the normal messages for that stream protocol, so in this case TransactionMessages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay that makes sense. I wasn't sure if handshake meant something different in this context, but this sounds like what I would think of as a typical handshake for establishing connection between peers (correct me if I'm misunderstanding though). And to clarify, the streams are one sided in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, After handshake the steam is unidirectional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's not exactly the same as a handshake for estabilishing connection/public keys, but it's a more high-level stream level handshake

NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*BatchMessage, err error)
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage)
)

// BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go
Expand Down Expand Up @@ -222,46 +222,31 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
"peer", stream.Conn().RemotePeer(),
)

var (
propagate bool
err error
msgs []*BatchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}
batchHandler(peer, msg)
return nil
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
msgs = append(msgs, &BatchMessage{
msg: msg,
peer: peer,
})
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if !propagate || s.noGossip {
return nil
}

for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
seen := s.gossip.hasSeen(msg)
if !seen {
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}
}
Expand Down
66 changes: 36 additions & 30 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)

func TestHandshake_SizeOf(t *testing.T) {
Expand Down Expand Up @@ -320,20 +320,17 @@ func Test_HandshakeTimeout(t *testing.T) {
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
const batchSize = 5
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
batchSize: batchSize,
}

s := createTestService(t, config)
s.batchSize = 5
srvc1 := createTestService(t, config)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand All @@ -342,42 +339,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
NoMDNS: true,
}

b := createTestService(t, configB)
srvc2 := createTestService(t, configB)

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()
txnBatch := make(chan *BatchMessage, batchSize)
txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch)

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
addrInfoB := srvc2.host.addrInfo()
err := srvc1.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
err = srvc1.host.connect(addrInfoB)
require.NoError(t, err)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
protocolID: txnProtocolID,
getHandshake: srvc1.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{
received: true,
validated: true,
})

msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
Expand Down Expand Up @@ -411,11 +407,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
Expand Down
8 changes: 4 additions & 4 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -141,6 +139,9 @@ func NewService(cfg *Config) (*Service, error) {
connectToPeersTimeout = cfg.DiscoveryInterval
}

if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}
// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -179,7 +180,6 @@ func NewService(cfg *Config) (*Service, error) {
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
batchSize: 100,
}

return network, err
Expand Down Expand Up @@ -227,7 +227,7 @@ func (s *Service) Start() error {
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
}

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatch := make(chan *BatchMessage, s.cfg.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
Expand Down
4 changes: 3 additions & 1 deletion dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func createServiceHelper(t *testing.T, num int) []*Service {

// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
t.Helper()

if cfg == nil {
basePath := utils.NewTestBasePath(t, "node")

Expand All @@ -84,7 +86,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {

if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}
Expand Down
71 changes: 45 additions & 26 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package network
import (
"errors"
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"

"github.com/libp2p/go-libp2p-core/peer"
)

var (
Expand Down Expand Up @@ -119,36 +120,54 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) {
return &transactionHandshake{}, nil
}

func (s *Service) createBatchMessageHandler(txnBatch chan *BatchMessage) NotificationsMessageBatchHandler {
return func(peer peer.ID, msg NotificationsMessage) (msgs []*BatchMessage, err error) {
func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler {
go func() {
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
protocolID := s.host.protocolID + transactionsID
ticker := time.NewTicker(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to configure this using slot duration, i think 1/3 a slot duration is reasonable, that way the node has 1/3 a slot to process the transaction messages and put them in the pool if they want. what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


for {
out:
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-s.ctx.Done():
return
case <-ticker.C:
innerTicker := time.NewTicker(300 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did this time come from? and what's the purpose of it exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external tick triggers at a regular interval of slotDuration. The inner tick was to trigger timeout, so that it doesn't take the entire slotDuration. I have updated the code to use a timer.

for {
select {
case <-innerTicker.C:
break out
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
case txnMsg := <-txnBatchCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a problem in this case condition blocks the thread for more than the ticker duration? if yes, we might need to call a goroutine every tick to avoid this blocking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are processing as many txns as possible within timeOut . If there is no data in channel then timeOut will be triggered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm asking if the process of a txnMsg takes for more than a ticker duration, for example lets say handleTransactionMessage blocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we don't process it. We process it in the next block.

qdm12 marked this conversation as resolved.
Show resolved Hide resolved
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
arijitAD marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}

if s.noGossip || !propagate {
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
}
}
}
}
}()

return func(peer peer.ID, msg NotificationsMessage) {
data := &BatchMessage{
msg: msg,
peer: peer,
}
txnBatch <- data

if len(txnBatch) < s.batchSize {
return nil, nil
}

var propagateMsgs []*BatchMessage
for txnData := range txnBatch {
propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg)
if err != nil {
continue
}
if propagate {
propagateMsgs = append(propagateMsgs, &BatchMessage{
msg: txnData.msg,
peer: txnData.peer,
})
}
if len(txnBatch) == 0 {
break
}
select {
case txnBatchCh <- data:
case <-time.After(time.Millisecond * 200):
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
logger.Debug("transaction message not included into batch")
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
return
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
// May be use error to compute peer score.
return propagateMsgs, nil
}
}

Expand Down