Skip to content

Commit

Permalink
improve UX
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
whyrusleeping committed Sep 11, 2016
1 parent e89a677 commit cf6fcf2
Showing 1 changed file with 67 additions and 44 deletions.
111 changes: 67 additions & 44 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package commands

import (
"bytes"
"fmt"
"encoding/binary"
"io"

cmds "github.com/ipfs/go-ipfs/commands"
Expand All @@ -13,16 +13,30 @@ import (

var PubsubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Only pubsub by the vaguest technical definition",
Tagline: "An experimental publish-subscribe system on ipfs.",
ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
`,
},
Subcommands: map[string]*cmds.Command{
"pub": PubsubPubCmd,
"sub": PubsubSubCmd,
},
}

var PubsubSubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "subscribe to messages on a given topic",
Tagline: "Subscribe to messages on a given topic.",
ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
Expand Down Expand Up @@ -67,50 +81,62 @@ var PubsubSubCmd = &cmds.Command{
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
log.Error("FROM: ", m.GetFrom())
return bytes.NewReader(m.Data), nil
}),
"ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
m.Data = append(m.Data, '\n')
return bytes.NewReader(m.Data), nil
}),
"lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
buf := make([]byte, 8)
n := binary.PutUvarint(buf, uint64(len(m.Data)))
return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil
}),
},
Type: floodsub.Message{},
}

func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) {
return func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*floodsub.Message)
if !ok {
return nil, u.ErrCast()
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*floodsub.Message)
if !ok {
return nil, u.ErrCast()
}

return bytes.NewReader(obj.Data), nil
}
return f(obj)
}

return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
},
},
Type: floodsub.Message{},
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
}
}

var PubsubPubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Publish a message to a given pubsub sub topic.",
ShortDescription: `pubsub sub pub pub hubbub sub?
pub. Pubsub subsub subpubsub hubbub pub sub grub subhubbub.
Pubsubs sub hubbub sub tub, dub dub dub wub wub wub wub wub. Pubsub.
Pubsub subsub pubsub hubbub dubtub? Wub. Wub. Pubsub.
Nubs, Tubs, Wubs, pub Dubs. Pub subdub pub pub sub nub.
Tagline: "Publish a message to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
ipfs pubsub pub "pubsub hubbub"
Pub: hubbub pub sub sub tub pub bub.
`,
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("data", true, false, "Data to send to david dias. (and only him)").EnableStdin(),
},
Options: []cmds.Option{
cmds.StringOption("topic", "t", "Topic to pubusb to."),
cmds.StringArg("topic", true, false, "Topic to publish to."),
cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Options: []cmds.Option{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
Expand All @@ -124,16 +150,13 @@ Pub: hubbub pub sub sub tub pub bub.
return
}

topic, found, _ := req.Option("topic").String()
if !found {
res.SetError(fmt.Errorf("topic required"), cmds.ErrNormal)
return
}
topic := req.Arguments()[0]

err = n.Floodsub.Publish(topic, []byte(req.Arguments()[0]))
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
for _, data := range req.Arguments()[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
},
}

0 comments on commit cf6fcf2

Please sign in to comment.