diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 609e1431cff9..870475eb57ca 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -569,6 +569,39 @@ impl<'a> TenantDownloader<'a> { heatmap.timelines.len() ); + // Get or initialize the local disk state for the timelines we will update + let mut timeline_states = HashMap::new(); + for timeline in &heatmap.timelines { + let timeline_state = self + .secondary_state + .detail + .lock() + .unwrap() + .timelines + .get(&timeline.timeline_id) + .cloned(); + + let timeline_state = match timeline_state { + Some(t) => t, + None => { + // We have no existing state: need to scan local disk for layers first. + let timeline_state = + init_timeline_state(self.conf, tenant_shard_id, timeline).await; + + // Re-acquire detail lock now that we're done with async load from local FS + self.secondary_state + .detail + .lock() + .unwrap() + .timelines + .insert(timeline.timeline_id, timeline_state.clone()); + timeline_state + } + }; + + timeline_states.insert(timeline.timeline_id, timeline_state); + } + // Clean up any local layers that aren't in the heatmap. We do this first for all timelines, on the general // principle that deletions should be done before writes wherever possible, and so that we can use this // phase to initialize our SecondaryProgress. @@ -579,6 +612,10 @@ impl<'a> TenantDownloader<'a> { // Download the layers in the heatmap for timeline in heatmap.timelines { + let timeline_state = timeline_states + .remove(&timeline.timeline_id) + .expect("Just populated above"); + if self.secondary_state.cancel.is_cancelled() { tracing::debug!( "Cancelled before downloading timeline {}", @@ -588,7 +625,7 @@ impl<'a> TenantDownloader<'a> { } let timeline_id = timeline.timeline_id; - self.download_timeline(timeline, ctx) + self.download_timeline(timeline, timeline_state, ctx) .instrument(tracing::info_span!( "secondary_download_timeline", tenant_id=%tenant_shard_id.tenant_id, @@ -609,6 +646,22 @@ impl<'a> TenantDownloader<'a> { .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL), }); + // Robustness: we should have updated progress properly, but in case we didn't, make sure + // we don't leave the tenant in a state where we claim to have successfully downloaded + // everything, but our progress is incomplete. The invariant here should be that if + // we have set `last_download` to this heatmap's etag, then the next time we see that + // etag we can safely do no work (i.e. we must be complete). + let mut progress = self.secondary_state.progress.lock().unwrap(); + debug_assert!(progress.layers_downloaded == progress.layers_total); + debug_assert!(progress.bytes_downloaded == progress.bytes_total); + if progress.layers_downloaded != progress.layers_total + || progress.bytes_downloaded != progress.bytes_total + { + tracing::warn!("Correcting drift in progress stats ({progress:?})"); + progress.layers_downloaded = progress.layers_total; + progress.bytes_downloaded = progress.bytes_total; + } + Ok(()) } @@ -784,6 +837,7 @@ impl<'a> TenantDownloader<'a> { async fn download_timeline( &self, timeline: HeatMapTimeline, + timeline_state: SecondaryDetailTimeline, ctx: &RequestContext, ) -> Result<(), UpdateError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -792,34 +846,6 @@ impl<'a> TenantDownloader<'a> { // Accumulate updates to the state let mut touched = Vec::new(); - // Clone a view of what layers already exist on disk - let timeline_state = self - .secondary_state - .detail - .lock() - .unwrap() - .timelines - .get(&timeline.timeline_id) - .cloned(); - - let timeline_state = match timeline_state { - Some(t) => t, - None => { - // We have no existing state: need to scan local disk for layers first. - let timeline_state = - init_timeline_state(self.conf, tenant_shard_id, &timeline).await; - - // Re-acquire detail lock now that we're done with async load from local FS - self.secondary_state - .detail - .lock() - .unwrap() - .timelines - .insert(timeline.timeline_id, timeline_state.clone()); - timeline_state - } - }; - tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); let mut download_futs = Vec::new(); @@ -1009,6 +1035,14 @@ impl<'a> TenantDownloader<'a> { "Skipped downloading missing layer {}, raced with compaction/gc?", layer.name ); + + // If the layer is 404, adjust the progress statistics to reflect that we will not download it. + let mut progress = self.secondary_state.progress.lock().unwrap(); + progress.layers_total = progress.layers_total.saturating_sub(1); + progress.bytes_total = progress + .bytes_total + .saturating_sub(layer.metadata.file_size); + return Ok(None); } Err(e) => return Err(e.into()),