diff --git a/tools/README.md b/tools/README.md index 9316fc94a..3464c4ad8 100644 --- a/tools/README.md +++ b/tools/README.md @@ -4,7 +4,7 @@ This folder contains applications that are useful for exploration of your Kafka Some of these tools mirror tools that ship with Kafka, but these tools won't require installing the JVM to function. - [kafka-console-producer](./kafka-console-producer): a command line tool to produce a single message to your Kafka custer. -- [kafka-console-partitionconsumer](./kafka-console-partitionconsumer): a command line tool to consume a single partition of a topic on your Kafka cluster. -- [kafka-console-topicconsumer](./kafka-console-topicconsumer): a command line tool to consume all partition of a topic on your Kafka cluster. +- [kafka-console-partitionconsumer](./kafka-console-partitionconsumer): (deprecated) a command line tool to consume a single partition of a topic on your Kafka cluster. +- [kafka-console-consumer](./kafka-console-consumer): a command line tool to consume arbitrary partitions of a topic on your Kafka cluster. To install all tools, run `go get github.com/Shopify/sarama/tools/...` diff --git a/tools/kafka-console-consumer/.gitignore b/tools/kafka-console-consumer/.gitignore new file mode 100644 index 000000000..67da9dfa9 --- /dev/null +++ b/tools/kafka-console-consumer/.gitignore @@ -0,0 +1,2 @@ +kafka-console-consumer +kafka-console-consumer.test diff --git a/tools/kafka-console-consumer/README.md b/tools/kafka-console-consumer/README.md new file mode 100644 index 000000000..4e77f0b70 --- /dev/null +++ b/tools/kafka-console-consumer/README.md @@ -0,0 +1,29 @@ +# kafka-console-consumer + +A simple command line tool to consume partitions of a topic and print the +messages on the standard output. + +### Installation + + go get github.com/Shopify/sarama/tools/kafka-console-consumer + +### Usage + + # Minimum invocation + kafka-console-consumer -topic=test -brokers=kafka1:9092 + + # It will pick up a KAFKA_PEERS environment variable + export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092 + kafka-console-consumer -topic=test + + # You can specify the offset you want to start at. It can be either + # `oldest`, `newest`. The default is `newest`. + kafka-console-consumer -topic=test -offset=oldest + kafka-console-consumer -topic=test -offset=newest + + # You can specify the partition(s) you want to consume as a comma-separated + # list. The default is `all`. + kafka-console-consumer -topic=test -partitions=1,2,3 + + # Display all command line options + kafka-console-consumer -help diff --git a/tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go b/tools/kafka-console-consumer/kafka-console-consumer.go similarity index 84% rename from tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go rename to tools/kafka-console-consumer/kafka-console-consumer.go index fe7865b6e..0f1eb89a9 100644 --- a/tools/kafka-console-topicconsumer/kafka-console-topicconsumer.go +++ b/tools/kafka-console-consumer/kafka-console-consumer.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "strconv" "strings" "sync" @@ -15,6 +16,7 @@ import ( var ( brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster") topic = flag.String("topic", "", "REQUIRED: the topic to consume") + partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers") offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`") verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging") bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.") @@ -52,7 +54,7 @@ func main() { printErrorAndExit(69, "Failed to start consumer: %s", err) } - partitions, err := c.Partitions(*topic) + partitionList, err := getPartitions(c) if err != nil { printErrorAndExit(69, "Failed to get the list of partitions: %s", err) } @@ -71,7 +73,7 @@ func main() { close(closing) }() - for _, partition := range partitions { + for _, partition := range partitionList { pc, err := c.ConsumePartition(*topic, partition, initialOffset) if err != nil { printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err) @@ -110,6 +112,24 @@ func main() { } } +func getPartitions(c sarama.Consumer) ([]int32, error) { + if *partitions == "all" { + return c.Partitions(*topic) + } + + tmp := strings.Split(*partitions, ",") + var pList []int32 + for i := range tmp { + val, err := strconv.ParseInt(tmp[i], 10, 32) + if err != nil { + return nil, err + } + pList = append(pList, int32(val)) + } + + return pList, nil +} + func printErrorAndExit(code int, format string, values ...interface{}) { fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...)) fmt.Fprintln(os.Stderr) diff --git a/tools/kafka-console-partitionconsumer/README.md b/tools/kafka-console-partitionconsumer/README.md index 58e1647de..646dd5f5c 100644 --- a/tools/kafka-console-partitionconsumer/README.md +++ b/tools/kafka-console-partitionconsumer/README.md @@ -1,5 +1,8 @@ # kafka-console-partitionconsumer +NOTE: this tool is deprecated in favour of the more general and more powerful +`kafka-console-consumer`. + A simple command line tool to consume a partition of a topic and print the messages on the standard output. diff --git a/tools/kafka-console-topicconsumer/.gitignore b/tools/kafka-console-topicconsumer/.gitignore deleted file mode 100644 index 2787203a3..000000000 --- a/tools/kafka-console-topicconsumer/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -kafka-console-topicconsumer -kafka-console-topicconsumer.test diff --git a/tools/kafka-console-topicconsumer/README.md b/tools/kafka-console-topicconsumer/README.md deleted file mode 100644 index 5a6758956..000000000 --- a/tools/kafka-console-topicconsumer/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# kafka-console-topicconsumer - -A simple command line tool to consume all partitions a topic and print the messages -on the standard output. - -### Installation - - go get github.com/Shopify/sarama/tools/kafka-console-topicconsumer - -### Usage - - # Minimum invocation - kafka-console-topicconsumer -topic=test -brokers=kafka1:9092 - - # It will pick up a KAFKA_PEERS environment variable - export KAFKA_PEERS=kafka1:9092,kafka2:9092,kafka3:9092 - kafka-console-topicconsumer -topic=test - - # You can specify the offset you want to start at. It can be either - # `oldest`, `newest`. The default is `newest`. - kafka-console-topicconsumer -topic=test -offset=oldest - kafka-console-topicconsumer -topic=test -offset=newest - - # Display all command line options - kafka-console-topicconsumer -help