Skip to content

Commit

Permalink
Merge pull request #285 from elubow/nsq_tail_update
Browse files Browse the repository at this point in the history
nsq_tail: add -n to limit the number of messages
  • Loading branch information
mreiferson committed Dec 14, 2013
2 parents dc9f7d3 + 7646a70 commit d2449ba
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions apps/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
var (
showVersion = flag.Bool("version", false, "print version string")

topic = flag.String("topic", "", "nsq topic")
channel = flag.String("channel", "", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
topic = flag.String("topic", "", "nsq topic")
channel = flag.String("channel", "", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
totalMessages = flag.Int("n", 0, "total messages to show (will wait if starved)")

readerOpts = util.StringArray{}
nsqdTCPAddrs = util.StringArray{}
Expand All @@ -31,9 +32,13 @@ func init() {
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}

type TailHandler struct{}
type TailHandler struct {
totalMessages int
messagesShown int
}

func (th *TailHandler) HandleMessage(m *nsq.Message) error {
th.messagesShown++
_, err := os.Stdout.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: failed to write to os.Stdout - %s", err.Error())
Expand All @@ -42,6 +47,9 @@ func (th *TailHandler) HandleMessage(m *nsq.Message) error {
if err != nil {
log.Fatalf("ERROR: failed to write to os.Stdout - %s", err.Error())
}
if th.messagesShown >= th.totalMessages {
os.Exit(0)
}
return nil
}

Expand Down Expand Up @@ -80,8 +88,13 @@ func main() {
if err != nil {
log.Fatalf(err.Error())
}

// Don't ask for more messages than we want
if *totalMessages < *maxInFlight {
*maxInFlight = *totalMessages
}
r.SetMaxInFlight(*maxInFlight)
r.AddHandler(&TailHandler{})
r.AddHandler(&TailHandler{totalMessages: *totalMessages})

for _, addrString := range nsqdTCPAddrs {
err := r.ConnectToNSQ(addrString)
Expand Down

0 comments on commit d2449ba

Please sign in to comment.