Skip to content

Commit

Permalink
EVM-687 Dial queue slots (#1601)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Jul 7, 2023
1 parent 4a083ca commit 6e5cd75
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 43 deletions.
69 changes: 26 additions & 43 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,34 +354,19 @@ func (s *Server) keepAliveMinimumPeerConnections() {
// Essentially, the networking server monitors for any open connection slots
// and attempts to fill them as soon as they open up
func (s *Server) runDial() {
// The notification channel needs to be buffered to avoid
// having events go missing, as they're crucial to the functioning
// of the runDial mechanism
notifyCh := make(chan struct{}, 1)

slots := NewSlots(s.connectionCounts.maxOutboundConnectionCount)
ctx, cancel := context.WithCancel(context.Background())

// cancel context first
defer close(notifyCh)
defer cancel()

if err := s.Subscribe(ctx, func(event *peerEvent.PeerEvent) {
// Only concerned about the listed event types
// PeerConnected, PeerDialCompleted will not change HasFreeOutboundConn from false to true
// Return back slot on PeerFailedToConnect or PeerDisconnected
switch event.Type {
case
peerEvent.PeerFailedToConnect,
peerEvent.PeerDisconnected,
peerEvent.PeerAddedToDialQueue:
default:
return
}

select {
case <-ctx.Done():
return
case notifyCh <- struct{}{}:
default:
peerEvent.PeerDisconnected:
slots.Release()
s.logger.Debug("slot released", "event", event.Type)
}
}); err != nil {
s.logger.Error(
Expand All @@ -400,35 +385,33 @@ func (s *Server) runDial() {
// TODO: Right now the dial task are done sequentially because Connect
// is a blocking request. In the future we should try to make up to
// maxDials requests concurrently (to be fixed in EVM-543)
for s.connectionCounts.HasFreeOutboundConn() {
tt := s.dialQueue.PopTask()
if tt == nil {
// The dial queue is closed,
// no further dial tasks are incoming
return
}
tt := s.dialQueue.PopTask()
if tt == nil {
// The dial queue is closed,
// no further dial tasks are incoming
return
}

peerInfo := tt.GetAddrInfo()
peerInfo := tt.GetAddrInfo()

s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())
if s.IsConnected(peerInfo.ID) {
continue
}

if !s.IsConnected(peerInfo.ID) {
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())
s.logger.Debug("Waiting for a dialing slot", "addr", peerInfo, "local", s.host.ID())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
}
if closed := slots.Take(ctx); closed {
return
}

// wait until there is a change in the state of a peer that
// might involve a new dial slot available
select {
case <-notifyCh:
case <-s.closeCh:
return
s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())

// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
}
}
Expand Down
39 changes: 39 additions & 0 deletions network/slots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package network

import (
"context"
)

// Slots is synchronization structure
// A routine can invoke the Take method, which will block until at least one slot becomes available
// The Release method can be called by other routines to increase the number of available slots by one
type Slots chan struct{}

// NewSlots creates Slots object with maximal slots available
func NewSlots(maximal int64) Slots {
slots := make(Slots, maximal)
// add slots
for i := int64(0); i < maximal; i++ {
slots <- struct{}{}
}

return slots
}

// Take takes slot if available or blocks until slot is available or context is done
func (s Slots) Take(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
case <-s:
return false
}
}

// Release returns back one slot. If all slots are already released, nothing will happen
func (s Slots) Release() {
select {
case s <- struct{}{}:
default: // No slot available to release, do nothing
}
}
41 changes: 41 additions & 0 deletions network/slots_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package network

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSlots(t *testing.T) {
t.Parallel()

slots := NewSlots(4)

for i := 0; i < 4; i++ {
slots.Release() // should do nothing
}

for i := 3; i >= 0; i-- {
closed := slots.Take(context.Background())
assert.False(t, closed)
}

go func() {
time.Sleep(time.Millisecond * 500)
slots.Release() // return one slot after 500 milis
time.Sleep(time.Millisecond * 500)
slots.Release() // return another slot after 1 seconds
}()

tm := time.Now().UTC()

closed1 := slots.Take(context.Background())
closed2 := slots.Take(context.Background())

assert.False(t, closed1)
assert.GreaterOrEqual(t, time.Now().UTC(), tm.Add(time.Millisecond*500))
assert.False(t, closed2)
assert.GreaterOrEqual(t, time.Now().UTC(), tm.Add(time.Millisecond*500*2))
}

0 comments on commit 6e5cd75

Please sign in to comment.