Skip to content

Commit

Permalink
Make load_keys and load_val_refs async
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Aug 1, 2023
1 parent 17922f7 commit 10d2740
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,15 @@ impl DeltaLayer {
/// Obtains all keys and value references stored in the layer
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub fn load_val_refs(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, ValueRef)>> {
pub async fn load_val_refs(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, ValueRef)>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.context("load delta layer")?;
DeltaLayerInner::load_val_refs(inner).context("Layer index is corrupted")
}

/// Loads all keys stored in the layer. Returns key, lsn and value size.
pub fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, u64)>> {
pub async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, u64)>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.context("load delta layer keys")?;
Expand Down
21 changes: 19 additions & 2 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pageserver_api::models::{
use remote_storage::GenericRemoteStorage;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::{oneshot, watch, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tracing::*;
Expand Down Expand Up @@ -3515,15 +3516,31 @@ impl Timeline {

let mut all_value_refs = Vec::new();
for l in deltas_to_compact.iter() {
all_value_refs.extend(l.load_val_refs(ctx)?);
// TODO: replace this with an await once we fully go async
all_value_refs.extend(
Handle::current().block_on(
l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_val_refs(ctx),
)?,
);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_value_refs.sort_by_key(|(key, _lsn, _value_ref)| *key);

let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx)?);
// TODO: replace this with an await once we fully go async
all_keys.extend(
Handle::current().block_on(
l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_keys(ctx),
)?,
);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
Expand Down

0 comments on commit 10d2740

Please sign in to comment.