From ac7b1a366ac57b74273fe2210c574a025a1f8c49 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 11:04:51 -0400 Subject: [PATCH 1/6] replace time.After with time.NewTimer --- dot/network/sync_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dot/network/sync_test.go b/dot/network/sync_test.go index 4529db0e7e..da3f70c2b5 100644 --- a/dot/network/sync_test.go +++ b/dot/network/sync_test.go @@ -470,12 +470,15 @@ func TestSyncQueue_SyncAtHead(t *testing.T) { go q.syncAtHead() time.Sleep(q.slotDuration * 3) + timeout := time.NewTimer(TestMessageTimeout) select { case req := <-q.requestCh: require.Equal(t, uint64(2), req.req.StartingBlock.Uint64()) - case <-time.After(TestMessageTimeout): + case <-timeout.C: t.Fatal("did not queue request") + timeout.Reset(TestMessageTimeout) // todo (ed): do we need to reset timer here since we're not using it after timeout? } + timeout.Stop() } func TestSyncQueue_PushRequest_NearHead(t *testing.T) { From 5ae25a57f3d85bfc996e495041d559be8f00a901 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 16:54:06 -0400 Subject: [PATCH 2/6] replace time.Affer with time.NewTicker --- dot/network/discovery.go | 12 +++++++++--- dot/network/sync.go | 12 +++++++++--- dot/network/sync_test.go | 5 +---- dot/telemetry/telemetry.go | 4 +++- lib/babe/babe.go | 4 +++- lib/grandpa/network.go | 4 +++- 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index d4773c9c5e..e3fb828c61 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -69,13 +69,15 @@ func (d *discovery) start() error { // get all currently connected peers and use them to bootstrap the DHT peers := d.h.Network().Peers() + t := time.NewTicker(startDHTTimeout) + defer t.Stop() for { if len(peers) > 0 { break } select { - case <-time.After(startDHTTimeout): + case <-t.C: logger.Debug("no peers yet, waiting to start DHT...") // wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes // will be empty and we will fail to fill the routing table @@ -138,9 +140,11 @@ func (d *discovery) discoverAndAdvertise() error { func (d *discovery) advertise() { ttl := initialAdvertisementTimeout + t := time.NewTicker(ttl) + defer t.Stop() for { select { - case <-time.After(ttl): + case <-t.C: logger.Debug("advertising ourselves in the DHT...") err := d.dht.Bootstrap(d.ctx) if err != nil { @@ -160,11 +164,13 @@ func (d *discovery) advertise() { } func (d *discovery) checkPeerCount() { + t := time.NewTicker(connectToPeersTimeout) + defer t.Stop() for { select { case <-d.ctx.Done(): return - case <-time.After(connectToPeersTimeout): + case <-t.C: if len(d.h.Network().Peers()) > d.minPeers { continue } diff --git a/dot/network/sync.go b/dot/network/sync.go index 22e1993ab9..3f2e044ffa 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -176,10 +176,12 @@ func (q *syncQueue) syncAtHead() { q.s.syncer.SetSyncing(true) q.s.noGossip = true // don't gossip messages until we're at the head + t := time.NewTicker(q.slotDuration * 2) + defer t.Stop() for { select { // sleep for average block time TODO: make this configurable from slot duration - case <-time.After(q.slotDuration * 2): + case <-t.C: case <-q.ctx.Done(): return } @@ -214,9 +216,11 @@ func (q *syncQueue) syncAtHead() { } func (q *syncQueue) handleResponseQueue() { + t := time.NewTicker(time.Second) + defer t.Stop() for { select { - case <-time.After(time.Second): + case <-t.C: case <-q.ctx.Done(): return } @@ -260,9 +264,11 @@ func (q *syncQueue) handleResponseQueue() { // prune peers with low score and connect to new peers func (q *syncQueue) prunePeers() { + t := time.NewTicker(time.Second * 30) + defer t.Stop() for { select { - case <-time.After(time.Second * 30): + case <-t.C: case <-q.ctx.Done(): return } diff --git a/dot/network/sync_test.go b/dot/network/sync_test.go index da3f70c2b5..4529db0e7e 100644 --- a/dot/network/sync_test.go +++ b/dot/network/sync_test.go @@ -470,15 +470,12 @@ func TestSyncQueue_SyncAtHead(t *testing.T) { go q.syncAtHead() time.Sleep(q.slotDuration * 3) - timeout := time.NewTimer(TestMessageTimeout) select { case req := <-q.requestCh: require.Equal(t, uint64(2), req.req.StartingBlock.Uint64()) - case <-timeout.C: + case <-time.After(TestMessageTimeout): t.Fatal("did not queue request") - timeout.Reset(TestMessageTimeout) // todo (ed): do we need to reset timer here since we're not using it after timeout? } - timeout.Stop() } func TestSyncQueue_PushRequest_NearHead(t *testing.T) { diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index cd11d65207..8e956e9435 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -109,10 +109,12 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { // SendMessage sends Message to connected telemetry listeners func (h *Handler) SendMessage(msg *Message) error { + t := time.NewTicker(time.Second * 1) + defer t.Stop() select { case h.msg <- *msg: - case <-time.After(time.Second * 1): + case <-t.C: return errors.New("timeout sending message") } return nil diff --git a/lib/babe/babe.go b/lib/babe/babe.go index 43a798f135..04246309ca 100644 --- a/lib/babe/babe.go +++ b/lib/babe/babe.go @@ -379,8 +379,10 @@ func (b *Service) invokeBlockAuthoring(epoch uint64) error { // check if it's time to start the epoch yet. if not, wait until it is if time.Since(epochStartTime) < 0 { logger.Debug("waiting for epoch to start") + t := time.NewTicker(time.Until(epochStartTime)) + defer t.Stop() select { - case <-time.After(time.Until(epochStartTime)): + case <-t.C: case <-b.ctx.Done(): return nil case <-b.pause: diff --git a/lib/grandpa/network.go b/lib/grandpa/network.go index a5c6c90556..e9f8d441e4 100644 --- a/lib/grandpa/network.go +++ b/lib/grandpa/network.go @@ -177,11 +177,13 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) ( } func (s *Service) sendNeighbourMessage() { + t := time.NewTicker(neighbourMessageInterval) + defer t.Stop() for { select { case <-s.ctx.Done(): return - case <-time.After(neighbourMessageInterval): + case <-t.C: if s.neighbourMessage == nil { continue } From d7963c651a035275772ff9ae1cef24d43bb36c5a Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 17:02:01 -0400 Subject: [PATCH 3/6] lint --- dot/network/discovery.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index e3fb828c61..a130217fc7 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -140,6 +140,8 @@ func (d *discovery) discoverAndAdvertise() error { func (d *discovery) advertise() { ttl := initialAdvertisementTimeout + // todo (ed): should this be inside the for loop so that ttl is updated as needed? + // or keep as time.After t := time.NewTicker(ttl) defer t.Stop() for { @@ -152,10 +154,10 @@ func (d *discovery) advertise() { continue } - ttl, err = d.rd.Advertise(d.ctx, string(d.pid)) + ttl, err = d.rd.Advertise(d.ctx, string(d.pid)) // nolint if err != nil { logger.Debug("failed to advertise in the DHT", "error", err) - ttl = tryAdvertiseTimeout + ttl = tryAdvertiseTimeout // nolint } case <-d.ctx.Done(): return From b4c8fc56f9de18611bf7b78aa40aa39f0ea9dd83 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Mon, 21 Jun 2021 17:11:20 -0400 Subject: [PATCH 4/6] replace time.After is discovery so ttl var is used --- dot/network/discovery.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/dot/network/discovery.go b/dot/network/discovery.go index a130217fc7..498d461ca2 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -140,13 +140,9 @@ func (d *discovery) discoverAndAdvertise() error { func (d *discovery) advertise() { ttl := initialAdvertisementTimeout - // todo (ed): should this be inside the for loop so that ttl is updated as needed? - // or keep as time.After - t := time.NewTicker(ttl) - defer t.Stop() for { select { - case <-t.C: + case <-time.After(ttl): logger.Debug("advertising ourselves in the DHT...") err := d.dht.Bootstrap(d.ctx) if err != nil { @@ -154,10 +150,10 @@ func (d *discovery) advertise() { continue } - ttl, err = d.rd.Advertise(d.ctx, string(d.pid)) // nolint + ttl, err = d.rd.Advertise(d.ctx, string(d.pid)) if err != nil { logger.Debug("failed to advertise in the DHT", "error", err) - ttl = tryAdvertiseTimeout // nolint + ttl = tryAdvertiseTimeout } case <-d.ctx.Done(): return From 64823d19e9638e8c2e2f748d2eece37e26524705 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 23 Jun 2021 10:05:38 -0400 Subject: [PATCH 5/6] replace time.After in if statement --- lib/babe/babe.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/babe/babe.go b/lib/babe/babe.go index 04246309ca..43a798f135 100644 --- a/lib/babe/babe.go +++ b/lib/babe/babe.go @@ -379,10 +379,8 @@ func (b *Service) invokeBlockAuthoring(epoch uint64) error { // check if it's time to start the epoch yet. if not, wait until it is if time.Since(epochStartTime) < 0 { logger.Debug("waiting for epoch to start") - t := time.NewTicker(time.Until(epochStartTime)) - defer t.Stop() select { - case <-t.C: + case <-time.After(time.Until(epochStartTime)): case <-b.ctx.Done(): return nil case <-b.pause: From db8618a428b3cbe073d03a9573923da3c141e5b3 Mon Sep 17 00:00:00 2001 From: Edward Mack Date: Wed, 23 Jun 2021 15:22:57 -0400 Subject: [PATCH 6/6] add configuration variables for time duration functions --- dot/network/sync.go | 40 ++++++++++++++++++++++---------------- dot/telemetry/telemetry.go | 16 +++++++++------ go.mod | 2 +- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/dot/network/sync.go b/dot/network/sync.go index 3f2e044ffa..b2ec909951 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -88,7 +88,9 @@ const ( badPeerThreshold int = -2 protectedPeerThreshold int = 7 - defaultSlotDuration = time.Second * 6 + defaultSlotDuration = time.Second * 6 + defaultHandleResponseQueueDuration = time.Second + defaultPrunePeersDuration = time.Second * 30 ) var ( @@ -132,26 +134,30 @@ type syncQueue struct { goal int64 // goal block number we are trying to sync to currStart, currEnd int64 // the start and end of the BlockResponse we are currently handling; 0 and 0 if we are not currently handling any - benchmarker *syncBenchmarker + benchmarker *syncBenchmarker + handleResponseQueueDuration time.Duration + prunePeersDuration time.Duration } func newSyncQueue(s *Service) *syncQueue { ctx, cancel := context.WithCancel(s.ctx) return &syncQueue{ - s: s, - slotDuration: defaultSlotDuration, - ctx: ctx, - cancel: cancel, - peerScore: new(sync.Map), - requestData: new(sync.Map), - requestDataByHash: new(sync.Map), - justificationRequestData: new(sync.Map), - requestCh: make(chan *syncRequest, blockRequestBufferSize), - responses: []*types.BlockData{}, - responseCh: make(chan []*types.BlockData, blockResponseBufferSize), - benchmarker: newSyncBenchmarker(), - buf: make([]byte, maxBlockResponseSize), + s: s, + slotDuration: defaultSlotDuration, + ctx: ctx, + cancel: cancel, + peerScore: new(sync.Map), + requestData: new(sync.Map), + requestDataByHash: new(sync.Map), + justificationRequestData: new(sync.Map), + requestCh: make(chan *syncRequest, blockRequestBufferSize), + responses: []*types.BlockData{}, + responseCh: make(chan []*types.BlockData, blockResponseBufferSize), + benchmarker: newSyncBenchmarker(), + buf: make([]byte, maxBlockResponseSize), + handleResponseQueueDuration: defaultHandleResponseQueueDuration, + prunePeersDuration: defaultPrunePeersDuration, } } @@ -216,7 +222,7 @@ func (q *syncQueue) syncAtHead() { } func (q *syncQueue) handleResponseQueue() { - t := time.NewTicker(time.Second) + t := time.NewTicker(q.handleResponseQueueDuration) defer t.Stop() for { select { @@ -264,7 +270,7 @@ func (q *syncQueue) handleResponseQueue() { // prune peers with low score and connect to new peers func (q *syncQueue) prunePeers() { - t := time.NewTicker(time.Second * 30) + t := time.NewTicker(q.prunePeersDuration) defer t.Stop() for { select { diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 8e956e9435..22ba7286ef 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -40,9 +40,10 @@ type Message struct { // Handler struct for holding telemetry related things type Handler struct { - msg chan Message - connections []*telemetryConnection - log log.Logger + msg chan Message + connections []*telemetryConnection + log log.Logger + sendMessageTimeout time.Duration } // KeyValue object to hold key value pairs used in telemetry messages @@ -56,14 +57,17 @@ var ( handlerInstance *Handler ) +const defaultMessageTimeout = time.Second + // GetInstance singleton pattern to for accessing TelemetryHandler func GetInstance() *Handler { //nolint if handlerInstance == nil { once.Do( func() { handlerInstance = &Handler{ - msg: make(chan Message, 256), - log: log.New("pkg", "telemetry"), + msg: make(chan Message, 256), + log: log.New("pkg", "telemetry"), + sendMessageTimeout: defaultMessageTimeout, } go handlerInstance.startListening() }) @@ -109,7 +113,7 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { // SendMessage sends Message to connected telemetry listeners func (h *Handler) SendMessage(msg *Message) error { - t := time.NewTicker(time.Second * 1) + t := time.NewTicker(h.sendMessageTimeout) defer t.Stop() select { case h.msg <- *msg: diff --git a/go.mod b/go.mod index 519258043d..1410d97cf1 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 // indirect github.com/golang/protobuf v1.4.3 github.com/golang/snappy v0.0.3-0.20201103224600-674baa8c7fc3 // indirect - github.com/google/go-cmp v0.5.6 // indirect + github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.1.5 // indirect github.com/gorilla/mux v1.7.4 github.com/gorilla/rpc v1.2.0