-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Spawn dedicated control loops per Indexer #866
Conversation
72fbad5
to
25f1e70
Compare
2685ada
to
100ea2e
Compare
let finished_tasks: Vec<String> = lifecycle_tasks | ||
.iter() | ||
.filter_map(|(name, task)| task.is_finished().then_some(name.clone())) | ||
.collect(); | ||
|
||
for indexer_name in finished_tasks { | ||
tracing::info!(indexer_name, "Lifecycle has finished, removing..."); | ||
|
||
lifecycle_tasks.remove(&indexer_name); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to do this in two as can't hold both mutable (lifecycle_tasks.remove
) and immutable (for ... in ...
) reference at the same time
serde_json::from_slice(&call_result.result)?; | ||
|
||
return Ok(IndexerConfig { | ||
let indexer = serde_json::from_slice::<Option<registry_types::IndexerConfig>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle Option
which was recently added to read_indexer_function
contract
This PR introduces dedicated/self-contained control loops per Indexer, replacing the single/combined control loop. The motive for this ticket is described in #811, you can read more about it there. Overall, there is lots of clean up to be done, but I wanted to get this out the door as quick as possible as to not block the features required to build on top of this. I've discussed some of the major concerns below. ## `LifecycleManager` These dedicated control loops are managed by the `LifecycleManager` struct. This is a state machine which progresses the Indexer through different states depending on the context. The different states and their transitions are described on the `LifecycleState` enum: ```rust /// Represents the different lifecycle states of an Indexer #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub enum LifecycleState { /// Pre-requisite resources, i.e. Data Layer, are being created. /// /// Transitions: /// - `Running` on success /// - `Repairing` on Data Layer provisioning failure #[default] Initializing, /// Indexer is functional, Block Stream and Executors are continouously monitored to ensure /// they are running the latest version of the Indexer. /// /// Transitions: /// - `Stopping` if suspended /// - `Running` if Block Stream or Executor fails to synchronise, essentially triggering a /// retry /// - `Running` on success Running, /// Indexer is being stopped, Block Stream and Executors are being stopped. /// /// Transitions: /// - `Stopping` on failure, triggering a retry /// - `Stopped` on success Stopping, /// Indexer is stopped, Block Stream and Executors are not running. /// /// Transitions: /// - `Running` if unsuspended Stopped, /// Indexer is in a bad state, currently requires manual intervention, but should eventually /// self heal. This is a dead-end state /// /// Transitions: /// - `Repairing` continuously Repairing, // TODO Add `error` to enable reparation /// Indexer is being deleted, all resources are being cleaned up /// /// Transitions: /// - `Deleting` on failure, triggering a retry /// - `Deleted` on success Deleting, /// Indexer is deleted, all resources are cleaned up, lifecycle manager will exit Deleted, } ``` The logic of this `struct` is very light, triggering high-level actions required within each state, and then returning the next desired state. Most of the "doing" logic has has been encapsulated in the other related `structs` as discussed below. The lifecycle state is stored in Redis so that the Indexer can pickup where it left off. A migration has been added to accommodate this new field, which replaces the existing `provisioned_state` field. ## `Handler`s Previously, the "handlers", i.e. `BlockStreamsHandler`, were lightweight `structs` which wrapped the gRPC client/methods. In this PR, I've moved all "synchronisation" logic to these structs. So rather than calling the e.g. `data_layer_handler.start_provisioning_task()` method, we can call `ensure_provisioned()` which manages all related logic. I feel this has been encapsulation, and allows the `LifecycleManager` to be light. I've had to remove `automock`, so we don't have mocked versions for this right now. Cloning mocked versions is tricky, and requires manual mocking. Rather than bloat this PR, I've left this out. Eventually, I'll separate the "sync" logic from the "client" logic, so that the latter can be easily mocked, and the sync logic covered by unit tests. Additionally, I've added `get` methods for both Block Streamer and Executors RPC, as listing is no longer convenient given we are managing Indexers individually. The getters use `account_id` and `function_name` as opposed to IDs. I'm considering moving away from IDs as the only way to get them is via list, which isn't helpful. Right now it's somewhat of a transitory state.
|
||
/// Represents the different lifecycle states of an Indexer | ||
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] | ||
pub enum LifecycleState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a while to finally feel like I understand how this life cycle is supposed to work. Overall, I agree with the states. The way I am breaking them down is this:
- We technically can start in any state since we read it from Redis.
- In normal circumstances, I expect us to start in Initializing (If new), Running (If existing), and Stopped (If suspended).
- We have four action states, meaning Coordinator did something and is waiting a result. These are: Initializing, Running, Stopping, and Deleting.
- We have two steady states: Running and Stopped. We would expect an indexer to stably stay in these states.
- We have two dead end states: Repairing and Deleted.
The thing I don't like about the current life cycles are that it seems to abstract retry behavior away from the state. We have the fact that Running loops into itself if it receives an unhealthy state, which I don't think I like as it seems we intend to also do retries in Repairing. Meaning we have two different retry mechanisms, one of which is way mroe hidden than the other. Whether its a failure in Running or a failure in Initializing, we should still represent the state as Repairing/Restarting. The state should accurately represent what Coordinator is trying to do. Retries in Provisioning might be exponential, whereas retries in Running would maybe be static. Maybe there's a retry limit. Maybe we don't retry at all. I think if we can configure the Repairing state to receive the previous state transitioned from, and retry configuration, it can serve as a single retry state which can then return to the correct state afterward, using those two pieces of information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your concerns here. I added Repairing
prematurely as I expected to eventually need it, but am not using it as intended yet.
The other important part to these states are that they are persisted in Redis, meaning, on Coordinator restart they can be picked up again. If an Indexer is 'broken' will we need to resume repair, I think yes, so you're right that we should house all retry logic within this state.
One thing I was unsure about with Repairing
is that it convolutes the logic: we check for issues in the other states, but deal with them in this state. Currently we just deal with the issues immediately when we have the required context on hand, which is much simpler to do. We'd need to pass that context to the Repairing
state like you're suggesting, which creates the indirection. But I think it's probably worth it to make all the states easier to understand.
Let's work towards utilising Repairing
more.
let config = config.unwrap(); | ||
|
||
if !state.enabled { | ||
return LifecycleState::Stopping; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused initially about Stopping as I thought this was how Running performs retries. Namely by going Running -> Stopping -> Stopped -> Running. In any case, this state is only reached if we disable an Indexer right? Do you think it makes sense to instead call this state Suspending
instead? I know the distinction is small, it feels more natural to me, especially when the indexer then reaches Suspended
state, which feels more intentional than Stopped state, where I can't tell immediately what that means and why its stopped and not starting again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think those names are better, I can rename.
pub async fn ensure_provisioned(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> { | ||
tracing::info!(account_id = ?indexer_config.account_id, function_name = ?indexer_config.function_name, "Provisioning data layer"); | ||
|
||
let start_task_result = self.start_provisioning_task(indexer_config).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we set up the provisioning task to do a callback after the provisioning task completes? Provisioning can take minutes. I feel that these finite length tasks should still follow our existing format of calling back immediately, followed by Coordinator monitoring the status. The difference being instead of healthy/unhealthy we check for if we eventually get a success of failed result.
"Stopping outdated executor" | ||
); | ||
|
||
self.stop(executor.executor_id).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have a Lifecycle enum, we should represent behavior like Stopping as a state, rather than expecting stop to be successful, followed by starting automatically. Either as a Stopping state (With the current one being renamed to Suspending), or as a Restarting state (Maybe rename Repairing to Restarting if we want all restarting to be the same). Something like that? I don't think I like the idea of synchronize having lifecycle related behavior such as updating/restarting inside it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would make things a bit more complicated. Synchronisation relies on a sequence of events, i.e. stop > reconfigure > start. If we wanted to handle stopping outside of this, we'd need to pass that context up to the lifecycle manager so that it knows what to do next.
I see the 'stop' here as something more granular than the over arching lifecycle: the Running
state will work towards keeping the indexer components running and up to date, and when it can no longer progress, it would move to a different state, i.e. Repairing
.
This PR introduces dedicated/self-contained control loops per Indexer, replacing the single/combined control loop. The motive for this ticket is described in #811, you can read more about it there.
Overall, there is lots of clean up to be done, but I wanted to get this out the door as quick as possible as to not block the features required to build on top of this. I've discussed some of the major concerns below.
LifecycleManager
These dedicated control loops are managed by the
LifecycleManager
struct. This is a state machine which progresses the Indexer through different states depending on the context. The different states and their transitions are described on theLifecycleState
enum:The logic of this
struct
is very light, triggering high-level actions required within each state, and then returning the next desired state. Most of the "doing" logic has has been encapsulated in the other relatedstructs
as discussed below.The lifecycle state is stored in Redis so that the Indexer can pickup where it left off. A migration has been added to accommodate this new field, which replaces the existing
provisioned_state
field.Handler
sPreviously, the "handlers", i.e.
BlockStreamsHandler
, were lightweightstructs
which wrapped the gRPC client/methods. In this PR, I've moved all "synchronisation" logic to these structs. So rather than calling the e.g.data_layer_handler.start_provisioning_task()
method, we can callensure_provisioned()
which manages all related logic. I feel this has been encapsulation, and allows theLifecycleManager
to be light.I've had to remove
automock
, so we don't have mocked versions for this right now. Cloning mocked versions is tricky, and requires manual mocking. Rather than bloat this PR, I've left this out. Eventually, I'll separate the "sync" logic from the "client" logic, so that the latter can be easily mocked, and the sync logic covered by unit tests.Additionally, I've added
get
methods for both Block Streamer and Executors RPC, as listing is no longer convenient given we are managing Indexers individually. The getters useaccount_id
andfunction_name
as opposed to IDs. I'm considering moving away from IDs as the only way to get them is via list, which isn't helpful. Right now it's somewhat of a transitory state.