Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Be considerate of replicas when copying #3966

Merged
merged 1 commit into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::iter::FromIterator;
use std::sync::Arc;
use std::time::Duration;

use graph::prelude::anyhow::anyhow;
use graph::{
Expand Down Expand Up @@ -616,3 +617,27 @@ pub fn stats(conn: &PgConnection, namespace: &Namespace) -> Result<Vec<VersionSt

Ok(stats.into_iter().map(|s| s.into()).collect())
}

/// Return by how much the slowest replica connected to the database `conn`
/// is lagging. The returned value has millisecond precision. If the
/// database has no replicas, return `0`
pub(crate) fn replication_lag(conn: &PgConnection) -> Result<Duration, StoreError> {
#[derive(Queryable, QueryableByName)]
struct Lag {
#[sql_type = "Nullable<Integer>"]
ms: Option<i32>,
}

let lag = sql_query(
"select extract(milliseconds from max(greatest(write_lag, flush_lag, replay_lag)))::int as ms \
from pg_stat_replication",
)
.get_result::<Lag>(conn)?;

let lag = lag
.ms
.map(|ms| if ms <= 0 { 0 } else { ms as u64 })
.unwrap_or(0);

Ok(Duration::from_millis(lag))
}
30 changes: 29 additions & 1 deletion store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use graph::{
};

use crate::{
advisory_lock,
advisory_lock, catalog,
dynds::DataSourcesTable,
primary::{DeploymentId, Site},
};
Expand All @@ -54,6 +54,16 @@ const INITIAL_BATCH_SIZE_LIST: i64 = 100;
const TARGET_DURATION: Duration = Duration::from_secs(5 * 60);
const LOG_INTERVAL: Duration = Duration::from_secs(3 * 60);

/// If replicas are lagging by more than this, the copying code will pause
/// for a while to allow replicas to catch up
const MAX_REPLICATION_LAG: Duration = Duration::from_secs(60);
/// If replicas need to catch up, do not resume copying until the lag is
/// less than this
const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
/// When replicas are lagging too much, sleep for this long before checking
/// the lag again
const REPLICATION_SLEEP: Duration = Duration::from_secs(10);

table! {
subgraphs.copy_state(dst) {
// deployment_schemas.id
Expand Down Expand Up @@ -744,6 +754,24 @@ impl Connection {
if table.is_cancelled(&self.conn)? {
return Ok(Status::Cancelled);
}

// Pause copying if replication is lagging behind to avoid
// overloading replicas
let mut lag = catalog::replication_lag(&self.conn)?;
if lag > MAX_REPLICATION_LAG {
neysofu marked this conversation as resolved.
Show resolved Hide resolved
loop {
info!(&self.logger,
"Replicas are lagging too much; pausing copying for {}s to allow them to catch up",
REPLICATION_SLEEP.as_secs();
"lag_s" => lag.as_secs());
std::thread::sleep(REPLICATION_SLEEP);
neysofu marked this conversation as resolved.
Show resolved Hide resolved
lag = catalog::replication_lag(&self.conn)?;
if lag <= ACCEPTABLE_REPLICATION_LAG {
break;
}
}
}

let status = self.transaction(|conn| table.copy_batch(conn))?;
if status == Status::Cancelled {
return Ok(status);
Expand Down