Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safekeeper: add separate tombstones map for deleted timelines #8253

Merged
merged 4 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,19 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL service main".to_owned(), res));
tasks_handles.push(Box::pin(wal_service_handle));

let timeline_housekeeping_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
.spawn(async move {
const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
loop {
tokio::time::sleep(TOMBSTONE_TTL).await;
GlobalTimelines::housekeeping(&TOMBSTONE_TTL);
}
})
.map(|res| ("Timeline map housekeeping".to_owned(), res));
tasks_handles.push(Box::pin(timeline_housekeeping_handle));

if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
let conf_ = conf.clone();
let wal_service_handle = current_thread_rt
Expand Down
89 changes: 74 additions & 15 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;

struct GlobalTimelinesState {
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,

// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,

conf: Option<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
Expand Down Expand Up @@ -64,11 +71,17 @@ impl GlobalTimelinesState {
.cloned()
.ok_or(TimelineError::NotFound(*ttid))
}

fn delete(&mut self, ttid: TenantTimelineId) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
}
}

static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf: None,
broker_active_set: Arc::new(TimelinesSet::default()),
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
Expand Down Expand Up @@ -198,11 +211,17 @@ impl GlobalTimelines {
let tli = Arc::new(timeline);

// TODO: prevent concurrent timeline creation/loading
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
{
let mut state = TIMELINES_STATE.lock().unwrap();

// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
warn!("Un-deleted timeline {ttid}");
}

state.timelines.insert(ttid, tli.clone());
}

tli.bootstrap(&conf, broker_active_set, partial_backup_rate_limiter);

Expand All @@ -229,7 +248,7 @@ impl GlobalTimelines {

/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub async fn create(
pub(crate) async fn create(
ttid: TenantTimelineId,
server_info: ServerInfo,
commit_lsn: Lsn,
Expand All @@ -241,6 +260,11 @@ impl GlobalTimelines {
// Timeline already exists, return it.
return Ok(timeline);
}

if state.tombstones.contains_key(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
}

state.get_dependencies()
};

Expand Down Expand Up @@ -300,17 +324,19 @@ impl GlobalTimelines {
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
/// i.e. loaded in memory and not cancelled.
pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let res = TIMELINES_STATE.lock().unwrap().get(&ttid);

match res {
pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
let tli_res = {
let state = TIMELINES_STATE.lock().unwrap();
state.get(&ttid)
};
match tli_res {
Ok(tli) => {
if tli.is_cancelled() {
return Err(TimelineError::Cancelled(ttid));
}
Ok(tli)
}
_ => res,
_ => tli_res,
}
}

Expand Down Expand Up @@ -339,12 +365,26 @@ impl GlobalTimelines {

/// Cancels timeline, then deletes the corresponding data directory.
/// If only_local, doesn't remove WAL segments in remote storage.
pub async fn delete(
pub(crate) async fn delete(
ttid: &TenantTimelineId,
only_local: bool,
) -> Result<TimelineDeleteForceResult> {
let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
match tli_res {
let tli_res = {
let state = TIMELINES_STATE.lock().unwrap();

if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
return Ok(TimelineDeleteForceResult {
dir_existed: false,
was_active: false,
});
}

state.get(ttid)
};

let result = match tli_res {
Ok(timeline) => {
let was_active = timeline.broker_active.load(Ordering::Relaxed);

Expand Down Expand Up @@ -374,7 +414,14 @@ impl GlobalTimelines {
was_active: false,
})
}
}
};

// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
// are used to prevent still-running computes from re-creating the same timeline when they send data,
// and to speed up repeated deletion calls by avoiding re-listing objects.
TIMELINES_STATE.lock().unwrap().delete(*ttid);

result
}

/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
Expand Down Expand Up @@ -433,6 +480,18 @@ impl GlobalTimelines {

Ok(deleted)
}

pub fn housekeeping(tombstone_ttl: &Duration) {
let mut state = TIMELINES_STATE.lock().unwrap();

// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
// may recreate a deleted timeline.
let now = Instant::now();
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
}

#[derive(Clone, Copy, Serialize)]
Expand Down
Loading