Skip to content

Commit

Permalink
feat: Deprovision Data Layer on delete (#808)
Browse files Browse the repository at this point in the history
This PR removes Data Layer resources on Indexer Delete. To achieve this,
the following has been added:
- `Provisioner.deprovision()` method which removes: schema, cron jobs,
and if necessary, Hasura source, database, and role
- `DataLayerService.StartDeprovisioningTask` gRPC method
- Calling the above from Coordinator within the delete lifecycle hook

In addition to the above, I've slightly refactored DataLayerService to
make the addition of de-provisioning more accomodating:
- `StartDeprovisioningTask` and `StartProvisioningTask` now return
opaque IDs rather than using `accountId`/`functionName` - to avoid
conflicts with eachother
- There is a single `GetTaskStatus` method which is used for both,
before it was specific to provisioning

As mentioned in #805, the Coordinator implementation is a little awkward
due to the shared/non-blocking Control Loop. I'll look to refactor this
later and hopefully improve on this.
  • Loading branch information
morgsmccauley authored Jun 20, 2024
1 parent d88a4dc commit 64d1ebb
Show file tree
Hide file tree
Showing 15 changed files with 711 additions and 194 deletions.
81 changes: 43 additions & 38 deletions coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![cfg_attr(test, allow(dead_code))]

pub use runner::data_layer::ProvisioningStatus;
use near_primitives::types::AccountId;

pub use runner::data_layer::TaskStatus;

use anyhow::Context;
use runner::data_layer::data_layer_client::DataLayerClient;
use runner::data_layer::{CheckProvisioningTaskStatusRequest, ProvisionRequest};
use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest};
use tonic::transport::channel::Channel;
use tonic::{Request, Status};
use tonic::Request;

use crate::indexer_config::IndexerConfig;

Expand All @@ -15,17 +17,14 @@ pub use DataLayerHandlerImpl as DataLayerHandler;
#[cfg(test)]
pub use MockDataLayerHandlerImpl as DataLayerHandler;

type TaskId = String;

pub struct DataLayerHandlerImpl {
client: DataLayerClient<Channel>,
}

#[cfg_attr(test, mockall::automock)]
impl DataLayerHandlerImpl {
pub fn from_env() -> anyhow::Result<Self> {
let runner_url = std::env::var("RUNNER_URL").context("RUNNER_URL is not set")?;
Self::connect(&runner_url)
}

pub fn connect(runner_url: &str) -> anyhow::Result<Self> {
let channel = Channel::from_shared(runner_url.to_string())
.context("Runner URL is invalid")?
Expand All @@ -38,7 +37,7 @@ impl DataLayerHandlerImpl {
pub async fn start_provisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
) -> anyhow::Result<TaskId> {
let request = ProvisionRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
Expand All @@ -49,46 +48,52 @@ impl DataLayerHandlerImpl {
.client
.clone()
.start_provisioning_task(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::AlreadyExists {
return Ok(ProvisioningStatus::Pending);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
};
.await?;

Ok(status)
Ok(response.into_inner().task_id)
}

pub async fn check_provisioning_task_status(
pub async fn start_deprovisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
let request = CheckProvisioningTaskStatusRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
account_id: AccountId,
function_name: String,
) -> anyhow::Result<TaskId> {
let request = DeprovisionRequest {
account_id: account_id.to_string(),
function_name,
};

let response = self
.client
.clone()
.check_provisioning_task_status(Request::new(request))
.start_deprovisioning_task(Request::new(request))
.await?;

let status = match response.into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
Ok(response.into_inner().task_id)
}

pub async fn get_task_status(&self, task_id: TaskId) -> anyhow::Result<TaskStatus> {
let request = GetTaskStatusRequest { task_id };

let response = self
.client
.clone()
.get_task_status(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::NotFound {
return Ok(TaskStatus::Failed);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => TaskStatus::Pending,
2 => TaskStatus::Complete,
3 => TaskStatus::Failed,
_ => anyhow::bail!("Received invalid task status"),
};

Ok(status)
Expand Down
43 changes: 37 additions & 6 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crate::redis::RedisClient;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum ProvisionedState {
Unprovisioned,
Provisioning,
Provisioning { task_id: String },
Provisioned,
Deprovisioning { task_id: String },
Failed,
}

Expand All @@ -31,13 +32,17 @@ pub struct IndexerState {
pub provisioned_state: ProvisionedState,
}

// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to
// construct the state key without it. But, this isn't ideal as we now have two places which
// define this key - we need to consolidate these somehow.
impl IndexerState {
// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to
// construct the state key without it. But, this isn't ideal as we now have two places which
// define this key - we need to consolidate these somehow.
pub fn get_state_key(&self) -> String {
format!("{}/{}:state", self.account_id, self.function_name)
}

pub fn get_redis_stream_key(&self) -> String {
format!("{}/{}:block_stream", self.account_id, self.function_name)
}
}

#[cfg(not(test))]
Expand Down Expand Up @@ -105,6 +110,12 @@ impl IndexerStateManagerImpl {
return Ok(serde_json::from_str(&raw_state)?);
}

tracing::info!(
account_id = indexer_config.account_id.to_string(),
function_name = indexer_config.function_name.as_str(),
"Creating new state using default"
);

Ok(self.get_default_state(indexer_config))
}

Expand Down Expand Up @@ -134,10 +145,30 @@ impl IndexerStateManagerImpl {
Ok(())
}

pub async fn set_provisioning(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn set_deprovisioning(
&self,
indexer_state: &IndexerState,
task_id: String,
) -> anyhow::Result<()> {
let mut state = indexer_state.clone();

state.provisioned_state = ProvisionedState::Deprovisioning { task_id };

self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;

Ok(())
}

pub async fn set_provisioning(
&self,
indexer_config: &IndexerConfig,
task_id: String,
) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Provisioning;
indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id };

self.set_state(indexer_config, indexer_state).await?;

Expand Down
4 changes: 4 additions & 0 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ mockall::mock! {
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static;

pub async fn del<K>(&self, key: K) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static;

pub async fn indexer_states_set_exists(&self) -> anyhow::Result<bool>;

pub async fn sadd<S, V>(&self, set: S, value: V) -> anyhow::Result<()>
Expand Down
Loading

0 comments on commit 64d1ebb

Please sign in to comment.