Skip to content

Commit

Permalink
review fixes, few refactorings
Browse files Browse the repository at this point in the history
review comment: xref tested string
doc: address review comment by jcsp
drop TODO about better load time api
LayerE::drop comments
refactor: fix residency and metrics to layermanager
refactor: LayerManager, remove arc
blanket rename
reorder, get rid of TODO
move Layer and all to storage_layer::layer
refactor: Layer initialization
test: migrate to Layer::for_resident
test: use guard_against_eviction from outside
fix: rename the written out file in Layer ctor

try to apply backoff *after* download

might not work as we could get cancelled, but doing it right before
seems wrong as well. We already retry the download.

doc: few touches
refactor: split get_or_maybe_download
refactor: split evicting
refactor: minor cleanup, doc
test: fix test_timeline_deletion_with_files_stuck_in_upload_queue string change
fix: delete and only then report evicted
test: fix allowed error typo
doc: minor fixes
restore Layer::dump
layer: remove dead comment and code
fix: allow dropping from UploadQueue by spawn_blocking
rename: garbage_collect => &_on_drop

refactor: split guard_against_eviction into three

- download
- keep_resident
- download_and_keep_resident

No need to bool enum.

botched rebase: lost impl AsRef<DeltaLayerInner>
doc: fix typo
doc: remove obsolete comment
doc: link to inmemorylayer
doc: cleanup, add missing "the"
doc: link to LayerMap::search
doc: delete fixme about gentlemans agreements and strings
doc: inmemlayer: cleanup comments
eviction_task: remove confusing drop(candidates)
doc: add cancellation safe comment

refactor: simplify schedule upload and tests (rebase conflicts)

lot of the non-conflicting changes were fixed in rebase here.

refactor: Result<(), NeedsDownload>
doc: typo
doc: explain what the consecutive failures are for
doc: another pass on LayerInner
fix: subscribe before evicting
reorder: get and get_or_apply_evictedness
doc: check_expected_download

info: stop using stat

we no longer need to use it because in the latter versions we initialize
to correct on-filesystem state with Layer::for_{resident,evicted}.

doc: residentlayer vs. downloadedlayer and eviction
doc: cancellation safety with evict_and_wait
doc: note running without remote storage again
reorder: 1. DownloadedLayer, 2. ResidentLayer
doc: explain DownloadedLayer::get owner param
doc: add validation
doc: drop comment in favor of drop_eviction_guard
image/deltalayer: shuffle comments around
doc: simplify comment
doc: create guard => new download has been started
doc: when => while
doc: remove comment about backoff
doc: adjust while in queue
doc: adjust more LayerInner::on_drop
doc: assert &Arc<LayerInner> and DownloadedLayer::owner
fixup residentlayer comment
test: allow witnessing stopping before broken
doc: fix link
refactor: rename LayerInner::on_drop to on_downloaded_layer_drop
doc: fix outdated commit
doc: fix broken link
doc: comment about chance of both evictions selecting same layer

refactor: move all metrics updates to layer

this fixes some missing increments for num_persistent_files_created,
persistent_bytes_written and removes double entries for residence
events.

refactor: rename for_written_tempfile -> finish_creating
  • Loading branch information
koivunej committed Sep 4, 2023
1 parent 944c009 commit ba7077a
Show file tree
Hide file tree
Showing 17 changed files with 1,473 additions and 1,452 deletions.
2 changes: 1 addition & 1 deletion libs/utils/src/sync/heavier_once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ mod tests {

#[tokio::test(start_paused = true)]
async fn reinit_waits_for_deinit() {
// witht he tokio::time paused, we will "sleep" for 1s while holding the reinitialization
// with he tokio::time paused, we will "sleep" for 1s while holding the reinitialization
let sleep_for = Duration::from_secs(1);
let initial = 42;
let reinit = 1;
Expand Down
5 changes: 2 additions & 3 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
storage_layer::{AsLayerDesc, LayerE},
timeline::EvictionError,
storage_layer::{AsLayerDesc, EvictionError, Layer},
Timeline,
},
};
Expand Down Expand Up @@ -481,7 +480,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<LayerE>,
layer: Layer,
last_activity_ts: SystemTime,
}

Expand Down
14 changes: 7 additions & 7 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3979,8 +3979,7 @@ mod tests {
Ok(())
}

/// FIXME: I don't want to add dump to LayerE, it should be in the ctl
/*#[tokio::test]
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
use storage_layer::AsLayerDesc;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
Expand All @@ -3990,22 +3989,23 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;

let layer_map = tline.layers.read().await;
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()?
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.collect::<Vec<_>>();

assert!(!level0_deltas.is_empty());

for delta in level0_deltas {
let delta = layer_map.get_from_desc(&delta);
// Ensure we are dumping a delta layer here
assert!(delta.layer_desc().is_delta);
delta.dump(false, &ctx).await.unwrap();
delta.dump(true, &ctx).await.unwrap();
}

Ok(())
}
*/

#[tokio::test]
async fn corrupt_metadata() -> anyhow::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/layer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,8 @@ impl LayerMap {
}

println!("historic_layers:");
for layer in self.iter_historic_layers() {
layer.dump(verbose, ctx)?;
for desc in self.iter_historic_layers() {
desc.dump();
}
println!("End dump LayerMap");
Ok(())
Expand Down
134 changes: 37 additions & 97 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ use crate::metrics::{
};
use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
pub(crate) use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::Delete;
use crate::tenant::TIMELINES_SEGMENT_NAME;
Expand Down Expand Up @@ -610,21 +610,19 @@ impl RemoteTimelineClient {
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;

// FIXME: we might be still including no longer existing files in the index_part because
// that consistency is built on strings and gentleman agreements, not Weak<LayerE> which
// could be upgraded at the time of rendering of index_part.
let metadata = layer.metadata();

upload_queue
.latest_files
.insert(layer.layer_desc().filename(), layer_metadata.clone());
.insert(layer.layer_desc().filename(), metadata.clone());
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;

info!("scheduled layer file upload {layer}");
let op = UploadOp::UploadLayer(layer, layer_metadata.clone());
let op = UploadOp::UploadLayer(layer, metadata);
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);

Expand Down Expand Up @@ -1461,7 +1459,7 @@ mod tests {
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::{LayerE, PersistentLayerDesc},
storage_layer::Layer,
Generation, Tenant, Timeline,
},
DEFAULT_PG_VERSION,
Expand Down Expand Up @@ -1599,73 +1597,29 @@ mod tests {
let generation = harness.generation;

// Create a couple of dummy files, schedule upload for them
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
let content_1 = dummy_contents("foo");
let content_2 = dummy_contents("bar");
let content_3 = dummy_contents("baz");

for (filename, content) in [
(&layer_file_name_1, &content_1),
(&layer_file_name_2, &content_2),
(&layer_file_name_3, &content_3),
] {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}

let layer_file_1 = LayerE::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
layer_file_name_1.clone(),
content_1.len() as u64,
),
)
.unwrap();

// FIXME: need that api for local files
assert!(layer_file_1.needs_download_blocking().unwrap().is_none());

let layer_file_2 = LayerE::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
layer_file_name_2.clone(),
content_2.len() as u64,
),
)
.unwrap();
assert!(layer_file_2.needs_download_blocking().unwrap().is_none());

let layer_file_3 = LayerE::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
layer_file_name_3.clone(),
content_3.len() as u64,
),
)
.unwrap();
assert!(layer_file_3.needs_download_blocking().unwrap().is_none());
let layers = [
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
]
.into_iter()
.map(|(name, contents): (LayerFileName, Vec<u8>)| {
std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();

Layer::for_resident(
harness.conf,
&timeline,
name,
LayerFileMetadata::new(contents.len() as u64, generation),
)
}).collect::<Vec<_>>();

client
.schedule_layer_file_upload(
layer_file_1.clone(),
&LayerFileMetadata::new(content_1.len() as u64, generation),
)
.schedule_layer_file_upload(layers[0].clone())
.unwrap();
client
.schedule_layer_file_upload(
layer_file_2.clone(),
&LayerFileMetadata::new(content_2.len() as u64, generation),
)
.schedule_layer_file_upload(layers[1].clone())
.unwrap();

// Check that they are started immediately, not queued
Expand Down Expand Up @@ -1719,21 +1673,18 @@ mod tests {
.collect(),
&[
&initial_layer.file_name(),
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
],
);
assert_eq!(index_part.metadata, metadata);

// Schedule upload and then a deletion. Check that the deletion is queued
client
.schedule_layer_file_upload(
layer_file_3.clone(),
&LayerFileMetadata::new(content_3.len() as u64, generation),
)
.schedule_layer_file_upload(layers[2].clone())
.unwrap();
client
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
.schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();
Expand All @@ -1749,8 +1700,8 @@ mod tests {
assert_remote_files(
&[
&initial_layer.file_name(),
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
"index_part.json",
],
&remote_timeline_dir,
Expand All @@ -1763,8 +1714,8 @@ mod tests {
assert_remote_files(
&[
&initial_layer.file_name(),
&layer_file_name_2.file_name(),
&layer_file_name_3.file_name(),
&layers[1].layer_desc().filename().file_name(),
&layers[2].layer_desc().filename().file_name(),
"index_part.json",
],
&remote_timeline_dir,
Expand Down Expand Up @@ -1793,20 +1744,12 @@ mod tests {
)
.unwrap();

let layer_file_1 = LayerE::for_written(
let layer_file_1 = Layer::for_resident(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
layer_file_name_1.clone(),
content_1.len() as u64,
),
)
.unwrap();

// FIXME: need that api for local files that actually exist
assert!(layer_file_1.needs_download_blocking().unwrap().is_none());
layer_file_name_1.clone(),
LayerFileMetadata::new(content_1.len() as u64, harness.generation),
);

#[derive(Debug, PartialEq, Clone, Copy)]
struct BytesStartedFinished {
Expand Down Expand Up @@ -1843,10 +1786,7 @@ mod tests {
let actual_a = get_bytes_started_stopped();

client
.schedule_layer_file_upload(
layer_file_1.clone(),
&LayerFileMetadata::new(content_1.len() as u64, harness.generation),
)
.schedule_layer_file_upload(layer_file_1.clone())
.unwrap();

let actual_b = get_bytes_started_stopped();
Expand Down
2 changes: 2 additions & 0 deletions pageserver/src/tenant/remote_timeline_client/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub(super) async fn upload_timeline_layer<'a>(
// upload. However, a nonexistent file can also be indicative of
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
//
// This is tested against `test_compaction_delete_before_upload`
info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
}
Expand Down
Loading

0 comments on commit ba7077a

Please sign in to comment.