Skip to content

Commit

Permalink
Implement time based batch processing of transcation message.
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD committed Nov 8, 2021
1 parent 1c989ad commit 9384f18
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 92 deletions.
5 changes: 5 additions & 0 deletions dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Minute * 5

// defaultTxnBatchSize is the default size for the transaction batch
defaultTxnBatchSize = 100
)

// DefaultBootnodes the default value for Config.Bootnodes
Expand Down Expand Up @@ -104,6 +107,8 @@ type Config struct {
telemetryInterval time.Duration

noPreAllocate bool // internal option

batchSize int // internal option
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
14 changes: 14 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,17 @@ func (h *host) protocols() []string {
func (h *host) closePeer(peer peer.ID) error {
return h.h.Network().ClosePeer(peer)
}

func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
connToPeer := h.h.Network().ConnsToPeer(p)
for _, c := range connToPeer {
for _, st := range c.GetStreams() {
if st.Protocol() == pID {
err := st.Close()
if err != nil {
logger.Trace("Failed to close stream", "protocol", pID, "error", err)
}
}
}
}
}
47 changes: 16 additions & 31 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
NotificationsMessageHandler = func(peer peer.ID, msg NotificationsMessage) (propagate bool, err error)

// NotificationsMessageBatchHandler is called when a (non-handshake) message is received over a notifications stream in batch processing mode.
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage) (batchMsgs []*BatchMessage, err error)
NotificationsMessageBatchHandler = func(peer peer.ID, msg NotificationsMessage)
)

// BatchMessage is exported for the mocks of lib/grandpa/mocks/network.go
Expand Down Expand Up @@ -222,46 +222,31 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
"peer", stream.Conn().RemotePeer(),
)

var (
propagate bool
err error
msgs []*BatchMessage
)
if batchHandler != nil {
msgs, err = batchHandler(peer, msg)
if err != nil {
return err
}
batchHandler(peer, msg)
return nil
}

propagate = len(msgs) > 0
} else {
propagate, err = messageHandler(peer, msg)
if err != nil {
return err
}
msgs = append(msgs, &BatchMessage{
msg: msg,
peer: peer,
})
propagate, err := messageHandler(peer, msg)
if err != nil {
return err
}

if !propagate || s.noGossip {
return nil
}

for _, data := range msgs {
seen := s.gossip.hasSeen(data.msg)
if !seen {
s.broadcastExcluding(info, data.peer, data.msg)
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
seen := s.gossip.hasSeen(msg)
if !seen {
s.broadcastExcluding(info, peer, msg)
return nil
}

// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Reason: peerset.DuplicateGossipReason,
}, peer)
return nil
}
}
Expand Down
66 changes: 36 additions & 30 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)

func TestHandshake_SizeOf(t *testing.T) {
Expand Down Expand Up @@ -320,20 +320,17 @@ func Test_HandshakeTimeout(t *testing.T) {
}

func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
const batchSize = 5
basePath := utils.NewTestBasePath(t, "nodeA")
mockhandler := &MockTransactionHandler{}
mockhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mockhandler.On("TransactionsCount").Return(0)
config := &Config{
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
TransactionHandler: mockhandler,
BasePath: basePath,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
batchSize: batchSize,
}

s := createTestService(t, config)
s.batchSize = 5
srvc1 := createTestService(t, config)

configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Expand All @@ -342,42 +339,41 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
NoMDNS: true,
}

b := createTestService(t, configB)
srvc2 := createTestService(t, configB)

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// don't set handshake data ie. this stream has just been opened
testPeerID := b.host.id()
txnBatch := make(chan *BatchMessage, batchSize)
txnBatchHandler := srvc1.createBatchMessageHandler(txnBatch)

// connect nodes
addrInfoB := b.host.addrInfo()
err := s.host.connect(addrInfoB)
addrInfoB := srvc2.host.addrInfo()
err := srvc1.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = s.host.connect(addrInfoB)
err = srvc1.host.connect(addrInfoB)
require.NoError(t, err)
}
require.NoError(t, err)

stream, err := s.host.h.NewStream(s.ctx, b.host.id(), s.host.protocolID+transactionsID)
txnProtocolID := srvc1.host.protocolID + transactionsID
stream, err := srvc1.host.h.NewStream(srvc1.ctx, srvc2.host.id(), txnProtocolID)
require.NoError(t, err)
require.Len(t, txnBatch, 0)

// create info and handler
info := &notificationsProtocol{
protocolID: s.host.protocolID + transactionsID,
getHandshake: s.getTransactionHandshake,
protocolID: txnProtocolID,
getHandshake: srvc1.getTransactionHandshake,
handshakeValidator: validateTransactionHandshake,
inboundHandshakeData: new(sync.Map),
outboundHandshakeData: new(sync.Map),
}
handler := s.createNotificationsMessageHandler(info, s.handleTransactionMessage, txnBatchHandler)
handler := srvc1.createNotificationsMessageHandler(info, srvc1.handleTransactionMessage, txnBatchHandler)

