Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (dot/telemetry): implement telemetry system.interval message #1528

Merged
merged 22 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d68f7f2
implement network data sytem interval telemetry message
edwardmack Apr 15, 2021
cfa4302
add lock to telemetry struct to fix concurrent websocket writes
edwardmack Apr 15, 2021
67448d7
implement block data system.interval telemetry message
edwardmack Apr 15, 2021
ac28587
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 23, 2021
1c68d46
address comments
edwardmack Apr 23, 2021
b35bb3a
fix race condition
edwardmack Apr 26, 2021
cc07fdf
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 26, 2021
922ac7c
fix lint
edwardmack Apr 26, 2021
3ab3bc7
update tests
edwardmack Apr 27, 2021
4b1ee15
refactor tests
edwardmack Apr 27, 2021
a9ddf04
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 27, 2021
1cbed8d
use interface{} for channel, add recover
edwardmack Apr 28, 2021
6eab866
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 28, 2021
1292bfa
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 29, 2021
25d35e4
rename channel doneNetworkTelemetry to closeCh
edwardmack Apr 29, 2021
3caeb27
fix check for closed channel
edwardmack Apr 29, 2021
517a46c
Merge branch 'development' into ed/tel_system_interval
edwardmack Apr 29, 2021
9f60995
fix error checking
edwardmack May 3, 2021
54b3233
Merge branch 'development' into ed/tel_system_interval
edwardmack May 3, 2021
8735fed
Merge branch 'development' into ed/tel_system_interval
edwardmack May 4, 2021
66e85c7
Merge branch 'development' into ed/tel_system_interval
edwardmack May 4, 2021
121c7fb
Merge branch 'development' into ed/tel_system_interval
edwardmack May 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
Expand Down Expand Up @@ -59,6 +60,7 @@ type host struct {
cm *ConnManager
ds *badger.Datastore
messageCache *messageCache
bwc *metrics.BandwidthCounter
}

// newHost creates a host wrapper with a new libp2p host instance
Expand Down Expand Up @@ -167,6 +169,8 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

bwc := metrics.NewBandwidthCounter()

host := &host{
ctx: ctx,
h: h,
Expand All @@ -177,6 +181,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
ds: ds,
persistentPeers: pps,
messageCache: msgCache,
bwc: bwc,
}

cm.host = host
Expand Down Expand Up @@ -311,7 +316,11 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error {
lenBytes := uint64ToLEB128(msgLen)
encMsg = append(lenBytes, encMsg...)

_, err = s.Write(encMsg)
sent, err := s.Write(encMsg)
if err == nil {
// todo (ed) determine if there are other places to capture data sent
h.bwc.LogSentMessage(int64(sent))
}
return err
Copy link
Contributor

@noot noot Apr 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to if err != nil { return err }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, this is the only function where outgoing messages are sent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Ok, thanks.

}

Expand Down
21 changes: 19 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"time"

gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/services"
"github.com/ethereum/go-ethereum/metrics"

log "github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/metrics"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -246,6 +246,8 @@ func (s *Service) Start() error {
}

go s.logPeerCount()
go s.publishNetworkTelemetry()

return nil
}

Expand Down Expand Up @@ -279,6 +281,18 @@ func (s *Service) logPeerCount() {
}
}

func (s *Service) publishNetworkTelemetry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (s *Service) publishNetworkTelemetry() {
// done be passed in to shut this goroutine down if we care about shutting down gracefully
func (s *Service) publishNetworkTelemetry(done chan interface{}) {
ticker := time.NewTicker(s.PublishNetworkTelemetryDuration)
defer ticker.Stop()
main:
for {
select {
case <-done:
break main
// use a channel to ensure runtime can work on other goroutines
case <-ticker.C:
fmt.Println("do stuff")
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified publishNetworkTelemetry as noted above, however when I added s.doneNetworkTelemerty <- struct{}{} to line 398 in network/service.go the node seems to hang when shutting down, and never closes. I see this when I run TestStartService (in service_test.go). It seems that stop gets called twice, so the channel seems to block there. I may not have set this up correctly, so let me know if I'm missing something.

for {
o := s.host.bwc.GetBandwidthTotals()

telemetry.GetInstance().SendNetworkData(&telemetry.NetworkData{
Peers: s.host.peerCount(),
RateIn: o.RateIn,
RateOut: o.RateOut,
})
time.Sleep(time.Second * 5)
Copy link
Contributor

@timwu20 timwu20 Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make the duration configurable on Service? You'll be able to modify this in the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added telemertyInterval time.Duration to the service config so that can be modified as needed now.

}
}
func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// give new peers a slight weight
// TODO: do this once handshake is received
Expand Down Expand Up @@ -521,6 +535,9 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
_ = stream.Close()
return
}

// todo (ed) determine if there are other places to capture data received
s.host.bwc.LogRecvMessage(int64(tot))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all messages received by the network go through this function, and no where else, so having this just here is fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent, thanks.

}
}

