Skip to content

Commit

Permalink
feat(ds): shard server workflow in two (#1157)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->
Fixes RVTEE-610
## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Oct 9, 2024
1 parent 4ed45c4 commit 8329433
Show file tree
Hide file tree
Showing 17 changed files with 1,595 additions and 1,429 deletions.
1 change: 1 addition & 0 deletions svc/pkg/cluster/src/workflows/server/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ async fn drain_pegboard_client(
)
.await?;

// Drain dynamic servers
if let Some(pegboard_client_id) = pegboard_client_id {
msg!([ctx] ds::msg::drain_all(&pegboard_client_id) {
nomad_node_id: None,
Expand Down
28 changes: 22 additions & 6 deletions svc/pkg/cluster/src/workflows/server/undrain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
};
use rivet_operation::prelude::proto::backend::pkg::mm;
use rivet_operation::prelude::proto::backend::pkg::*;

use crate::types::PoolType;

Expand Down Expand Up @@ -40,7 +40,7 @@ pub(crate) async fn cluster_server_undrain(
PoolType::Ats => {}
PoolType::Pegboard => {
let pegboard_client_id = ctx
.activity(GetPegboardClientInput {
.activity(UndrainPegboardClientInput {
server_id: input.server_id,
})
.await?;
Expand Down Expand Up @@ -113,20 +113,27 @@ async fn undrain_node(ctx: &ActivityCtx, input: &UndrainNodeInput) -> GlobalResu
is_closed: false,
})
.await?;

// Undrain dynamic servers
msg!([ctx] ds::msg::undrain_all(&nomad_node_id) {
nomad_node_id: Some(nomad_node_id.clone()),
pegboard_client_id: None,
})
.await?;
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct GetPegboardClientInput {
struct UndrainPegboardClientInput {
server_id: Uuid,
}

#[activity(GetPegboardClient)]
async fn get_pegboard_client(
#[activity(UndrainPegboardClient)]
async fn undrain_pegboard_client(
ctx: &ActivityCtx,
input: &GetPegboardClientInput,
input: &UndrainPegboardClientInput,
) -> GlobalResult<Option<Uuid>> {
let (pegboard_client_id,) = sql_fetch_one!(
[ctx, (Option<Uuid>,)]
Expand All @@ -139,5 +146,14 @@ async fn get_pegboard_client(
)
.await?;

// Undrain dynamic servers
if let Some(pegboard_client_id) = pegboard_client_id {
msg!([ctx] ds::msg::undrain_all(&pegboard_client_id) {
nomad_node_id: None,
pegboard_client_id: Some(pegboard_client_id.into()),
})
.await?;
}

Ok(pegboard_client_id)
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE game_config (
game_id UUID PRIMARY KEY,
host_networking_enabled BOOLEAN NOT NULL DEFAULT FALSE,
root_user_enabled BOOLEAN NOT NULL DEFAULT FALSE,
client INT NOT NULL, -- ds::types::GameClient
);
2 changes: 1 addition & 1 deletion svc/pkg/ds/proto/msg/drain-all.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "proto/common.proto";

/// name = "msg-ds-drain-all"
/// parameters = [
/// { name = "nomad_node_id" },
/// { name = "client_id" },
/// ]
message Message {
optional string nomad_node_id = 1;
Expand Down
14 changes: 14 additions & 0 deletions svc/pkg/ds/proto/msg/undrain-all.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package rivet.backend.pkg.ds.msg.undrain_all;

import "proto/common.proto";

/// name = "msg-ds-undrain-all"
/// parameters = [
/// { name = "client_id" },
/// ]
message Message {
optional string nomad_node_id = 1;
optional string pegboard_client_id = 3;
}
11 changes: 7 additions & 4 deletions svc/pkg/ds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ pub fn registry() -> WorkflowResult<Registry> {

let mut registry = Registry::new();
registry.register_workflow::<server::Workflow>()?;
registry.register_workflow::<server::destroy::Workflow>()?;
registry.register_workflow::<server::nomad_alloc_plan::Workflow>()?;
registry.register_workflow::<server::nomad_alloc_update::Workflow>()?;
registry.register_workflow::<server::nomad_eval_update::Workflow>()?;
registry.register_workflow::<server::nomad::Workflow>()?;
registry.register_workflow::<server::nomad::destroy::Workflow>()?;
registry.register_workflow::<server::nomad::alloc_plan::Workflow>()?;
registry.register_workflow::<server::nomad::alloc_update::Workflow>()?;
registry.register_workflow::<server::nomad::eval_update::Workflow>()?;
registry.register_workflow::<server::nomad::eval_update::Workflow>()?;
// registry.register_workflow::<server::pegboard::Workflow>()?;

Ok(registry)
}
6 changes: 6 additions & 0 deletions svc/pkg/ds/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ pub enum HostProtocol {
Udp = 1,
}

#[derive(Serialize, Deserialize, Hash, Debug, Clone, Copy, PartialEq, Eq, FromRepr)]
pub enum GameClient {
Nomad = 0,
Pegboard = 1,
}

// Move to build pkg when migrated to workflows
pub mod build {
use serde::{Deserialize, Serialize};
Expand Down
26 changes: 10 additions & 16 deletions svc/pkg/ds/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalRe

let server_rows = if let Some(nomad_node_id) = &ctx.nomad_node_id {
sql_fetch_all!(
[ctx, (Uuid, i64)]
[ctx, (Uuid,)]
"
SELECT s.server_id, s.kill_timeout_ms
SELECT s.server_id
FROM db_ds.servers AS s
JOIN db_ds.server_nomad AS sn
ON s.server_id = sn.server_id
Expand All @@ -24,9 +24,9 @@ async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalRe
.await?
} else if let Some(pegboard_client_id) = &ctx.pegboard_client_id {
sql_fetch_all!(
[ctx, (Uuid, i64)]
[ctx, (Uuid,)]
"
SELECT s.server_id, s.kill_timeout_ms
SELECT s.server_id
FROM db_ds.servers AS s
JOIN db_ds.servers_pegboard AS spb
ON s.server_id = spb.server_id
Expand All @@ -41,18 +41,12 @@ async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalRe
bail!("neither `nomad_node_id` nor `pegboard_client_id` set");
};

for (server_id, kill_timeout_ms) in server_rows {
chirp_workflow::compat::signal(
ctx,
crate::workflows::server::Destroy {
override_kill_timeout_ms: (drain_timeout < kill_timeout_ms)
.then_some(drain_timeout),
},
)
.await?
.tag("server_id", server_id)
.send()
.await?;
for (server_id,) in server_rows {
chirp_workflow::compat::signal(ctx, crate::workflows::server::Drain { drain_timeout })
.await?
.tag("server_id", server_id)
.send()
.await?;
}

Ok(())
Expand Down
Loading

0 comments on commit 8329433

Please sign in to comment.