Skip to content

Commit

Permalink
EVM-543 Enable concurrent run of multiple dial tasks (#1707)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Jul 12, 2023
1 parent f4efe97 commit f17b6ee
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 45 deletions.
34 changes: 17 additions & 17 deletions network/dial/dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package dial

import (
"container/heap"
"context"
"sync"

"github.com/0xPolygon/polygon-edge/network/common"

"github.com/libp2p/go-libp2p/core/peer"
)

const updateChBufferSize = 20

// DialQueue is a queue that holds dials tasks for potential peers, implemented as a min-heap
type DialQueue struct {
sync.Mutex
Expand All @@ -27,7 +26,7 @@ func NewDialQueue() *DialQueue {
return &DialQueue{
heap: dialQueueImpl{},
tasks: map[peer.ID]*DialTask{},
updateCh: make(chan struct{}, updateChBufferSize),
updateCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}
}
Expand All @@ -37,22 +36,21 @@ func (d *DialQueue) Close() {
close(d.closeCh)
}

// PopTask is a loop that handles update and close events [BLOCKING]
func (d *DialQueue) PopTask() *DialTask {
for {
select {
case <-d.updateCh: // blocks until AddTask is called...
if task := d.popTaskImpl(); task != nil {
return task
}
case <-d.closeCh: // ... or dial queue is closed
return nil
}
// Wait waits for closing or updating event or end of context.
// Returns true for closing event or end of the context [BLOCKING].
func (d *DialQueue) Wait(ctx context.Context) bool {
select {
case <-ctx.Done(): // blocks until context is done ...
return true
case <-d.updateCh: // ... or AddTask is called ...
return false
case <-d.closeCh: // ... or dial queue is closed
return true
}
}

// popTaskImpl is the implementation for task popping from the min-heap
func (d *DialQueue) popTaskImpl() *DialTask {
// PopTask is the implementation for task popping from the min-heap
func (d *DialQueue) PopTask() *DialTask {
d.Lock()
defer d.Unlock()

Expand Down Expand Up @@ -87,8 +85,8 @@ func (d *DialQueue) DeleteTask(peer peer.ID) {
func (d *DialQueue) AddTask(addrInfo *peer.AddrInfo, priority common.DialPriority) {
if d.addTaskImpl(addrInfo, priority) {
select {
case <-d.closeCh:
case d.updateCh <- struct{}{}:
default:
}
}
}
Expand All @@ -104,6 +102,8 @@ func (d *DialQueue) addTaskImpl(addrInfo *peer.AddrInfo, priority common.DialPri
item.addrInfo = addrInfo
item.priority = uint64(priority)
heap.Fix(&d.heap, item.index)

return true
}

return false
Expand Down
15 changes: 10 additions & 5 deletions network/dial/dial_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dial

import (
"context"
"testing"
"time"

Expand All @@ -11,6 +12,9 @@ import (
func TestDialQueue(t *testing.T) {
q := NewDialQueue()
infos := [3]*peer.AddrInfo{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

defer cancel()

for i, x := range []string{"a", "b", "c"} {
infos[i] = &peer.AddrInfo{
Expand All @@ -32,17 +36,18 @@ func TestDialQueue(t *testing.T) {
q.AddTask(infos[2], 1) // existing task, more priority
assert.Equal(t, 3, q.heap.Len())

assert.Equal(t, peer.ID("b"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("c"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("a"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("b"), q.PopTask().addrInfo.ID)
assert.Equal(t, peer.ID("c"), q.PopTask().addrInfo.ID)
assert.Equal(t, peer.ID("a"), q.PopTask().addrInfo.ID)
assert.Equal(t, 0, q.heap.Len())

assert.Nil(t, q.popTaskImpl())
assert.Nil(t, q.PopTask())

done := make(chan struct{})

go func() {
q.PopTask()
q.Wait(ctx) // wait for first update
q.Wait(ctx) // wait for second update (line 61)
done <- struct{}{}
}()

Expand Down
49 changes: 26 additions & 23 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (s *Server) runDial() {
peerEvent.PeerFailedToConnect,
peerEvent.PeerDisconnected:
slots.Release()
s.logger.Debug("slot released", "event", event.Type)
s.logger.Debug("slot released", "event", event.Type, "peerID", event.PeerID)
}
}); err != nil {
s.logger.Error(
Expand All @@ -381,37 +381,40 @@ func (s *Server) runDial() {
}

for {
//nolint:godox
// TODO: Right now the dial task are done sequentially because Connect
// is a blocking request. In the future we should try to make up to
// maxDials requests concurrently (to be fixed in EVM-543)
tt := s.dialQueue.PopTask()
if tt == nil {
// The dial queue is closed,
// no further dial tasks are incoming
if closed := s.dialQueue.Wait(ctx); closed {
// The dial queue is closed, no further dial tasks are incoming
return
}

peerInfo := tt.GetAddrInfo()
for {
tt := s.dialQueue.PopTask()
if tt == nil {
break
}

if s.IsConnected(peerInfo.ID) {
continue
}
peerInfo := tt.GetAddrInfo()

s.logger.Debug("Waiting for a dialing slot", "addr", peerInfo, "local", s.host.ID())
if s.IsConnected(peerInfo.ID) {
continue
}

if closed := slots.Take(ctx); closed {
return
}
s.logger.Debug("Waiting for a dialing slot", "addr", peerInfo, "local", s.host.ID())

s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())
if closed := slots.Take(ctx); closed {
return
}

// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
go func() {
s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
}()
}
}
}
Expand Down

0 comments on commit f17b6ee

Please sign in to comment.