Skip to content

Commit

Permalink
Merge pull request #82 from gfanton/fix/local_disc_connect
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored Jun 23, 2023
2 parents c5b177b + 8067e91 commit 3de041b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 11 deletions.
29 changes: 19 additions & 10 deletions pkg/tinder/driver_localdiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,15 @@ func (ld *LocalDiscovery) handleConnection(ctx context.Context, p peer.ID) {
conns := ld.h.Network().ConnsToPeer(p)
for _, conn := range conns {
if manet.IsPrivateAddr(conn.RemoteMultiaddr()) || isProximityProtocol(conn.RemoteMultiaddr()) {
ld.logger.Info("found local peer", logutil.PrivateString("peer", conn.RemotePeer().String()))
go func() {
records := ld.getLocalReccord()
if err := ld.sendRecordsTo(ctx, p, records); err != nil {
ld.logger.Warn("unable to send local record",
logutil.PrivateString("peer", p.String()),
zap.Int("records", len(records.Records)),
zap.Error(err))
} else {
ld.logger.Info("send topics to local peer", logutil.PrivateString("peer", conn.RemotePeer().String()))
}
}()

Expand All @@ -420,8 +421,10 @@ func (ld *LocalDiscovery) handleConnection(ctx context.Context, p peer.ID) {
}

func (ld *LocalDiscovery) monitorConnection(ctx context.Context) error {
sub, err := ld.h.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged),
eventbus.Name("weshnet/tinder/monitor-connection"))
sub, err := ld.h.EventBus().Subscribe([]interface{}{
new(event.EvtPeerConnectednessChanged),
new(event.EvtPeerProtocolsUpdated),
}, eventbus.Name("weshnet/tinder/monitor-connection"))
if err != nil {
return fmt.Errorf("unable to subscribe to `EvtPeerConnectednessChanged`: %w", err)
}
Expand All @@ -443,14 +446,20 @@ func (ld *LocalDiscovery) monitorConnection(ctx context.Context) error {
return
}

evt := e.(event.EvtPeerConnectednessChanged)

// send record to connected peer only
if evt.Connectedness != network.Connected {
continue
switch evt := e.(type) {
case event.EvtPeerConnectednessChanged:
// send record to connected peer only
if evt.Connectedness == network.Connected {
ld.handleConnection(ctx, evt.Peer)
}
case event.EvtPeerProtocolsUpdated:
for _, added := range evt.Added {
if added == recProtocolID {
ld.handleConnection(ctx, evt.Peer)
break
}
}
}

ld.handleConnection(ctx, evt.Peer)
}
}()

Expand Down
59 changes: 58 additions & 1 deletion pkg/tinder/driver_localdiscovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"berty.tech/weshnet/pkg/testutil"
)

func TestLocalDiscorvery(t *testing.T) {
func TestServiceLocalDiscorvery(t *testing.T) {
ctx := context.Background()
mn := mocknet.New()
defer mn.Close()
Expand Down Expand Up @@ -73,3 +73,60 @@ func TestLocalDiscorvery(t *testing.T) {
require.Equal(t, p1.Addrs(), p.Addrs)
}
}

func TestServiceLocalDiscorveryBeforeProtocolRegister(t *testing.T) {
const topic = "test_topic"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mn := mocknet.New()
defer mn.Close()

logger, cleanup := testutil.Logger(t)
defer cleanup()

p1 := genLocalPeer(t, mn)
p2 := genLocalPeer(t, mn)

err := mn.LinkAll()
require.NoError(t, err)

disc1, err := NewLocalDiscovery(logger, p1, rand.New(rand.NewSource(rand.Int63())))
require.NoError(t, err)

// create service for peer 1
s1, err := NewService(p1, logger, disc1)
require.NoError(t, err)

// start advertising
s1.StartAdvertises(ctx, topic)

// connect both peer BEFORE registering local discovery protocol for peer 2
err = mn.ConnectAllButSelf()
require.NoError(t, err)

// let some time to peers to connect and trigger protocol exchange
time.Sleep(time.Millisecond * 200)

// register p2 local discovery
disc2, err := NewLocalDiscovery(logger, p2, rand.New(rand.NewSource(rand.Int63())))
require.NoError(t, err)

s2, err := NewService(p2, logger, disc2)
require.NoError(t, err)

// start subscribe and wait for connection
sub := s2.Subscribe(topic)
defer sub.Close()

// pull to fetch current peers on the topic
sub.Pull()

select {
case p := <-sub.Out():
require.Equal(t, p.ID, p1.ID())
case <-time.After(time.Second * 5):
require.FailNow(t, "unable to wait for peer on local discovery")
}
}

0 comments on commit 3de041b

Please sign in to comment.