Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autonat service #814

Merged
merged 35 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
dc3db41
Basic version which asks peers recently connected about our nat status
diegomrsantos Nov 21, 2022
e6f947d
Add more test
diegomrsantos Nov 22, 2022
c3980ac
Add autonat service
diegomrsantos Nov 29, 2022
1ff4d6b
Refactor hp service
diegomrsantos Nov 29, 2022
a2d1d27
Remove sleepAsync
diegomrsantos Nov 30, 2022
af9f8b1
Services can be scheduled to run often
diegomrsantos Nov 30, 2022
e72397d
Removing scheduling from switch
diegomrsantos Dec 1, 2022
ad524ba
Fix hole punching test
diegomrsantos Dec 1, 2022
3775c85
Rename NetworkReachability values
diegomrsantos Dec 2, 2022
cedd0fc
Ask a configurable number of random peers
diegomrsantos Dec 2, 2022
c4accbe
Ask only peers with at least one out connection
diegomrsantos Dec 2, 2022
ba28d77
Handle double start and stop calls
diegomrsantos Dec 5, 2022
39edb1a
Remove hp service
diegomrsantos Dec 5, 2022
2d7bfa8
Refactor autonat dialMe
diegomrsantos Dec 6, 2022
937da10
Does not count an unknown answer
diegomrsantos Dec 6, 2022
573d1c2
Improve imports
diegomrsantos Dec 7, 2022
b3d2def
Simplify askPeer
diegomrsantos Dec 7, 2022
fc47ac9
Make scheduleInterval mandatory
diegomrsantos Dec 7, 2022
92db78e
Add withServices
diegomrsantos Dec 12, 2022
c2d3864
Add callback again and make scheduleInterval optional
diegomrsantos Dec 12, 2022
a4999bf
Changes after code review
diegomrsantos Dec 12, 2022
c36fbb2
Add metric
diegomrsantos Dec 13, 2022
1e2f45f
Handle timeout and improve logging
diegomrsantos Dec 14, 2022
f360717
Enable metrics in tests
diegomrsantos Dec 14, 2022
8a51c7e
Update libp2p/services/autonatservice.nim
diegomrsantos Dec 14, 2022
e58ff9c
Make askConnectedPeers clearer
diegomrsantos Dec 14, 2022
4fe4889
Disable PCRE
diegomrsantos Dec 14, 2022
086c693
Improve naming
diegomrsantos Dec 14, 2022
c3ef435
Add flag to disable asking new connected peers
diegomrsantos Dec 14, 2022
bd8ec0b
Remove public pragma
diegomrsantos Dec 15, 2022
e67d718
Remove unnecessary Awaiter
diegomrsantos Dec 15, 2022
22d34d7
More readability improvements
diegomrsantos Dec 15, 2022
bd6814d
More readability improvements
diegomrsantos Dec 15, 2022
6d95519
Add newline
diegomrsantos Dec 15, 2022
3cf948c
Update libp2p/connmanager.nim
Menduist Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type
autonat: bool
circuitRelay: Relay
rdv: RendezVous
services: seq[Service]

proc new*(T: type[SwitchBuilder]): T {.public.} =
## Creates a SwitchBuilder
Expand Down Expand Up @@ -199,6 +200,10 @@ proc withRendezVous*(b: SwitchBuilder, rdv: RendezVous = RendezVous.new()): Swit
b.rdv = rdv
b

proc withServices*(b: SwitchBuilder, services: seq[Service]): SwitchBuilder =
b.services = services
b

proc build*(b: SwitchBuilder): Switch
{.raises: [Defect, LPError], public.} =

Expand Down Expand Up @@ -254,7 +259,8 @@ proc build*(b: SwitchBuilder): Switch
connManager = connManager,
ms = ms,
nameResolver = b.nameResolver,
peerStore = peerStore)
peerStore = peerStore,
services = b.services)

