-
Notifications
You must be signed in to change notification settings - Fork 104
Conversation
log.Info("Shutting down %s consumer", w.key) | ||
w.plugin.Stop() | ||
|
||
// shutdown our reciever plugins. These may take a while as we allow them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receive receive receive receive :) (edit: does not apply anymore if we switch to input)
go c.handle(conn) | ||
} | ||
} | ||
|
||
func (c *Carbon) Stop() { | ||
c.listener.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's 2 issues here:
- c.listener.AcceptTCP() will return an error when we close the listener, we log it as an error, but it's actually normal. unfortunately there's no elegant way to check which kind of error was returned by AcceptTCP, but there's some tricks you can do. see http://zhen.org/blog/graceful-shutdown-of-go-net-dot-listeners/ (I found out about this when reading https://github.com/Shopify/toxiproxy/blob/2526adc590a5f1e80d37dce7e39a68045eee250c/proxy.go#L137 the other day)
- we don't seem to have anything that stops the c.handle goroutines, I think they will keep working as long as their client connection is established (possibly long after we closed the listener)
@@ -27,6 +27,10 @@ type KafkaMdam struct { | |||
StopChan chan int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like these two lines can be removed
@@ -203,14 +207,10 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, partitionOffs | |||
} | |||
|
|||
// Stop will initiate a graceful stop of the Consumer (permanent) | |||
// | |||
// NOTE: receive on StopChan to block until this process completes | |||
// and block until it stopped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also here, we should probably remove StopChan attribute
clKafka "github.com/raintank/metrictank/mdata/clkafka" | ||
clNSQ "github.com/raintank/metrictank/mdata/clnsq" | ||
"github.com/raintank/metrictank/mdata/notifierKafka" | ||
notifierNsq "github.com/raintank/metrictank/mdata/notifierNsq" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to override the import name for notifierNsq but not for notifierKafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we dont.
sarama.Logger = l.New(os.Stdout, "[Sarama] ", l.LstdFlags) | ||
inKafkaMdamInst.Start(metrics, metricIndex, usg) | ||
/*********************************** | ||
Start our receivers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a naming problem:
- in the docs and configs, we talk about inputs
- in the code, we use
in
- in the main scope, we call them receivers
We should make this consistent to 1 terminology.
in
is a bit too short to be a meaningful term in docs etc, but input
is shorter than receiver
, so I think we should call them input in all 3 places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, lets standardize on "inputs"
client, err := sarama.NewClient(cfg.Brokers, cfg.Config) | ||
func NewNotifierKafka(instance string, metrics mdata.Metrics, stats met.Backend) *NotifierKafka { | ||
messagesPublished = stats.NewCount("cluster.messages-published") | ||
messagesSize = stats.NewMeter("cluster.message_size", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now the notifier plugins have their own copies of the metrics, but with the same keys. because it ultimately gets sent to statsd (for now) this doesn't really matter but I suspect this is not what was intended. we should probably add the notifier type into the metrics instead. (e.g. notifier.kafka.messages-published
)
ProducerOpts string | ||
ConsumerOpts string | ||
PCfg *nsq.Config | ||
CCfg *nsq.Config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the exception of Enabled, all these variables don't need to exported anymore because we only use them in the package itself. in fact we probably also don't need the two separate files anymore. everything that's in cfg.go can be moved into notifierNsq.go (although you can also keep it separate if you want, i don't care much)
@@ -20,6 +21,9 @@ var Config *sarama.Config | |||
var OffsetDuration time.Duration | |||
var OffsetCommitInterval time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the exception of Enabled, all these variables don't need to exported anymore because we only use them in the package itself. in fact we probably also don't need the two separate files anymore. everything that's in cfg.go can be moved into notifierKafka.go (although you can also keep it separate if you want, i don't care much)
in.carbon, in.kafkamdm, in.kafkamdam all kind of implemetned the in.Plugin interface, but didnt. This commit cleans things up so that all of the plugins work in the same way.
- rename CL* to Notifier* so it is clear what the structs are used for. - refactor each implimentation to be an isolated plugin that implements the mdata.NotifierHandler interface
We had 3 different handle functions built into the abstract In package, one for each type of handler. Given that the handle functions are specific to only 1 of the handler implimentations, it makes sense to just move them into the handlers directly. Also added some additional unit tests.
@Dieterbe can this be merged now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a few more changes needed. please squash them in to the appropriate commits instead of making new commits, so we have a cleaner history (using rebase -i), thanks.
messagesPublished = stats.NewCount("cluster.messages-published") | ||
messagesSize = stats.NewMeter("cluster.message_size", 0) | ||
messagesPublished = stats.NewCount("notifier.kafka.messages-published") | ||
messagesSize = stats.NewMeter("notifier.kafka.message_size", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dashboard.json has to be updated accordingly
return | ||
default: | ||
} | ||
log.Error(4, "Accept Error: %s", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all log messages should have a prefix describing the module
if io.EOF != err { | ||
log.Error(4, err.Error()) | ||
log.Error(4, "Recv error: %s", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all log messages should have a prefix describing the module
t.Fatal(err) | ||
} | ||
} | ||
time.Sleep(time.Millisecond * 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sleep? (don't just tell me, comment would be better)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As soon as this goroutine ends, the socket is closed. If we dont sleep here the sockets will be closed before the packets have been processed by the kernel and passed to the application.
Name: "default", | ||
RetentionStr: "1s:1d", | ||
} | ||
s.Pattern, err = regexp.Compile(".*") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just use https://golang.org/pkg/regexp/#MustCompile here
These are change pulled from the cluster PR.