Skip to content

Commit

Permalink
chore(bors): merge pull request openebs#707
Browse files Browse the repository at this point in the history
707: feat(multi-replica-snapshot): Handle multireplica snapshot in control-plane r=hrudaya21 a=hrudaya21

1. Add multi-replica snapshot capability in control-plane.
2. Limit snapshot creation based on new environment variable. (max_snapshots)


Co-authored-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
mayastor-bors and hrudaya21 committed Feb 12, 2024
2 parents ba46dc7 + 9a84d7d commit 10d87df
Show file tree
Hide file tree
Showing 25 changed files with 327 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ impl PoolItemLister {
pools
}
/// Get a list of pool items to create a snapshot on.
/// todo: support multi-replica snapshot.
pub(crate) async fn list_for_snaps(registry: &Registry, item: &ChildItem) -> Vec<PoolItem> {
pub(crate) async fn list_for_snaps(registry: &Registry, items: &[ChildItem]) -> Vec<PoolItem> {
let nodes = Self::nodes(registry).await;

match nodes.iter().find(|n| n.id() == item.node()) {
Some(node) => vec![PoolItem::new(node.clone(), item.pool().clone(), None)],
None => vec![],
}
let pool_items = items
.iter()
.filter_map(|item| {
nodes
.iter()
.find(|node| node.id() == item.node())
.map(|node| PoolItem::new(node.clone(), item.pool().clone(), None))
})
.collect();
pool_items
}
/// Get a list of replicas wrapped as ChildItem, for resize.
pub(crate) async fn list_for_resize(registry: &Registry, spec: &VolumeSpec) -> Vec<ChildItem> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ pub(crate) struct SnapshotVolumeReplica {
}

impl SnapshotVolumeReplica {
async fn builder(registry: &Registry, volume: &VolumeSpec, item: &ChildItem) -> Self {
async fn builder(registry: &Registry, volume: &VolumeSpec, items: &[ChildItem]) -> Self {
let allocated_bytes = AddVolumeReplica::allocated_bytes(registry, volume).await;

Self {
Expand All @@ -649,7 +649,7 @@ impl SnapshotVolumeReplica {
snap_repl: true,
ag_restricted_nodes: None,
},
PoolItemLister::list_for_snaps(registry, item).await,
PoolItemLister::list_for_snaps(registry, items).await,
),
}
}
Expand All @@ -670,8 +670,7 @@ impl SnapshotVolumeReplica {
pub(crate) async fn builder_with_defaults(
registry: &Registry,
volume: &VolumeSpec,
// todo: only 1 replica snapshot supported atm
items: &ChildItem,
items: &[ChildItem],
) -> Self {
Self::builder(registry, volume, items)
.await
Expand Down
6 changes: 3 additions & 3 deletions control-plane/agents/src/bin/core/volume/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,14 +811,14 @@ impl ResourceLifecycleExt<CreateVolumeSource<'_>> for OperationGuardArc<VolumeSp
request_src: &CreateVolumeSource,
) -> Result<Self::CreateOutput, SvcError> {
request_src.pre_flight_check()?;
let request = request_src.source();

let specs = registry.specs();
let mut volume = specs
.get_or_create_volume(request_src)?
.operation_guard_wait()
.await?;
let volume_clone = volume.start_create_update(registry, request).await?;
let volume_clone = volume
.start_create_update(registry, request_src.source())
.await?;

// If the volume is a part of the ag, create or update accordingly.
registry.specs().get_or_create_affinity_group(&volume_clone);
Expand Down
12 changes: 10 additions & 2 deletions control-plane/agents/src/bin/core/volume/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ impl Service {
}
filter => return Err(SvcError::InvalidFilter { filter }),
};

Ok(Volumes {
entries: filtered_volumes,
next_token: match last_result {
Expand Down Expand Up @@ -403,7 +402,6 @@ impl Service {
volume.set_replica(&self.registry, request).await?;
self.registry.volume(&request.uuid).await
}

/// Create a volume snapshot.
#[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.source_id, snapshot.source_uuid = %request.source_id, snapshot.uuid = %request.snap_id))]
async fn create_snapshot(
Expand All @@ -426,6 +424,16 @@ impl Service {
}
Err(error) => Err(error),
}?;

if let Some(max_snapshots) = volume.as_ref().max_snapshots {
if volume.as_ref().metadata.num_snapshots() as u32 >= max_snapshots {
return Err(SvcError::SnapshotMaxLimit {
max_snapshots,
volume_id: volume.as_ref().uuid.to_string(),
});
}
}

let snapshot = volume
.create_snap(
&self.registry,
Expand Down
54 changes: 32 additions & 22 deletions control-plane/agents/src/bin/core/volume/snapshot_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ use stor_port::{
},
};

use std::collections::HashSet;

/// A request type for creating snapshot of a volume, which essentially
/// means a snapshot of all(or selected) healthy replicas associated with that volume.
pub(super) struct PrepareVolumeSnapshot {
pub(super) parameters: SnapshotParameters<VolumeId>,
pub(super) replica_snapshot: (Replica, ReplicaSnapshot),
pub(super) replica_snapshot: Vec<(Replica, ReplicaSnapshot)>,
pub(super) completer: VolumeSnapshotCompleter,
}

Expand Down Expand Up @@ -95,34 +97,42 @@ impl SpecOperationsHelper for VolumeSnapshot {
pub(crate) async fn snapshoteable_replica(
volume: &VolumeSpec,
registry: &Registry,
) -> Result<ChildItem, SvcError> {
if volume.num_replicas != 1 {
return Err(SvcError::NReplSnapshotNotAllowed {});
) -> Result<Vec<ChildItem>, SvcError> {
let children = super::scheduling::snapshoteable_replica(volume, registry).await?;

if children.candidates().len() != volume.num_replicas as usize {
return Err(SvcError::InsufficientHealthyReplicas {
id: volume.uuid_str(),
});
}

let children = super::scheduling::snapshoteable_replica(volume, registry).await?;
//todo: Remove this check once we support snapshotting with n-replicas.
if volume.num_replicas != 1 || children.candidates().len() != 1 {
return Err(SvcError::NReplSnapshotNotAllowed {});
}

volume.trace(&format!("Snapshoteable replicas for volume: {children:?}"));

let item = match children.candidates().as_slice() {
[item] => Ok(item),
[] => Err(SvcError::NoHealthyReplicas {
if children.candidates().is_empty() {
return Err(SvcError::NoHealthyReplicas {
id: volume.uuid_str(),
}),
_ => Err(SvcError::NReplSnapshotNotAllowed {}),
}?;
});
}

//todo: check for snapshot chain for all the replicas.

let pools = SnapshotVolumeReplica::builder_with_defaults(registry, volume, item)
.await
.collect();
let pools =
SnapshotVolumeReplica::builder_with_defaults(registry, volume, children.candidates())
.await
.collect();
let pools: HashSet<_> = pools.iter().map(|item| item.pool.id.clone()).collect();

match pools
.iter()
.any(|pool_item| pool_item.pool.id == item.pool().id)
{
true => Ok(item.clone()),
false => Err(SvcError::NotEnoughResources {
source: NotEnough::PoolFree {},
}),
for item in children.candidates() {
if !pools.contains(&item.pool().id) {
return Err(SvcError::NotEnoughResources {
source: NotEnough::PoolFree {},
});
}
}
Ok(children.candidates().clone())
}
Loading

0 comments on commit 10d87df

Please sign in to comment.