diff --git a/lib/bolt/cli/src/commands/db/mod.rs b/lib/bolt/cli/src/commands/db/mod.rs index 1bb7397e7b..746ff8e604 100644 --- a/lib/bolt/cli/src/commands/db/mod.rs +++ b/lib/bolt/cli/src/commands/db/mod.rs @@ -16,6 +16,8 @@ pub enum SubCommand { service: String, #[clap(short = 'q', long)] query: Option, + #[clap(short = 'f', long)] + forwarded: bool, }, } @@ -23,11 +25,16 @@ impl SubCommand { pub async fn execute(self, ctx: ProjectContext) -> Result<()> { match self { Self::Migrate { command } => command.execute(ctx).await, - Self::Shell { service, query } => { + Self::Shell { + service, + query, + forwarded, + } => { tasks::db::shell( &ctx, &ctx.service_with_name(&service).await, query.as_deref(), + forwarded, ) .await?; diff --git a/lib/bolt/core/src/tasks/db.rs b/lib/bolt/core/src/tasks/db.rs index 777d909823..1743871a7b 100644 --- a/lib/bolt/core/src/tasks/db.rs +++ b/lib/bolt/core/src/tasks/db.rs @@ -1,7 +1,6 @@ use anyhow::*; use duct::cmd; use indoc::formatdoc; -use rand::Rng; use serde_json::json; use tokio::{io::AsyncWriteExt, task::block_in_place}; @@ -25,15 +24,28 @@ pub struct ShellQuery { pub struct ShellContext<'a> { pub ctx: &'a ProjectContext, + pub forwarded: bool, pub conn: &'a DatabaseConnections, pub queries: &'a [ShellQuery], pub log_type: LogType, } -pub async fn shell(ctx: &ProjectContext, svc: &ServiceContext, query: Option<&str>) -> Result<()> { - let conn = DatabaseConnections::create(ctx, &[svc.clone()], true).await?; +pub async fn shell( + ctx: &ProjectContext, + svc: &ServiceContext, + query: Option<&str>, + forwarded: bool, +) -> Result<()> { + let forwarded = forwarded + && matches!( + &ctx.ns().cluster.kind, + config::ns::ClusterKind::SingleNode { .. } + ); + + let conn = DatabaseConnections::create(ctx, &[svc.clone()], forwarded).await?; let shell_ctx = ShellContext { ctx, + forwarded, conn: &conn, queries: &[ShellQuery { svc: svc.clone(), @@ -55,6 +67,7 @@ pub async fn shell(ctx: &ProjectContext, svc: &ServiceContext, query: Option<&st async fn redis_shell(shell_ctx: ShellContext<'_>) -> Result<()> { let ShellContext { ctx, + forwarded, conn, queries, log_type, @@ -115,97 +128,97 @@ async fn redis_shell(shell_ctx: ShellContext<'_>) -> Result<()> { Vec::new() }; - let cmd = formatdoc!( + let mut cmd = formatdoc!( " - sleep 1 && redis-cli \ -h {hostname} \ -p {port} \ --user {username} \ - -c \ - --tls {cacert} - ", - cacert = if mount_ca { - "--cacert /local/redis-ca.crt" - } else { - "" - } + -c", ); - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "redis", - "image": REDIS_IMAGE, - // "command": ["redis-cli"], - // "args": [ - // "-h", hostname, - // "-p", port, - // "--user", username, - // "-c", - // "--tls", - // "--cacert", "/local/redis-ca.crt" - // ], - "command": ["sh", "-c"], - "args": [cmd], - "env": env, - "stdin": true, - "stdinOnce": true, - "tty": true, - "volumeMounts": if mount_ca { - json!([{ - "name": "redis-ca", - "mountPath": "/local/redis-ca.crt", - "subPath": "redis-ca.crt" - }]) - } else { - json!([]) + if forwarded { + let port = + utils::kubectl_port_forward(ctx, &format!("redis-{db_name}"), "redis", (6379, 6379))?; + port.check().await?; + + block_in_place(|| cmd!("bash", "-c", cmd).run())?; + } else { + cmd.push_str(" --tls"); + + if mount_ca { + cmd.push_str(" --cacert /local/redis-ca.crt"); + } + + let overrides = json!({ + "apiVersion": "v1", + "metadata": { + "namespace": "bolt", + }, + "spec": { + "containers": [ + { + "name": "redis", + "image": REDIS_IMAGE, + "command": ["sleep", "10000"], + "env": env, + "stdin": true, + "stdinOnce": true, + "tty": true, + "volumeMounts": if mount_ca { + json!([{ + "name": "redis-ca", + "mountPath": "/local/redis-ca.crt", + "subPath": "redis-ca.crt" + }]) + } else { + json!([]) + } } + ], + "volumes": if mount_ca { + json!([{ + "name": "redis-ca", + "configMap": { + "name": format!("redis-{}-ca", db_name), + "defaultMode": 420, + // Distributed clusters don't need a CA for redis + "optional": true, + "items": [ + { + "key": "ca.crt", + "path": "redis-ca.crt" + } + ] + } + }]) + } else { + json!([]) } - ], - "volumes": if mount_ca { - json!([{ - "name": "redis-ca", - "configMap": { - "name": format!("redis-{}-ca", db_name), - "defaultMode": 420, - // Distributed clusters don't need a CA for redis - "optional": true, - "items": [ - { - "key": "ca.crt", - "path": "redis-ca.crt" - } - ] - } - }]) - } else { - json!([]) } - } - }); - - block_in_place(|| { - cmd!( - "kubectl", - "run", - "-itq", - "--rm", - "--restart=Never", - format!("--image={REDIS_IMAGE}"), - "--namespace", - "bolt", - format!("--overrides={overrides}"), - shell_name("redis"), - ) - .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .run() - })?; + }); + + let pod_name = format!("redis-{db_name}-sh-persistent"); + start_persistent_pod(ctx, "Redis", &pod_name, overrides).await?; + + // Connect to persistent pod + block_in_place(|| { + cmd!( + "kubectl", + "exec", + format!("pod/{pod_name}"), + "-it", + "-n", + "bolt", + "--", + "sh", + "-c", + cmd, + ) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .run() + })?; + } Ok(()) } @@ -213,6 +226,7 @@ async fn redis_shell(shell_ctx: ShellContext<'_>) -> Result<()> { pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { let ShellContext { ctx, + forwarded, conn, queries, log_type, @@ -229,10 +243,12 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { let conn = conn.cockroach_host.as_ref().unwrap(); let username = ctx.read_secret(&["crdb", "username"]).await?; let password = ctx.read_secret(&["crdb", "password"]).await?; - let db_url = format!( - "postgres://{}:{}@{}/{}?sslmode=verify-ca&sslrootcert=/local/crdb-ca.crt", - username, password, conn, db_name - ); + let mut db_url = format!("postgres://{}:{}@{}/{}", username, password, conn, db_name); + + // Add SSL + if !forwarded { + db_url.push_str("?sslmode=verify-ca&sslrootcert=/local/crdb-ca.crt"); + } let query = if let Some(query) = query { format!("-c '{}'", query.replace('\'', "'\\''")) @@ -256,75 +272,77 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { query_cmd.push_str(&cmd); } - utils::kubectl_port_forward(ctx, "cockroachdb", "cockroachdb", (26257, 26257))?; - - println!("{query_cmd}"); - - block_in_place(|| cmd!(query_cmd).run())?; - - return Ok(()); - - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "postgres", - "image": "postgres", - "command": ["sh", "-c"], - "args": [query_cmd], - "env": [ - // See https://github.com/cockroachdb/cockroach/issues/37129#issuecomment-600115995 - { - "name": "PGCLIENTENCODING", - "value": "utf-8", - } - ], - "stdin": true, - "stdinOnce": true, - "tty": true, - "volumeMounts": [{ - "name": "crdb-ca", - "mountPath": "/local/crdb-ca.crt", - "subPath": "crdb-ca.crt" - }] - } - ], - "volumes": [{ - "name": "crdb-ca", - "configMap": { - "name": "crdb-ca", - "defaultMode": 420, - "items": [ - { - "key": "ca.crt", - "path": "crdb-ca.crt" - } - ] - } - }] - } - }); + if forwarded { + let port = utils::kubectl_port_forward(ctx, "cockroachdb", "cockroachdb", (26257, 26257))?; + port.check().await?; - block_in_place(|| { - cmd!( - "kubectl", - "run", - "-itq", - "--rm", - "--restart=Never", - "--image=postgres", - "--namespace", - "bolt", - format!("--overrides={overrides}"), - shell_name("crdb"), - ) - .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .run() - })?; + block_in_place(|| cmd!("bash", "-c", query_cmd).run())?; + } else { + let overrides = json!({ + "apiVersion": "v1", + "metadata": { + "namespace": "bolt", + }, + "spec": { + "containers": [ + { + "name": "postgres", + "image": "postgres", + "command": ["sleep", "10000"], + "env": [ + // See https://github.com/cockroachdb/cockroach/issues/37129#issuecomment-600115995 + { + "name": "PGCLIENTENCODING", + "value": "utf-8", + } + ], + "stdin": true, + "stdinOnce": true, + "tty": true, + "volumeMounts": [{ + "name": "crdb-ca", + "mountPath": "/local/crdb-ca.crt", + "subPath": "crdb-ca.crt" + }] + } + ], + "volumes": [{ + "name": "crdb-ca", + "configMap": { + "name": "crdb-ca", + "defaultMode": 420, + "items": [ + { + "key": "ca.crt", + "path": "crdb-ca.crt" + } + ] + } + }] + } + }); + + let pod_name = "crdb-sh-persistent"; + start_persistent_pod(ctx, "Cockroach", pod_name, overrides).await?; + + // Connect to persistent pod + block_in_place(|| { + cmd!( + "kubectl", + "exec", + format!("pod/{pod_name}"), + "-it", + "-n", + "bolt", + "--", + "sh", + "-c", + query_cmd, + ) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .run() + })?; + } Ok(()) } @@ -333,6 +351,7 @@ pub async fn crdb_shell(shell_ctx: ShellContext<'_>) -> Result<()> { pub async fn clickhouse_shell(shell_ctx: ShellContext<'_>, no_db: bool) -> Result<()> { let ShellContext { ctx, + forwarded, conn, queries, log_type, @@ -343,7 +362,7 @@ pub async fn clickhouse_shell(shell_ctx: ShellContext<'_>, no_db: bool) -> Resul } // Combine all queries into one command - let mut query_cmd = "sleep 1".to_string(); + let mut query_cmd = String::new(); for ShellQuery { svc, query } in queries { let db_name = svc.clickhouse_db_name(); let user = "default"; @@ -353,28 +372,26 @@ pub async fn clickhouse_shell(shell_ctx: ShellContext<'_>, no_db: bool) -> Resul let host = conn.clickhouse_host.as_ref().unwrap(); let (hostname, port) = host.split_once(':').unwrap(); - let db_flag = if no_db { - "".to_string() - } else { - format!("--database {db_name}") - }; - let query = if let Some(query) = query { - format!("--multiquery '{}'", query.replace('\'', "'\\''")) - } else { - "".to_string() - }; - let cmd = formatdoc!( + let mut cmd = formatdoc!( " clickhouse-client \ - --secure \ - --config-file /local/config.yml \ --host {hostname} \ --port {port} \ --user {user} \ - --password {password} {db_flag} {query} - " + --password {password} \ + --secure" ); + if !forwarded { + cmd.push_str(" --config-file /local/config.yml "); + } + if !no_db { + cmd.push_str(&format!(" --database {db_name}")); + } + if let Some(query) = query { + cmd.push_str(&format!(" --multiquery '{}'", query.replace('\'', "'\\''"))); + } + if let LogType::Migration = log_type { // Append command if !query_cmd.is_empty() { @@ -390,113 +407,165 @@ pub async fn clickhouse_shell(shell_ctx: ShellContext<'_>, no_db: bool) -> Resul query_cmd.push_str(cmd.trim()); } - let overrides = json!({ - "apiVersion": "v1", - "metadata": { - "namespace": "bolt", - }, - "spec": { - "containers": [ - { - "name": "clickhouse", - "image": "clickhouse/clickhouse-server", - "command": ["sh", "-c"], - "args": [query_cmd], - "stdin": true, - "stdinOnce": true, - "tty": true, - "volumeMounts": [ - { - "name": "clickhouse-ca", - "mountPath": "/local/clickhouse-ca.crt", - "subPath": "clickhouse-ca.crt" - }, - { - "name": "clickhouse-config", - "mountPath": "/local/config.yml", - "subPath": "config.yml", - } - ] - } - ], - "volumes": [{ - "name": "clickhouse-ca", - "configMap": { - "name": "clickhouse-ca", - "defaultMode": 420, - // Distributed clusters don't need a CA for clickhouse - "optional": true, - "items": [ - { - "key": "ca.crt", - "path": "clickhouse-ca.crt" - } - ] - } - }, { - "name": "clickhouse-config", - "configMap": { - "name": "clickhouse-config", - "defaultMode": 420, - "optional": true - } - }] - } - }); + // TODO: Does not work when forwarded, not sure why + if forwarded { + let port = utils::kubectl_port_forward(ctx, "clickhouse", "clickhouse", (9440, 9440))?; + port.check().await?; - // Apply clickhouse config to K8s - if let Some(config) = &conn.clickhouse_config { - let spec = serde_json::to_vec(&json!({ - "kind": "ConfigMap", + block_in_place(|| cmd!("bash", "-c", query_cmd).run())?; + } else { + let overrides = json!({ "apiVersion": "v1", "metadata": { - "name": "clickhouse-config", - "namespace": "bolt" + "namespace": "bolt", }, - "data": { - "config.yml": config + "spec": { + "containers": [ + { + "name": "clickhouse", + "image": "clickhouse/clickhouse-server", + "command": ["sh", "-c"], + "args": [query_cmd], + "stdin": true, + "stdinOnce": true, + "tty": true, + "volumeMounts": [ + { + "name": "clickhouse-ca", + "mountPath": "/local/clickhouse-ca.crt", + "subPath": "clickhouse-ca.crt" + }, + { + "name": "clickhouse-config", + "mountPath": "/local/config.yml", + "subPath": "config.yml", + } + ] + } + ], + "volumes": [{ + "name": "clickhouse-ca", + "configMap": { + "name": "clickhouse-ca", + "defaultMode": 420, + // Distributed clusters don't need a CA for clickhouse + "optional": true, + "items": [ + { + "key": "ca.crt", + "path": "clickhouse-ca.crt" + } + ] + } + }, { + "name": "clickhouse-config", + "configMap": { + "name": "clickhouse-config", + "defaultMode": 420, + "optional": true + } + }] } - }))?; - - let mut cmd = tokio::process::Command::new("kubectl"); - cmd.args(&["apply", "-f", "-"]); - cmd.env("KUBECONFIG", ctx.gen_kubeconfig_path()); - cmd.stdin(std::process::Stdio::piped()); - cmd.stdout(std::process::Stdio::null()); - let mut child = cmd.spawn()?; - - { - let mut stdin = child.stdin.take().context("missing stdin")?; - stdin.write_all(&spec).await?; + }); + + // Apply clickhouse config to K8s + if let Some(config) = &conn.clickhouse_config { + let spec = serde_json::to_vec(&json!({ + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": "clickhouse-config", + "namespace": "bolt" + }, + "data": { + "config.yml": config + } + }))?; + + let mut cmd = tokio::process::Command::new("kubectl"); + cmd.args(&["apply", "-f", "-"]); + cmd.env("KUBECONFIG", ctx.gen_kubeconfig_path()); + cmd.stdin(std::process::Stdio::piped()); + cmd.stdout(std::process::Stdio::null()); + let mut child = cmd.spawn()?; + + { + let mut stdin = child.stdin.take().context("missing stdin")?; + stdin.write_all(&spec).await?; + } + + let status = child.wait().await?; + ensure!(status.success(), "kubectl apply failed"); } - let status = child.wait().await?; - ensure!(status.success(), "kubectl apply failed"); + let pod_name = "clickhouse-sh-persistent"; + start_persistent_pod(ctx, "ClickHouse", pod_name, overrides).await?; + + // Connect to persistent pod + block_in_place(|| { + cmd!( + "kubectl", + "exec", + format!("pod/{pod_name}"), + "-it", + "-n", + "bolt", + "--", + "sh", + "-c", + query_cmd, + ) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .run() + })?; } - block_in_place(|| { + Ok(()) +} + +pub async fn start_persistent_pod( + ctx: &ProjectContext, + title: &str, + pod_name: &str, + overrides: serde_json::Value, +) -> Result<()> { + let res = block_in_place(|| { cmd!( "kubectl", - "run", - "-itq", - "--rm", - "--restart=Never", - "--image=clickhouse/clickhouse-server", - "--namespace", + "get", + "pod", + pod_name, + "-n", "bolt", - format!("--overrides={overrides}"), - shell_name("clickhouse"), + "--ignore-not-found" ) .env("KUBECONFIG", ctx.gen_kubeconfig_path()) - .run() + .read() })?; + let persistent_pod_exists = !res.is_empty(); + + if !persistent_pod_exists { + rivet_term::status::progress(&format!("Creating persistent {title} pod"), ""); + + block_in_place(|| { + cmd!( + "kubectl", + "run", + "-q", + "--restart=Never", + "--image=postgres", + "-n", + "bolt", + format!("--overrides={overrides}"), + pod_name, + ) + .env("KUBECONFIG", ctx.gen_kubeconfig_path()) + .run() + })?; + + // Wait for ready + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } Ok(()) } - -// Generates a pod name for the shell with a random hash at the end -pub fn shell_name(name: &str) -> String { - let hash = rand::thread_rng().gen_range::(0..9999); - - format!("{name}-sh-{hash}") -} diff --git a/lib/bolt/core/src/tasks/migrate.rs b/lib/bolt/core/src/tasks/migrate.rs index 4f68140957..d404536e86 100644 --- a/lib/bolt/core/src/tasks/migrate.rs +++ b/lib/bolt/core/src/tasks/migrate.rs @@ -396,6 +396,7 @@ pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> if !crdb_pre_queries.is_empty() { db::crdb_shell(db::ShellContext { ctx, + forwarded: false, conn: &conn, queries: &crdb_pre_queries, log_type: db::LogType::Migration, @@ -409,6 +410,7 @@ pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> db::clickhouse_shell( db::ShellContext { ctx, + forwarded: false, conn: &conn, queries: &clickhouse_pre_queries, log_type: db::LogType::Migration, @@ -457,6 +459,7 @@ pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> if !crdb_post_queries.is_empty() { db::crdb_shell(db::ShellContext { ctx, + forwarded: false, conn: &conn, queries: &crdb_post_queries, log_type: db::LogType::Migration, @@ -470,6 +473,7 @@ pub async fn up(ctx: &ProjectContext, services: &[ServiceContext]) -> Result<()> db::clickhouse_shell( db::ShellContext { ctx, + forwarded: false, conn: &conn, queries: &clickhouse_post_queries, log_type: db::LogType::Migration, @@ -605,7 +609,7 @@ async fn migration(ctx: &ProjectContext, migration_cmds: &[MigrateCmd]) -> Resul { "name": "migrate", "image": MIGRATE_IMAGE, - "command": ["sh", "-c", migration_cmd], + "command": ["sleep", "1000"], // // See https://github.com/golang-migrate/migrate/issues/494 // "env": [{ // "name": "TZ", @@ -618,18 +622,20 @@ async fn migration(ctx: &ProjectContext, migration_cmds: &[MigrateCmd]) -> Resul } }); + let pod_name = "migrate-sh-persistent"; + db::start_persistent_pod(ctx, "migrate", pod_name, overrides).await?; + block_in_place(|| { cmd!( "kubectl", - "run", - "-itq", - "--rm", - "--restart=Never", - format!("--image={MIGRATE_IMAGE}"), - "--namespace", + "exec", + format!("pod/{pod_name}"), + "-n", "bolt", - format!("--overrides={overrides}"), - db::shell_name("migrate"), + "--", + "sh", + "-c", + migration_cmd, ) .env("KUBECONFIG", ctx.gen_kubeconfig_path()) .run() diff --git a/lib/bolt/core/src/utils/db_conn.rs b/lib/bolt/core/src/utils/db_conn.rs index 3c6b0df6ff..7deaa83ab3 100644 --- a/lib/bolt/core/src/utils/db_conn.rs +++ b/lib/bolt/core/src/utils/db_conn.rs @@ -21,11 +21,11 @@ impl DatabaseConnections { pub async fn create( ctx: &ProjectContext, services: &[ServiceContext], - forward: bool, + forwarded: bool, ) -> Result> { match &ctx.ns().cluster.kind { config::ns::ClusterKind::SingleNode { .. } => { - if forward { + if forwarded { DatabaseConnections::create_local_forwarded(ctx, services).await } else { DatabaseConnections::create_local(ctx, services).await diff --git a/lib/bolt/core/src/utils/mod.rs b/lib/bolt/core/src/utils/mod.rs index 8198bfcf67..04f87538c9 100644 --- a/lib/bolt/core/src/utils/mod.rs +++ b/lib/bolt/core/src/utils/mod.rs @@ -258,8 +258,8 @@ impl Drop for DroppablePort { pub fn kubectl_port_forward( ctx: &ProjectContext, - service_name: &str, namespace: &str, + service_name: &str, (local_port, remote_port): (u16, u16), ) -> Result { // println!( @@ -269,9 +269,9 @@ pub fn kubectl_port_forward( let handle = cmd!( "kubectl", "port-forward", - format!("service/{service_name}"), - "--namespace", + "-n", namespace, + format!("service/{service_name}"), format!("{local_port}:{remote_port}") ) .env("KUBECONFIG", ctx.gen_kubeconfig_path())