Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Deprovision Data Layer on delete #808

Merged
merged 26 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8b45444
chore: Add logging to `DataLayerService`
morgsmccauley Jun 18, 2024
85ee540
feat: Drop schema during deprovision
morgsmccauley Jun 18, 2024
07ac1aa
feat: Unschedule cron jobs
morgsmccauley Jun 18, 2024
be0e0ef
feat: Drop hasura datasource
morgsmccauley Jun 18, 2024
07a27fb
feat: Drop database
morgsmccauley Jun 18, 2024
7ef08aa
feat: Drop role
morgsmccauley Jun 18, 2024
e22b38a
fix: Format list schemas query correctly
morgsmccauley Jun 18, 2024
d8ba5ea
fix: Correctly format drop db query
morgsmccauley Jun 18, 2024
e5fd4fd
feat: Revoke cron access
morgsmccauley Jun 18, 2024
fce30f2
test: Add integration test for deprovisioning
morgsmccauley Jun 18, 2024
54525f6
refactor: Change data layer svc methods to make extension easier
morgsmccauley Jun 18, 2024
f8d0661
refactor: Use deterministic hashes to prevent duplicate tasks
morgsmccauley Jun 18, 2024
9e65602
feat: Deprovision via rpc
morgsmccauley Jun 18, 2024
c209a83
feat: Update rust data layer client
morgsmccauley Jun 18, 2024
4d699d5
feat: Return current task if it exists
morgsmccauley Jun 18, 2024
42b9b00
refactor: Provision via updated rpc proto
morgsmccauley Jun 18, 2024
9f4e152
feat: Deprovision data layer on delete indexer
morgsmccauley Jun 18, 2024
eebc305
fix: Handle existing indexers without provisioning task
morgsmccauley Jun 18, 2024
ed6a2a9
feat: Remove Redis Stream on delete
morgsmccauley Jun 19, 2024
c04ca63
test: Fix synchroniser tests after rebase
morgsmccauley Jun 20, 2024
7ce18e7
chore: Log when using default state
morgsmccauley Jun 20, 2024
6a3bfae
chore: Update example names for clarity
morgsmccauley Jun 20, 2024
c71dcac
chore: Use warn over info
morgsmccauley Jun 20, 2024
23f0693
test: Assert data layer non-/existance in integration tests
morgsmccauley Jun 20, 2024
91135bb
fix: Correctly query user owned schemas
morgsmccauley Jun 20, 2024
ca53f70
fix: Ensura hasura metadata is cleaned up after schema drop
morgsmccauley Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 43 additions & 38 deletions coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![cfg_attr(test, allow(dead_code))]

pub use runner::data_layer::ProvisioningStatus;
use near_primitives::types::AccountId;

pub use runner::data_layer::TaskStatus;

use anyhow::Context;
use runner::data_layer::data_layer_client::DataLayerClient;
use runner::data_layer::{CheckProvisioningTaskStatusRequest, ProvisionRequest};
use runner::data_layer::{DeprovisionRequest, GetTaskStatusRequest, ProvisionRequest};
use tonic::transport::channel::Channel;
use tonic::{Request, Status};
use tonic::Request;

use crate::indexer_config::IndexerConfig;

Expand All @@ -15,17 +17,14 @@ pub use DataLayerHandlerImpl as DataLayerHandler;
#[cfg(test)]
pub use MockDataLayerHandlerImpl as DataLayerHandler;

type TaskId = String;

pub struct DataLayerHandlerImpl {
client: DataLayerClient<Channel>,
}

#[cfg_attr(test, mockall::automock)]
impl DataLayerHandlerImpl {
pub fn from_env() -> anyhow::Result<Self> {
let runner_url = std::env::var("RUNNER_URL").context("RUNNER_URL is not set")?;
Self::connect(&runner_url)
}

pub fn connect(runner_url: &str) -> anyhow::Result<Self> {
let channel = Channel::from_shared(runner_url.to_string())
.context("Runner URL is invalid")?
Expand All @@ -38,7 +37,7 @@ impl DataLayerHandlerImpl {
pub async fn start_provisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
) -> anyhow::Result<TaskId> {
let request = ProvisionRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
Expand All @@ -49,46 +48,52 @@ impl DataLayerHandlerImpl {
.client
.clone()
.start_provisioning_task(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::AlreadyExists {
return Ok(ProvisioningStatus::Pending);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
};
.await?;

Ok(status)
Ok(response.into_inner().task_id)
}

pub async fn check_provisioning_task_status(
pub async fn start_deprovisioning_task(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<ProvisioningStatus> {
let request = CheckProvisioningTaskStatusRequest {
account_id: indexer_config.account_id.to_string(),
function_name: indexer_config.function_name.clone(),
account_id: AccountId,
function_name: String,
) -> anyhow::Result<TaskId> {
let request = DeprovisionRequest {
account_id: account_id.to_string(),
function_name,
};

let response = self
.client
.clone()
.check_provisioning_task_status(Request::new(request))
.start_deprovisioning_task(Request::new(request))
.await?;

let status = match response.into_inner().status {
1 => ProvisioningStatus::Pending,
2 => ProvisioningStatus::Complete,
3 => ProvisioningStatus::Failed,
_ => ProvisioningStatus::Unspecified,
Ok(response.into_inner().task_id)
}

pub async fn get_task_status(&self, task_id: TaskId) -> anyhow::Result<TaskStatus> {
let request = GetTaskStatusRequest { task_id };

let response = self
.client
.clone()
.get_task_status(Request::new(request))
.await;

if let Err(error) = response {
if error.code() == tonic::Code::NotFound {
return Ok(TaskStatus::Failed);
}

return Err(error.into());
}

let status = match response.unwrap().into_inner().status {
1 => TaskStatus::Pending,
2 => TaskStatus::Complete,
3 => TaskStatus::Failed,
_ => anyhow::bail!("Received invalid task status"),
};

Ok(status)
Expand Down
43 changes: 37 additions & 6 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crate::redis::RedisClient;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum ProvisionedState {
Unprovisioned,
Provisioning,
Provisioning { task_id: String },
Provisioned,
Deprovisioning { task_id: String },
Failed,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Failed sufficient to redress the situation? Failure in Provisioning is different from failure in deprovisioning. Especially since the enum indicates there's no additional data stored.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - but at this point it isn't my goal, I will address redundancy in future work.

}

Expand All @@ -31,13 +32,17 @@ pub struct IndexerState {
pub provisioned_state: ProvisionedState,
}

// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to
// construct the state key without it. But, this isn't ideal as we now have two places which
// define this key - we need to consolidate these somehow.
impl IndexerState {
// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to
// construct the state key without it. But, this isn't ideal as we now have two places which
// define this key - we need to consolidate these somehow.
pub fn get_state_key(&self) -> String {
format!("{}/{}:state", self.account_id, self.function_name)
}

pub fn get_redis_stream_key(&self) -> String {
format!("{}/{}:block_stream", self.account_id, self.function_name)
}
}

#[cfg(not(test))]
Expand Down Expand Up @@ -105,6 +110,12 @@ impl IndexerStateManagerImpl {
return Ok(serde_json::from_str(&raw_state)?);
}

tracing::info!(
account_id = indexer_config.account_id.to_string(),
function_name = indexer_config.function_name.as_str(),
"Creating new state using default"
);

Ok(self.get_default_state(indexer_config))
}

Expand Down Expand Up @@ -134,10 +145,30 @@ impl IndexerStateManagerImpl {
Ok(())
}

pub async fn set_provisioning(&self, indexer_config: &IndexerConfig) -> anyhow::Result<()> {
pub async fn set_deprovisioning(
&self,
indexer_state: &IndexerState,
task_id: String,
) -> anyhow::Result<()> {
let mut state = indexer_state.clone();

state.provisioned_state = ProvisionedState::Deprovisioning { task_id };

self.redis_client
.set(state.get_state_key(), serde_json::to_string(&state)?)
.await?;

Ok(())
}

pub async fn set_provisioning(
&self,
indexer_config: &IndexerConfig,
task_id: String,
) -> anyhow::Result<()> {
let mut indexer_state = self.get_state(indexer_config).await?;

indexer_state.provisioned_state = ProvisionedState::Provisioning;
indexer_state.provisioned_state = ProvisionedState::Provisioning { task_id };

self.set_state(indexer_config, indexer_state).await?;

Expand Down
4 changes: 4 additions & 0 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ mockall::mock! {
K: ToRedisArgs + Debug + Send + Sync + 'static,
V: ToRedisArgs + Debug + Send + Sync + 'static;

pub async fn del<K>(&self, key: K) -> anyhow::Result<()>
where
K: ToRedisArgs + Debug + Send + Sync + 'static;

pub async fn indexer_states_set_exists(&self) -> anyhow::Result<bool>;

pub async fn sadd<S, V>(&self, set: S, value: V) -> anyhow::Result<()>
Expand Down
Loading
Loading