// set handshake data to received
info.inboundHandshakeData.Store(testPeerID, handshakeData{
info.inboundHandshakeData.Store(srvc2.host.id(), handshakeData{
received: true,
validated: true,
})

msg := &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
Expand Down Expand Up @@ -411,11 +407,21 @@ func TestCreateNotificationsMessageHandler_HandleTransaction(t *testing.T) {
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 0)
require.Len(t, txnBatch, 5)

// reached batch size limit, below transaction will not be included in batch.
msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 5)

msg = &TransactionMessage{
Extrinsics: []types.Extrinsic{{1, 1}, {2, 2}},
}
// wait for transaction batch channel to process.
time.Sleep(1300 * time.Millisecond)
err = handler(stream, msg)
require.NoError(t, err)
require.Len(t, txnBatch, 1)
Expand Down
8 changes: 4 additions & 4 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

batchSize int
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -141,6 +139,9 @@ func NewService(cfg *Config) (*Service, error) {
connectToPeersTimeout = cfg.DiscoveryInterval
}

if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}
// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -179,7 +180,6 @@ func NewService(cfg *Config) (*Service, error) {
bufPool: bufPool,
streamManager: newStreamManager(ctx),
blockResponseBuf: make([]byte, maxBlockResponseSize),
batchSize: 100,
}

return network, err
Expand Down Expand Up @@ -227,7 +227,7 @@ func (s *Service) Start() error {
logger.Warn("failed to register notifications protocol", "sub-protocol", blockAnnounceID, "error", err)
}

txnBatch := make(chan *BatchMessage, s.batchSize)
txnBatch := make(chan *BatchMessage, s.cfg.batchSize)
txnBatchHandler := s.createBatchMessageHandler(txnBatch)

// register transactions protocol
Expand Down
4 changes: 3 additions & 1 deletion dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func createServiceHelper(t *testing.T, num int) []*Service {

// helper method to create and start a new network service
func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
t.Helper()

if cfg == nil {
basePath := utils.NewTestBasePath(t, "node")

Expand All @@ -84,7 +86,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {

if cfg.TransactionHandler == nil {
mocktxhandler := &MockTransactionHandler{}
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*TransactionMessage")).Return(nil)
mocktxhandler.On("HandleTransactionMessage", mock.AnythingOfType("*network.TransactionMessage")).Return(true, nil)
mocktxhandler.On("TransactionsCount").Return(0)
cfg.TransactionHandler = mocktxhandler
}
Expand Down
71 changes: 45 additions & 26 deletions dot/network/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package network
import (
"errors"
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"

"github.com/libp2p/go-libp2p-core/peer"
)

var (
Expand Down Expand Up @@ -119,36 +120,54 @@ func decodeTransactionHandshake(_ []byte) (Handshake, error) {
return &transactionHandshake{}, nil
}

func (s *Service) createBatchMessageHandler(txnBatch chan *BatchMessage) NotificationsMessageBatchHandler {
return func(peer peer.ID, msg NotificationsMessage) (msgs []*BatchMessage, err error) {
func (s *Service) createBatchMessageHandler(txnBatchCh chan *BatchMessage) NotificationsMessageBatchHandler {
go func() {
protocolID := s.host.protocolID + transactionsID
ticker := time.NewTicker(1 * time.Second)

for {
out:
select {
case <-s.ctx.Done():
return
case <-ticker.C:
innerTicker := time.NewTicker(300 * time.Millisecond)
for {
select {
case <-innerTicker.C:
break out
case txnMsg := <-txnBatchCh:
propagate, err := s.handleTransactionMessage(txnMsg.peer, txnMsg.msg)
if err != nil {
s.host.closeProtocolStream(protocolID, txnMsg.peer)
continue
}

if s.noGossip || !propagate {
continue
}

if !s.gossip.hasSeen(txnMsg.msg) {
s.broadcastExcluding(s.notificationsProtocols[TransactionMsgType], txnMsg.peer, txnMsg.msg)
}
}
}
}
}
}()

return func(peer peer.ID, msg NotificationsMessage) {
data := &BatchMessage{
msg: msg,
peer: peer,
}
txnBatch <- data

if len(txnBatch) < s.batchSize {
return nil, nil
}

var propagateMsgs []*BatchMessage
for txnData := range txnBatch {
propagate, err := s.handleTransactionMessage(txnData.peer, txnData.msg)
if err != nil {
continue
}
if propagate {
propagateMsgs = append(propagateMsgs, &BatchMessage{
msg: txnData.msg,
peer: txnData.peer,
})
}
if len(txnBatch) == 0 {
break
}
select {
case txnBatchCh <- data:
case <-time.After(time.Millisecond * 200):
logger.Debug("transaction message not included into batch")
return
}
// May be use error to compute peer score.
return propagateMsgs, nil
}
}

Expand Down

0 comments on commit 9384f18

Please sign in to comment.