Skip to content

Commit

Permalink
EVM-680 Dial queue issues fixes (#1593)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Jul 7, 2023
1 parent ac9b449 commit 4a083ca
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 106 deletions.
59 changes: 35 additions & 24 deletions network/dial/dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

const updateChBufferSize = 20

// DialQueue is a queue that holds dials tasks for potential peers, implemented as a min-heap
type DialQueue struct {
sync.Mutex
Expand All @@ -25,7 +27,7 @@ func NewDialQueue() *DialQueue {
return &DialQueue{
heap: dialQueueImpl{},
tasks: map[peer.ID]*DialTask{},
updateCh: make(chan struct{}),
updateCh: make(chan struct{}, updateChBufferSize),
closeCh: make(chan struct{}),
}
}
Expand All @@ -38,14 +40,12 @@ func (d *DialQueue) Close() {
// PopTask is a loop that handles update and close events [BLOCKING]
func (d *DialQueue) PopTask() *DialTask {
for {
task := d.popTaskImpl() // Blocking pop
if task != nil {
return task
}

select {
case <-d.updateCh:
case <-d.closeCh:
case <-d.updateCh: // blocks until AddTask is called...
if task := d.popTaskImpl(); task != nil {
return task
}
case <-d.closeCh: // ... or dial queue is closed
return nil
}
}
Expand All @@ -58,13 +58,13 @@ func (d *DialQueue) popTaskImpl() *DialTask {

if len(d.heap) != 0 {
// pop the first value and remove it from the heap
tt := heap.Pop(&d.heap)

task, ok := tt.(*DialTask)
task, ok := heap.Pop(&d.heap).(*DialTask)
if !ok {
return nil
}

delete(d.tasks, task.addrInfo.ID)

return task
}

Expand All @@ -78,32 +78,43 @@ func (d *DialQueue) DeleteTask(peer peer.ID) {

item, ok := d.tasks[peer]
if ok {
// negative index for popped element
if item.index >= 0 {
heap.Remove(&d.heap, item.index)
}

heap.Remove(&d.heap, item.index)
delete(d.tasks, peer)
}
}

// AddTask adds a new task to the dial queue
func (d *DialQueue) AddTask(
addrInfo *peer.AddrInfo,
priority common.DialPriority,
) {
func (d *DialQueue) AddTask(addrInfo *peer.AddrInfo, priority common.DialPriority) {
if d.addTaskImpl(addrInfo, priority) {
select {
case <-d.closeCh:
case d.updateCh <- struct{}{}:
}
}
}

func (d *DialQueue) addTaskImpl(addrInfo *peer.AddrInfo, priority common.DialPriority) bool {
d.Lock()
defer d.Unlock()

// do not spam queue with same addresses
if item, ok := d.tasks[addrInfo.ID]; ok {
// if existing priority greater than new one, replace item
if item.priority > uint64(priority) {
item.addrInfo = addrInfo
item.priority = uint64(priority)
heap.Fix(&d.heap, item.index)
}

return false
}

task := &DialTask{
addrInfo: addrInfo,
priority: uint64(priority),
}
d.tasks[addrInfo.ID] = task
heap.Push(&d.heap, task)

select {
case d.updateCh <- struct{}{}:
default:
}
return true
}
34 changes: 22 additions & 12 deletions network/dial/dial_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,31 @@ import (

func TestDialQueue(t *testing.T) {
q := NewDialQueue()
infos := [3]*peer.AddrInfo{}

info0 := &peer.AddrInfo{
ID: peer.ID("a"),
}
q.AddTask(info0, 1)
assert.Equal(t, 1, q.heap.Len())
for i, x := range []string{"a", "b", "c"} {
infos[i] = &peer.AddrInfo{
ID: peer.ID(x),
}

if i != 1 {
q.AddTask(infos[i], 8)
} else {
q.AddTask(infos[i], 1)
}

info1 := &peer.AddrInfo{
ID: peer.ID("b"),
assert.Equal(t, i+1, q.heap.Len())
}
q.AddTask(info1, 1)
assert.Equal(t, 2, q.heap.Len())

assert.Equal(t, q.popTaskImpl().addrInfo.ID, peer.ID("a"))
assert.Equal(t, q.popTaskImpl().addrInfo.ID, peer.ID("b"))
q.AddTask(infos[0], 8) // existing task, same priority
assert.Equal(t, 3, q.heap.Len())

q.AddTask(infos[2], 1) // existing task, more priority
assert.Equal(t, 3, q.heap.Len())

assert.Equal(t, peer.ID("b"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("c"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, peer.ID("a"), q.popTaskImpl().addrInfo.ID)
assert.Equal(t, 0, q.heap.Len())

assert.Nil(t, q.popTaskImpl())
Expand All @@ -43,7 +53,7 @@ func TestDialQueue(t *testing.T) {
case <-time.After(1 * time.Second):
}

q.AddTask(info0, 1)
q.AddTask(infos[0], 1)

select {
case <-done:
Expand Down
80 changes: 18 additions & 62 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,13 @@ func (s *Server) runDial() {
defer close(notifyCh)
defer cancel()

if err := s.SubscribeFn(ctx, func(event *peerEvent.PeerEvent) {
if err := s.Subscribe(ctx, func(event *peerEvent.PeerEvent) {
// Only concerned about the listed event types
// PeerConnected, PeerDialCompleted will not change HasFreeOutboundConn from false to true
switch event.Type {
case
peerEvent.PeerConnected,
peerEvent.PeerFailedToConnect,
peerEvent.PeerDisconnected,
peerEvent.PeerDialCompleted,
peerEvent.PeerAddedToDialQueue:
default:
return
Expand Down Expand Up @@ -411,13 +410,13 @@ func (s *Server) runDial() {

peerInfo := tt.GetAddrInfo()

s.logger.Debug(fmt.Sprintf("Dialing peer [%s] as local [%s]", peerInfo.String(), s.host.ID()))
s.logger.Debug("Dialing peer", "addr", peerInfo, "local", s.host.ID())

if !s.IsConnected(peerInfo.ID) {
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
if err := s.host.Connect(context.Background(), *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err.Error())
if err := s.host.Connect(ctx, *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
Expand Down Expand Up @@ -485,7 +484,7 @@ func (s *Server) GetProtocols(peerID peer.ID) ([]string, error) {
// and updates relevant counters and metrics. It is called from the
// disconnection callback of the libp2p network bundle (when the connection is closed)
func (s *Server) removePeer(peerID peer.ID) {
s.logger.Info("Peer disconnected", "id", peerID.String())
s.logger.Info("Peer disconnected", "id", peerID)

// Remove the peer from the peers map
connectionInfo := s.removePeerInfo(peerID)
Expand Down Expand Up @@ -549,10 +548,10 @@ func (s *Server) updateBootnodeConnCount(peerID peer.ID, delta int64) {
// DisconnectFromPeer disconnects the networking server from the specified peer
func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) {
if s.host.Network().Connectedness(peer) == network.Connected {
s.logger.Info(fmt.Sprintf("Closing connection to peer [%s] for reason [%s]", peer.String(), reason))
s.logger.Info("Closing connection", "id", peer, "reason", reason)

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
s.logger.Error(fmt.Sprintf("Unable to gracefully close peer connection, %v", closeErr))
if err := s.host.Network().ClosePeer(peer); err != nil {
s.logger.Error("Unable to gracefully close connection", "id", peer, "err", err)
}
}
}
Expand Down Expand Up @@ -585,7 +584,7 @@ func (s *Server) JoinPeer(rawPeerMultiaddr string) error {

// joinPeer creates a new dial task for the peer (for async joining)
func (s *Server) joinPeer(peerInfo *peer.AddrInfo) {
s.logger.Info("Join request", "addr", peerInfo.String())
s.logger.Info("Join request", "addr", peerInfo)

// This method can be completely refactored to support some kind of active
// feedback information on the dial status, and not just asynchronous updates.
Expand Down Expand Up @@ -674,54 +673,9 @@ func (s *Server) emitEvent(peerID peer.ID, peerEventType peerEvent.PeerEventType
}
}

type Subscription struct {
sub event.Subscription
ch chan *peerEvent.PeerEvent
}

func (s *Subscription) run() {
// convert interface{} to *PeerEvent channels
for {
evnt := <-s.sub.Out()
if obj, ok := evnt.(peerEvent.PeerEvent); ok {
s.ch <- &obj
}
}
}

func (s *Subscription) GetCh() chan *peerEvent.PeerEvent {
return s.ch
}

func (s *Subscription) Get() *peerEvent.PeerEvent {
obj := <-s.ch

return obj
}

func (s *Subscription) Close() {
s.sub.Close()
}

// Subscribe starts a PeerEvent subscription
func (s *Server) Subscribe() (*Subscription, error) {
raw, err := s.host.EventBus().Subscribe(new(peerEvent.PeerEvent))
if err != nil {
return nil, err
}

sub := &Subscription{
sub: raw,
ch: make(chan *peerEvent.PeerEvent),
}
go sub.run()

return sub, nil
}

// SubscribeFn is a helper method to run subscription of PeerEvents
func (s *Server) SubscribeFn(ctx context.Context, handler func(evnt *peerEvent.PeerEvent)) error {
sub, err := s.Subscribe()
// Subscribe is a helper method to run subscription of PeerEvents
func (s *Server) Subscribe(ctx context.Context, handler func(evnt *peerEvent.PeerEvent)) error {
sub, err := s.host.EventBus().Subscribe(new(peerEvent.PeerEvent))
if err != nil {
return err
}
Expand All @@ -737,8 +691,10 @@ func (s *Server) SubscribeFn(ctx context.Context, handler func(evnt *peerEvent.P
case <-s.closeCh:
return

case evnt := <-sub.GetCh():
handler(evnt)
case evnt := <-sub.Out():
if obj, ok := evnt.(peerEvent.PeerEvent); ok {
handler(&obj)
}
}
}
}()
Expand All @@ -751,7 +707,7 @@ func (s *Server) SubscribeCh(ctx context.Context) (<-chan *peerEvent.PeerEvent,
ch := make(chan *peerEvent.PeerEvent)
ctx, cancel := context.WithCancel(ctx)

err := s.SubscribeFn(ctx, func(evnt *peerEvent.PeerEvent) {
err := s.Subscribe(ctx, func(evnt *peerEvent.PeerEvent) {
select {
case <-ctx.Done():
return
Expand Down
6 changes: 3 additions & 3 deletions network/server_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *Server) setupDiscovery() error {

// Set the PeerAdded event handler
routingTable.PeerAdded = func(p peer.ID) {
// this is called from the lock. because of that execute addToDialQueue on separated routine
// spawn routine because PeerAdded is called from event handler and s.addToDialQueue emits event again
go func() {
info := s.host.Peerstore().PeerInfo(p)
s.addToDialQueue(&info, common.PriorityRandomDial)
Expand All @@ -226,8 +226,8 @@ func (s *Server) setupDiscovery() error {
)

// Register a network event handler
if subscribeErr := s.SubscribeFn(context.Background(), discoveryService.HandleNetworkEvent); subscribeErr != nil {
return fmt.Errorf("unable to subscribe to network events, %w", subscribeErr)
if err := s.Subscribe(context.Background(), discoveryService.HandleNetworkEvent); err != nil {
return fmt.Errorf("unable to subscribe to network events, %w", err)
}

// Register the actual discovery service as a valid protocol
Expand Down
14 changes: 9 additions & 5 deletions network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ func TestPeerEvent_EmitAndSubscribe(t *testing.T) {
assert.NoError(t, server.Close())
})

sub, err := server.Subscribe()
receiver := make(chan *peerEvent.PeerEvent)

err := server.Subscribe(context.Background(), func(evnt *peerEvent.PeerEvent) {
receiver <- evnt
})
assert.NoError(t, err)

count := 10
Expand All @@ -170,7 +174,7 @@ func TestPeerEvent_EmitAndSubscribe(t *testing.T) {
id, event := getIDAndEventType(i)
server.emitEvent(id, event)

received := sub.Get()
received := <-receiver
assert.Equal(t, &peerEvent.PeerEvent{
PeerID: id,
Type: event,
Expand All @@ -184,7 +188,7 @@ func TestPeerEvent_EmitAndSubscribe(t *testing.T) {
server.emitEvent(id, event)
}
for i := 0; i < count; i++ {
received := sub.Get()
received := <-receiver
id, event := getIDAndEventType(i)
assert.Equal(t, &peerEvent.PeerEvent{
PeerID: id,
Expand Down Expand Up @@ -692,7 +696,7 @@ func TestRunDial(t *testing.T) {
})
}

func TestSubscribeFn(t *testing.T) {
func TestSubscribe(t *testing.T) {
t.Parallel()

setupServer := func(t *testing.T, shouldCloseAfterTest bool) *Server {
Expand Down Expand Up @@ -730,7 +734,7 @@ func TestSubscribeFn(t *testing.T) {
close(eventCh)
})

err := server.SubscribeFn(ctx, func(e *peerEvent.PeerEvent) {
err := server.Subscribe(ctx, func(e *peerEvent.PeerEvent) {
eventCh <- e
})

Expand Down

0 comments on commit 4a083ca

Please sign in to comment.