-
Notifications
You must be signed in to change notification settings - Fork 104
Conversation
ffe838a
to
a578a40
Compare
i've been testing this with |
cluster/node.go
Outdated
case NodeUnreachable: | ||
return []byte(`"NodeUnreachable"`), nil | ||
} | ||
panic(fmt.Sprintf("impossible nodestate %v", n)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont panic. This method can return an error, so return it.
input/kafkamdm/kafkamdm.go
Outdated
log.Error(3, "kafka-mdm failed to get oldest offset of partition %s:%d. %s", topic, partition, err) | ||
} | ||
} | ||
|
||
if currentOffset >= 0 { | ||
// we cant set the offsetMetrics until we know what offset we are at. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this statement is no longer true. We do know the offset we are at, it is stored in initialOffset
f145ea5
to
19aa45a
Compare
@woodsaj PTAL . an interesting side effect is that what is stored in the offsetMgr database is a bit different, previously we would store -1 and -2 as long as nothing was consumed, now since we resolve the offset to a concrete one, that is what is stored in the db |
input/kafkamdm/kafkamdm.go
Outdated
|
||
// we must determine the initial offset of our consumer | ||
// if we just assign the current newest or oldest to it, | ||
// it may actually be a later value then what oldest/newest was when we started the consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesnt make any sense to me. There is no reason why you cant just fetch the desired offset, whether that be newest, oldest or a specific offset and also fetch the newest offset. This is all the information needed to be able to correctly set the lag.
There is no need to consume the messages here.
i would simply refactor this function to:
- Get Newest offset
- if currentOffset == sarama.OffsetNewest then set currentOffset to newest
else if currentOffset == sarama.OffsetOldest, then currentOffset = tryGetOffset(Oldest)
else currentOffset remains unchanged or you can call tryGetOffset(currentOffset) to make sure the offset exists. If not then we should fall back to oldest or newest? - set partitionLogSizeMetric, partitionOffsetMetric and partitionLagMetric
- call
pc, err := k.consumer.ConsumePartition(topic, partition, currentOffset)
currentOffset will always be an explicit offset rather then newest or oldest. - consume from pc.messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with everything you say, except:
or you can call tryGetOffset(currentOffset) to make sure the offset exists. If not then we should fall back to oldest or newest?
a call to tryGetOffset for a concrete offset, followed by then consumer.ConsumePartition is racey (offset might disappear in between the two), and overkill (we might as well just rely on consumer.ConsumePartition telling us wether or not it failed to consume the requested offset)
if it doesn't exist, I think it's best to fatal and draw the operator's attention.
input/kafkamdm/kafkamdm.go
Outdated
break | ||
} | ||
if attempt == 7 { | ||
log.Fatal(3, "kafka-mdm failed to get %s of partition %s:%d. %s (attempt %d/7)", offsetStr, topic, partition, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is dangerous. if the user has MT confured to resume from "last" but the last offset stored by offsetMger is no longer available in kafka, MT will just enter a crash loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should have this method return an error instead and let the caller decide if the error is fatal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the user has MT confured to resume from "last" but the last offset stored by offsetMger is no longer available in kafka
isn't the behavior here par with what's in master? AFAICT, master will also crash because it has this:
pc, err := k.consumer.ConsumePartition(topic, partition, currentOffset)
if err != nil {
log.Fatal(4, "kafka-mdm: failed to start partitionConsumer for %s:%d. %s", topic, partition, err)
}
I think crashing in this case is the right thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal libraries should (almost) never just crash, they should return an error indicating they can't continue and let the code that's calling them decide what to do about it. What is the rationale for allowing the input processor to just stop the whole program dead in its tracks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the rationale for why to abort is discussed at #520 ; this is also what i was referring to when i said the behavior is on par with master. (we've been doing this ever since kafka support was introduced)
as for why is the abort coming from the input plugin and not from main
, i generally agree with you and am open to refactoring that. but let's do that in a future PR. I want these fixes to get sooner rather than later.
input/kafkamdm/kafkamdm.go
Outdated
@@ -218,6 +218,27 @@ func (k *KafkaMdm) Start(handler input.Handler) { | |||
} | |||
} | |||
|
|||
// tryGetOffset will query kafka repeatedly for the requested offset, and return it when it has it, | |||
// or halt the program if can't retrieve the value | |||
func (k *KafkaMdm) tryGetOffset(topic string, partition int32, offsetStr string, offset int64, attempts int, sleep time.Duration) int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offsetStr in something that should be determined inside this function, not passed in.
var offsetStr string
switch offset {
case sarama.OffsetNewest:
offsetStr = "newest"
case sarama.OffsetOldest:
offsetStr = "oldest"
default:
offsetStr = strings.FormatInt(offset, 10)
}
input/kafkamdm/kafkamdm.go
Outdated
break | ||
} | ||
if attempt == 7 { | ||
log.Fatal(3, "kafka-mdm failed to get %s of partition %s:%d. %s (attempt %d/7)", offsetStr, topic, partition, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you also look to missing "attempt" from the args passed to log.Fatal()
input/kafkamdm/kafkamdm.go
Outdated
if attempt == 7 { | ||
log.Fatal(3, "kafka-mdm failed to get %s of partition %s:%d. %s (attempt %d/7)", offsetStr, topic, partition, err) | ||
} | ||
log.Error(3, "kafka-mdm failed to get %s of partition %s:%d. %s (attempt %d/7)", offsetStr, topic, partition, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing "attempt" from the args passed to log.Fatal()
404553e
to
d75c075
Compare
input/kafkamdm/kafkamdm.go
Outdated
@@ -262,19 +300,16 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset | |||
log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err) | |||
} | |||
k.lagMonitor.StoreOffset(partition, currentOffset, ts) | |||
offset, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest) | |||
newest, err := k.client.GetOffset(topic, partition, sarama.OffsetNewest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for consistancy I think this should use currentOffset, err = k.tryGetOffset(topic, partition, sarama.OffsetOldest, 1, time.Second)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you mean "use tryGetOffset", that's fine I can do that
but assigning the oldest offset to currentOffset doesn't make sense here. the purpose of this code is update newest
to be the currently newest offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i meant use k.tryGetOffset
instead of calling k.client.GetOffset
directly.
if attempt == attempts { | ||
break | ||
} | ||
log.Warn("kafka-mdm %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to log before break'ing, else the last log message will never be written. if "attempts=1" then no log will be written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the warnings here are only for failed attempts that have subsequent attempts. the final error is returned to the caller and logged there as a critical. (this is what i thought you asked for)
LGTM |
it's up to the input plugin to signify when the priority gets low.
in case a peer node queries us because it thought we are healthy but actually it has outdated readyness/priority information (e.g. we just crashed and rebooted)
the failure mode is as follows: * MT starts up, and starts the input plugin * input is configured to consume from offset latest/oldest * for some reason consumption doesn't start straight away, and the ticker in consumePartition triggers and the offset of -1/-2 is saved. * consumption starts and concrete (high) offset is reported * MaintainPriority() runs, the rate looks ridiculously high, so a low priority score is reported (even though there may be a big lag) * the node effectively announces it can serve queries but in fact it doesn't have the data, leading to incomplete/incorrect query responses.
previously, the lag monitor would report 0 in certain cases (no measurements yet, rollover, kafka recovery), which would lead to an unrealistically low priority, making the node announce it's in great shape, but actually providing incorrect output if it's lagging behind. with this change: * if we don't know what our priority should be, report 10k, same as default unknown priority. * if we had to skip a lag report, just report the previous value (possibly also 10k or something concrete)
BEFORE: { "name": "metrictank3", "version": "0.7.4-85-ga578a40", "primary": false, "primaryChange": "2017-09-04T17:42:39.782065453Z", "state": 0, "priority": 10000, "started": "2017-09-04T17:42:39.780889544Z", "stateChange": "2017-09-04T17:42:39.782065509Z", "partitions": [ 4, 5, 6, 7 ], "apiPort": 6060, "apiScheme": "http", "updated": "2017-09-04T17:42:55.050884771Z", "remoteAddr": "172.21.0.10" } AFTER: { "name": "metrictank0", "version": "0.7.4-85-ga578a40", "primary": true, "primaryChange": "2017-09-04T18:38:35.821909901Z", "state": "NodeReady", "priority": 10000, "started": "2017-09-04T18:38:35.820140791Z", "stateChange": "2017-09-04T18:38:35.821909959Z", "partitions": [ 0, 1, 2, 3 ], "apiPort": 6060, "apiScheme": "http", "updated": "2017-09-04T18:38:53.106347005Z", "remoteAddr": "172.21.0.9" }
if we're starting from oldest/newest and have not consumed anything yet, still report lag and current offset, since we can know both accurately. In particular this fixes a bug where on a kafka cluster where nothing was being produced, nothing was also obviously being consumed, so no lag was reported, so priority was reported as 10k, so the node was not able to become ready.
we need the currentOffset to be concrete-ized instead of -1 (newest) or -2 (oldest) otherwise we cannot properly report our lag, and thus cannot set our priority and function in the cluster this allows us to then further simplify some things further down the code.
f2bcce3
to
65631a5
Compare
note that index init (backfill) is blocking, and plugin.MaintainPriority is only called afterwards. so when priority goes down from 10k, index is guaranteed to be loaded.
future nice to have: