Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #861 from grafana/issue-860
Browse files Browse the repository at this point in the history
when duration kafka offset fails, fall back to oldest
  • Loading branch information
Dieterbe authored Mar 7, 2018
2 parents 87ae555 + bff3193 commit c9cd6a6
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 12 deletions.
7 changes: 6 additions & 1 deletion docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ instance = default
drop-first-chunk = false
# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
# max age for a metric before to be considered stale and to be purged from in-memory ring buffer.
metric-max-stale = 6h
# Interval to run garbage collection job
gc-interval = 1h
Expand Down Expand Up @@ -146,6 +146,8 @@ fallback-graphite-addr = http://graphite
log-min-dur = 5min
# timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone.
time-zone = local
# maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.
get-targets-concurrency = 20
# default limit for tagdb query results, can be overridden with query parameter "limit"
tagdb-default-limit = 100

Expand Down Expand Up @@ -175,6 +177,7 @@ brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -273,6 +276,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down
5 changes: 4 additions & 1 deletion docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ instance = default
drop-first-chunk = false
# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
# max age for a metric before to be considered stale and to be purged from in-memory ring buffer.
metric-max-stale = 6h
# Interval to run garbage collection job
gc-interval = 1h
Expand Down Expand Up @@ -177,6 +177,7 @@ brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -275,6 +276,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down
5 changes: 4 additions & 1 deletion docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ instance = default
drop-first-chunk = false
# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
# max age for a metric before to be considered stale and to be purged from in-memory ring buffer.
metric-max-stale = 6h
# Interval to run garbage collection job
gc-interval = 1h
Expand Down Expand Up @@ -177,6 +177,7 @@ brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -275,6 +276,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down
3 changes: 3 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -333,6 +334,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down
12 changes: 8 additions & 4 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,16 @@ func (k *KafkaMdm) Start(handler input.Handler, fatal chan struct{}) error {
offset = sarama.OffsetNewest
case "last":
offset, err = offsetMgr.Last(topic, partition)
if err != nil {
log.Error(4, "kafka-mdm: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
return err
}
default:
offset, err = k.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
}
if err != nil {
log.Error(4, "kafka-mdm: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
return err
if err != nil {
offset = sarama.OffsetOldest
log.Warn("kafka-mdm failed to get offset %s: %s -> will use oldest instead", offsetDuration, err)
}
}
k.wg.Add(1)
go k.consumePartition(topic, partition, offset)
Expand Down
10 changes: 7 additions & 3 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ func (c *NotifierKafka) start() {
offset = -1
case "last":
offset, err = c.offsetMgr.Last(topic, partition)
if err != nil {
log.Fatal(4, "kafka-cluster: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
}
default:
offset, err = c.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
}
if err != nil {
log.Fatal(4, "kafka-cluster: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
if err != nil {
offset = sarama.OffsetOldest
log.Warn("kafka-cluster failed to get offset %s: %s -> will use oldest instead", offsetDuration, err)
}
}
partitionLogSize[partition].Set(int(bootTimeOffsets[partition]))
if offset >= 0 {
Expand Down
3 changes: 3 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -278,6 +279,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down
5 changes: 4 additions & 1 deletion scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ instance = default
drop-first-chunk = false
# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
# max age for a metric before to be considered stale and to be purged from in-memory ring buffer.
metric-max-stale = 6h
# Interval to run garbage collection job
gc-interval = 1h
Expand Down Expand Up @@ -177,6 +177,7 @@ brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -275,6 +276,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down
5 changes: 4 additions & 1 deletion scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ instance = default
drop-first-chunk = false
# max age for a chunk before to be considered stale and to be persisted to Cassandra
chunk-max-stale = 1h
# max age for a metric before to be considered stale and to be purged from memory
# max age for a metric before to be considered stale and to be purged from in-memory ring buffer.
metric-max-stale = 6h
# Interval to run garbage collection job
gc-interval = 1h
Expand Down Expand Up @@ -177,6 +177,7 @@ brokers = localhost:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
# kafka partitions to consume. use '*' or a comma separated list of id's
Expand Down Expand Up @@ -275,6 +276,8 @@ partitions = *
# method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)
partition-scheme = bySeries
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
Expand Down

0 comments on commit c9cd6a6

Please sign in to comment.