Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
dont export config variables
Browse files Browse the repository at this point in the history
  • Loading branch information
woodsaj committed Nov 8, 2016
1 parent 6597795 commit d42b979
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 68 deletions.
42 changes: 21 additions & 21 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (

var Enabled bool
var brokerStr string
var Brokers []string
var Topic string
var OffsetStr string
var DataDir string
var Config *sarama.Config
var OffsetDuration time.Duration
var OffsetCommitInterval time.Duration
var brokers []string
var topic string
var offsetStr string
var dataDir string
var config *sarama.Config
var offsetDuration time.Duration
var offsetCommitInterval time.Duration

var messagesPublished met.Count
var messagesSize met.Meter
Expand All @@ -28,10 +28,10 @@ func ConfigSetup() {
fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
fs.BoolVar(&Enabled, "enabled", false, "")
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
fs.StringVar(&Topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&OffsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
fs.StringVar(&DataDir, "data-dir", "", "Directory to store partition offsets index")
fs.DurationVar(&OffsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
globalconf.Register("kafka-cluster", fs)
}

Expand All @@ -40,25 +40,25 @@ func ConfigProcess(instance string) {
return
}
var err error
switch OffsetStr {
switch offsetStr {
case "last":
case "oldest":
case "newest":
default:
OffsetDuration, err = time.ParseDuration(OffsetStr)
offsetDuration, err = time.ParseDuration(offsetStr)
if err != nil {
log.Fatal(4, "kafka-cluster: invalid offest format. %s", err)
}
}
Brokers = strings.Split(brokerStr, ",")
brokers = strings.Split(brokerStr, ",")

Config = sarama.NewConfig()
Config.ClientID = instance + "-cluster"
Config.Version = sarama.V0_10_0_0
Config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
Config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
Config.Producer.Compression = sarama.CompressionNone
err = Config.Validate()
config = sarama.NewConfig()
config.ClientID = instance + "-cluster"
config.Version = sarama.V0_10_0_0
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionNone
err = config.Validate()
if err != nil {
log.Fatal(2, "kafka-cluster invalid consumer config: %s", err)
}
Expand Down
15 changes: 7 additions & 8 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend)
messagesPublished = stats.NewCount("notifier.kafka.messages-published")
messagesSize = stats.NewMeter("notifier.kafka.message_size", 0)

client, err := sarama.NewClient(Brokers, Config)
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatal(2, "kafka-notifier failed to start client: %s", err)
}
Expand All @@ -48,7 +48,7 @@ func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend)
log.Fatal(2, "kafka-notifier failed to initialize producer: %s", err)
}

offsetMgr, err := kafka.NewOffsetMgr(DataDir)
offsetMgr, err := kafka.NewOffsetMgr(dataDir)
if err != nil {
log.Fatal(2, "kafka-notifier couldnt create offsetMgr. %s", err)
}
Expand All @@ -75,25 +75,24 @@ func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend)

func (c *NotifierKafka) start() {
// get partitions.
topic := Topic
partitions, err := c.consumer.Partitions(topic)
if err != nil {
log.Fatal(4, "kafka-notifier: Faild to get partitions for topic %s. %s", topic, err)
}
for _, partition := range partitions {
var offset int64
switch OffsetStr {
switch offsetStr {
case "oldest":
offset = -2
case "newest":
offset = -1
case "last":
offset, err = c.offsetMgr.Last(topic, partition)
default:
offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*OffsetDuration).UnixNano()/int64(time.Millisecond))
offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
}
if err != nil {
log.Fatal(4, "kafka-notifier: failed to get %q offset for %s:%d. %q", OffsetStr, topic, partition, err)
log.Fatal(4, "kafka-notifier: failed to get %q offset for %s:%d. %q", offsetStr, topic, partition, err)
}
go c.consumePartition(topic, partition, offset)
}
Expand All @@ -110,7 +109,7 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, partitio
log.Info("kafka-notifier: consuming from %s:%d from offset %d", topic, partition, partitionOffset)
currentOffset := partitionOffset
messages := pc.Messages()
ticker := time.NewTicker(OffsetCommitInterval)
ticker := time.NewTicker(offsetCommitInterval)
for {
select {
case msg := <-messages:
Expand Down Expand Up @@ -190,7 +189,7 @@ func (c *NotifierKafka) flush() {
buf.Write(data)
messagesSize.Value(int64(buf.Len()))
payload := &sarama.ProducerMessage{
Topic: Topic,
Topic: topic,
Value: sarama.ByteEncoder(buf.Bytes()),
}

Expand Down
64 changes: 32 additions & 32 deletions mdata/notifierNsq/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,66 +19,66 @@ import (

var (
Enabled bool
NsqdTCPAddrs string
LookupdHTTPAddrs string
NsqdAdds []string
LookupdAdds []string
Topic string
Channel string
MaxInFlight int
ProducerOpts string
ConsumerOpts string
PCfg *nsq.Config
CCfg *nsq.Config
nsqdTCPAddrs string
lookupdHTTPAddrs string
nsqdAdds []string
lookupdAdds []string
topic string
channel string
maxInFlight int
producerOpts string
consumerOpts string
pCfg *nsq.Config
cCfg *nsq.Config
messagesPublished met.Count
messagesSize met.Meter
)

func ConfigSetup() {
fs := flag.NewFlagSet("nsq-cluster", flag.ExitOnError)
fs.BoolVar(&Enabled, "enabled", false, "")
fs.StringVar(&NsqdTCPAddrs, "nsqd-tcp-address", "", "nsqd TCP address (may be given multiple times as comma-separated list)")
fs.StringVar(&LookupdHTTPAddrs, "lookupd-http-address", "", "lookupd HTTP address (may be given multiple times as comma-separated list)")
fs.StringVar(&Topic, "topic", "metricpersist", "NSQ topic for persist messages")
fs.StringVar(&Channel, "channel", "tank", "NSQ channel for persist messages")
fs.StringVar(&ProducerOpts, "producer-opt", "", "option to passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
fs.StringVar(&ConsumerOpts, "consumer-opt", "", "option to passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)")
fs.IntVar(&MaxInFlight, "max-in-flight", 200, "max number of messages to allow in flight for consumer")
fs.StringVar(&nsqdTCPAddrs, "nsqd-tcp-address", "", "nsqd TCP address (may be given multiple times as comma-separated list)")
fs.StringVar(&lookupdHTTPAddrs, "lookupd-http-address", "", "lookupd HTTP address (may be given multiple times as comma-separated list)")
fs.StringVar(&topic, "topic", "metricpersist", "NSQ topic for persist messages")
fs.StringVar(&channel, "channel", "tank", "NSQ channel for persist messages")
fs.StringVar(&producerOpts, "producer-opt", "", "option to passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
fs.StringVar(&consumerOpts, "consumer-opt", "", "option to passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)")
fs.IntVar(&maxInFlight, "max-in-flight", 200, "max number of messages to allow in flight for consumer")
globalconf.Register("nsq-cluster", fs)
}

func ConfigProcess() {
if !Enabled {
return
}
if Topic == "" {
if topic == "" {
log.Fatal(4, "topic for nsq-cluster cannot be empty")
}

NsqdAdds = strings.Split(NsqdTCPAddrs, ",")
if len(NsqdAdds) == 1 && NsqdAdds[0] == "" {
NsqdAdds = []string{}
nsqdAdds = strings.Split(nsqdTCPAddrs, ",")
if len(nsqdAdds) == 1 && nsqdAdds[0] == "" {
nsqdAdds = []string{}
}

LookupdAdds = strings.Split(LookupdHTTPAddrs, ",")
if len(LookupdAdds) == 1 && LookupdAdds[0] == "" {
LookupdAdds = []string{}
lookupdAdds = strings.Split(lookupdHTTPAddrs, ",")
if len(lookupdAdds) == 1 && lookupdAdds[0] == "" {
lookupdAdds = []string{}
}

// producers
PCfg = nsq.NewConfig()
PCfg.UserAgent = "metrictank-cluster"
err := app.ParseOpts(PCfg, ProducerOpts)
pCfg = nsq.NewConfig()
pCfg.UserAgent = "metrictank-cluster"
err := app.ParseOpts(pCfg, producerOpts)
if err != nil {
log.Fatal(4, "nsq-cluster: failed to parse nsq producer options. %s", err)
}

// consumer
CCfg = nsq.NewConfig()
CCfg.UserAgent = "metrictank-cluster"
err = app.ParseOpts(CCfg, ConsumerOpts)
cCfg = nsq.NewConfig()
cCfg.UserAgent = "metrictank-cluster"
err = app.ParseOpts(cCfg, consumerOpts)
if err != nil {
log.Fatal(4, "nsq-cluster: failed to parse nsq consumer options. %s", err)
}
CCfg.MaxInFlight = MaxInFlight
cCfg.MaxInFlight = maxInFlight
}
14 changes: 7 additions & 7 deletions mdata/notifierNsq/notifierNsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ func NewNSQ(instance string, metrics mdata.Metrics, stats met.Backend) *Notifier
messagesPublished = stats.NewCount("notifier.nsq.messages-published")
messagesSize = stats.NewMeter("notifier.nsq.message_size", 0)
// producers
hostPool = hostpool.NewEpsilonGreedy(NsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{})
hostPool = hostpool.NewEpsilonGreedy(nsqdAdds, 0, &hostpool.LinearEpsilonValueCalculator{})
producers = make(map[string]*nsq.Producer)

for _, addr := range NsqdAdds {
producer, err := nsq.NewProducer(addr, PCfg)
for _, addr := range nsqdAdds {
producer, err := nsq.NewProducer(addr, pCfg)
if err != nil {
log.Fatal(4, "nsq-cluster failed creating producer %s", err.Error())
}
producers[addr] = producer
}

// consumers
consumer, err := insq.NewConsumer(Topic, Channel, CCfg, "metric_persist.%s", stats)
consumer, err := insq.NewConsumer(topic, channel, cCfg, "metric_persist.%s", stats)
if err != nil {
log.Fatal(4, "nsq-cluster failed to create NSQ consumer. %s", err)
}
Expand All @@ -56,13 +56,13 @@ func NewNSQ(instance string, metrics mdata.Metrics, stats met.Backend) *Notifier
}
consumer.AddConcurrentHandlers(c, 2)

err = consumer.ConnectToNSQDs(NsqdAdds)
err = consumer.ConnectToNSQDs(nsqdAdds)
if err != nil {
log.Fatal(4, "nsq-cluster failed to connect to NSQDs. %s", err)
}
log.Info("nsq-cluster persist consumer connected to nsqd")

err = consumer.ConnectToNSQLookupds(LookupdAdds)
err = consumer.ConnectToNSQLookupds(lookupdAdds)
if err != nil {
log.Fatal(4, "nsq-cluster failed to connect to NSQLookupds. %s", err)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func (c *NotifierNSQ) flush() {
// will result in this loop repeating forever until we successfully publish our msg.
hostPoolResponse := hostPool.Get()
prod := producers[hostPoolResponse.Host()]
err = prod.Publish(Topic, buf.Bytes())
err = prod.Publish(topic, buf.Bytes())
// Hosts that are marked as dead will be retried after 30seconds. If we published
// successfully, then sending a nil error will mark the host as alive again.
hostPoolResponse.Mark(err)
Expand Down

0 comments on commit d42b979

Please sign in to comment.