Skip to content

Commit

Permalink
fix(ds): fix logs (#1074)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
NathanFlurry committed Aug 16, 2024
1 parent ec498fb commit 21dbd6c
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 48 deletions.
34 changes: 13 additions & 21 deletions infra/tf/vector/vector.tf
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,30 @@ resource "helm_release" "vector" {
vector_metrics = {
type = "internal_metrics"
}

vector_logs = {
type = "internal_logs"
}
}
transforms = {
job_run = {
dynamic_servers = {
type = "filter"
inputs = ["vector", "tcp_json"]
condition = {
type = "vrl"
source = ".source == \"job_run\""
source = ".source == \"dynamic_servers\""
}
}

dynamic_servers = {
job_run = {
type = "filter"
inputs = ["vector", "tcp_json"]
condition = {
type = "vrl"
source = ".source == \"dynamic_servers\""
source = ".source == \"job_run\""
}
}

ds_fix_id = {
type = "remap"
inputs = ["dynamic_servers"]
source = <<-EOF
.server_id = .run_id
del(.run_id)
EOF
}

backend_worker = {
type = "filter"
inputs = ["http_json"]
Expand All @@ -116,13 +108,13 @@ resource "helm_release" "vector" {
address = "0.0.0.0:9598"
}

clickhouse_job_run_logs = {
clickhouse_ds_logs = {
type = "clickhouse"
inputs = ["job_run"]
inputs = ["dynamic_servers"]
compression = "gzip"
database = "db_job_log"
database = "db_ds_log"
endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}"
table = "run_logs"
table = "server_logs"
auth = {
strategy = "basic"
user = "vector"
Expand All @@ -138,13 +130,13 @@ resource "helm_release" "vector" {
}
}

clickhouse_ds_logs = {
clickhouse_job_run_logs = {
type = "clickhouse"
inputs = ["ds_fix_id"]
inputs = ["job_run"]
compression = "gzip"
database = "db_ds_log"
database = "db_job_log"
endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}"
table = "server_logs"
table = "run_logs"
auth = {
strategy = "basic"
user = "vector"
Expand Down
5 changes: 5 additions & 0 deletions lib/job-runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
pub mod log_shipper;
pub mod throttle;

pub enum Manager {
DynamicServers { server_id: String },
JobRun { run_id: String },
}
50 changes: 34 additions & 16 deletions lib/job-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ pub struct LogShipper {
/// trying to send to this channel.
pub msg_rx: mpsc::Receiver<ReceivedMessage>,

pub job_run_id: String,
pub nomad_task_name: String,
pub runner: String,
pub manager: crate::Manager,
}

impl LogShipper {
Expand Down Expand Up @@ -89,13 +88,21 @@ impl LogShipper {
println!("Log shipper connected");

while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = VectorMessage {
source: self.runner.as_str(),
run_id: self.job_run_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
let vector_message = match &self.manager {
crate::Manager::DynamicServers { server_id } => VectorMessage::DynamicServers {
server_id: server_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
},
crate::Manager::JobRun { run_id } => VectorMessage::JobRun {
run_id: run_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: message.message.as_str(),
},
};

serde_json::to_writer(&mut stream, &vector_message)?;
Expand All @@ -110,11 +117,22 @@ impl LogShipper {

/// Vector-compatible message format
#[derive(Serialize)]
struct VectorMessage<'a> {
source: &'a str,
run_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
#[serde(tag = "source")]
enum VectorMessage<'a> {
#[serde(rename = "dynamic_servers")]
DynamicServers {
server_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
},
#[serde(rename = "job_run")]
JobRun {
run_id: &'a str,
task: &'a str,
stream_type: u8,
ts: u64,
message: &'a str,
},
}
14 changes: 10 additions & 4 deletions lib/job-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ const MAX_PREVIEW_LINES: usize = 128;

fn main() -> anyhow::Result<()> {
let nomad_alloc_dir = std::env::var("NOMAD_ALLOC_DIR").context("NOMAD_ALLOC_DIR")?;
let job_run_id = std::env::var("NOMAD_META_job_run_id").context("NOMAD_META_job_run_id")?;
let nomad_task_name = std::env::var("NOMAD_TASK_NAME").context("NOMAD_TASK_NAME")?;
let root_user_enabled = std::env::var("NOMAD_META_root_user_enabled")
.context("NOMAD_META_root_user_enabled")?
== "1";
let runner = std::env::var("NOMAD_META_runner").unwrap_or("job_run".to_string());

let manager = match std::env::var("NOMAD_META_manager").ok() {
Some(x) if x == "dynamic_servers" => job_runner::Manager::DynamicServers {
server_id: std::env::var("NOMAD_META_server_id").context("NOMAD_META_server_id")?,
},
_ => job_runner::Manager::JobRun {
run_id: std::env::var("NOMAD_META_job_run_id").context("NOMAD_META_job_run_id")?,
},
};

let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);

Expand All @@ -38,9 +45,8 @@ fn main() -> anyhow::Result<()> {
let log_shipper = log_shipper::LogShipper {
shutdown_rx,
msg_rx,
job_run_id,
nomad_task_name,
runner,
manager,
};
let log_shipper_thread = log_shipper.spawn();

Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/ds/src/workers/nomad_monitor_alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::Deserialize;
use crate::util::NEW_NOMAD_CONFIG;

// TODO:
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(3);
const TRAEFIK_GRACE_PERIOD: Duration = Duration::from_secs(2);

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
8 changes: 4 additions & 4 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ async fn submit_job(ctx: &ActivityCtx, input: &SubmitJobInput) -> GlobalResult<S
"vector_socket_addr".into(),
"image_artifact_url".into(),
"root_user_enabled".into(),
"runner".into(),
"manager".into(),
"user_env".into(),
"job_run_id".into(),
"server_id".into(),
]),
meta_optional: Some(vec!["rivet_test_id".into()]),
})),
Expand Down Expand Up @@ -1022,7 +1022,7 @@ async fn dispatch_job(ctx: &ActivityCtx, input: &DispatchJobInput) -> GlobalResu
value: "0".into(),
},
backend::job::Parameter {
key: "runner".into(),
key: "manager".into(),
value: "dynamic_servers".into(),
},
backend::job::Parameter {
Expand All @@ -1040,7 +1040,7 @@ async fn dispatch_job(ctx: &ActivityCtx, input: &DispatchJobInput) -> GlobalResu
.into_iter()
.collect::<Vec<_>>();

let job_params = vec![("job_run_id".to_string(), input.server_id.to_string())];
let job_params = vec![("server_id".to_string(), input.server_id.to_string())];

// MARK: Dispatch job
let dispatch_res = nomad_client::apis::jobs_api::post_job_dispatch(
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/mm/worker/src/workers/lobby_create/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ async fn create_docker_job(
value: if mm_game_config.root_user_enabled { "1" } else { "0" }.into()
},
backend::job::Parameter {
key: "runner".into(),
key: "manager".into(),
value: "job_run".into()
},
].into_iter()
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/mm/worker/src/workers/lobby_create/nomad_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ pub fn gen_lobby_docker_job(
"max_players_direct".into(),
"max_players_party".into(),
"root_user_enabled".into(),
"runner".into(),
"manager".into(),
]),
meta_optional: Some(vec!["rivet_test_id".into()]),
})),
Expand Down

0 comments on commit 21dbd6c

Please sign in to comment.