Skip to content

Commit

Permalink
feat(snapshot): support multi-replica snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>

feat(snapshot): set plugin command to change volume property, show
max_snapshots in volume query response

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>

fix: rework review comments

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
hrudaya21 committed Jan 31, 2024
1 parent 81f812f commit 6d90be7
Show file tree
Hide file tree
Showing 34 changed files with 736 additions and 172 deletions.
1 change: 0 additions & 1 deletion control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ impl Registry {
pub(crate) fn create_volume_limit(&self) -> usize {
self.create_volume_limit
}

/// Get a reference to the actual state of the nodes.
pub(crate) fn nodes(&self) -> &NodesMapLocked {
&self.nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,19 @@ pub(crate) trait ResourceReplicas {
) -> Result<Self::MoveResp, SvcError>;
}

/// Property modification as resource operation.
#[async_trait::async_trait]
pub(crate) trait ResourceProperties {
type Request: Sync + Send;

/// Set the property value.
async fn set_property(
&mut self,
registry: &Registry,
request: &Self::Request,
) -> Result<(), SvcError>;
}

/// Resource Children/Offspring Operations.
#[async_trait::async_trait]
pub(crate) trait ResourceOffspring {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ impl PoolItemLister {
pools
}
/// Get a list of pool items to create a snapshot on.
/// todo: support multi-replica snapshot.
pub(crate) async fn list_for_snaps(registry: &Registry, item: &ChildItem) -> Vec<PoolItem> {
pub(crate) async fn list_for_snaps(registry: &Registry, items: &[ChildItem]) -> Vec<PoolItem> {
let nodes = Self::nodes(registry).await;

match nodes.iter().find(|n| n.id() == item.node()) {
Some(node) => vec![PoolItem::new(node.clone(), item.pool().clone(), None)],
None => vec![],
}
let pool_items = items
.iter()
.filter_map(|item| {
nodes
.iter()
.find(|node| node.id() == item.node())
.map(|node| PoolItem::new(node.clone(), item.pool().clone(), None))
})
.collect();
pool_items
}
/// Get a list of replicas wrapped as ChildItem, for resize.
pub(crate) async fn list_for_resize(registry: &Registry, spec: &VolumeSpec) -> Vec<ChildItem> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ pub(crate) struct SnapshotVolumeReplica {
}

impl SnapshotVolumeReplica {
async fn builder(registry: &Registry, volume: &VolumeSpec, item: &ChildItem) -> Self {
async fn builder(registry: &Registry, volume: &VolumeSpec, items: &[ChildItem]) -> Self {
let allocated_bytes = AddVolumeReplica::allocated_bytes(registry, volume).await;

Self {
Expand All @@ -649,7 +649,7 @@ impl SnapshotVolumeReplica {
snap_repl: true,
ag_restricted_nodes: None,
},
PoolItemLister::list_for_snaps(registry, item).await,
PoolItemLister::list_for_snaps(registry, items).await,
),
}
}
Expand All @@ -670,8 +670,7 @@ impl SnapshotVolumeReplica {
pub(crate) async fn builder_with_defaults(
registry: &Registry,
volume: &VolumeSpec,
// todo: only 1 replica snapshot supported atm
items: &ChildItem,
items: &[ChildItem],
) -> Self {
Self::builder(registry, volume, items)
.await
Expand Down
47 changes: 40 additions & 7 deletions control-plane/agents/src/bin/core/volume/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
resources::{
operations::{
ResourceLifecycle, ResourceLifecycleExt, ResourceLifecycleWithLifetime,
ResourceOwnerUpdate, ResourcePublishing, ResourceReplicas, ResourceResize,
ResourceSharing, ResourceShutdownOperations,
ResourceOwnerUpdate, ResourceProperties, ResourcePublishing, ResourceReplicas,
ResourceResize, ResourceSharing, ResourceShutdownOperations,
},
operations_helper::{
GuardedOperationsHelper, OnCreateFail, OperationSequenceGuard, ResourceSpecsLocked,
Expand All @@ -26,18 +26,24 @@ use crate::{
},
};
use agents::errors::SvcError;
use std::str::FromStr;

// use grpc::operations::volume::traits::CreateVolumeInfo;
use stor_port::{
transport_api::ErrorChain,
types::v0::{
store::{
nexus_persistence::NexusInfoKey,
replica::ReplicaSpec,
volume::{PublishOperation, RepublishOperation, VolumeOperation, VolumeSpec},
volume::{
PublishOperation, RepublishOperation, VolumeAttr, VolumeOperation, VolumeProperty,
VolumeSpec,
},
},
transport::{
CreateVolume, DestroyNexus, DestroyReplica, DestroyShutdownTargets, DestroyVolume,
Protocol, PublishVolume, Replica, ReplicaId, ReplicaOwners, RepublishVolume,
ResizeVolume, SetVolumeReplica, ShareNexus, ShareVolume, ShutdownNexus,
ResizeVolume, SetVolumeProp, SetVolumeReplica, ShareNexus, ShareVolume, ShutdownNexus,
UnpublishVolume, UnshareNexus, UnshareVolume, Volume,
},
},
Expand Down Expand Up @@ -638,6 +644,34 @@ impl ResourceReplicas for OperationGuardArc<VolumeSpec> {
}
}

#[async_trait::async_trait]
impl ResourceProperties for OperationGuardArc<VolumeSpec> {
type Request = SetVolumeProp;

async fn set_property(
&mut self,
registry: &Registry,
request: &Self::Request,
) -> Result<(), SvcError> {
let state = registry.volume_state(&request.uuid).await?;
let operation = match VolumeAttr::from_str(&request.prop_name) {
Ok(VolumeAttr::MaxSnapshots) => VolumeOperation::SetVolumeProperty(
VolumeProperty::new(VolumeAttr::MaxSnapshots, &request.prop_value),
),
_ => {
return Err(SvcError::InvalidSetProperty {
property_name: request.prop_name.clone(),
id: state.uuid.to_string(),
});
}
};

let spec_clone = self.start_update(registry, &state, operation).await?;

self.complete_update(registry, Ok(()), spec_clone).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl ResourceShutdownOperations for OperationGuardArc<VolumeSpec> {
type RemoveShutdownTargets = DestroyShutdownTargets;
Expand Down Expand Up @@ -811,14 +845,13 @@ impl ResourceLifecycleExt<CreateVolumeSource<'_>> for OperationGuardArc<VolumeSp
request_src: &CreateVolumeSource,
) -> Result<Self::CreateOutput, SvcError> {
request_src.pre_flight_check()?;
let request = request_src.source();

let request = request_src.source().clone();
let specs = registry.specs();
let mut volume = specs
.get_or_create_volume(request_src)?
.operation_guard_wait()
.await?;
let volume_clone = volume.start_create_update(registry, request).await?;
let volume_clone = volume.start_create_update(registry, &request).await?;

// If the volume is a part of the ag, create or update accordingly.
registry.specs().get_or_create_affinity_group(&volume_clone);
Expand Down
47 changes: 39 additions & 8 deletions control-plane/agents/src/bin/core/volume/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::{
resources::{
operations::{
ResourceCloning, ResourceLifecycle, ResourceLifecycleWithLifetime,
ResourcePublishing, ResourceReplicas, ResourceResize, ResourceSharing,
ResourceShutdownOperations, ResourceSnapshotting,
ResourceProperties, ResourcePublishing, ResourceReplicas, ResourceResize,
ResourceSharing, ResourceShutdownOperations, ResourceSnapshotting,
},
operations_helper::{OperationSequenceGuard, ResourceSpecsLocked},
OperationGuardArc,
Expand All @@ -21,9 +21,9 @@ use grpc::{
CreateSnapshotVolumeInfo, CreateVolumeInfo, CreateVolumeSnapshot,
CreateVolumeSnapshotInfo, DestroyShutdownTargetsInfo, DestroyVolumeInfo,
DestroyVolumeSnapshot, DestroyVolumeSnapshotInfo, PublishVolumeInfo,
RepublishVolumeInfo, ResizeVolumeInfo, SetVolumeReplicaInfo, ShareVolumeInfo,
UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations, VolumeSnapshot,
VolumeSnapshots,
RepublishVolumeInfo, ResizeVolumeInfo, SetVolumePropInfo, SetVolumeReplicaInfo,
ShareVolumeInfo, UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations,
VolumeSnapshot, VolumeSnapshots,
},
Pagination,
},
Expand All @@ -37,8 +37,8 @@ use stor_port::{
},
transport::{
CreateSnapshotVolume, CreateVolume, DestroyShutdownTargets, DestroyVolume, Filter,
PublishVolume, RepublishVolume, ResizeVolume, SetVolumeReplica, ShareVolume,
UnpublishVolume, UnshareVolume, Volume,
PublishVolume, RepublishVolume, ResizeVolume, SetVolumeProp, SetVolumeReplica,
ShareVolume, UnpublishVolume, UnshareVolume, Volume,
},
},
};
Expand Down Expand Up @@ -161,6 +161,18 @@ impl VolumeOperations for Service {
Ok(volume)
}

async fn set_volume_property(
&self,
req: &dyn SetVolumePropInfo,
_ctx: Option<Context>,
) -> Result<Volume, ReplyError> {
let set_volume_prop = req.into();
let service = self.clone();
let volume =
Context::spawn(async move { service.set_volume_property(&set_volume_prop).await })
.await??;
Ok(volume)
}
async fn probe(&self, _ctx: Option<Context>) -> Result<bool, ReplyError> {
return Ok(true);
}
Expand Down Expand Up @@ -291,7 +303,6 @@ impl Service {
}
filter => return Err(SvcError::InvalidFilter { filter }),
};

Ok(Volumes {
entries: filtered_volumes,
next_token: match last_result {
Expand Down Expand Up @@ -403,6 +414,16 @@ impl Service {
volume.set_replica(&self.registry, request).await?;
self.registry.volume(&request.uuid).await
}
/// Set volume property.
#[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.uuid))]
pub(super) async fn set_volume_property(
&self,
request: &SetVolumeProp,
) -> Result<Volume, SvcError> {
let mut volume = self.specs().volume(&request.uuid).await?;
volume.set_property(&self.registry, request).await?;
self.registry.volume(&request.uuid).await
}

/// Create a volume snapshot.
#[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.source_id, snapshot.source_uuid = %request.source_id, snapshot.uuid = %request.snap_id))]
Expand All @@ -426,6 +447,16 @@ impl Service {
}
Err(error) => Err(error),
}?;

if let Some(max_snapshots) = volume.as_ref().max_snapshots {
if volume.as_ref().metadata.num_snapshots() as u32 >= max_snapshots {
return Err(SvcError::SnapshotMaxLimit {
max_snapshots,
volume_id: volume.as_ref().uuid.to_string(),
});
}
}

let snapshot = volume
.create_snap(
&self.registry,
Expand Down
53 changes: 31 additions & 22 deletions control-plane/agents/src/bin/core/volume/snapshot_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use crate::controller::{
registry::Registry,
resources::{
Expand Down Expand Up @@ -29,7 +31,7 @@ use stor_port::{
/// means a snapshot of all(or selected) healthy replicas associated with that volume.
pub(super) struct PrepareVolumeSnapshot {
pub(super) parameters: SnapshotParameters<VolumeId>,
pub(super) replica_snapshot: (Replica, ReplicaSnapshot),
pub(super) replica_snapshot: Vec<(Replica, ReplicaSnapshot)>,
pub(super) completer: VolumeSnapshotCompleter,
}

Expand Down Expand Up @@ -95,34 +97,41 @@ impl SpecOperationsHelper for VolumeSnapshot {
pub(crate) async fn snapshoteable_replica(
volume: &VolumeSpec,
registry: &Registry,
) -> Result<ChildItem, SvcError> {
if volume.num_replicas != 1 {
) -> Result<Vec<ChildItem>, SvcError> {
let children = super::scheduling::snapshoteable_replica(volume, registry).await?;

//todo: Remove this check once we support snapshotting with n-replicas.
if volume.num_replicas != 1 || children.candidates().len() != 1 {
return Err(SvcError::NReplSnapshotNotAllowed {});
}

let children = super::scheduling::snapshoteable_replica(volume, registry).await?;

volume.trace(&format!("Snapshoteable replicas for volume: {children:?}"));

let item = match children.candidates().as_slice() {
[item] => Ok(item),
[] => Err(SvcError::NoHealthyReplicas {
if children.candidates().is_empty() {
return Err(SvcError::NoHealthyReplicas {
id: volume.uuid_str(),
}),
_ => Err(SvcError::NReplSnapshotNotAllowed {}),
}?;
});
}

if children.candidates().len() != volume.num_replicas as usize {
return Err(SvcError::AllReplicaNotHealthy {
id: volume.uuid_str(),
});
}
//todo: check for snapshot chain for all the replicas.

let pools = SnapshotVolumeReplica::builder_with_defaults(registry, volume, item)
.await
.collect();
let pools =
SnapshotVolumeReplica::builder_with_defaults(registry, volume, children.candidates())
.await
.collect();
let pools: HashSet<_> = pools.iter().map(|item| item.pool.id.clone()).collect();

match pools
.iter()
.any(|pool_item| pool_item.pool.id == item.pool().id)
{
true => Ok(item.clone()),
false => Err(SvcError::NotEnoughResources {
source: NotEnough::PoolFree {},
}),
for item in children.candidates() {
if !pools.contains(&item.pool().id) {
return Err(SvcError::NotEnoughResources {
source: NotEnough::PoolFree {},
});
}
}
Ok(children.candidates().clone())
}
Loading

0 comments on commit 6d90be7

Please sign in to comment.