-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Send samples to joining ingester during handover #788
Conversation
I'm considering adding a new ingester state |
7b6dd99
to
ab22fa5
Compare
pkg/ingester/ingester_claim.go
Outdated
userSamples := &i.joiningSampleQueue[j] | ||
userCtx := user.InjectOrgID(stream.Context(), userSamples.userID) | ||
for k := range userSamples.samples { | ||
i.append(userCtx, &userSamples.samples[k]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returned error is ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I just log the error?
pkg/ingester/ingester.go
Outdated
userID, err := user.ExtractOrgID(ctx) | ||
if err != nil { | ||
// TODO not sure what to do here | ||
return &client.WriteResponse{}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't happen (request will be rejected without a userid), but we should at least return nil, err
if it does.
pkg/ingester/ingester_claim.go
Outdated
i.append(userCtx, &userSamples.samples[k]) | ||
} | ||
} | ||
i.joiningSampleQueue = []userSamples{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably want to put this whole loop into a separate function and utilise a defer to do the unlock?
@@ -18,6 +18,7 @@ message IngesterDesc { | |||
message TokenDesc { | |||
uint32 token = 1; | |||
string ingester = 2; | |||
string nextIngester = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this extra state redundant? Could this be detected uniquely by a token being owned by a "PENDING" ingester?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the transfer is underway the tokens are still owned by the leaving ingester. The ingester recieving the chunks is in JOINING state.
pkg/ring/ring.proto
Outdated
@@ -26,4 +27,5 @@ enum IngesterState { | |||
|
|||
PENDING = 2; | |||
JOINING = 3; | |||
PREPARING_TO_LEAVE = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably want to start adding comments to explain these states and their transitions - how is PREPARING_TO_LEAVE different from LEAVING?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
We send new samples to PREPARING_TO_LEAVE ingesters but not LEAVING ingesters.
(We sort out the token.NextIngester
pointer while in PREPARING_TO_LEAVE)
Thanks @leth! Is there a short description of how this is supposed to behave or what this is trying to achieve? I'm a bit concerned about synchronisation around state changes - can we guarantee none of the samples added to the Will holding the pending sample lock like this when flushing all the samples prevent writes from succeeding to the ingester for potentially a longish time? Should they fail instead, or should we be able to add to the queue whilst draining it? Generally, I think trying to improve the current bootstrapping process could be considered polishing a turd (not that I'm against that - incremental improvements and all). I'm am in favour of replacing this whole process with something more akin to other DHTs, where a joining node picks N fresh tokens and incrementally transfers data from the replicas of the ranges they intersect, and leaving nodes do the opposite; this has the added benefit of allow elastic scaling. WDYT? |
Sorry, I've updated the PR description to cover that in more detail
I think ring state changes should be delivered in-order as lot of this interaction is pretty synchronous, see below for an outline of what happens.
I don't think we hold the lock for a flush, just while we empty the slice into the userStates - it could take some time, but I doubt it'd take that long. I couldn't think of a better way of doing it though, any suggestions.
It certainly sounds better for scaling, but also sounds like quite a significant change. Also, it will need to do something similar to this change to avoid the same mysterious flush issue. |
So I took this for a test drive in our dev cluster today. It didn't crash and burn, but it also didn't fix the flush issue. |
I tried some analysis of what happend in kibana, but didn't reach any conclusions |
Development on this seems to have stalled; any plans to revisit, or should we close? |
Thanks for the reminder. I'm unlikely to find time to work on this soon, so will close this PR. |
Avoids a "Mysterious Flush" (#467) after ingester handover by adding transfer destination information to the ring.
While an ingester handover is underway, the distributor will skip the leaving ingester, and pick an extra ingestor to maintain the same replication count for incoming samples.
This temporary ingestor is never involved in the handover, and will recieve samples only for the duration of the handover.
After the handover, since no more samples are being recieved, the temporary ingestor will eventually (default 5m later) mark those chunks as idle and flush them.
During a rollout this results in every ingestor flushing a shedload of chunks.
This change attempts to prevent that by marking a transfer as underway, sending new samples to the joining ingestor, and batching those new samples to be appended to the chunk store after a transfer is complete.
Fixes #467