-
Notifications
You must be signed in to change notification settings - Fork 104
use node priority for routing requests #541
Conversation
The main purpose of this is to allow us to reduce the warmUpPeriod in hosted-metrics, allowing read nodes to start accepting queries a few minutes after they come online, but not start serving their own data until they have finished processing the kafka backlog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good. just a few remarks.
input/kafkamdm/kafkamdm.go
Outdated
for range ticker.C { | ||
cluster.Manager.SetPriority(k.lagMonitor.Metric()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be cleaner if we were able to exit out of this goroutine when the plugin stops. in this case it doesn't matter much since we only call Stop() when stopping MT, but that may change.
membersMap := make(map[int32][]Node) | ||
// store the available nodes for each partition, grouped by | ||
// priority | ||
membersMap := make(map[int32]*partitionCandidates) | ||
if thisNode.IsReady() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we set the priority irrespective of thisNode.IsReady()
? same for a bit further down where we continue the for loop if !member.IsReady()
. I thought we would set the candidacies of the nodes based solely on their priority. now they are all still subject to the warmup period and we won't query them until they are warmed up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Nodes that are marked as notReady are still excluded. The priority is used in addition to the ready state (set to ready after warmUpPeriod). It is implemented this way to ensure that when using carbon input, things work exactly as they previously did as all nodes will have the same prio.
having a small warmUp period is still useful as the nodes do a fair amount of work during startup.
This also keeps our options open for later marking the node as notReady based on other information (query failure rates, manually via the api, etc..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having a small warmUp period is still useful as the nodes do a fair amount of work during startup.
note that we only call SetReady or SetReadyIn(warmupPeriod) as the very last thing during startup. in particular, loading the index and starting the input plugins comes first.
So it sounds good, keeping the warmup mechanism, especially for carbon. I'm just skeptical about this particular argument. I think a warmup of 0 should work when using kafka and offset seek back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that though a node can handle requests (forwarding to other nodes) while it is low prio, we use the ready state in hosted-metrics for the k8s readiness probe.
This has complications for deploying upgrades. We do rolling upgrades, but if the nodes report as being ready when they cant actually serve data then k8s will push the upgrade out too fast and there will be no instances with all of the data (all running instances will still be processing their backlog)
So in addition to converting the lag from a number of messages behind, i have also added a config option for maxPrio to mark the node as notReady. By default it is set to 10seconds, so if the node is 10secon or more behind it will be marked as notReady. For hosted-metrics this will ensure that the nodes remain out of service until they have completed processing the backlog of data (or within 10seconds of processing it)/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how concerned should we be for a scenario like this:
something goes wrong with kafka consumption -> all MT instances start to lag -> all mark themselves as not ready -> cluster is unavailable for reads. whereas it could just have shown some data with the most recent data missing, now users can't see anything
so we basically have a semantical difference:
A) healthy enough for the health check in k8s = in great health (priority value that is at least as good as the other older nodes) = other, older nodes can be downed.
B) healthy enough to participate in data read path = in mediocre health = maybe the data is laggy, but it's better then nothing. if we need to read from this node because it has the best prio, so be it. it's better than showing no data.
note that for loadbalancers (and just loadbalancing, not deployments), they should also use B for the same reason (better partial data than no data, priority system takes care of the rest - making sure we select the node with best data)
perhaps we should turn this into two different concepts. is it possible to use a http endpoint for the rolling upgrade health, and a different one for loadbalancing?
the one for A could then use the max-prio setting to affect the state, but it seems best if it doesn't affect B.
what would be really nice in k8s is that if it could compare the priority of instances. then there could be a rule that says if the new node has better priority than the old, the old can be killed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to use a http endpoint for the rolling upgrade health, and a different one for loadbalancing?
no
how concerned should we be for a scenario like this:
It is a concern, but something that we can address later.
cluster/cluster.go
Outdated
// ThisNode if it is capable of handling queries. | ||
// Only 1 member per partition is returned. | ||
// The nodes are selected based on priority, using a random selection | ||
// if there are multi nodes with the highest priority. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't remove the comment explaining that ThisNode can be included. that's really good to know. Also I think "Only 1 member per partition is returned." is only correct for symmetric partition assignments, not for shuffled assignments. that's how we have it setup so for us it's true, but it doesn't look like an intrinsic property of the code to be that way. so let's clarify
4550081
to
a81c6b3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
- add a priority attribute to the cluster nodes - when getting peers needed to handle a query, use the nodes with the lowest priority. - update kafkamdm to track kafka lag, and use this lag as the node priority. So if a node has a large lag (either due to proecessing the backlog at startup, or because of a fault) it will stop recieving queries. But if all nodes in the cluster have high lag, then queries will still be processed and the user will get all of the data available. - closes #519
- instead of just using the number of messages the node is lagging by, divide the number of messages by the ingest rate to get the number of seconds of lag. ie, if a node is ingesting at 10k/s and the lag is 1k, then the priority is set to 0, as the outstanding metrics will be processed within 1second. But if the ingest rate is 10k, and the lag is 40k then the prio gets set to 4 as it will take 4seconds to process the outstanding metrics at the current rate.
… (lower priority) For kafkamdm specifically this allows us to remove a node from service if it is lagging by too much (measured in seconds of lag)
426e4e6
to
898b4aa
Compare
@@ -78,7 +78,7 @@ func (n Node) RemoteURL() string { | |||
} | |||
|
|||
func (n Node) IsReady() bool { | |||
return n.State == NodeReady | |||
return n.State == NodeReady && n.Priority <= maxPrio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is it about the priority system that is not good enough, that we need this? I don't see how this improves anything.
for the user the experience seems to get worse. instead of getting stale data from a node, they won't get data at all . e.g. instead of having a chart that has series with a gap, the series would be missing entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input/kafkamdm/lag_monitor.go
Outdated
duration := int64(ts.Sub(o.lastTs) / time.Second) | ||
o.lastTs = ts | ||
if duration <= 0 { | ||
// current ts is > last ts. This would only happen if our clock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<
input/kafkamdm/lag_monitor.go
Outdated
// low it has really been since we last took a measurement. | ||
return | ||
} | ||
rate := metrics / duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we kept duration in milliseconds until here, and then rounded after this step, our results could be up to 20% more accurate (assuming we do this every 5 seconds)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would require using a float64 for the rate instead of an int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, the ticker that executes Store() will likely run very close to the second boundary. So there would not be a significant change in accuracy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would it run close to the second boundary? it's a standard ticker so it depends on when it was started.
see:
~/t/testticker ❯❯❯ cat main.go
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("start time", time.Now())
ticker := time.NewTicker(2 * time.Second)
for ts := range ticker.C {
fmt.Println(ts)
}
}
~/t/testticker ❯❯❯ go run main.go
start time 2017-02-23 10:39:45.746803375 +0100 CET
2017-02-23 10:39:47.747025261 +0100 CET
2017-02-23 10:39:49.747043167 +0100 CET
^Csignal: interrupt
~/t/testticker ❯❯❯ go run main.go ⏎
start time 2017-02-23 10:39:53.88361158 +0100 CET
2017-02-23 10:39:55.883752266 +0100 CET
2017-02-23 10:39:57.883791314 +0100 CET
^Csignal: interrupt
That would require using a float64 for the rate instead of an int.
no, we just round to an int after doing the rate calculation. i created PR #546 which shows what i mean.
input/kafkamdm/lag_monitor.go
Outdated
return | ||
} | ||
rate := metrics / duration | ||
if rate < 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could have done this check earlier, by checking metrics < 0
.
with the lowest priority.
node priority. So if a node has a large lag (either due to
proecessing the backlog at startup, or because of a fault) it
will stop recieving queries. But if all nodes in the cluster
have high lag, then queries will still be processed and the user
will get all of the data available.