Skip to content

Commit

Permalink
refactor: remove eviction batching (#6060)
Browse files Browse the repository at this point in the history
We no longer have `layer_removal_cs` since #5108, we no longer need
batching.
  • Loading branch information
koivunej authored Dec 13, 2023
1 parent 2d22661 commit a919b86
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 288 deletions.
178 changes: 76 additions & 102 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
// reading these fields. We use the Debug impl for semi-structured logging, though.

use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -125,7 +124,7 @@ pub fn launch_disk_usage_global_eviction_task(
async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
_storage: &GenericRemoteStorage,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
cancel: CancellationToken,
) {
Expand All @@ -149,8 +148,14 @@ async fn disk_usage_eviction_task(
let start = Instant::now();

async {
let res =
disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await;
let res = disk_usage_eviction_task_iteration(
state,
task_config,
storage,
tenants_dir,
&cancel,
)
.await;

match res {
Ok(()) => {}
Expand Down Expand Up @@ -181,12 +186,13 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await;
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
Expand Down Expand Up @@ -268,8 +274,9 @@ struct LayerCount {
count: usize,
}

pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
_storage: &GenericRemoteStorage,
usage_pre: U,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
Expand Down Expand Up @@ -321,16 +328,16 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// Walk through the list of candidates, until we have accumulated enough layers to get
// us back under the pressure threshold. 'usage_planned' is updated so that it tracks
// how much disk space would be used after evicting all the layers up to the current
// point in the list. The layers are collected in 'batched', grouped per timeline.
// point in the list.
//
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
let mut evicted_amount = 0;

for (i, (partition, candidate)) in candidates.iter().enumerate() {
if !usage_planned.has_pressure() {
debug!(
no_candidates_evicted = i,
Expand All @@ -339,25 +346,13 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
break;
}

if partition == MinResidentSizePartition::Below && warned.is_none() {
if partition == &MinResidentSizePartition::Below && warned.is_none() {
warn!(?usage_pre, ?usage_planned, candidate_no=i, "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy");
warned = Some(usage_planned);
}

usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);

// FIXME: batching makes no sense anymore because of no layermap locking, should just spawn
// tasks to evict all seen layers until we have evicted enough

let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();

// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
evicted_amount += 1;
}

let usage_planned = match warned {
Expand All @@ -372,100 +367,79 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
};
debug!(?usage_planned, "usage planned");

// phase2: evict victims batched by timeline
// phase2: evict layers

let mut js = tokio::task::JoinSet::new();
let limit = 1000;

let mut evicted = candidates.into_iter().take(evicted_amount).fuse();
let mut consumed_all = false;

// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();

let evict_layers = async move {
loop {
let next = if js.len() >= limit || consumed_all {
js.join_next().await
} else if !js.is_empty() {
// opportunistically consume ready result, one per each new evicted
futures::future::FutureExt::now_or_never(js.join_next()).and_then(|x| x)
} else {
None
};

// ratelimit to 1k files or any higher max batch size
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));

for (timeline, batch) in batched {
let tenant_shard_id = timeline.tenant_shard_id;
let timeline_id = timeline.timeline_id;
let batch_size =
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");

// I dislike naming of `available_permits` but it means current total amount of permits
// because permits can be added
assert!(batch_size as usize <= limit.available_permits());

debug!(%timeline_id, "evicting batch for timeline");

let evict = {
let limit = limit.clone();
let cancel = cancel.clone();
async move {
let mut evicted_bytes = 0;
let mut evictions_failed = LayerCount::default();

let Ok(_permit) = limit.acquire_many_owned(batch_size).await else {
// semaphore closing means cancelled
return (evicted_bytes, evictions_failed);
};

let results = timeline.evict_layers(&batch).await;

match results {
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
evicted_bytes += file_size;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
}
}
}
if let Some(next) = next {
match next {
Ok(Ok(file_size)) => {
usage_assumed.add_available_bytes(file_size);
}
Err(e) => {
warn!("failed to evict batch: {:#}", e);
Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
(evicted_bytes, evictions_failed)
}
}
.instrument(tracing::info_span!("evict_batch", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, batch_size));

js.spawn(evict);

// spwaning multiple thousands of these is essentially blocking, so give already spawned a
// chance of making progress
tokio::task::yield_now().await;
}
if consumed_all && js.is_empty() {
break;
}

let join_all = async move {
// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
// calling again when consumed_all is fine as evicted is fused.
let Some((_partition, candidate)) = evicted.next() else {
consumed_all = true;
continue;
};

while let Some(res) = js.join_next().await {
match res {
Ok((evicted_bytes, failed)) => {
usage_assumed.add_available_bytes(evicted_bytes);
evictions_failed.file_sizes += failed.file_sizes;
evictions_failed.count += failed.count;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
js.spawn(async move {
let rtc = candidate.timeline.remote_client.as_ref().expect(
"holding the witness, all timelines must have a remote timeline client",
);
let file_size = candidate.layer.layer_desc().file_size;
candidate
.layer
.evict_and_wait(rtc)
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
});

tokio::task::yield_now().await;
}

(usage_assumed, evictions_failed)
};

let (usage_assumed, evictions_failed) = tokio::select! {
tuple = join_all => { tuple },
tuple = evict_layers => { tuple },
_ = cancel.cancelled() => {
// close the semaphore to stop any pending acquires
limit.close();
// dropping joinset will abort all pending evict_and_waits and that is fine, our
// requests will still stand
return Ok(IterationOutcome::Cancelled);
}
};
Expand Down
42 changes: 10 additions & 32 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ async fn always_panic_handler(

async fn disk_usage_eviction_run(
mut r: Request<Body>,
_cancel: CancellationToken,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;

Expand Down Expand Up @@ -1625,48 +1625,26 @@ async fn disk_usage_eviction_run(
freed_bytes: 0,
};

let (tx, rx) = tokio::sync::oneshot::channel();

let state = get_state(&r);

if state.remote_storage.as_ref().is_none() {
let Some(storage) = state.remote_storage.as_ref() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
}
};

let state = state.disk_usage_eviction_state.clone();

let cancel = CancellationToken::new();
let child_cancel = cancel.clone();
let _g = cancel.drop_guard();

crate::task_mgr::spawn(
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,
"ondemand disk usage eviction",
false,
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
usage,
&child_cancel,
)
.await;

info!(?res, "disk_usage_eviction_task_iteration_impl finished");
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state, storage, usage, &cancel,
)
.await;

let _ = tx.send(res);
Ok(())
}
.in_current_span(),
);
info!(?res, "disk_usage_eviction_task_iteration_impl finished");

let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?;
let res = res.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, response)
json_response(StatusCode::OK, res)
}

async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Expand Down
Loading

1 comment on commit a919b86

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2252 tests run: 2162 passed, 0 failed, 90 skipped (full report)


Flaky tests (2)

Postgres 14

Code coverage (full report)

  • functions: 55.2% (9433 of 17091 functions)
  • lines: 82.3% (54555 of 66303 lines)

The comment gets automatically updated with the latest test results
a919b86 at 2023-12-13T16:55:08.794Z :recycle:

Please sign in to comment.