forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka-console-producer.go
140 lines (120 loc) · 4.53 KB
/
kafka-console-producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/tools/tls"
"github.com/rcrowley/go-metrics"
)
var (
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster. You can also set the KAFKA_PEERS environment variable")
topic = flag.String("topic", "", "REQUIRED: the topic to produce to")
key = flag.String("key", "", "The key of the message to produce. Can be empty.")
value = flag.String("value", "", "REQUIRED: the value of the message to produce. You can also provide the value on stdin.")
partitioner = flag.String("partitioner", "", "The partitioning scheme to use. Can be `hash`, `manual`, or `random`")
partition = flag.Int("partition", -1, "The partition to produce to.")
verbose = flag.Bool("verbose", false, "Turn on sarama logging to stderr")
showMetrics = flag.Bool("metrics", false, "Output metrics on successful publish to stderr")
silent = flag.Bool("silent", false, "Turn off printing the message's topic, partition, and offset to stdout")
tlsEnabled = flag.Bool("tls-enabled", false, "Whether to enable TLS")
tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
tlsClientKey = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
logger = log.New(os.Stderr, "", log.LstdFlags)
)
func main() {
flag.Parse()
if *brokerList == "" {
printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
}
if *topic == "" {
printUsageErrorAndExit("no -topic specified")
}
if *verbose {
sarama.Logger = logger
}
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
if *tlsEnabled {
tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
if err != nil {
printErrorAndExit(69, "Failed to create TLS config: %s", err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
}
switch *partitioner {
case "":
if *partition >= 0 {
config.Producer.Partitioner = sarama.NewManualPartitioner
} else {
config.Producer.Partitioner = sarama.NewHashPartitioner
}
case "hash":
config.Producer.Partitioner = sarama.NewHashPartitioner
case "random":
config.Producer.Partitioner = sarama.NewRandomPartitioner
case "manual":
config.Producer.Partitioner = sarama.NewManualPartitioner
if *partition == -1 {
printUsageErrorAndExit("-partition is required when partitioning manually")
}
default:
printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
}
message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
if *key != "" {
message.Key = sarama.StringEncoder(*key)
}
if *value != "" {
message.Value = sarama.StringEncoder(*value)
} else if stdinAvailable() {
bytes, err := ioutil.ReadAll(os.Stdin)
if err != nil {
printErrorAndExit(66, "Failed to read data from the standard input: %s", err)
}
message.Value = sarama.ByteEncoder(bytes)
} else {
printUsageErrorAndExit("-value is required, or you have to provide the value on stdin")
}
producer, err := sarama.NewSyncProducer(strings.Split(*brokerList, ","), config)
if err != nil {
printErrorAndExit(69, "Failed to open Kafka producer: %s", err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Println("Failed to close Kafka producer cleanly:", err)
}
}()
partition, offset, err := producer.SendMessage(message)
if err != nil {
printErrorAndExit(69, "Failed to produce message: %s", err)
} else if !*silent {
fmt.Printf("topic=%s\tpartition=%d\toffset=%d\n", *topic, partition, offset)
}
if *showMetrics {
metrics.WriteOnce(config.MetricRegistry, os.Stderr)
}
}
func printErrorAndExit(code int, format string, values ...interface{}) {
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
fmt.Fprintln(os.Stderr)
os.Exit(code)
}
func printUsageErrorAndExit(message string) {
fmt.Fprintln(os.Stderr, "ERROR:", message)
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Available command line options:")
flag.PrintDefaults()
os.Exit(64)
}
func stdinAvailable() bool {
stat, _ := os.Stdin.Stat()
return (stat.Mode() & os.ModeCharDevice) == 0
}