Skip to content

Commit

Permalink
namesys/pubsub: publisher and resolver
Browse files Browse the repository at this point in the history
Commits:
namesys: pubsub Publisher and Resolver
namesys/pubsub: pacify code climate.
namesys/pubsub: timeout for rendezvous
namesys/pubsub: filter self in bootstrap connections
namesys/pubsub: Publish to the correct topic

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: unit  test

Commits:
namesys/pubsub: test
namesys/pubsub_test: pacify code climate
namesys/pubsub: update test to use extant mock routing

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: integrate namesys pubsub

namesys: integrate pubsub resolvers
namesys/pubsub_test: tweak delays
- trying to make travis happy.
namesys/pubsub: fix duplicate bootstraps
- subscription key is topic, not ipnskey.
namesys/pubsub: no warning needed on cancellation
namesys/pubsub: warning for receive errors
- and more informative error messages at that.
namesys/pubsub_test: smaller test
- make it work with seemingly low fdlimits in travis/macosx.
  also, more informative test failures.
namesys/pubsub: add delay to let pubsub perform handshake
namesys/pubsub: update gx imports
namesys/pubsub_test: preconnect publisher, reduce delays
- preconnects the publisher to the receivers in order to avoid bootstrap
  flakiness with connectivity problems in travis.
  reduces sleeps to 1s for flood propagation (3s seems excessive with 5 hosts).
namesys/pubsub: drop named return values in resolveOnce
- per review comment.
namesys/pubsub: check errors
namesys/pubsub: store bytes in resolver datastore
namesys/pubsub: resolver Cancel
- for canceling subscriptions, pre whyrusleeping's request.
namesys/pubsub: fix resolution without /ipns prefix
- also improve the logging a bit.
namesys/pubsub: don't resolve own keys through pubsub
namesys/pubsub: signal ErrResolveFailed on resolution failure
namesys/pubsub: use sync datastore, resolver lock only for subs
namesys/pubsub_test: coverage for Cancel

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: parallelize dht and pubsub publishing

Commits:
namesys/pubsub: code cosmetics
namesys: parallelize publishing with dht and pubsub
namesys/pubsub: periodically reprovide topic rendezvous
namesys/pubsub: cancelation for rendezvous goroutine
namesys/pubsub: log ipns record seqno on publish

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: error checking

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: --enable-namesys-pubsub option and management

Commits:
package.json: update go-libp2p-blankhost
namesys: fix stale package imports
update go-testutil
namesys/pubsub: reduce bootstrap provide period to 8hr
namesys/pubsub: try to extract the key from id first
option to enable ipns pubsub: --enable-namesys-pubsub
ipfs name pubsub management subcommands
corehttp/gateway_test: mockNamesys needs to implement GetResolver
pacify code climate

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: pubsub sharness test

test/sharness: test for ipns pubsub
namesys/pubsub: return boolean indicator on Cancel
package.json: remove duplicate entry for go-testutil
update gx deps, testutil to 1.1.12
fix jenkins failure: use tabs in t0183-namesys-pubsub
t0183: use 4 spaces for tabification

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: update for new command interface

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>

namesys/pubsub: fix sharness test for broken MacOS echo

echo -n "" should print -n, but hey it's a mac.

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>
  • Loading branch information
vyzo authored and whyrusleeping committed Nov 21, 2017
1 parent b18b1e9 commit e45df72
Show file tree
Hide file tree
Showing 13 changed files with 1,008 additions and 26 deletions.
4 changes: 4 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
enableIPNSPubSubKwd = "enable-namesys-pubsub"
enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
Expand Down Expand Up @@ -157,6 +158,7 @@ Headers.
cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API."),
cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmdkit.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."),
cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true),

