-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement rendezvous protocol spec #1
base: master
Are you sure you want to change the base?
Changes from 34 commits
f946163
0cbcbf6
f94b0b4
268abf3
b7bc940
a107e34
13f4c67
e3d343f
1506c04
7e5664c
6a1176f
b7c304d
aeac2e2
e5a72b9
7d72fc7
dbe6b0d
6c1d282
8181424
fbaf21c
4e3eaa7
aa3f46c
c703d37
ae10cc6
8c12272
53dfbc7
f41fbba
cfbcdde
4788ef7
a47367d
2b0995f
9ab12ab
6c4fda5
e530204
aa7f9da
c487c20
baf1e4e
c540724
1ee2b55
8846a4b
3c726d2
f2ee9b3
2843bd3
91cdb88
7901280
9052b53
25d0082
0e771cd
7371441
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,287 @@ | ||
package rendezvous | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"time" | ||
|
||
pb "github.com/libp2p/go-libp2p-rendezvous/pb" | ||
|
||
ggio "github.com/gogo/protobuf/io" | ||
host "github.com/libp2p/go-libp2p-host" | ||
inet "github.com/libp2p/go-libp2p-net" | ||
peer "github.com/libp2p/go-libp2p-peer" | ||
pstore "github.com/libp2p/go-libp2p-peerstore" | ||
) | ||
|
||
var ( | ||
DiscoverAsyncInterval = 2 * time.Minute | ||
) | ||
|
||
type Rendezvous interface { | ||
Register(ctx context.Context, ns string, ttl int) error | ||
Unregister(ctx context.Context, ns string) error | ||
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) | ||
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) | ||
} | ||
|
||
type Registration struct { | ||
Peer pstore.PeerInfo | ||
Ns string | ||
Ttl int | ||
} | ||
|
||
func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { | ||
return &client{ | ||
host: host, | ||
rp: rp, | ||
} | ||
} | ||
|
||
type client struct { | ||
host host.Host | ||
rp peer.ID | ||
} | ||
|
||
func (cli *client) Register(ctx context.Context, ns string, ttl int) error { | ||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) | ||
if err != nil { | ||
return err | ||
} | ||
defer s.Close() | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) | ||
err = w.WriteMsg(req) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var res pb.Message | ||
err = r.ReadMsg(&res) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if res.GetType() != pb.Message_REGISTER_RESPONSE { | ||
return fmt.Errorf("Unexpected response: %s", res.GetType().String()) | ||
} | ||
|
||
status := res.GetRegisterResponse().GetStatus() | ||
if status != pb.Message_OK { | ||
return RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func Register(ctx context.Context, rz Rendezvous, ns string, ttl int) error { | ||
if ttl < 120 { | ||
return fmt.Errorf("registration TTL is too short") | ||
} | ||
|
||
err := rz.Register(ctx, ns, ttl) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
go registerRefresh(ctx, rz, ns, ttl) | ||
return nil | ||
} | ||
|
||
func registerRefresh(ctx context.Context, rz Rendezvous, ns string, ttl int) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesnt appear that this gets closed down when unregister is called. So if i call register then unregister, this routine would re-register me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hrm, yes, the two are not correlated; you'd have to cancel the context to stop registration refresh. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this is a utility function to augment the interface with persistent registrations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should make a more complicated stateful client contraption that handles the registration/unregistration correlation? I think we can have two interfaces, a low level |
||
var refresh time.Duration | ||
errcount := 0 | ||
|
||
for { | ||
if errcount > 0 { | ||
// do randomized exponential backoff, up to ~4 hours | ||
if errcount > 7 { | ||
errcount = 7 | ||
} | ||
backoff := 2 << uint(errcount) | ||
refresh = 5*time.Minute + time.Duration(rand.Intn(backoff*60000))*time.Millisecond | ||
} else { | ||
refresh = time.Duration(ttl-30) * time.Second | ||
} | ||
|
||
select { | ||
case <-time.After(refresh): | ||
case <-ctx.Done(): | ||
return | ||
} | ||
|
||
err := rz.Register(ctx, ns, ttl) | ||
if err != nil { | ||
log.Errorf("Error registering [%s]: %s", ns, err.Error()) | ||
errcount++ | ||
} else { | ||
errcount = 0 | ||
} | ||
} | ||
} | ||
|
||
func (cli *client) Unregister(ctx context.Context, ns string) error { | ||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) | ||
if err != nil { | ||
return err | ||
} | ||
defer s.Close() | ||
|
||
w := ggio.NewDelimitedWriter(s) | ||
req := newUnregisterMessage(ns, cli.host.ID()) | ||
return w.WriteMsg(req) | ||
} | ||
|
||
func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) { | ||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
defer s.Close() | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
return discoverQuery(ns, limit, cookie, r, w) | ||
} | ||
|
||
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) { | ||
|
||
req := newDiscoverMessage(ns, limit, cookie) | ||
err := w.WriteMsg(req) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
var res pb.Message | ||
err = r.ReadMsg(&res) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
if res.GetType() != pb.Message_DISCOVER_RESPONSE { | ||
return nil, nil, fmt.Errorf("Unexpected response: %s", res.GetType().String()) | ||
} | ||
|
||
status := res.GetDiscoverResponse().GetStatus() | ||
if status != pb.Message_OK { | ||
return nil, nil, RendezvousError{Status: status, Text: res.GetDiscoverResponse().GetStatusText()} | ||
} | ||
|
||
regs := res.GetDiscoverResponse().GetRegistrations() | ||
result := make([]Registration, 0, len(regs)) | ||
for _, reg := range regs { | ||
pi, err := pbToPeerInfo(reg.GetPeer()) | ||
if err != nil { | ||
log.Errorf("Invalid peer info: %s", err.Error()) | ||
continue | ||
} | ||
result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())}) | ||
} | ||
|
||
return result, res.GetDiscoverResponse().GetCookie(), nil | ||
} | ||
|
||
func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) { | ||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ch := make(chan Registration) | ||
go discoverAsync(ctx, ns, s, ch) | ||
return ch, nil | ||
} | ||
|
||
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) { | ||
defer s.Close() | ||
defer close(ch) | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
const batch = 200 | ||
|
||
var ( | ||
cookie []byte | ||
regs []Registration | ||
err error | ||
) | ||
|
||
for { | ||
regs, cookie, err = discoverQuery(ns, batch, cookie, r, w) | ||
if err != nil { | ||
// TODO robust error recovery | ||
// - handle closed streams with backoff + new stream, preserving the cookie | ||
// - handle E_INVALID_COOKIE errors in that case to restart the discovery | ||
log.Errorf("Error in discovery [%s]: %s", ns, err.Error()) | ||
return | ||
} | ||
|
||
for _, reg := range regs { | ||
select { | ||
case ch <- reg: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
if len(regs) < batch { | ||
// TODO adaptive backoff for heavily loaded rendezvous points | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the service respond something like "load too high, try again in a bit" ? Seems like a nice DoS mitigation feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if a load indicator would help, as the clients can simply ignore it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On the other hand, we could make this an Error response -- Now that could help, as it would force the clients to back off and possibly create a new stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was thinking have it be an error response. So they don't get any records back, and the error tells them to just wait and try again in a bit |
||
select { | ||
case <-time.After(DiscoverAsyncInterval): | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { | ||
regs, cookie, err := rz.Discover(ctx, ns, limit, cookie) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
pinfos := make([]pstore.PeerInfo, len(regs)) | ||
for i, reg := range regs { | ||
pinfos[i] = reg.Peer | ||
} | ||
|
||
return pinfos, cookie, nil | ||
} | ||
|
||
func DiscoverPeersAsync(ctx context.Context, rz Rendezvous, ns string) (<-chan pstore.PeerInfo, error) { | ||
rch, err := rz.DiscoverAsync(ctx, ns) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ch := make(chan pstore.PeerInfo) | ||
go discoverPeersAsync(ctx, rch, ch) | ||
return ch, nil | ||
} | ||
|
||
func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan pstore.PeerInfo) { | ||
defer close(ch) | ||
for { | ||
select { | ||
case reg, ok := <-rch: | ||
if !ok { | ||
return | ||
} | ||
|
||
select { | ||
case ch <- reg.Peer: | ||
case <-ctx.Done(): | ||
return | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments on the interface would be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes indeed! Part of the "docstrings" checkbox.