Skip to content

Commit

Permalink
add disks to existing zettacache (openzfs#68)
Browse files Browse the repository at this point in the history
Disks can be added to an existing zettacache by restarting the agent
with additional `-c DEVICE` arguments.
  • Loading branch information
ahrens authored Dec 17, 2021
1 parent 30bd1b0 commit afb9215
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 87 deletions.
58 changes: 31 additions & 27 deletions cmd/zfs_object_agent/zettacache/src/block_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::ops::{Add, Bound::*, Sub};
use std::sync::Arc;
use std::time::Instant;
use std::{fmt, mem};
use std::{fmt, iter, mem};
use util::BitmapRangeIterator;
use util::RangeTree;
use util::{get_tunable, TerseVec};
Expand Down Expand Up @@ -1374,36 +1374,40 @@ pub struct BlockAllocatorPhys {
impl OnDisk for BlockAllocatorPhys {}

impl BlockAllocatorPhys {
pub fn new(capacity: Vec<Extent>) -> BlockAllocatorPhys {
let slab_size = *DEFAULT_SLAB_SIZE;
let slab_size64 = u64::from(slab_size);

// Truncate each extent to a multiple of slab_size
let capacity: Vec<Extent> = capacity
.iter()
.map(|extent| extent.range(0, extent.size / slab_size64 * slab_size64))
.collect();
let slabs = vec![
SlabPhys {
generation: SlabGeneration(0),
slab_type: SlabPhysType::Free
};
usize::from64(
capacity
.iter()
.map(|extent| extent.size / slab_size64)
.sum()
)
];

BlockAllocatorPhys {
slab_size,
pub fn new<T>(capacity: T) -> BlockAllocatorPhys
where
T: IntoIterator<Item = Extent>,
{
let mut this = BlockAllocatorPhys {
slab_size: *DEFAULT_SLAB_SIZE,
spacemap: SpaceMapPhys::new(),
spacemap_next: SpaceMapPhys::new(),
next_slab_to_condense: SlabId(0),
capacity,
slabs: slabs.into(),
capacity: Default::default(),
slabs: TerseVec(Vec::new()),
slab_buckets: DEFAULT_SLAB_BUCKETS.clone(),
};
this.extend(capacity);
this
}

/// Add new capacity
pub fn extend<T>(&mut self, capacity: T)
where
T: IntoIterator<Item = Extent>,
{
let slabsize = u64::from(self.slab_size);
for extent in capacity {
let nslabs = extent.size / slabsize;
self.slabs.0.extend(
iter::repeat(SlabPhys {
generation: SlabGeneration(0),
slab_type: SlabPhysType::Free,
})
.take(usize::from64(nslabs)),
);
// capacity is aligned to be a multiple of slabsize
self.capacity.push(extent.range(0, nslabs * slabsize));
}
}

Expand Down
7 changes: 7 additions & 0 deletions cmd/zfs_object_agent/zettacache/src/extent_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ impl ExtentAllocatorPhys {
pub fn new(capacity: Vec<Extent>) -> Self {
Self { capacity }
}

pub fn extend<T>(&mut self, capacity: T)
where
T: IntoIterator<Item = Extent>,
{
self.capacity.extend(capacity);
}
}

pub struct ExtentAllocator {
Expand Down
55 changes: 38 additions & 17 deletions cmd/zfs_object_agent/zettacache/src/superblock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,52 @@ impl PrimaryPhys {
.await;
}

pub async fn read(block_access: &BlockAccess) -> Result<(Self, DiskId, u64)> {
let superblocks = SuperblockPhys::read_all(block_access).await?;
/// Return value is (Self, primary_disk, guid, extra_disks)
pub async fn read(block_access: &BlockAccess) -> Result<(Self, DiskId, u64, Vec<DiskId>)> {
let results = SuperblockPhys::read_all(block_access).await;

let (primary, primary_disk, guid) = superblocks
let (primary, primary_disk, guid) = results
.iter()
.find_map(|phys| {
phys.primary
.as_ref()
.map(|primary| (primary.clone(), phys.disk, phys.guid))
.find_map(|result| {
if let Ok(phys) = result {
phys.primary
.as_ref()
.map(|primary| (primary.clone(), phys.disk, phys.guid))
} else {
None
}
})
.ok_or_else(|| anyhow!("Primary Superblock not found"))?;

for (id, phys) in superblocks.iter().enumerate() {
let extra_disks = results
.iter()
.enumerate()
.filter_map(|(id, result)| match result {
Ok(_) => None,
Err(_) => Some(DiskId(id.try_into().unwrap())),
})
.collect::<Vec<_>>();

for (id, result) in results.iter().enumerate() {
// XXX proper error handling
// XXX we should be able to reorder them?
assert_eq!(DiskId(id.try_into().unwrap()), phys.disk);
assert_eq!(phys.guid, guid);
assert!(phys.primary.is_none() || phys.disk == primary_disk);
if let Ok(phys) = result {
assert_eq!(DiskId(id.try_into().unwrap()), phys.disk);
assert_eq!(phys.guid, guid);
assert!(phys.primary.is_none() || phys.disk == primary_disk);
}
}

// XXX proper error handling
assert_eq!(block_access.disks().count(), primary.num_disks);
assert!(primary.checkpoint_capacity.contains(&primary.checkpoint));
assert_eq!(
results.len() - extra_disks.len(),
primary.num_disks,
"Expected {} disks with superblocks, {} disks provided, of which {} have superblocks",
primary.num_disks,
results.len(),
results.len() - extra_disks.len()
);

Ok((primary, primary_disk, guid))
Ok((primary, primary_disk, guid, extra_disks))
}
}

Expand All @@ -96,12 +117,12 @@ impl SuperblockPhys {
Ok(this)
}

async fn read_all(block_access: &BlockAccess) -> Result<Vec<SuperblockPhys>> {
async fn read_all(block_access: &BlockAccess) -> Vec<Result<SuperblockPhys>> {
block_access
.disks()
.map(|disk| SuperblockPhys::read(block_access, disk))
.collect::<FuturesOrdered<_>>()
.try_collect()
.collect()
.await
}

Expand Down
155 changes: 112 additions & 43 deletions cmd/zfs_object_agent/zettacache/src/zettacache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ lazy_static! {
static ref QUANTILES_IN_SIZE_HISTOGRAM: usize = get_tunable("quantiles_in_size_histogram", 100);
static ref CACHE_INSERT_BLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_blocking_buffer_bytes", 256_000_000);
static ref CACHE_INSERT_NONBLOCKING_BUFFER_BYTES: usize = get_tunable("cache_insert_nonblocking_buffer_bytes", 256_000_000);

static ref INDEX_CACHE_ENTRIES_MEM_PCT: usize = get_tunable("index_cache_entries_mem_pct", 10);
}

Expand Down Expand Up @@ -464,34 +463,38 @@ pub enum LookupSource {

#[metered(registry=ZettaCacheMetrics)]
impl ZettaCache {
pub async fn create(block_access: &BlockAccess) {
let guid: u64 = rand::random();

let total_capacity = block_access.total_capacity();

// checkpoint is stored on the largest disk, its size a percent of the whole cache
let checkpoint_capacity = Extent::new(
block_access
.disks()
.reduce(|a, b| {
if block_access.disk_size(a) > block_access.disk_size(b) {
a
} else {
b
}
})
// The checkpoint is stored on the largest provided disk (when adding disks,
// only the new disks are candidates). Its size is a percent of the whole
// cache.
fn checkpoint_capacity<D>(disks: D, block_access: &BlockAccess) -> Extent
where
D: IntoIterator<Item = DiskId>,
{
Extent::new(
disks
.into_iter()
.max_by_key(|&disk| block_access.disk_size(disk))
.unwrap(),
SUPERBLOCK_SIZE,
block_access.round_up_to_sector(
(*DEFAULT_CHECKPOINT_SIZE_PCT / 100.0 * total_capacity as f64)
(*DEFAULT_CHECKPOINT_SIZE_PCT / 100.0 * block_access.total_capacity() as f64)
.approx_as::<u64>()
.unwrap(),
),
);
)
}

// metadata is stored on each disk, its size a percent of that disk
let metadata_capacity: Vec<Extent> = block_access
.disks()
// metadata is stored on each disk, its size a percent of that disk
fn metadata_capacity<D>(
disks: D,
block_access: &BlockAccess,
checkpoint_capacity: Extent,
) -> Vec<Extent>
where
D: IntoIterator<Item = DiskId>,
{
disks
.into_iter()
.map(|disk| {
let start = if disk == checkpoint_capacity.location.disk {
checkpoint_capacity.location.offset + checkpoint_capacity.size
Expand All @@ -510,10 +513,16 @@ impl ZettaCache {
),
)
})
.collect();
.collect()
}

let data_capacity = metadata_capacity
.iter()
// data is stored after metadata
fn data_capacity<M>(metadata_capacity: M, block_access: &BlockAccess) -> Vec<Extent>
where
M: IntoIterator<Item = Extent>,
{
metadata_capacity
.into_iter()
.map(|extent| {
Extent::new(
extent.location.disk,
Expand All @@ -522,15 +531,27 @@ impl ZettaCache {
- (extent.location.offset + extent.size),
)
})
.collect();
.collect()
}

pub async fn create(block_access: &BlockAccess) {
let guid: u64 = rand::random();

let total_capacity = block_access.total_capacity();
info!("creating cache from {} disks", block_access.disks().count());

let checkpoint_capacity = Self::checkpoint_capacity(block_access.disks(), block_access);
let metadata_capacity =
Self::metadata_capacity(block_access.disks(), block_access, checkpoint_capacity);
let data_capacity = Self::data_capacity(metadata_capacity.iter().copied(), block_access);

let checkpoint = ZettaCheckpointPhys {
generation: CheckpointId(0),
block_allocator: BlockAllocatorPhys::new(data_capacity),
extent_allocator: ExtentAllocatorPhys::new(metadata_capacity),
index: Default::default(),
operation_log: Default::default(),
last_atime: Atime(0),
block_allocator: BlockAllocatorPhys::new(data_capacity),
size_histogram: SizeHistogramPhys::new(total_capacity, *QUANTILES_IN_SIZE_HISTOGRAM),

merge_progress: None,
Expand Down Expand Up @@ -559,22 +580,60 @@ impl ZettaCache {
false,
));

let (primary, primary_disk, guid) = match PrimaryPhys::read(&block_access).await {
Ok(tuple) => tuple,
Err(_) => {
// XXX need proper create CLI
Self::create(&block_access).await;
PrimaryPhys::read(&block_access).await.unwrap()
}
};
let (mut primary, primary_disk, guid, extra_disks) =
match PrimaryPhys::read(&block_access).await {
Ok(tuple) => tuple,
Err(_) => {
// XXX need proper create CLI
Self::create(&block_access).await;
PrimaryPhys::read(&block_access).await.unwrap()
}
};
if let Err(feature_error) = check_features(primary.feature_flags.iter()) {
panic!("{}", feature_error)
};

let checkpoint = ZettaCheckpointPhys::read(&block_access, primary.checkpoint).await;

// XXX proper error handling
assert!(primary.checkpoint_capacity.contains(&primary.checkpoint));
let mut checkpoint = ZettaCheckpointPhys::read(&block_access, primary.checkpoint).await;
assert_eq!(checkpoint.generation, primary.checkpoint_id);

// XXX proper CLI for adding disks
if !extra_disks.is_empty() {
info!(
"adding {} disks to existing {}-disk cache",
extra_disks.len(),
primary.num_disks
);
primary.num_disks += extra_disks.len();

// Checkpoint size is a % of the total pool, so reallocate a bigger checkpoint space on a new disk.
let checkpoint_capacity =
Self::checkpoint_capacity(extra_disks.iter().copied(), &block_access);
primary.checkpoint_capacity = checkpoint_capacity;

// Add new disks to extent (metadata) allocator.
let metadata_capacity = Self::metadata_capacity(
extra_disks.into_iter(),
&block_access,
checkpoint_capacity,
);
checkpoint
.extent_allocator
.extend(metadata_capacity.iter().copied());

// Add new disks to block (data) allocator.
checkpoint.block_allocator.extend(Self::data_capacity(
metadata_capacity.into_iter(),
&block_access,
));
}

info!(
"opening ZettaCache {} with {} disks",
guid, primary.num_disks
);

let mut builder = ExtentAllocatorBuilder::new(&checkpoint.extent_allocator);
checkpoint.claim(&mut builder);
let extent_allocator = Arc::new(ExtentAllocator::open(builder));
Expand Down Expand Up @@ -1165,7 +1224,7 @@ impl ZCacheDBHandle {
true,
));

let (primary, primary_disk, guid) = PrimaryPhys::read(&block_access).await?;
let (primary, primary_disk, guid, _extra_disks) = PrimaryPhys::read(&block_access).await?;

let checkpoint =
Arc::new(ZettaCheckpointPhys::read(&block_access, primary.checkpoint).await);
Expand Down Expand Up @@ -1699,11 +1758,21 @@ impl ZettaCacheState {
.block_access
.chunk_to_raw(EncodeType::Json, &checkpoint);

let mut checkpoint_extent = Extent::new(
self.primary.checkpoint.location.disk,
self.primary.checkpoint.location.offset + self.primary.checkpoint.size,
raw.len() as u64,
);
let mut checkpoint_extent = if self
.primary
.checkpoint_capacity
.contains(&self.primary.checkpoint)
{
// Try placing checkpoint after current checkpoint.
Extent::new(
self.primary.checkpoint.location.disk,
self.primary.checkpoint.location.offset + self.primary.checkpoint.size,
raw.len() as u64,
)
} else {
// The checkpoint region has moved. Write new checkpoint in new region.
self.primary.checkpoint_capacity.range(0, raw.len() as u64)
};

if !self
.primary
Expand Down

0 comments on commit afb9215

Please sign in to comment.