-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: resilience #510
Comments
@stephensearles thanks again for putting this proposal together, apologies for the delay in responding. I've read through it and there are a lot of solid ideas. I think my biggest concern is that I don't think a delegate for I'm not sure how far you've gotten in your experiments, but I imagine that when you get to the point where state is being propagated via consensus (raft, whatever), you'll run into an issue due to the way Basically, for messages that end up propagated to a remote Fortunately, I've actually thought through this specific problem quite a bit and was already in the middle of writing up my thoughts on potential paths forward to achieving replication. It involves more significant changes, and I was hoping to complete the document before showing it to folks, but this feels like an appropriate time to have a discussion about it. Take a look, https://github.com/mreiferson/mreiferson.github.com/blob/nsq_roadmap_1/posts/nsq_roadmap.md and let's continue to discuss. |
Nice, I gave that a quick read through. Very cool. So among all the things to talk about on this, this one piece does stand out to me as something I've been thinking about quite a bit: raft. Having read the paper, spent a lot of time with visualizations, and reading the code, I think we might be able to get away with not having true leaders for nsqd. So the reason raft has a leader is so that two requests mutating the same state don't end up in conflict. It allows changes to be atomic across the whole cluster. With nsqd, we don't have state that could end up conflicted. Messages being published can't overwrite each other. The closest we can have to conflict would be in the requeue counter. For that, though, I think we can just consider the nsqd node that owns it being able to always increment the counter. The other nsqd nodes should not attempt to send the message ... until the owner node goes down. Once that node is detected as down, an election should occur to choose the new owner of that node's messages. I've thought about that quite a bit, and perhaps I'm missing something, but I think it allows us to forego a requirement limiting publishes to specific nodes. A related thought: raft has increasing costs as the number of participating nodes increases, especially across the WAN. I have seen that Anyway, thanks for the feedback. I'm still processing the whole of your comments and that doc, but I'm excited to move forward. |
The "backup" messages propagated would need to be stored apart from regular messages being published through an ansqd node. For the FIN to happen well, they'd need to be stored in a map of some kind (in memory or on disk) so they can be looked up and cleared by message ID. |
Right, exactly, I was simply pointing out that there is more design work to do. Put more simply, I'm not fully convinced that we can "bolt on" replication given the current design and implementation of I think this should be easily achievable via configuration. |
Raft is just one way of guaranteeing the consistency of replicated data. In this case it seems like that would be an incredibly useful property, not so much around telemetry (although that's a nice side-effect), but rather around providing a guarantee that if you've published a message and received an
This is a crucial detail - because this affects backwards compatibility and is something that shouldn't overburden producers. |
Another thought I've had recently: I think we might consider baking into the design a cancelable consistency requirement. In particular, there may be cases where it's much quicker to simply deliver the message than to achieve the desired redundancy for undelivered messages. That isn't to say we should wait to replicate, but that we shouldn't wait to deliver, and upon successful delivery, that we |
So I've gone ahead and posted the work in progress implementation of the proposal I sent. The major work done so far is the delegate, storing messages grouped by an expiration and by ID, and starting to work on host recovery. In particular, this work is missing a fallback to on-disk storage and the raft implementation is incomplete. Two observations I have about this work so far:
|
This is the big advantage of raft, but we don't need a replicated log to be as consistent as raft guarantees. I think we can trade off those things we don't need for the throughput and simplicity we do want. That said, my work so far has been to use an out-of-the-box raft implementation to get an initial proof of concept for the larger architecture before optimizing how and what I achieve with regard to consistency. Related to that thought: I'd like to discuss narrowing down some short or short-ish term goals in light of your document. (This is motivated by a work project, after all.) Resilience is my primary goal, but I think the gossip piece is closely related enough that we should strongly consider tackling them together. Compaction and WAL seem like optimizations that improve, but are technically redundant to strong resilience, and unless performance really suffers, I think we can consider those as separate goals. What do you think? So what this could look like broken down into slightly smaller goals:
Some of this I've done in the code I posted, some of this is vague, but I'm putting this here as a starting place. In terms of solving problems:
What do you think? What am I missing here? |
That's a really neat idea - I dig it 👍 💯 ✨ |
I've got to review your WIP implementation in detail, I haven't had the time to do that yet... but:
I've got obvious concerns about taking the approach of introducing new While I'm super happy that the I'm stewing on it... |
The WAL (and its compaction) provide a way for nodes to recover from failure, so no, it isn't strictly related to replication but is an orthogonal and complimentary feature. Your list seems like a reasonable series of changes, the devil is in the details of course.
Bootstrapping gossip is a little different than bootstrapping this proposed replication strategy - think about the current use case, for example... We'd like it to be as simple as possible to run a bunch of It's fine for now for it to be "each node needs to specify a root node for gossip" similar to how the current state is "each node needs to specify a hard list of
My biggest concern is that it entirely defeats the purpose of the hybrid in-memory/on-disk storage mechanism. Relatedly, this is why considering the WAL may be more important than you think... |
So in light of the gossip work, I ended up writing a re-implementation of raft that uses serf as its mode of communication and source of node-liveness information. Perhaps its not accurate to call it raft, but that's definitely what it's based on. Other than the obvious advantage of a resilient communication layer, we can adapt to use the same serf instance for the lookup and resilience tasks. While deduplicating work, that will also deduplicate the need for bootstrapping. The documentation is here: http://godoc.org/github.com/shipwire/ansqd/internal/polity For the WAL question, yes, I agree, there is likely value in making sure there's a persistent storage element to it. The current implementation for message storage is to group them by an "expiration time," removing messages as they get confirmed as handled by the cluster. The expiration time can be looked up periodically for any outstanding messages that need to be recovered. The implementation of that is here: https://github.com/shipwire/ansqd/blob/master/buckets.go |
I want to make sure I'm on the same page with the direction you're proposing. Polity would have the responsibility, on top of our ongoing gossip work, for "self-organizing" nodes in the cluster into various roles. We would then use these "roles" to form replication groups (of sizes 3-5, etc.)? From there, we can layer on a replication strategy between those role-based groups, strongly consistent or otherwise? |
I don't have a better place to ask polity specific questions, so here goes... I haven't read the raft paper in some time, but I thought its use of terms and commit index further protected it against split brain situations - how does polity handle this? |
So polity is kind of a simplified raft. It has no replication log, so there's no commit index, and it has no single leader role, so terms don't make sense either. You can think of polity roles as named, globally distributed locks. When one node proposes its candidacy for a role, and it is elected, no other node can take on that role until the first node is recalled. The three main operations, electing, recalling, and querying, all require a majority of all known nodes to succeed. In a split brain situation, a few things could happen. If the serf/memberlist instance still recognizes the presence of the missing nodes from the other side of the brain, only the side with a majority could win elections. Once the missing nodes are timed out, you could start to win elections on both sides and end up with two different role-holders. Once the partition is resolved, queries will return the node that had the majority of votes. Describing that has led me to a possible improvement: when joining two clusters (because of a partition), proactively resolve role conflicts by looking at which node has the most votes, and if there's a tie, break it by choosing the one that won at the earliest lamport time. As far as how that works with replication: the current idea is to not use polity to replicate at all, but rather to use special topics to replicate messages (1) to peers. The FIN from a peer is a clear enough signal that it has received a message, and replication can be considered successful when a majority of peers have FIN'd it. When all is well, once the originating nsqd finishes with the message, it will notify peers about that success. The other nodes can stop worrying about that message. Now say the original node goes down. Once a timeout has been reached, some number of peer nodes (>1) will initiate an election through polity, asking for a role that is specific to recovering the node that went down. Once a peer wins that election, it is then responsible for delivering all the downed node's messages. There isn't currently a notion of subdividing nodes into replication groups in the code I'm working on, but that seems like a reasonably good thing to do. (1) By "message" here, I'm referring to a message as it sits on a particular channel's queue. So each nsqd can treat each "message" as a single required delivery. |
Also, FWIW, depending on the settings you're using with memberlist, the various defaults for removing unreachable nodes range from about 3 seconds (Local), to about 5 seconds (LAN), to about 25 seconds (WAN). |
I think this is going to be important for larger clusters, particularly to limit the number of replicated topics per-node (scalability).
What do you mean by "special topics"? What is the impact of this approach from an NSQ client's perspective? |
Well, currently, that idea is just using a reserved topic name. I guess that might be a breaking change if there's nothing like that currently. Perhaps there's a way to do that communication in a different way. Like with a different TCP command. |
Is this something that's being worked on? |
Yes, in #625 |
Looks like this was getting there with #625 any updates ? |
That branch needs quite the rebase :) |
Could we improve resilience using object storage (S3) like Warpstream[1]? |
Here's a proposal we're looking at to provide resilience on top of NSQ. This idea requires a small change to the nsqd library in that it would need to be notified when a message has been FIN'd.
EDIT: latest proposal link
The text was updated successfully, but these errors were encountered: