Skip to content

Commit

Permalink
feat: configurable drain ts per pool (#684)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Apr 18, 2024
1 parent 2e50434 commit f88c457
Show file tree
Hide file tree
Showing 29 changed files with 123 additions and 108 deletions.
5 changes: 2 additions & 3 deletions lib/bolt/config/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,6 @@ pub struct ProvisioningDatacenter {
pub provider_datacenter_name: String,
#[serde(default)]
pub build_delivery_method: ProvisioningBuildDeliveryMethod,
/// Nomad drain time in seconds.
pub drain_timeout: u32,

#[serde(default)]
pub pools: HashMap<ProvisioningDatacenterPoolType, ProvisioningDatacenterPool>,
}
Expand Down Expand Up @@ -671,6 +668,8 @@ pub struct ProvisioningDatacenterPool {
pub hardware: Vec<ProvisioningDatacenterHardware>,
pub desired_count: u32,
pub max_count: u32,
/// Server drain time in ms.
pub drain_timeout: u64,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions proto/backend/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ message Datacenter {

repeated Pool pools = 8;
BuildDeliveryMethod build_delivery_method = 9;
// Nomad drain time in seconds.
uint64 drain_timeout = 10;
}

message Pool {
Expand All @@ -40,6 +38,8 @@ message Pool {
repeated Hardware hardware = 2;
uint32 desired_count = 3;
uint32 max_count = 4;
// Server drain timeout In ms
uint64 drain_timeout = 5;
}

enum PoolType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ CREATE TABLE datacenters (
provider_api_token TEXT,
pools BYTES NOT NULL, -- rivet.backend.pkg.cluster.msg.datacenter_create.Pools
build_delivery_method INT NOT NULL,
drain_timeout INT NOT NULL,
create_ts INT NOT NULL,

UNIQUE (cluster_id, name_id),
Expand Down
3 changes: 0 additions & 3 deletions svc/pkg/cluster/ops/datacenter-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ struct Datacenter {
provider_api_token: Option<String>,
pools: Vec<u8>,
build_delivery_method: i64,
drain_timeout: i64,
}

impl TryFrom<Datacenter> for backend::cluster::Datacenter {
Expand All @@ -35,7 +34,6 @@ impl TryFrom<Datacenter> for backend::cluster::Datacenter {
provider_api_token: value.provider_api_token,
pools,
build_delivery_method: value.build_delivery_method as i32,
drain_timeout: value.drain_timeout as u64,
})
}
}
Expand Down Expand Up @@ -63,7 +61,6 @@ pub async fn handle(
provider_api_token,
pools,
build_delivery_method,
drain_timeout,
create_ts
FROM db_cluster.datacenters
WHERE datacenter_id = ANY($1)
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/ops/datacenter-get/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ async fn empty(ctx: TestCtx) {
pools: Vec::new(),

build_delivery_method: backend::cluster::BuildDeliveryMethod::TrafficServer as i32,
drain_timeout: 0,
})
.await
.unwrap();
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/ops/datacenter-list/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ async fn empty(ctx: TestCtx) {
pools: Vec::new(),

build_delivery_method: backend::cluster::BuildDeliveryMethod::TrafficServer as i32,
drain_timeout: 0,
})
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ async fn basic(ctx: TestCtx) {
pools: Vec::new(),

build_delivery_method: backend::cluster::BuildDeliveryMethod::TrafficServer as i32,
drain_timeout: 0,
})
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ async fn empty(ctx: TestCtx) {
pools: Vec::new(),

build_delivery_method: backend::cluster::BuildDeliveryMethod::TrafficServer as i32,
drain_timeout: 0,
})
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ async fn empty(ctx: TestCtx) {
pools: Vec::new(),

build_delivery_method: backend::cluster::BuildDeliveryMethod::TrafficServer as i32,
drain_timeout: 0,
})
.await
.unwrap();
Expand Down
7 changes: 3 additions & 4 deletions svc/pkg/cluster/standalone/default-update/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ struct Datacenter {
provider_datacenter_name: String,
pools: HashMap<PoolType, Pool>,
build_delivery_method: BuildDeliveryMethod,
drain_timeout: u64,
}

#[derive(Deserialize)]
Expand All @@ -41,6 +40,7 @@ struct Pool {
hardware: Vec<Hardware>,
desired_count: u32,
max_count: u32,
drain_timeout: u64,
}

#[derive(Deserialize, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -205,15 +205,14 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> {
.collect::<Vec<_>>(),
desired_count,
max_count: Some(pool.max_count),
drain_timeout: Some(pool.drain_timeout),
}
})
.collect::<Vec<_>>();

msg!([ctx] @wait cluster::msg::datacenter_update(datacenter.datacenter_id) {
datacenter_id: datacenter_id_proto,
pools: new_pools,
// Convert from seconds to ms
drain_timeout: Some(datacenter.drain_timeout * 1000),
})
.await?;
}
Expand All @@ -235,11 +234,11 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> {
hardware: pool.hardware.into_iter().map(Into::into).collect::<Vec<_>>(),
desired_count: pool.desired_count,
max_count: pool.max_count,
drain_timeout: pool.drain_timeout,
}
}).collect::<Vec<_>>(),

build_delivery_method: Into::<backend::cluster::BuildDeliveryMethod>::into(datacenter.build_delivery_method) as i32,
drain_timeout: datacenter.drain_timeout,
})
.await?;
}
Expand Down
111 changes: 84 additions & 27 deletions svc/pkg/cluster/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::collections::HashMap;

use futures_util::FutureExt;
use proto::backend::{self, pkg::*};
use rivet_operation::prelude::*;

#[derive(sqlx::FromRow)]
struct ServerRow {
server_id: Uuid,
datacenter_id: Uuid,
pool_type: i64,
drain_ts: i64,
}

#[tracing::instrument(skip_all)]
pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()> {
let client = chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("cluster-gc");
Expand All @@ -18,34 +25,84 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
(),
Vec::new(),
);
let crdb = ctx.crdb().await?;

// Update all draining gg and ats servers that have completed draining
let datacenter_rows = sql_fetch_all!(
[ctx, (Uuid,), &crdb]
"
WITH updated AS (
UPDATE db_cluster.servers AS s
SET drain_complete_ts = $2
FROM db_cluster.datacenters AS d
WHERE
s.datacenter_id = d.datacenter_id AND
pool_type = ANY($1) AND
cloud_destroy_ts IS NULL AND
drain_ts IS NOT NULL AND
drain_ts < $2 - d.drain_timeout
RETURNING s.datacenter_id
)
SELECT DISTINCT datacenter_id
FROM updated
",
&[backend::cluster::PoolType::Gg as i64, backend::cluster::PoolType::Ats as i64],
ts,
)

let datacenter_ids = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let ctx = ctx.base();

async move {
// Select all draining gg and ats servers
let servers = sql_fetch_all!(
[ctx, ServerRow, @tx tx]
"
SELECT server_id, datacenter_id, pool_type, drain_ts
FROM db_cluster.datacenters AS d
WHERE
s.datacenter_id = d.datacenter_id AND
pool_type = ANY($1) AND
cloud_destroy_ts IS NULL AND
drain_ts IS NOT NULL
",
&[backend::cluster::PoolType::Gg as i64, backend::cluster::PoolType::Ats as i64],
ts,
)
.await?;

let datacenters_res = op!([ctx] cluster_datacenter_get {
datacenter_ids: servers
.iter()
.map(|server| server.datacenter_id.into())
.collect::<Vec<_>>(),
})
.await?;

let drained_servers = servers
.into_iter()
.map(|server| {
let dc_id_proto = Some(server.datacenter_id.into());
let datacenter = unwrap!(datacenters_res
.datacenters
.iter()
.find(|dc| dc.datacenter_id == dc_id_proto));
let pool = unwrap!(datacenter
.pools
.iter()
.find(|pool| pool.pool_type == server.pool_type as i32));
let drain_completed = server.drain_ts < ts - pool.drain_timeout as i64;

Ok((server, drain_completed))
})
.filter(|res| {
res.as_ref()
.map_or(true, |(_, drain_completed)| *drain_completed)
})
.collect::<GlobalResult<Vec<_>>>()?;

// Update servers that have completed draining
sql_execute!(
[ctx, @tx tx]
"
UPDATE db_cluster.servers
SET drain_complete_ts = $2
WHERE
server_id = ANY($1) AND
cloud_destroy_ts IS NULL
",
drained_servers.iter().map(|(server, _)| server.server_id).collect::<Vec<_>>(),
ts,
)
.await?;

Ok(drained_servers
.into_iter()
.map(|(server, _)| server.datacenter_id)
.collect::<Vec<_>>())
}
.boxed()
})
.await?;

// Scale
for (datacenter_id,) in datacenter_rows {
for datacenter_id in datacenter_ids {
msg!([ctx] cluster::msg::datacenter_scale(datacenter_id) {
datacenter_id: Some(datacenter_id.into()),
})
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/cluster/standalone/gc/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async fn setup(
}],
desired_count: 0,
max_count: 0,
drain_timeout: DRAIN_TIMEOUT as u64,
}];
let provider = backend::cluster::Provider::Linode;

Expand All @@ -139,7 +140,6 @@ async fn setup(
pools: pools.clone(),

build_delivery_method: backend::cluster::BuildDeliveryMethod::TrafficServer as i32,
drain_timeout: DRAIN_TIMEOUT as u64,
})
.await
.unwrap();
Expand Down
2 changes: 0 additions & 2 deletions svc/pkg/cluster/types/msg/datacenter-create.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ message Message {

repeated rivet.backend.cluster.Pool pools = 8;
rivet.backend.cluster.BuildDeliveryMethod build_delivery_method = 9;
// Nomad drain time in seconds.
uint64 drain_timeout = 10;
}

// Helper proto for writing to sql
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/cluster/types/msg/datacenter-update.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import "proto/backend/cluster.proto";
message Message {
rivet.common.Uuid datacenter_id = 1;
repeated PoolUpdate pools = 2;
optional uint64 drain_timeout = 3;
}

message PoolUpdate {
Expand All @@ -22,4 +21,5 @@ message PoolUpdate {
repeated rivet.backend.cluster.Hardware hardware = 2;
optional uint32 desired_count = 3;
optional uint32 max_count = 4;
optional uint64 drain_timeout = 5;
}
1 change: 1 addition & 0 deletions svc/pkg/cluster/types/msg/server-install-complete.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "proto/backend/cluster.proto";
/// ]
message Message {
rivet.common.Uuid request_id = 1;

string public_ip = 2;
rivet.common.Uuid datacenter_id = 3;
// If set in server install message
Expand Down
1 change: 1 addition & 0 deletions svc/pkg/cluster/types/msg/server-install.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "proto/backend/cluster.proto";
/// ]
message Message {
rivet.common.Uuid request_id = 1;

string public_ip = 2;
rivet.backend.cluster.PoolType pool_type = 3;

Expand Down
4 changes: 1 addition & 3 deletions svc/pkg/cluster/worker/src/workers/datacenter_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ async fn worker(
provider_api_token,
pools,
build_delivery_method,
drain_timeout,
create_ts
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
",
datacenter_id,
cluster_id,
Expand All @@ -56,7 +55,6 @@ async fn worker(
&ctx.provider_api_token,
pools_buf,
ctx.build_delivery_method as i64,
ctx.drain_timeout as i64,
util::timestamp::now(),
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/cluster/worker/src/workers/datacenter_scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async fn inner(
// Sort job servers by memory usage
servers.sort_unstable_by_key(|server| memory_by_server.get(&server.server_id));

// TODO: Sort gg and ats servers by cpu usage
// TODO: RVT-3732 Sort gg and ats servers by cpu usage
// servers.sort_by_key

let mut msgs = Vec::new();
Expand Down
Loading

0 comments on commit f88c457

Please sign in to comment.