// TODO: add way to override addresses. tricky part: updating the config if also --init.
Expand Down Expand Up @@ -283,6 +285,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {

offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
ipnsps, _, _ := req.Option(enableIPNSPubSubKwd).Bool()
mplex, _, _ := req.Option(enableMultiplexKwd).Bool()

// Start assembling node config
Expand All @@ -292,6 +295,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {
Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"ipnsps": ipnsps,
"mplex": mplex,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
Expand Down
2 changes: 1 addition & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {
Expand Down
163 changes: 163 additions & 0 deletions core/commands/ipnsps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package commands

import (
"errors"
"fmt"
"io"
"strings"

cmds "github.com/ipfs/go-ipfs/commands"
e "github.com/ipfs/go-ipfs/core/commands/e"
ns "github.com/ipfs/go-ipfs/namesys"

cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit"
)

type ipnsPubsubState struct {
Enabled bool
}

type ipnsPubsubCancel struct {
Canceled bool
}

// IpnsPubsubCmd is the subcommand that allows us to manage the IPNS pubsub system
var IpnsPubsubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "IPNS pubsub management",
ShortDescription: `
Manage and inspect the state of the IPNS pubsub resolver.
Note: this command is experimental and subject to change as the system is refined
`,
},
Subcommands: map[string]*cmds.Command{
"state": ipnspsStateCmd,
"subs": ipnspsSubsCmd,
"cancel": ipnspsCancelCmd,
},
}

var ipnspsStateCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Query the state of IPNS pubsub",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

_, ok := n.Namesys.GetResolver("pubsub")
res.SetOutput(&ipnsPubsubState{ok})
},
Type: ipnsPubsubState{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

output, ok := v.(*ipnsPubsubState)
if !ok {
return nil, e.TypeErr(output, v)
}

var state string
if output.Enabled {
state = "enabled"
} else {
state = "disabled"
}

return strings.NewReader(state + "\n"), nil
},
},
}

var ipnspsSubsCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Show current name subscriptions",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}

psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
}

res.SetOutput(&stringList{psr.GetSubscriptions()})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: stringListMarshaler,
},
}

var ipnspsCancelCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Cancel a name subscription",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}

psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
}

ok = psr.Cancel(req.Arguments()[0])
res.SetOutput(&ipnsPubsubCancel{ok})
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("name", true, false, "Name to cancel the subscription for."),
},
Type: ipnsPubsubCancel{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

output, ok := v.(*ipnsPubsubCancel)
if !ok {
return nil, e.TypeErr(output, v)
}

var state string
if output.Canceled {
state = "canceled"
} else {
state = "no subscription"
}

return strings.NewReader(state + "\n"), nil
},
},
}
2 changes: 1 addition & 1 deletion core/commands/mount_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

cmds "github.com/ipfs/go-ipfs/commands"

cmdkit "gx/ipfs/QmSNbH2A1evCCbJSDC6u3RV3GGDhgu6pRGbXHvrN89tMKf/go-ipfs-cmdkit"
cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit"
)

var MountCmd = &cmds.Command{
Expand Down
1 change: 1 addition & 0 deletions core/commands/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,6 @@ Resolve the value of a dnslink:
Subcommands: map[string]*cmds.Command{
"publish": PublishCmd,
"resolve": IpnsCmd,
"pubsub": IpnsPubsubCmd,
},
}
11 changes: 9 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type Mounts struct {
Ipns mount.Mount
}

func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {

if n.PeerHost != nil { // already online.
return errors.New("node already online")
Expand Down Expand Up @@ -249,10 +249,17 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}

if pubsub {
if pubsub || ipnsps {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}

if ipnsps {
err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub)
if err != nil {
return err
}
}

n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)

// setup local discovery
Expand Down
4 changes: 4 additions & 0 deletions core/corehttp/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (m mockNamesys) PublishWithEOL(ctx context.Context, name ci.PrivKey, value
return errors.New("not implemented for mockNamesys")
}

func (m mockNamesys) GetResolver(subs string) (namesys.Resolver, bool) {
return nil, false
}

func newNodeWithMockNamesys(ns mockNamesys) (*core.IpfsNode, error) {
c := config.Config{
Identity: config.Identity{
Expand Down
8 changes: 8 additions & 0 deletions namesys/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var ErrPublishFailed = errors.New("Could not publish name.")
type NameSystem interface {
Resolver
Publisher
ResolverLookup
}

// Resolver is an object capable of resolving names.
Expand Down Expand Up @@ -112,3 +113,10 @@ type Publisher interface {
// call once the records spec is implemented
PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error
}

// ResolverLookup is an object capable of finding resolvers for a subsystem
type ResolverLookup interface {

// GetResolver retrieves a resolver associated with a subsystem
GetResolver(subs string) (Resolver, bool)
}
Loading

0 comments on commit e45df72

Please sign in to comment.