Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{.async: (raises).} for MultistreamSelect #1066

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 77 additions & 35 deletions libp2p/multistream.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -45,15 +45,18 @@
)

template validateSuffix(str: string): untyped =
if str.endsWith("\n"):
str.removeSuffix("\n")
else:
raise newException(MultiStreamError, "MultistreamSelect failed, malformed message")

proc select*(_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: seq[string]):
Future[string] {.async.} =
if str.endsWith("\n"):
str.removeSuffix("\n")
else:
raise (ref MultiStreamError)(msg:
"MultistreamSelect failed, malformed message")

Check warning on line 53 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L53

Added line #L53 was not covered by tests
proc select*(
_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: seq[string]
): Future[string] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =

Check warning on line 59 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L59

Added line #L59 was not covered by tests
trace "initiating handshake", conn, codec = Codec
## select a remote protocol
await conn.writeLp(Codec & "\n") # write handshake
Expand All @@ -66,7 +69,7 @@

if s != Codec:
notice "handshake failed", conn, codec = s
raise newException(MultiStreamError, "MultistreamSelect handshake failed")
raise (ref MultiStreamError)(msg: "MultistreamSelect handshake failed")

Check warning on line 72 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L72

Added line #L72 was not covered by tests
else:
trace "multistream handshake success", conn

Expand Down Expand Up @@ -98,19 +101,29 @@
# No alternatives, fail
return ""

proc select*(_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: string): Future[bool] {.async.} =
proc select*(
_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: string
): Future[bool] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =

Check warning on line 109 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L109

Added line #L109 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure this is the right PR for it, but reraising underlying stream errors represents an abstraction leak - ie they should be caught and rebranded as multistreamerror really

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but the pattern of re-raising LPStreamError applies to quite large parts of the codebase, and the goal for now is to just document the reality with the {.async: (raises).}, while fixing the obvious offenders of raise (ref CatchableError)(). Semantic changes should be separate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another precedent we have in muxer.nim where the procs currently raise {.async: (raises: [CancelledError, LPStreamError, MuxerError].}, it's just how it's done today.

if proto.len > 0:
return (await MultistreamSelect.select(conn, @[proto])) == proto
(await MultistreamSelect.select(conn, @[proto])) == proto
else:
return (await MultistreamSelect.select(conn, @[])) == Codec
(await MultistreamSelect.select(conn, @[])) == Codec

proc select*(m: MultistreamSelect, conn: Connection): Future[bool] =
proc select*(
m: MultistreamSelect,
conn: Connection
): Future[bool] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError], raw: true).} =
m.select(conn, "")

proc list*(m: MultistreamSelect,
conn: Connection): Future[seq[string]] {.async.} =
proc list*(
m: MultistreamSelect,
conn: Connection
): Future[seq[string]] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =

Check warning on line 126 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L126

Added line #L126 was not covered by tests
## list remote protos requests on connection
if not await m.select(conn):
return
Expand All @@ -126,12 +139,13 @@
result = list

proc handle*(
_: type MultistreamSelect,
conn: Connection,
protos: seq[string],
matchers = newSeq[Matcher](),
active: bool = false,
): Future[string] {.async.} =
_: type MultistreamSelect,
conn: Connection,
protos: seq[string],
matchers = newSeq[Matcher](),
active: bool = false
): Future[string] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =

Check warning on line 148 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L148

Added line #L148 was not covered by tests
trace "Starting multistream negotiation", conn, handshaked = active
var handshaked = active
while not conn.atEof:
Expand All @@ -140,8 +154,8 @@

if not handshaked and ms != Codec:
debug "expected handshake message", conn, instead=ms
raise newException(CatchableError,
"MultistreamSelect handling failed, invalid first message")
raise (ref MultiStreamError)(msg:

Check warning on line 157 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L157

Added line #L157 was not covered by tests
"MultistreamSelect handling failed, invalid first message")

trace "handle: got request", conn, ms
if ms.len() <= 0:
Expand Down Expand Up @@ -172,26 +186,30 @@
trace "no handlers", conn, protocol = ms
await conn.writeLp(Na)

proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.async.} =
proc handle*(
m: MultistreamSelect,
conn: Connection,
active: bool = false) {.async: (raises: [CancelledError]).} =
trace "Starting multistream handler", conn, handshaked = active
var
protos: seq[string]
matchers: seq[Matcher]
for h in m.handlers:
if not isNil(h.match):
if h.match != nil:
matchers.add(h.match)
for proto in h.protos:
protos.add(proto)

try:
let ms = await MultistreamSelect.handle(conn, protos, matchers, active)
for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
if (h.match != nil and h.match(ms)) or h.protos.contains(ms):
trace "found handler", conn, protocol = ms

var protocolHolder = h
let maxIncomingStreams = protocolHolder.protocol.maxIncomingStreams
if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= maxIncomingStreams:
if protocolHolder.openedStreams.getOrDefault(conn.peerId) >=
maxIncomingStreams:
debug "Max streams for protocol reached, blocking new stream",
conn, protocol = ms, maxIncomingStreams
return
Expand Down Expand Up @@ -242,8 +260,32 @@
protocol: protocol,
match: matcher))

proc start*(m: MultistreamSelect) {.async.} =
await allFutures(m.handlers.mapIt(it.protocol.start()))
proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
let
handlers = m.handlers
futs = handlers.mapIt(it.protocol.start())
try:
await allFutures(futs)
for fut in futs:
await fut
except CancelledError as exc:
var pending: seq[Future[void].Raising([])]
for i, fut in futs:
if not fut.finished:
pending.add noCancel fut.cancelAndWait()
elif fut.completed:
pending.add handlers[i].protocol.stop()
else:
static: doAssert typeof(fut).E is (CancelledError,)
await noCancel allFutures(pending)

Check warning on line 280 in libp2p/multistream.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/multistream.nim#L273-L280

Added lines #L273 - L280 were not covered by tests
raise exc