if b.autonat:
let autonat = Autonat.new(switch)
Expand Down
8 changes: 8 additions & 0 deletions libp2p/connmanager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ proc new*(C: type ConnManager,
proc connCount*(c: ConnManager, peerId: PeerId): int =
c.conns.getOrDefault(peerId).len

proc connectedPeers*(c: ConnManager, dir: Direction): seq[PeerId] =
var peers = newSeq[PeerId]();
Menduist marked this conversation as resolved.
Show resolved Hide resolved
for peerId, conns in c.conns:
if conns.anyIt(it.dir == dir):
peers.add(peerId)
return peers

proc addConnEventHandler*(c: ConnManager,
handler: ConnEventHandler,
kind: ConnEventKind) =
Expand Down Expand Up @@ -537,3 +544,4 @@ proc close*(c: ConnManager) {.async.} =
await conn.close()

trace "Closed ConnManager"

47 changes: 30 additions & 17 deletions libp2p/protocols/connectivity/autonat.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const

type
AutonatError* = object of LPError
AutonatUnreachableError* = object of LPError

MsgType* = enum
Dial = 0
Expand Down Expand Up @@ -203,25 +204,37 @@ type
sem: AsyncSemaphore
switch*: Switch

proc dialMe*(a: Autonat, pid: PeerId, ma: MultiAddress|seq[MultiAddress]):
Future[MultiAddress] {.async.} =
let addrs = when ma is MultiAddress: @[ma] else: ma
let conn = await a.switch.dial(pid, addrs, AutonatCodec)
method dialMe*(a: Autonat, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()):
Future[MultiAddress] {.base, async.} =

proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} =
if autonatMsg.isNone() or
autonatMsg.get().msgType != DialResponse or
autonatMsg.get().response.isNone() or
autonatMsg.get().response.get().ma.isNone():
raise newException(AutonatError, "Unexpected response")
else:
autonatMsg.get().response.get()

let conn =
try:
if addrs.len == 0:
await a.switch.dial(pid, @[AutonatCodec])
else:
await a.switch.dial(pid, addrs, AutonatCodec)
except CatchableError as err:
raise newException(AutonatError, "Unexpected error when dialling", err)

defer: await conn.close()
await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs)
let msgOpt = AutonatMsg.decode(await conn.readLp(1024))
if msgOpt.isNone() or
msgOpt.get().msgType != DialResponse or
msgOpt.get().response.isNone():
raise newException(AutonatError, "Unexpected response")
let response = msgOpt.get().response.get()
if response.status != ResponseStatus.Ok:
raise newException(AutonatError, "Bad status " &
$response.status & " " &
response.text.get(""))
if response.ma.isNone():
raise newException(AutonatError, "Missing address")
return response.ma.get()
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
return case response.status:
of ResponseStatus.Ok:
response.ma.get()
of ResponseStatus.DialError:
raise newException(AutonatUnreachableError, "Peer could not dial us back")
else:
raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get(""))

proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
try:
Expand Down
1 change: 1 addition & 0 deletions libp2p/protocols/connectivity/relay/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ./relay,
../../../multiaddress,
../../../stream/connection

export options

logScope:
topics = "libp2p relay relay-client"
Expand Down
151 changes: 151 additions & 0 deletions libp2p/services/autonatservice.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import std/[options, deques, sequtils]
import chronos, metrics
import ../switch
import ../protocols/[connectivity/autonat]
import ../utils/heartbeat
import ../crypto/crypto

declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"])

type
AutonatService* = ref object of Service
newConnectedPeerHandler: PeerEventHandler
scheduleHandle: Future[void]
networkReachability: NetworkReachability
confidence: Option[float]
answers: Deque[NetworkReachability]
autonat: Autonat
statusAndConfidenceHandler: StatusAndConfidenceHandler
rng: ref HmacDrbgContext
scheduleInterval: Option[Duration]
disableAskNewConnectedPeers: bool
numPeersToAsk: int
maxQueueSize: int
minConfidence: float
dialTimeout: Duration

NetworkReachability* {.pure.} = enum
NotReachable, Reachable, Unknown

StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].}

proc new*(
T: typedesc[AutonatService],
autonat: Autonat,
rng: ref HmacDrbgContext,
scheduleInterval: Option[Duration] = none(Duration),
disableAskNewConnectedPeers = false,
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
numPeersToAsk: int = 5,
maxQueueSize: int = 10,
minConfidence: float = 0.3,
dialTimeout = 5.seconds): T =
return T(
scheduleInterval: scheduleInterval,
networkReachability: Unknown,
confidence: none(float),
answers: initDeque[NetworkReachability](),
autonat: autonat,
rng: rng,
disableAskNewConnectedPeers: disableAskNewConnectedPeers,
numPeersToAsk: numPeersToAsk,
maxQueueSize: maxQueueSize,
minConfidence: minConfidence,
dialTimeout: dialTimeout)

proc networkReachability*(self: AutonatService): NetworkReachability {.inline.} =
return self.networkReachability

proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} =
alrevuelta marked this conversation as resolved.
Show resolved Hide resolved

if self.answers.len == self.maxQueueSize:
self.answers.popFirst()

self.answers.addLast(ans)

self.networkReachability = Unknown
self.confidence = none(float)
const reachabilityPriority = [Reachable, NotReachable]
for reachability in reachabilityPriority:
let confidence = self.answers.countIt(it == reachability) / self.maxQueueSize
libp2p_autonat_reachability_confidence.set(value = confidence, labelValues = [$reachability])
if self.confidence.isNone and confidence >= self.minConfidence:
self.networkReachability = reachability
self.confidence = some(confidence)