Expand Down
24 changes: 24 additions & 0 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/common/optional"
Expand Down Expand Up @@ -170,6 +171,7 @@ func (q *syncQueue) start() {

go q.benchmark()
go q.prunePeers()
go q.sentBlockIntervalTelemetry()
}

func (q *syncQueue) syncAtHead() {
Expand Down Expand Up @@ -356,6 +358,28 @@ func (q *syncQueue) benchmark() {
}
}

func (q *syncQueue) sentBlockIntervalTelemetry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function could be just part of the Service, since it doesn't use any syncQueue functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I've moved this.

for {
best, err := q.s.blockState.BestBlockHeader()
if err != nil {
continue
}
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0) //nolint
if err != nil {
continue
}

telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{
BestHash: best.Hash(),
BestHeight: best.Number,
FinalizedHash: finalized.Hash(),
FinalizedHeight: finalized.Number,
TXCount: 0, // todo (ed) determine where to get tx count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the count of txs in a block, or in the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what they want reported.

UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size
})
time.Sleep(time.Second * 5)
}
}
func (q *syncQueue) stringifyResponseQueue() string {
if len(q.responses) == 0 {
return "[empty]"
Expand Down
45 changes: 42 additions & 3 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
Expand All @@ -33,6 +34,7 @@ type Handler struct {
buf bytes.Buffer
wsConn []*websocket.Conn
telemetryLogger *log.Entry
sync.RWMutex
}

// MyJSONFormatter struct for defining JSON Formatter
Expand Down Expand Up @@ -100,18 +102,55 @@ func (h *Handler) SendConnection(data *ConnectionData) {
"version": data.SystemVersion}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
h.sendTelemetry()
}

// SendBlockImport sends block imported message to telemetry connection
func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {
payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemtry()
h.sendTelemetry()
}

func (h *Handler) sendTelemtry() {
// NetworkData struct to hold network data telemetry information
type NetworkData struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struct and all its fields are exported. Is it required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I've updated this.

Peers int
RateIn float64
RateOut float64
}

// SendNetworkData send network data system.interval message to telemetry connection
func (h *Handler) SendNetworkData(data *NetworkData) {
payload := log.Fields{"bandwidth_download": data.RateIn, "bandwidth_upload": data.RateOut, "msg": "system.interval", "peers": data.Peers}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemetry()
}

// BlockIntervalData struct to hold data for block system.interval message
type BlockIntervalData struct {
BestHash common.Hash
BestHeight *big.Int
FinalizedHash common.Hash
FinalizedHeight *big.Int
TXCount int
UsedStateCacheSize int
}

// SendBlockIntervalData send block data system interval information to telemetry connection
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(),
"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount,
"used_state_cache_size": data.UsedStateCacheSize}
h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
h.telemetryLogger.Print()
h.sendTelemetry()
}

func (h *Handler) sendTelemetry() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add lock in AddConnections also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the lock should be acquired in the functions calling sendTelemetry because Handler.buf is written before calling sendTelemetry but it is reset inside sendTelemetry

// SendBlockIntervalData send block data system interval information to telemetry connection
func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) {
	h.Lock()
	defer h.Unlock()
	payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(),
		"finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount,
		"used_state_cache_size": data.UsedStateCacheSize}
	h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()})
	h.telemetryLogger.Print()
	h.sendTelemetry()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this to add locks and created sender which reads from buffer and sends messages.

h.Lock()
defer h.Unlock()
for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -61,6 +62,32 @@ func TestHandler_SendBlockImport(t *testing.T) {
require.Equal(t, expected, lastMessage[:101])
}

func TestHandler_SendNetworkData(t *testing.T) {
expected := []byte(`{"id":1,"payload":{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1},"ts":`)
GetInstance().SendNetworkData(&NetworkData{
Peers: 1,
RateIn: 2,
RateOut: 3,
})
time.Sleep(time.Millisecond)
// note, we only check the first 103 bytes because the remaining bytes are the timestamp, which we can't estimate
require.Equal(t, expected, lastMessage[:103])
}

func TestHandler_SendBlockIntervalData(t *testing.T) {
expected := []byte(`{"id":1,"payload":{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","txcount":2,"used_state_cache_size":1886357},"ts":`)
GetInstance().SendBlockIntervalData(&BlockIntervalData{
BestHash: common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"),
BestHeight: big.NewInt(32375),
FinalizedHash: common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"),
FinalizedHeight: big.NewInt(32256),
TXCount: 2,
UsedStateCacheSize: 1886357,
})
time.Sleep(time.Millisecond)
require.Equal(t, expected, lastMessage[:295])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use require.Contains in all testcase.

require.Contains(t, lastMessage, expected)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}

func listen(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Expand Down