Skip to content

Commit

Permalink
chore: update start_ts to be set when networking is ready (#1062)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
NathanFlurry committed Aug 16, 2024
1 parent 4e8185b commit 22b3fec
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 55 deletions.
1 change: 1 addition & 0 deletions fern/definition/servers/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ types:
lifecycle: Lifecycle
created_at: long
started_at: optional<long>
connectable_at: optional<long>
destroyed_at: optional<long>

Runtime:
Expand Down
2 changes: 1 addition & 1 deletion lib/convert/src/impls/ds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl ApiTryFrom<backend::ds::Server> for models::ServersServer {
datacenter: unwrap!(value.datacenter_id).as_uuid(),
cluster: unwrap!(value.cluster_id).as_uuid(),
created_at: value.create_ts,
started_at: value.start_ts,
started_at: value.connectable_ts,
destroyed_at: value.destroy_ts,
tags: Some(to_value(value.tags).unwrap()),
runtime: Box::new(models::ServersRuntime {
Expand Down
1 change: 1 addition & 0 deletions proto/backend/ds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message Server {
int64 kill_timeout_ms = 7;
int64 create_ts = 9;
optional int64 start_ts = 10;
optional int64 connectable_ts = 17;
optional int64 destroy_ts = 11;
rivet.common.Uuid image_id = 12;
repeated string args = 13;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ pub async fn build_ds(
docker_ports_protocol_game_guard.port_name,
docker_ports_protocol_game_guard.protocol
FROM
db_dynamic_servers.internal_ports
db_ds.internal_ports
JOIN
db_dynamic_servers.servers
db_ds.servers
ON
internal_ports.server_id = servers.server_id
JOIN
db_dynamic_servers.docker_ports_protocol_game_guard
db_ds.docker_ports_protocol_game_guard
ON
internal_ports.server_id = docker_ports_protocol_game_guard.server_id
AND
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/db/servers/Service.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[service]
name = "db-dynamic-servers"
name = "db-ds"

[runtime]
kind = "crdb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CREATE TABLE servers (

create_ts INT NOT NULL,
start_ts INT,
connectable_ts INT,
stop_ts INT,
finish_ts INT,
cleanup_ts INT,
Expand All @@ -29,7 +30,6 @@ CREATE TABLE servers (
INDEX (env_id)
);


CREATE TABLE docker_ports_protocol_game_guard (
server_id UUID NOT NULL REFERENCES servers,
port_name STRING NOT NULL,
Expand Down
10 changes: 5 additions & 5 deletions svc/pkg/ds/db/servers/migrations/20240809224504_add_idx.up.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

CREATE INDEX ON db_dynamic_servers.servers (datacenter_id, stop_ts);
CREATE INDEX ON servers (datacenter_id, stop_ts);

CREATE INDEX ON db_dynamic_servers.server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts);
DROP INDEX db_dynamic_servers.server_nomad@server_nomad_nomad_dispatched_job_id_idx;
CREATE INDEX ON server_nomad (nomad_dispatched_job_id) STORING (nomad_alloc_plan_ts);
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;

CREATE INDEX ON db_dynamic_servers.server_nomad (nomad_dispatched_job_id)
CREATE INDEX ON server_nomad (nomad_dispatched_job_id)
STORING (
nomad_alloc_id,
nomad_node_id,
Expand All @@ -15,4 +15,4 @@ STORING (
nomad_node_public_ipv4,
nomad_node_vlan_ipv4
);
DROP INDEX db_dynamic_servers.server_nomad@server_nomad_nomad_dispatched_job_id_idx;
DROP INDEX server_nomad@server_nomad_nomad_dispatched_job_id_idx;
2 changes: 1 addition & 1 deletion svc/pkg/ds/ops/server-create/Service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ kind = "rust"
[operation]

[databases]
db-dynamic-servers = {}
db-ds = {}
15 changes: 8 additions & 7 deletions svc/pkg/ds/ops/server-create/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ async fn bind_with_retries(
"
SELECT EXISTS(
SELECT 1
FROM db_dynamic_servers.servers as r
JOIN db_dynamic_servers.docker_ports_protocol_game_guard as p
FROM db_ds.servers as r
JOIN db_ds.docker_ports_protocol_game_guard as p
ON r.server_id = p.server_id
WHERE
r.cleanup_ts IS NULL AND
Expand Down Expand Up @@ -183,7 +183,7 @@ pub async fn handle(
WITH
servers_cte AS (
INSERT INTO
db_dynamic_servers.servers (
db_ds.servers (
server_id,
env_id,
datacenter_id,
Expand All @@ -205,7 +205,7 @@ pub async fn handle(
),
docker_ports_host_cte AS (
INSERT INTO
db_dynamic_servers.docker_ports_host (
db_ds.docker_ports_host (
server_id,
port_name,
port_number
Expand All @@ -220,7 +220,7 @@ pub async fn handle(
),
docker_ports_protocol_game_guard_cte AS (
INSERT INTO
db_dynamic_servers.docker_ports_protocol_game_guard (
db_ds.docker_ports_protocol_game_guard (
server_id,
port_name,
port_number,
Expand Down Expand Up @@ -1292,7 +1292,7 @@ pub async fn handle(
[ctx]
"
INSERT INTO
db_dynamic_servers.server_nomad (server_id)
db_ds.server_nomad (server_id)
VALUES
($1)
",
Expand Down Expand Up @@ -1339,7 +1339,7 @@ pub async fn handle(
[ctx]
"
UPDATE
db_dynamic_servers.server_nomad
db_ds.server_nomad
SET
nomad_dispatched_job_id = $2
WHERE
Expand Down Expand Up @@ -1400,6 +1400,7 @@ pub async fn handle(
kill_timeout_ms: ctx.kill_timeout_ms,
create_ts,
start_ts: None,
connectable_ts: None,
destroy_ts: None,
args: ctx.args.clone(),
environment: ctx.environment.clone(),
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/ops/server-delete/Service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ kind = "rust"
[operation]

[databases]
db-dynamic-servers = {}
db-ds = {}
8 changes: 4 additions & 4 deletions svc/pkg/ds/ops/server-delete/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub async fn handle(
let dynamic_server = sql_fetch_one!(
[ctx, UpdatedServer, @tx tx]
"
UPDATE db_dynamic_servers.servers
UPDATE db_ds.servers
SET delete_ts = $2
WHERE
server_id = $1
Expand All @@ -35,11 +35,11 @@ pub async fn handle(
server_nomad.nomad_dispatched_job_id,
server_nomad.nomad_alloc_id,
FROM
db_dynamic_servers.servers
db_ds.servers
JOIN
db_dynamic_servers.server_nomad
db_ds.server_nomad
ON
db_dynamic_servers.servers.server_id = db_dynamic_servers.server_nomad.server_id
db_ds.servers.server_id = db_ds.server_nomad.server_id
",
server_id,
ctx.ts(),
Expand Down
13 changes: 8 additions & 5 deletions svc/pkg/ds/ops/server-get/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct Server {
kill_timeout_ms: i64,
create_ts: i64,
start_ts: Option<i64>,
connectable_ts: Option<i64>,
destroy_ts: Option<i64>,
image_id: Uuid,
args: Vec<String>,
Expand Down Expand Up @@ -84,13 +85,14 @@ pub async fn handle(
kill_timeout_ms,
create_ts,
start_ts,
connectable_ts,
destroy_ts,
image_id,
args,
network_mode,
environment
FROM
db_dynamic_servers.servers
db_ds.servers
WHERE
server_id = ANY($1)
",
Expand All @@ -106,7 +108,7 @@ pub async fn handle(
gg_port,
protocol
FROM
db_dynamic_servers.docker_ports_protocol_game_guard
db_ds.docker_ports_protocol_game_guard
WHERE
server_id = ANY($1)
",
Expand All @@ -121,7 +123,7 @@ pub async fn handle(
port_number,
protocol
FROM
db_dynamic_servers.docker_ports_host
db_ds.docker_ports_host
WHERE
server_id = ANY($1)
",
Expand All @@ -140,7 +142,7 @@ pub async fn handle(
nomad_node_vlan_ipv4,
nomad_alloc_plan_ts
FROM
db_dynamic_servers.server_nomad
db_ds.server_nomad
WHERE
server_id = ANY($1)
",
Expand All @@ -155,7 +157,7 @@ pub async fn handle(
nomad_ip,
nomad_source
FROM
db_dynamic_servers.internal_ports
db_ds.internal_ports
WHERE
server_id = ANY($1)
",
Expand Down Expand Up @@ -223,6 +225,7 @@ pub async fn handle(
network_ports: ports,
create_ts: server.create_ts,
start_ts: server.start_ts,
connectable_ts: server.connectable_ts,
destroy_ts: server.destroy_ts,
};

Expand Down
4 changes: 2 additions & 2 deletions svc/pkg/ds/ops/server-list-for-env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ async fn handle(
"
WITH after_server AS (
SELECT create_ts, server_id
FROM db_dynamic_servers.servers
FROM db_ds.servers
WHERE server_id = $4
)
SELECT server_id
FROM db_dynamic_servers.servers
FROM db_ds.servers
WHERE
env_id = $1
AND tags @> $2
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/worker/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn webhook_call(
// SELECT
// server_id
// FROM
// db_dynamic_servers.server_nomad
// db_ds.server_nomad
// WHERE
// nomad_alloc_id = $1
// ",
Expand Down
42 changes: 28 additions & 14 deletions svc/pkg/ds/worker/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ use chirp_worker::prelude::*;
use proto::backend::{self, pkg::*};
use redis::AsyncCommands;
use serde::Deserialize;
use std::time::Duration;

use crate::workers::NEW_NOMAD_CONFIG;

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(3);

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct PlanResult {
Expand All @@ -16,6 +20,7 @@ struct RunRow {
server_id: Uuid,
datacenter_id: Uuid,
stop_ts: Option<i64>,
connectable_ts: Option<i64>,
nomad_alloc_plan_ts: Option<i64>, // this was nomad_plan_ts
}

Expand Down Expand Up @@ -92,10 +97,8 @@ async fn worker(
tracing::info!("no network returned");
}

// This works
tracing::info!(?ports, "found protsadf");

// {"timestamp":"2024-06-28T01:43:24.930496Z","level":"INFO","fields":{"message":"found protsadf","ports":"[Port { label: \"game_testing2\", source: 20202, target: 0, ip: \"10.0.50.97\" }]"},"target":"ds_worker::workers::nomad_monitor_alloc_plan","spans":[{"ray_id":"1c8bfa81-3c80-4a2c-ab7c-2655f6c6a665","req_id":"a44227ad-4f1a-44b8-b4d0-7746dd8a622e","worker_name":"monolith-worker--ds-nomad-monitor-alloc-plan","name":"handle_req"},{"name":"ds-nomad-monitor-alloc-plan","tick_index":0,"name":"handle"}]}
// Wait for Traefik to be ready
tokio::time::sleep(TRAEFIK_GRACE_PERIOD).await;

// Fetch the run
//
Expand All @@ -110,7 +113,6 @@ async fn worker(
nomad_node_vlan_ipv4: unwrap!(meta.remove("network-vlan-ipv4")),
ports: ports.clone(),
};
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let db_output = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
let ctx = ctx.clone();
let now = ctx.ts();
Expand Down Expand Up @@ -171,11 +173,12 @@ async fn update_db(
servers.server_id,
servers.datacenter_id,
servers.stop_ts,
servers.connectable_ts,
server_nomad.nomad_alloc_plan_ts
FROM
db_dynamic_servers.server_nomad
db_ds.server_nomad
INNER JOIN
db_dynamic_servers.servers
db_ds.servers
ON
servers.server_id = server_nomad.server_id
WHERE
Expand All @@ -186,9 +189,6 @@ async fn update_db(
&job_id,
)
.await?;
tracing::info!(?job_id, "checking jobid");

tracing::info!(?run_row, "ayy event2a");

// Check if run found
let run_row = if let Some(run_row) = run_row {
Expand All @@ -199,7 +199,21 @@ async fn update_db(
};
let server_id = run_row.server_id;

tracing::info!("ayy event2b");
if run_row.connectable_ts.is_some() {
tracing::warn!("connectable ts already set");
} else {
sql_execute!(
[ctx, @tx tx]
"
UPDATE db_ds.servers
SET connectable_ts = $2
WHERE server_id = $1
",
server_id,
now,
)
.await?;
}

// Write run meta on first plan
if run_row.nomad_alloc_plan_ts.is_none() {
Expand All @@ -208,14 +222,14 @@ async fn update_db(
[ctx, @tx tx]
"
UPDATE
db_dynamic_servers.server_nomad
db_ds.server_nomad
SET
nomad_alloc_id = $2,
nomad_alloc_plan_ts = $3,
nomad_node_id = $4,
nomad_node_name = $5,
nomad_node_public_ipv4 = $6,
nomad_node_vlan_ipv4 = $7
nomad_node_vlan_ipv4 = $
WHERE
server_id = $1
",
Expand All @@ -238,7 +252,7 @@ async fn update_db(
[ctx, @tx tx]
"
INSERT INTO
db_dynamic_servers.internal_ports (
db_ds.internal_ports (
server_id,
nomad_label,
nomad_source,
Expand Down
Loading

0 comments on commit 22b3fec

Please sign in to comment.