if not isNil(self.statusAndConfidenceHandler):
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved

trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence

proc askPeer(self: AutonatService, s: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} =
trace "Asking for reachability", peerId = $peerId
let ans =
try:
discard await self.autonat.dialMe(peerId).wait(self.dialTimeout)
Reachable
except AutonatUnreachableError:
trace "dialMe answer is not reachable", peerId = $peerId
NotReachable
except AsyncTimeoutError:
trace "dialMe timed out", peerId = $peerId
Unknown
except CatchableError as err:
trace "dialMe unexpected error", peerId = $peerId, errMsg = $err.msg
Unknown
await self.handleAnswer(ans)
alrevuelta marked this conversation as resolved.
Show resolved Hide resolved
return ans

proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} =
alrevuelta marked this conversation as resolved.
Show resolved Hide resolved
var peers = switch.connectedPeers(Direction.Out)
self.rng.shuffle(peers)
var answersFromPeers = 0
for peer in peers:
if answersFromPeers >= self.numPeersToAsk:
break
elif (await askPeer(self, switch, peer)) != Unknown:
answersFromPeers.inc()

proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} =
heartbeat "Schedule AutonatService run", interval:
await service.run(switch)

method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
let hasBeenSetup = await procCall Service(self).setup(switch)
if hasBeenSetup:
if not self.disableAskNewConnectedPeers:
self.newConnectedPeerHandler = proc (peerId: PeerId, event: PeerEvent): Future[void] {.async.} =
discard askPeer(self, switch, peerId)
switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
if self.scheduleInterval.isSome():
self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get())
return hasBeenSetup

method run*(self: AutonatService, switch: Switch) {.async, public.} =
await askConnectedPeers(self, switch)

method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} =
let hasBeenStopped = await procCall Service(self).stop(switch)
if hasBeenStopped:
if not isNil(self.scheduleHandle):
self.scheduleHandle.cancel()
self.scheduleHandle = nil
if not isNil(self.newConnectedPeerHandler):
switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
return hasBeenStopped

proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) =
self.statusAndConfidenceHandler = statusAndConfidenceHandler
38 changes: 36 additions & 2 deletions libp2p/switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ type
peerStore*: PeerStore
nameResolver*: NameResolver
started: bool
services*: seq[Service]

Service* = ref object of RootObj
inUse: bool


method setup*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} =
if self.inUse:
warn "service setup has already been called"
return false
self.inUse = true
return true

method run*(self: Service, switch: Switch) {.base, async, gcsafe.} =
doAssert(false, "Not implemented!")

method stop*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} =
if not self.inUse:
warn "service is already stopped"
return false
self.inUse = false
return true

proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler,
Expand Down Expand Up @@ -108,6 +130,9 @@ method addTransport*(s: Switch, t: Transport) =
s.transports &= t
s.dialer.addTransport(t)

proc connectedPeers*(s: Switch, dir: Direction): seq[PeerId] =
s.connManager.connectedPeers(dir)

proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} =
## returns true if the peer has one or more
## associated connections
Expand Down Expand Up @@ -294,6 +319,9 @@ proc stop*(s: Switch) {.async, public.} =
if not a.finished:
a.cancel()

for service in s.services:
discard await service.stop(s)

await s.ms.stop()

trace "Switch stopped"
Expand Down Expand Up @@ -335,6 +363,9 @@ proc start*(s: Switch) {.async, gcsafe, public.} =

await s.ms.start()

for service in s.services:
discard await service.setup(s)

s.started = true

debug "Started libp2p node", peer = s.peerInfo
Expand All @@ -346,7 +377,8 @@ proc newSwitch*(peerInfo: PeerInfo,
connManager: ConnManager,
ms: MultistreamSelect,
nameResolver: NameResolver = nil,
peerStore = PeerStore.new()): Switch
peerStore = PeerStore.new(),
services = newSeq[Service]()): Switch
{.raises: [Defect, LPError], public.} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")
Expand All @@ -358,8 +390,10 @@ proc newSwitch*(peerInfo: PeerInfo,
connManager: connManager,
peerStore: peerStore,
dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver),
nameResolver: nameResolver)
nameResolver: nameResolver,
services: services)

switch.connManager.peerStore = peerStore
switch.mount(identity)

return switch
2 changes: 2 additions & 0 deletions tests/config.nims
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import ../config.nims

--threads:on
--d:metrics
--d:withoutPCRE
2 changes: 1 addition & 1 deletion tests/helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect], gcsafe.} ): Fu
await sleepAsync(1.millis)

template checkExpiring*(code: untyped): untyped =
check await checkExpiringInternal(proc(): bool = code)
check await checkExpiringInternal(proc(): bool = code)
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
Loading