From cf6fcf20403b1ae9dad0d389869c19acf2ee71ff Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 11 Sep 2016 16:45:48 -0700 Subject: [PATCH] improve UX License: MIT Signed-off-by: Jeromy --- core/commands/pubsub.go | 111 ++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 44 deletions(-) diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go index 6b6709817e09..d656724fcc60 100644 --- a/core/commands/pubsub.go +++ b/core/commands/pubsub.go @@ -2,7 +2,7 @@ package commands import ( "bytes" - "fmt" + "encoding/binary" "io" cmds "github.com/ipfs/go-ipfs/commands" @@ -13,16 +13,30 @@ import ( var PubsubCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "Only pubsub by the vaguest technical definition", + Tagline: "An experimental publish-subscribe system on ipfs.", + ShortDescription: ` +ipfs pubsub allows you to publish messages to a given topic, and also to +subscribe to new messages on a given topic. + +This is an experimental feature. It is not intended in its current state +to be used in a production environment. +`, }, Subcommands: map[string]*cmds.Command{ "pub": PubsubPubCmd, "sub": PubsubSubCmd, }, } + var PubsubSubCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "subscribe to messages on a given topic", + Tagline: "Subscribe to messages on a given topic.", + ShortDescription: ` +ipfs pubsub sub subscribes to messages on a given topic. + +This is an experimental feature. It is not intended in its current state +to be used in a production environment. +`, }, Arguments: []cmds.Argument{ cmds.StringArg("topic", true, false, "String name of topic to subscribe to."), @@ -67,50 +81,62 @@ var PubsubSubCmd = &cmds.Command{ }() }, Marshalers: cmds.MarshalerMap{ - cmds.Text: func(res cmds.Response) (io.Reader, error) { - outChan, ok := res.Output().(<-chan interface{}) + cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { + log.Error("FROM: ", m.GetFrom()) + return bytes.NewReader(m.Data), nil + }), + "ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { + m.Data = append(m.Data, '\n') + return bytes.NewReader(m.Data), nil + }), + "lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) { + buf := make([]byte, 8) + n := binary.PutUvarint(buf, uint64(len(m.Data))) + return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil + }), + }, + Type: floodsub.Message{}, +} + +func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) { + return func(res cmds.Response) (io.Reader, error) { + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + return nil, u.ErrCast() + } + + marshal := func(v interface{}) (io.Reader, error) { + obj, ok := v.(*floodsub.Message) if !ok { return nil, u.ErrCast() } - marshal := func(v interface{}) (io.Reader, error) { - obj, ok := v.(*floodsub.Message) - if !ok { - return nil, u.ErrCast() - } - - return bytes.NewReader(obj.Data), nil - } + return f(obj) + } - return &cmds.ChannelMarshaler{ - Channel: outChan, - Marshaler: marshal, - Res: res, - }, nil - }, - }, - Type: floodsub.Message{}, + return &cmds.ChannelMarshaler{ + Channel: outChan, + Marshaler: marshal, + Res: res, + }, nil + } } + var PubsubPubCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "Publish a message to a given pubsub sub topic.", - ShortDescription: `pubsub sub pub pub hubbub sub? -pub. Pubsub subsub subpubsub hubbub pub sub grub subhubbub. -Pubsubs sub hubbub sub tub, dub dub dub wub wub wub wub wub. Pubsub. -Pubsub subsub pubsub hubbub dubtub? Wub. Wub. Pubsub. -Nubs, Tubs, Wubs, pub Dubs. Pub subdub pub pub sub nub. + Tagline: "Publish a message to a given pubsub topic.", + ShortDescription: ` +ipfs pubsub pub publishes a message to a specified topic. - ipfs pubsub pub "pubsub hubbub" - -Pub: hubbub pub sub sub tub pub bub. - `, +This is an experimental feature. It is not intended in its current state +to be used in a production environment. +`, }, Arguments: []cmds.Argument{ - cmds.StringArg("data", true, false, "Data to send to david dias. (and only him)").EnableStdin(), - }, - Options: []cmds.Option{ - cmds.StringOption("topic", "t", "Topic to pubusb to."), + cmds.StringArg("topic", true, false, "Topic to publish to."), + cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(), }, + Options: []cmds.Option{}, Run: func(req cmds.Request, res cmds.Response) { n, err := req.InvocContext().GetNode() if err != nil { @@ -124,16 +150,13 @@ Pub: hubbub pub sub sub tub pub bub. return } - topic, found, _ := req.Option("topic").String() - if !found { - res.SetError(fmt.Errorf("topic required"), cmds.ErrNormal) - return - } + topic := req.Arguments()[0] - err = n.Floodsub.Publish(topic, []byte(req.Arguments()[0])) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return + for _, data := range req.Arguments()[1:] { + if err := n.Floodsub.Publish(topic, []byte(data)); err != nil { + res.SetError(err, cmds.ErrNormal) + return + } } }, }