From fe41e24393f3735511e06069dff8cafdc1e56ade Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 1 May 2015 13:12:44 +0000 Subject: [PATCH] Build the partitionConsumer into the topicConsumer --- tools/README.md | 3 +- .../.gitignore | 2 - .../kafka-console-partitionconsumer/README.md | 25 ----- .../kafka-console-partitionconsumer.go | 102 ------------------ tools/kafka-console-topicconsumer/README.md | 6 +- .../kafka-console-topicconsumer.go | 24 ++++- 6 files changed, 28 insertions(+), 134 deletions(-) delete mode 100644 tools/kafka-console-partitionconsumer/.gitignore delete mode 100644 tools/kafka-console-partitionconsumer/README.md delete mode 100644 tools/kafka-console-partitionconsumer/kafka-console-partitionconsumer.go diff --git a/tools/README.md b/tools/README.md index 9316fc94a..cdb892348 100644 --- a/tools/README.md +++ b/tools/README.md @@ -4,7 +4,6 @@ This folder contains applications that are useful for exploration of your Kafka Some of these tools mirror tools that ship with Kafka, but these tools won't require installing the JVM to function. - [kafka-console-producer](./kafka-console-producer): a command line tool to produce a single message to your Kafka custer. -- [kafka-console-partitionconsumer](./kafka-console-partitionconsumer): a command line tool to consume a single partition of a topic on your Kafka cluster. -- [kafka-console-topicconsumer](./kafka-console-topicconsumer): a command line tool to consume all partition of a topic on your Kafka cluster. +- [kafka-console-topicconsumer](./kafka-console-topicconsumer): a command line tool to consume partitions of a topic on your Kafka cluster. To install all tools, run `go get github.com/Shopify/sarama/tools/...` diff --git a/tools/kafka-console-partitionconsumer/.gitignore b/tools/kafka-console-partitionconsumer/.gitignore deleted file mode 100644 index 5837fe8ca..000000000 --- a/tools/kafka-console-partitionconsumer/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -kafka-console-partitionconsumer -kafka-console-partitionconsumer.test diff --git a/tools/kafka-console-partitionconsumer/README.md b/tools/kafka-console-partitionconsumer/README.md deleted file mode 100644 index 58e1647de..000000000 --- a/tools/kafka-console-partitionconsumer/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# kafka-console-partitionconsumer - -A simple command line tool to consume a partition of a topic and print the messages -on the standard output. - -### Installation - - go get github.com/Shopify/sarama/tools/kafka-console-partitionconsumer - -### Usage - - # Minimum invocation - kafka-console-partitionconsumer -topic=test -partition=4 -brokers=kafka1:9092 - - # It will pick up a KAFKA_PEERS environment variable - export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092 - kafka-console-partitionconsumer -topic=test -partition=4 - - # You can specify the offset you want to start at. It can be either - # `oldest`, `newest`, or a specific offset number - kafka-console-partitionconsumer -topic=test -partition=3 -offset=oldest - kafka-console-partitionconsumer -topic=test -partition=2 -offset=1337 - - # Display all command line options - kafka-console-partitionconsumer -help diff --git a/tools/kafka-console-partitionconsumer/kafka-console-partitionconsumer.go b/tools/kafka-console-partitionconsumer/kafka-console-partitionconsumer.go deleted file mode 100644 index d5e4464de..000000000 --- a/tools/kafka-console-partitionconsumer/kafka-console-partitionconsumer.go +++ /dev/null @@ -1,102 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "os" - "os/signal" - "strconv" - "strings" - - "github.com/Shopify/sarama" -) - -var ( - brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster") - topic = flag.String("topic", "", "REQUIRED: the topic to consume") - partition = flag.Int("partition", -1, "REQUIRED: the partition to consume") - offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`, or an actual offset") - verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging") - - logger = log.New(os.Stderr, "", log.LstdFlags) -) - -func main() { - flag.Parse() - - if *brokerList == "" { - printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.") - } - - if *topic == "" { - printUsageErrorAndExit("-topic is required") - } - - if *partition == -1 { - printUsageErrorAndExit("-partition is required") - } - - if *verbose { - sarama.Logger = logger - } - - var ( - initialOffset int64 - offsetError error - ) - switch *offset { - case "oldest": - initialOffset = sarama.OffsetOldest - case "newest": - initialOffset = sarama.OffsetNewest - default: - initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64) - } - - if offsetError != nil { - printUsageErrorAndExit("Invalid initial offset: %s", *offset) - } - - c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil) - if err != nil { - printErrorAndExit(69, "Failed to start consumer: %s", err) - } - - pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset) - if err != nil { - printErrorAndExit(69, "Failed to start partition consumer: %s", err) - } - - go func() { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Kill, os.Interrupt) - <-signals - pc.AsyncClose() - }() - - for msg := range pc.Messages() { - fmt.Printf("Offset:\t%d\n", msg.Offset) - fmt.Printf("Key:\t%s\n", string(msg.Key)) - fmt.Printf("Value:\t%s\n", string(msg.Value)) - fmt.Println() - } - - if err := c.Close(); err != nil { - logger.Println("Failed to close consumer: ", err) - } -} - -func printErrorAndExit(code int, format string, values ...interface{}) { - fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) - fmt.Fprintln(os.Stderr) - os.Exit(code) -} - -func printUsageErrorAndExit(format string, values ...interface{}) { - fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) - fmt.Fprintln(os.Stderr) - fmt.Fprintln(os.Stderr, "Available command line options:") - flag.PrintDefaults() - os.Exit(64) -} diff --git a/tools/kafka-console-topicconsumer/README.md b/tools/kafka-console-topicconsumer/README.md index 5a6758956..d786ebf3a 100644 --- a/tools/kafka-console-topicconsumer/README.md +++ b/tools/kafka-console-topicconsumer/README.md @@ -1,6 +1,6 @@ # kafka-console-topicconsumer -A simple command line tool to consume all partitions a topic and print the messages +A simple command line tool to consume partitions of a topic and print the messages on the standard output. ### Installation @@ -21,5 +21,9 @@ on the standard output. kafka-console-topicconsumer -topic=test -offset=oldest kafka-console-topicconsumer -topic=test -offset=newest + # You can specify the partition(s) you want to consume as a comma-separated + # list. The default is `all`. + kafka-console-topicconsumer -topic=test -partitions=1,2,3 + # Display all command line options kafka-console-topicconsumer -help diff --git a/tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go b/tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go index fe7865b6e..0f1eb89a9 100644 --- a/tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go +++ b/tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "strconv" "strings" "sync" @@ -15,6 +16,7 @@ import ( var ( brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster") topic = flag.String("topic", "", "REQUIRED: the topic to consume") + partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers") offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`") verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging") bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.") @@ -52,7 +54,7 @@ func main() { printErrorAndExit(69, "Failed to start consumer: %s", err) } - partitions, err := c.Partitions(*topic) + partitionList, err := getPartitions(c) if err != nil { printErrorAndExit(69, "Failed to get the list of partitions: %s", err) } @@ -71,7 +73,7 @@ func main() { close(closing) }() - for _, partition := range partitions { + for _, partition := range partitionList { pc, err := c.ConsumePartition(*topic, partition, initialOffset) if err != nil { printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err) @@ -110,6 +112,24 @@ func main() { } } +func getPartitions(c sarama.Consumer) ([]int32, error) { + if *partitions == "all" { + return c.Partitions(*topic) + } + + tmp := strings.Split(*partitions, ",") + var pList []int32 + for i := range tmp { + val, err := strconv.ParseInt(tmp[i], 10, 32) + if err != nil { + return nil, err + } + pList = append(pList, int32(val)) + } + + return pList, nil +} + func printErrorAndExit(code int, format string, values ...interface{}) { fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) fmt.Fprintln(os.Stderr)