proc stop*(m: MultistreamSelect) {.async.} =
await allFutures(m.handlers.mapIt(it.protocol.stop()))
proc stop*(m: MultistreamSelect) {.async: (raises: []).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])`
var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len)
for it in m.handlers:
futs.add it.protocol.stop()
await noCancel allFutures(futs)
for fut in futs:
await fut
18 changes: 13 additions & 5 deletions libp2p/protocols/connectivity/relay/relay.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -361,17 +361,25 @@
if n > r.rsvp[k]:
r.rsvp.del(k)

method start*(r: Relay) {.async.} =
method start*(
r: Relay
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
let fut = newFuture[void]()
fut.complete()
if not r.reservationLoop.isNil:
warn "Starting relay twice"
return
return fut

Check warning on line 371 in libp2p/protocols/connectivity/relay/relay.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/connectivity/relay/relay.nim#L371

Added line #L371 was not covered by tests
r.reservationLoop = r.deletesReservation()
r.started = true
fut

method stop*(r: Relay) {.async.} =
method stop*(r: Relay): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()
if r.reservationLoop.isNil:
warn "Stopping relay without starting it"
return
return fut

Check warning on line 381 in libp2p/protocols/connectivity/relay/relay.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/connectivity/relay/relay.nim#L381

Added line #L381 was not covered by tests
r.started = false
r.reservationLoop.cancel()
r.reservationLoop = nil
fut
17 changes: 14 additions & 3 deletions libp2p/protocols/protocol.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -31,8 +31,19 @@ type
maxIncomingStreams: Opt[int]

method init*(p: LPProtocol) {.base, gcsafe.} = discard
method start*(p: LPProtocol) {.async, base.} = p.started = true
method stop*(p: LPProtocol) {.async, base.} = p.started = false

method start*(
p: LPProtocol) {.async: (raises: [CancelledError], raw: true), base.} =
let fut = newFuture[void]()
fut.complete()
p.started = true
fut

method stop*(p: LPProtocol) {.async: (raises: [], raw: true), base.} =
let fut = newFuture[void]()
fut.complete()
p.started = false
fut

proc maxIncomingStreams*(p: LPProtocol): int =
p.maxIncomingStreams.get(DefaultMaxIncomingStreams)
Expand Down
20 changes: 15 additions & 5 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -701,30 +701,40 @@
for id, addrs in g.parameters.directPeers:
await g.addDirectPeer(id, addrs)

method start*(g: GossipSub) {.async.} =
method start*(
g: GossipSub
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the raw here? can't see how it matters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not familiar enough to know if it matters, but it makes the flow more explicit. there is no async involved in this method, so if the transformation introduces more complex logic or increases compile time, rather opting out of it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes it hard to read though - complexity is negligible in simple cases like this, specially since it's not part of any hot path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's true, the non-raw version is a bit simpler to read, but is a bit less clear about what's going on / as in, there is no async involved at all in the implementation, it's just a regular synchronous function.

side note: one of the other PRs actually was triggered by switching some of the redirectors to raw, it changed test timing just slightly to make it fail, revealing a small logic issue.

anyway, will keep it in mind going forward and transform to raw a bit less agressively. I think one could argue that anytime newFuture can be avoided, the readability benefit generally outweighs the optimization aspect; while for redirections / wrappers where newFuture is not necessary, the raw version looks similar enough to the transformed version that there is no downside to using it?

can do another cleanup pass for readability after the raises annotations are done, depending on the outcome of the discussion. personally I'm also fine with newFuture.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thing with newFuture is that it opens up two avenues for "classic" bugs: returning nil on some code path and returning an accidentally unfinished future on some code path - it's risky shit, for a dumb start function. ie when reviewing, I had to spend time ascertaining this point (it would have been more clear btw had you put the newfuture call at the end of the function where it gets returned, instead of at the beginning because as is, there's too much unnecessary distance between construction and use).

The most simple things are the most pleasurable to debate ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, haven't been bitten by those personally, but can see them happening relatively easily. It's a good point, generally, especially as it seems that the async transformation is not too costly for purely synchronous functions.

let fut = newFuture[void]()
fut.complete()

trace "gossipsub start"

if not g.heartbeatFut.isNil:
warn "Starting gossipsub twice"
return
return fut

Check warning on line 714 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L714

Added line #L714 was not covered by tests

g.heartbeatFut = g.heartbeat()
g.scoringHeartbeatFut = g.scoringHeartbeat()
g.directPeersLoop = g.maintainDirectPeers()
g.started = true
fut

method stop*(g: GossipSub): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()

method stop*(g: GossipSub) {.async.} =
trace "gossipsub stop"
g.started = false
if g.heartbeatFut.isNil:
warn "Stopping gossipsub without starting it"
return
return fut

Check warning on line 730 in libp2p/protocols/pubsub/gossipsub.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub.nim#L730

Added line #L730 was not covered by tests

# stop heartbeat interval
g.directPeersLoop.cancel()
g.scoringHeartbeatFut.cancel()
g.heartbeatFut.cancel()
g.heartbeatFut = nil
fut

method initPubSub*(g: GossipSub)
{.raises: [InitializationError].} =
Expand Down
18 changes: 13 additions & 5 deletions libp2p/protocols/rendezvous.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand Down Expand Up @@ -678,17 +678,25 @@
libp2p_rendezvous_registered.set(int64(total))
libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len))

method start*(rdv: RendezVous) {.async.} =
method start*(
rdv: RendezVous
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
let fut = newFuture[void]()
fut.complete()
if not rdv.registerDeletionLoop.isNil:
warn "Starting rendezvous twice"
return
return fut

Check warning on line 688 in libp2p/protocols/rendezvous.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/rendezvous.nim#L688

Added line #L688 was not covered by tests
rdv.registerDeletionLoop = rdv.deletesRegister()
rdv.started = true
fut

method stop*(rdv: RendezVous) {.async.} =
method stop*(rdv: RendezVous): Future[void] {.async: (raises: [], raw: true).} =
let fut = newFuture[void]()
fut.complete()
if rdv.registerDeletionLoop.isNil:
warn "Stopping rendezvous without starting it"
return
return fut

Check warning on line 698 in libp2p/protocols/rendezvous.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/rendezvous.nim#L698

Added line #L698 was not covered by tests
rdv.started = false
rdv.registerDeletionLoop.cancel()
rdv.registerDeletionLoop = nil
fut
Loading