Skip to content

Commit

Permalink
fix: Ensure deleting is handled correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jul 16, 2024
1 parent 25f1e70 commit 3fc4172
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 22 deletions.
2 changes: 2 additions & 0 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ impl BlockStreamsHandler {
function_name: String,
) -> anyhow::Result<()> {
if let Some(block_stream) = self.get(account_id, function_name).await? {
tracing::info!("Stopping block stream");

self.stop(block_stream.stream_id).await?;
}

Expand Down
1 change: 1 addition & 0 deletions coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl ExecutorsHandler {
function_name: String,
) -> anyhow::Result<()> {
if let Some(executor) = self.get(account_id, function_name).await? {
tracing::info!("Stopping executor");
self.stop(executor.executor_id).await?;
}

Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl IndexerStateManagerImpl {
}

pub async fn delete_state(&self, indexer_state: &IndexerState) -> anyhow::Result<()> {
tracing::info!("Deleting state");

self.redis_client.delete_indexer_state(indexer_state).await
}

Expand Down
77 changes: 55 additions & 22 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ impl<'a> LifecycleManager<'a> {
#[tracing::instrument(name = "initializing", skip_all)]
async fn handle_initializing(
&self,
config: &IndexerConfig,
config: Option<&IndexerConfig>,
_state: &IndexerState,
) -> LifecycleState {
if config.is_none() {
return LifecycleState::Deleting;
}

let config = config.unwrap();

if self
.data_layer_handler
.ensure_provisioned(config)
Expand All @@ -72,7 +78,17 @@ impl<'a> LifecycleManager<'a> {
}

#[tracing::instrument(name = "running", skip_all)]
async fn handle_running(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState {
async fn handle_running(
&self,
config: Option<&IndexerConfig>,
state: &IndexerState,
) -> LifecycleState {
if config.is_none() {
return LifecycleState::Deleting;
}

let config = config.unwrap();

if !state.enabled {
return LifecycleState::Stopping;
}
Expand All @@ -97,7 +113,13 @@ impl<'a> LifecycleManager<'a> {
}

#[tracing::instrument(name = "stopping", skip_all)]
async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState {
async fn handle_stopping(&self, config: Option<&IndexerConfig>) -> LifecycleState {
if config.is_none() {
return LifecycleState::Deleting;
}

let config = config.unwrap();

if let Err(error) = self
.block_streams_handler
.stop_if_needed(config.account_id.clone(), config.function_name.clone())
Expand All @@ -120,7 +142,15 @@ impl<'a> LifecycleManager<'a> {
}

#[tracing::instrument(name = "stopped", skip_all)]
async fn handle_stopped(&self, state: &IndexerState) -> LifecycleState {
async fn handle_stopped(
&self,
config: Option<&IndexerConfig>,
state: &IndexerState,
) -> LifecycleState {
if config.is_none() {
return LifecycleState::Deleting;
}

// TODO Transistion to `Running` on config update

if state.enabled {
Expand All @@ -133,10 +163,14 @@ impl<'a> LifecycleManager<'a> {
#[tracing::instrument(name = "repairing", skip_all)]
async fn handle_repairing(
&self,
_config: &IndexerConfig,
config: Option<&IndexerConfig>,
_state: &IndexerState,
) -> LifecycleState {
// TODO Add more robust error handling, for now attempt to continue
if config.is_none() {
return LifecycleState::Deleting;
}

// TODO Add more robust error handling, for now just stop
LifecycleState::Repairing
}

Expand All @@ -163,6 +197,8 @@ impl<'a> LifecycleManager<'a> {
return LifecycleState::Deleting;
}

info!("Clearing block stream");

if self
.redis_client
.del(state.get_redis_stream_key())
Expand Down Expand Up @@ -227,23 +263,16 @@ impl<'a> LifecycleManager<'a> {
first_iteration = false;
}

if state.lifecycle_state == LifecycleState::Deleted {
break;
}

let next_lifecycle_state = if let Some(config) = config.clone() {
match state.lifecycle_state {
LifecycleState::Initializing => self.handle_initializing(&config, &state).await,
LifecycleState::Running => self.handle_running(&config, &state).await,
LifecycleState::Stopping => self.handle_stopping(&config).await,
LifecycleState::Stopped => self.handle_stopped(&state).await,
LifecycleState::Repairing => self.handle_repairing(&config, &state).await,
LifecycleState::Deleting | LifecycleState::Deleted => {
unreachable!("handled explicitly above")
}
let next_lifecycle_state = match state.lifecycle_state {
LifecycleState::Initializing => {
self.handle_initializing(config.as_ref(), &state).await
}
} else {
self.handle_deleting(&state).await
LifecycleState::Running => self.handle_running(config.as_ref(), &state).await,
LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await,
LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await,
LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await,
LifecycleState::Deleting => self.handle_deleting(&state).await,
LifecycleState::Deleted => LifecycleState::Deleted,
};

if next_lifecycle_state != state.lifecycle_state {
Expand All @@ -253,6 +282,10 @@ impl<'a> LifecycleManager<'a> {
);
}

if next_lifecycle_state == LifecycleState::Deleted {
break;
}

state.lifecycle_state = next_lifecycle_state;

loop {
Expand Down

0 comments on commit 3fc4172

Please sign in to comment.