Skip to content

Commit

Permalink
chore: run dev db migrations in parallel for faster startups
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Nov 12, 2024
1 parent bd64305 commit b4450c5
Showing 1 changed file with 80 additions and 50 deletions.
130 changes: 80 additions & 50 deletions packages/common/migrate/src/migrate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::*;
use futures_util::stream::{self, StreamExt};
use indoc::formatdoc;
use rivet_config::config::CockroachDbUserRole;
use rivet_pools::prelude::*;
Expand Down Expand Up @@ -260,58 +261,87 @@ pub async fn drop(config: rivet_config::Config, service: &SqlService) -> Result<
}

async fn run_migrations(config: rivet_config::Config, migration_cmds: &[MigrateCmd]) -> Result<()> {
for cmd in migration_cmds {
tracing::debug!(db_name=%cmd.service.db_name, "running db migration");

// Write migrations to temp path
let dir = tempfile::tempdir()?;
block_in_place(|| cmd.service.migrations.extract(dir.path()))?;

// Run migration
let migrate_url = migrate_db_url(config.clone(), &cmd.service).await?;
let mut child = tokio::process::Command::new("migrate")
.arg("-database")
.arg(migrate_url)
.arg("-path")
.arg(dir.path().join("migrations"))
.args(&cmd.args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("failed to run migrate command")?;

// Log output in real-time
let stdout = child.stdout.take().expect("Failed to capture stdout");
let stderr = child.stderr.take().expect("Failed to capture stderr");

tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines();
while let Some(line) = stdout_reader
.next_line()
.await
.expect("Failed to read stdout")
{
tracing::debug!("migrate stdout: {}", line);
}
});

tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr).lines();
while let Some(line) = stderr_reader
.next_line()
.await
.expect("Failed to read stderr")
{
tracing::debug!("migrate stderr: {}", line);
}
});
let is_dev = config
.server()
.map_err(|err| anyhow!("{err}"))?
.rivet
.auth
.access_kind
== rivet_config::config::AccessKind::Development;
let migration_parallelism = if is_dev {
// Speed up migrations when setting up dev clusters since these are usually noops
16
} else {
// Run 1 migration at a time for production environments since these object have costly
// jobs including building indexes
1
};

let migration_futs = migration_cmds.iter().map(|cmd| {
let config = config.clone();
run_migration(config, cmd)
});

stream::iter(migration_futs)
.buffer_unordered(migration_parallelism)
.collect::<Vec<_>>()
.await
// Convert to error
.into_iter()
.collect::<Result<_, _>>()?;

Ok(())
}

let status = child.wait().await?;
if !status.success() {
tracing::error!("migrate failed: {}", cmd.service.db_name);
std::future::pending::<()>().await;
unreachable!();
async fn run_migration(config: rivet_config::Config, cmd: &MigrateCmd) -> Result<()> {
tracing::debug!(db_name=%cmd.service.db_name, "running db migration");

// Write migrations to temp path
let dir = tempfile::tempdir()?;
block_in_place(|| cmd.service.migrations.extract(dir.path()))?;

// Run migration
let migrate_url = migrate_db_url(config, &cmd.service).await?;
let mut child = tokio::process::Command::new("migrate")
.arg("-database")
.arg(migrate_url)
.arg("-path")
.arg(dir.path().join("migrations"))
.args(&cmd.args)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("failed to run migrate command")?;

// Log output in real-time
let stdout = child.stdout.take().context("Failed to capture stdout")?;
let stderr = child.stderr.take().context("Failed to capture stderr")?;

tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines();
while let Some(line) = stdout_reader
.next_line()
.await
.expect("Failed to read stdout")
{
tracing::debug!("migrate stdout: {}", line);
}
});

tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr).lines();
while let Some(line) = stderr_reader
.next_line()
.await
.expect("Failed to read stderr")
{
tracing::debug!("migrate stderr: {}", line);
}
});

let status = child.wait().await?;
if !status.success() {
bail!("migrate failed: {}", cmd.service.db_name);
}

Ok(())
Expand Down

0 comments on commit b4450c5

Please sign in to comment.