diff --git a/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs b/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs index 93a676655..e7ed5f0f2 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs @@ -93,14 +93,18 @@ impl PoolItemLister { pools } /// Get a list of pool items to create a snapshot on. - /// todo: support multi-replica snapshot. - pub(crate) async fn list_for_snaps(registry: &Registry, item: &ChildItem) -> Vec { + pub(crate) async fn list_for_snaps(registry: &Registry, items: &[ChildItem]) -> Vec { let nodes = Self::nodes(registry).await; - - match nodes.iter().find(|n| n.id() == item.node()) { - Some(node) => vec![PoolItem::new(node.clone(), item.pool().clone(), None)], - None => vec![], - } + let pool_items = items + .iter() + .filter_map(|item| { + nodes + .iter() + .find(|node| node.id() == item.node()) + .map(|node| PoolItem::new(node.clone(), item.pool().clone(), None)) + }) + .collect(); + pool_items } /// Get a list of replicas wrapped as ChildItem, for resize. pub(crate) async fn list_for_resize(registry: &Registry, spec: &VolumeSpec) -> Vec { diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume.rs index a671effa1..b249c0bb2 100644 --- a/control-plane/agents/src/bin/core/controller/scheduling/volume.rs +++ b/control-plane/agents/src/bin/core/controller/scheduling/volume.rs @@ -636,7 +636,7 @@ pub(crate) struct SnapshotVolumeReplica { } impl SnapshotVolumeReplica { - async fn builder(registry: &Registry, volume: &VolumeSpec, item: &ChildItem) -> Self { + async fn builder(registry: &Registry, volume: &VolumeSpec, items: &[ChildItem]) -> Self { let allocated_bytes = AddVolumeReplica::allocated_bytes(registry, volume).await; Self { @@ -649,7 +649,7 @@ impl SnapshotVolumeReplica { snap_repl: true, ag_restricted_nodes: None, }, - PoolItemLister::list_for_snaps(registry, item).await, + PoolItemLister::list_for_snaps(registry, items).await, ), } } @@ -670,8 +670,7 @@ impl SnapshotVolumeReplica { pub(crate) async fn builder_with_defaults( registry: &Registry, volume: &VolumeSpec, - // todo: only 1 replica snapshot supported atm - items: &ChildItem, + items: &[ChildItem], ) -> Self { Self::builder(registry, volume, items) .await diff --git a/control-plane/agents/src/bin/core/volume/operations.rs b/control-plane/agents/src/bin/core/volume/operations.rs index 331ff9e84..3fd8a763b 100644 --- a/control-plane/agents/src/bin/core/volume/operations.rs +++ b/control-plane/agents/src/bin/core/volume/operations.rs @@ -811,14 +811,14 @@ impl ResourceLifecycleExt> for OperationGuardArc Result { request_src.pre_flight_check()?; - let request = request_src.source(); - let specs = registry.specs(); let mut volume = specs .get_or_create_volume(request_src)? .operation_guard_wait() .await?; - let volume_clone = volume.start_create_update(registry, request).await?; + let volume_clone = volume + .start_create_update(registry, request_src.source()) + .await?; // If the volume is a part of the ag, create or update accordingly. registry.specs().get_or_create_affinity_group(&volume_clone); diff --git a/control-plane/agents/src/bin/core/volume/service.rs b/control-plane/agents/src/bin/core/volume/service.rs index 5f6887a12..05b038030 100644 --- a/control-plane/agents/src/bin/core/volume/service.rs +++ b/control-plane/agents/src/bin/core/volume/service.rs @@ -291,7 +291,6 @@ impl Service { } filter => return Err(SvcError::InvalidFilter { filter }), }; - Ok(Volumes { entries: filtered_volumes, next_token: match last_result { @@ -403,7 +402,6 @@ impl Service { volume.set_replica(&self.registry, request).await?; self.registry.volume(&request.uuid).await } - /// Create a volume snapshot. #[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.source_id, snapshot.source_uuid = %request.source_id, snapshot.uuid = %request.snap_id))] async fn create_snapshot( @@ -426,6 +424,16 @@ impl Service { } Err(error) => Err(error), }?; + + if let Some(max_snapshots) = volume.as_ref().max_snapshots { + if volume.as_ref().metadata.num_snapshots() as u32 >= max_snapshots { + return Err(SvcError::SnapshotMaxLimit { + max_snapshots, + volume_id: volume.as_ref().uuid.to_string(), + }); + } + } + let snapshot = volume .create_snap( &self.registry, diff --git a/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs b/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs index 5acc9e108..d5165b140 100644 --- a/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs +++ b/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs @@ -25,11 +25,13 @@ use stor_port::{ }, }; +use std::collections::HashSet; + /// A request type for creating snapshot of a volume, which essentially /// means a snapshot of all(or selected) healthy replicas associated with that volume. pub(super) struct PrepareVolumeSnapshot { pub(super) parameters: SnapshotParameters, - pub(super) replica_snapshot: (Replica, ReplicaSnapshot), + pub(super) replica_snapshot: Vec<(Replica, ReplicaSnapshot)>, pub(super) completer: VolumeSnapshotCompleter, } @@ -95,34 +97,42 @@ impl SpecOperationsHelper for VolumeSnapshot { pub(crate) async fn snapshoteable_replica( volume: &VolumeSpec, registry: &Registry, -) -> Result { - if volume.num_replicas != 1 { - return Err(SvcError::NReplSnapshotNotAllowed {}); +) -> Result, SvcError> { + let children = super::scheduling::snapshoteable_replica(volume, registry).await?; + + if children.candidates().len() != volume.num_replicas as usize { + return Err(SvcError::InsufficientHealthyReplicas { + id: volume.uuid_str(), + }); } - let children = super::scheduling::snapshoteable_replica(volume, registry).await?; + //todo: Remove this check once we support snapshotting with n-replicas. + if volume.num_replicas != 1 || children.candidates().len() != 1 { + return Err(SvcError::NReplSnapshotNotAllowed {}); + } volume.trace(&format!("Snapshoteable replicas for volume: {children:?}")); - let item = match children.candidates().as_slice() { - [item] => Ok(item), - [] => Err(SvcError::NoHealthyReplicas { + if children.candidates().is_empty() { + return Err(SvcError::NoHealthyReplicas { id: volume.uuid_str(), - }), - _ => Err(SvcError::NReplSnapshotNotAllowed {}), - }?; + }); + } + + //todo: check for snapshot chain for all the replicas. - let pools = SnapshotVolumeReplica::builder_with_defaults(registry, volume, item) - .await - .collect(); + let pools = + SnapshotVolumeReplica::builder_with_defaults(registry, volume, children.candidates()) + .await + .collect(); + let pools: HashSet<_> = pools.iter().map(|item| item.pool.id.clone()).collect(); - match pools - .iter() - .any(|pool_item| pool_item.pool.id == item.pool().id) - { - true => Ok(item.clone()), - false => Err(SvcError::NotEnoughResources { - source: NotEnough::PoolFree {}, - }), + for item in children.candidates() { + if !pools.contains(&item.pool().id) { + return Err(SvcError::NotEnoughResources { + source: NotEnough::PoolFree {}, + }); + } } + Ok(children.candidates().clone()) } diff --git a/control-plane/agents/src/bin/core/volume/snapshot_operations.rs b/control-plane/agents/src/bin/core/volume/snapshot_operations.rs index 9515607de..736d39c3a 100644 --- a/control-plane/agents/src/bin/core/volume/snapshot_operations.rs +++ b/control-plane/agents/src/bin/core/volume/snapshot_operations.rs @@ -38,7 +38,10 @@ use stor_port::{ }; use chrono::{DateTime, Utc}; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + time::SystemTime, +}; #[async_trait::async_trait] impl ResourceSnapshotting for OperationGuardArc { @@ -66,7 +69,6 @@ impl ResourceSnapshotting for OperationGuardArc { }, ) .await; - self.complete_update(registry, snap_result, spec_clone) .await } @@ -155,13 +157,13 @@ impl ResourceLifecycleWithLifetime for OperationGuardArc { let volume = &request.volume; let request = &request.request; - let replica = snapshoteable_replica(volume.as_ref(), registry).await?; + let replicas = snapshoteable_replica(volume.as_ref(), registry).await?; let target_node = if let Some(target) = volume.as_ref().target() { - registry.node_wrapper(target.node()).await + let node = registry.node_wrapper(target.node()).await?; + Some(node) } else { - registry.node_wrapper(&replica.state().node).await - }?; - + None + }; let specs = registry.specs(); let mut snapshot = specs .get_or_create_snapshot(request) @@ -179,13 +181,17 @@ impl ResourceLifecycleWithLifetime for OperationGuardArc { // Try to prune 1 stale transaction, if present.. snapshot.prune(registry, Some(1)).await.ok(); - let prepare_snapshot = snapshot.snapshot_params(&replica)?; + let prepare_snapshot = snapshot.snapshot_params(&replicas)?; snapshot .start_create_update( registry, &VolumeSnapshotCreateInfo::new( prepare_snapshot.parameters.txn_id(), - prepare_snapshot.replica_snapshot.1.clone(), + prepare_snapshot + .replica_snapshot + .iter() + .map(|(_, snapshot)| snapshot.clone()) + .collect(), &prepare_snapshot.completer, ), ) @@ -285,31 +291,39 @@ impl ResourcePruning for OperationGuardArc { } impl OperationGuardArc { - fn snapshot_params(&self, replica: &ChildItem) -> Result { + fn snapshot_params( + &self, + replicas: &Vec, + ) -> Result { let Some(parameters) = self.as_ref().prepare() else { return Err(SvcError::AlreadyExists { id: self.uuid().to_string(), kind: ResourceKind::VolumeSnapshot, }); }; + let mut replica_snapshots = Vec::with_capacity(replicas.len()); let volume = self.as_ref().spec().source_id(); let generic_params = parameters.params().clone(); - let snapshot_source = ReplicaSnapshotSource::new( - replica.spec().uid().clone(), - replica.state().pool_id.clone(), - replica.state().pool_uuid.clone().unwrap_or_default(), - ); - let replica_snapshot = ReplicaSnapshot::new_vol( - ReplicaSnapshotSpec::new(&snapshot_source, SnapshotId::new()), - SnapshotParameters::new(volume, generic_params), - replica.state().size, - 0, - replica.spec().size, - ); - let replica = replica.state().clone(); + for replica in replicas { + let snapshot_source = ReplicaSnapshotSource::new( + replica.spec().uid().clone(), + replica.state().pool_id.clone(), + replica.state().pool_uuid.clone().unwrap_or_default(), + ); + let replica_snapshot = ReplicaSnapshot::new_vol( + ReplicaSnapshotSpec::new(&snapshot_source, SnapshotId::new()), + SnapshotParameters::new(volume, generic_params.clone()), + replica.state().size, + 0, + replica.spec().size, + ); + let replica = replica.state().clone(); + replica_snapshots.push((replica, replica_snapshot)); + } + Ok(PrepareVolumeSnapshot { parameters, - replica_snapshot: (replica, replica_snapshot), + replica_snapshot: replica_snapshots, completer: VolumeSnapshotCompleter::default(), }) } @@ -318,13 +332,14 @@ impl OperationGuardArc { volume: &OperationGuardArc, prep_params: &PrepareVolumeSnapshot, registry: &Registry, - target_node: N, + target_node: Option, ) -> Result { - if let Some(target) = volume.as_ref().target() { - self.snapshot_nexus(prep_params, target, registry, target_node) + let target = volume.as_ref().target(); + if target.is_some() && target_node.is_some() { + self.snapshot_nexus::(prep_params, target.unwrap(), registry, target_node.unwrap()) .await } else { - self.snapshot_replica(prep_params, target_node).await + self.snapshot_replica::(prep_params, registry).await } } @@ -335,88 +350,106 @@ impl OperationGuardArc { registry: &Registry, target_node: N, ) -> Result { - let mut replica_snap = prep_params.replica_snapshot.1.clone(); - let replica = &prep_params.replica_snapshot.0; let generic_params = prep_params.parameters.params(); - - let replica_id = replica_snap.spec().source_id().replica_id(); + let nexus_snap_desc = prep_params + .replica_snapshot + .iter() + .map(|(_, snapshot)| { + CreateNexusSnapReplDescr::new( + snapshot.spec().source_id().replica_id(), + snapshot.spec().uuid().clone(), + ) + }) + .collect::>(); let response = target_node .create_nexus_snapshot(&CreateNexusSnapshot::new( SnapshotParameters::new(target.nexus(), generic_params.clone()), - vec![CreateNexusSnapReplDescr::new( - replica_id, - replica_snap.spec().uuid().clone(), - )], + nexus_snap_desc, )) .await?; - if response.skipped.contains(replica_id) || !response.skipped.is_empty() { + if !response.skipped.is_empty() { return Err(SvcError::ReplicaSnapSkipped { - replica: replica_snap.spec().uuid().to_string(), + replica: response + .skipped + .iter() + .map(|r| r.to_string()) + .collect::>() + .join(", "), }); } - - let snapped = match response.replicas_status.as_slice() { - [snapped] if &snapped.replica_uuid == replica_snap.spec().source_id().replica_id() => { - Ok(snapped) - } - _ => Err(SvcError::ReplicaSnapMiss { - replica: replica_snap.spec().uuid().to_string(), - }), - }?; - - if let Some(error) = snapped.error { + let failed_replicas = response + .replicas_status + .iter() + .filter(|&snap| snap.error.is_some()) + .collect::>(); + if !failed_replicas.is_empty() { return Err(SvcError::ReplicaSnapError { - replica: replica_snap.spec().uuid().to_string(), - error, + failed_replicas: failed_replicas + .iter() + .map(|&snap| (snap.replica_uuid.to_string(), snap.error.unwrap())) + .collect::>(), }); } - let timestamp = DateTime::::from(response.snap_time); // What if snapshot succeeds but we can't fetch the replica snapshot, should we carry // on as following, or should we bail out? - let node = registry.node_wrapper(&replica.node).await?; - - let snapshot = NodeWrapper::fetch_update_snapshot_state( - &node, - ReplicaSnapshotInfo::new( - replica_snap.spec().source_id().replica_id(), - replica_snap.spec().uuid().clone(), - ), - ) - .await?; - - replica_snap.complete_vol( - snapshot.timestamp().into(), - snapshot.replica_size(), - snapshot.allocated_size() + snapshot.predecessor_alloc_size(), - ); - Ok(VolumeSnapshotCreateResult::new_ok(replica_snap, timestamp)) + let mut replica_snapshots = prep_params.replica_snapshot.clone(); + for (replica, replica_snap) in replica_snapshots.iter_mut() { + let node = registry.node_wrapper(&replica.node).await?; + let snapshot = NodeWrapper::fetch_update_snapshot_state( + &node, + ReplicaSnapshotInfo::new( + replica_snap.spec().source_id().replica_id(), + replica_snap.spec().uuid().clone(), + ), + ) + .await?; + + replica_snap.complete_vol( + snapshot.timestamp().into(), + snapshot.replica_size(), + snapshot.allocated_size() + snapshot.predecessor_alloc_size(), + ); + } + let snapshots = replica_snapshots + .iter() + .map(|(_, replica_snapshot)| replica_snapshot.clone()) + .collect::>(); + Ok(VolumeSnapshotCreateResult::new_ok(snapshots, timestamp)) } + async fn snapshot_replica( &self, prep_params: &PrepareVolumeSnapshot, - target_node: N, + registry: &Registry, ) -> Result { - let mut replica_snap = prep_params.replica_snapshot.clone(); let volume_params = prep_params.parameters.params().clone(); - - let replica_params = volume_params.with_uuid(replica_snap.1.spec().uuid()); - let response = target_node - .create_repl_snapshot(&CreateReplicaSnapshot::new(SnapshotParameters::new( - replica_snap.1.spec().source_id().replica_id(), - replica_params, - ))) - .await?; - let timestamp = response.timestamp(); - replica_snap.1.complete_vol( - timestamp.into(), - response.replica_size(), - response.allocated_size() + response.predecessor_alloc_size(), - ); + let mut timestamp = SystemTime::now(); + + let mut replica_snapshots = prep_params.replica_snapshot.clone(); + for (replica, replica_snap) in replica_snapshots.iter_mut() { + let replica_params = volume_params.clone().with_uuid(replica_snap.spec().uuid()); + let target_node = registry.node_wrapper(&replica.node).await?; + let response = target_node + .create_repl_snapshot(&CreateReplicaSnapshot::new(SnapshotParameters::new( + replica_snap.spec().source_id().replica_id(), + replica_params, + ))) + .await?; + timestamp = response.timestamp(); + replica_snap.complete_vol( + timestamp.into(), + response.replica_size(), + response.allocated_size() + response.predecessor_alloc_size(), + ); + } Ok(VolumeSnapshotCreateResult::new_ok( - replica_snap.1, + replica_snapshots + .iter() + .map(|(_, replica_snapshot)| replica_snapshot.clone()) + .collect::>(), timestamp.into(), )) } diff --git a/control-plane/agents/src/bin/core/volume/specs.rs b/control-plane/agents/src/bin/core/volume/specs.rs index 26401ea80..776d18ae3 100644 --- a/control-plane/agents/src/bin/core/volume/specs.rs +++ b/control-plane/agents/src/bin/core/volume/specs.rs @@ -1059,7 +1059,6 @@ impl SpecOperationsHelper for VolumeSpec { } } } - VolumeOperation::RemoveUnusedReplica(uuid) => { let last_replica = !registry .specs() @@ -1111,10 +1110,6 @@ impl SpecOperationsHelper for VolumeSpec { } VolumeOperation::Create => unreachable!(), VolumeOperation::Destroy => unreachable!(), - - VolumeOperation::CreateSnapshot(_) if self.num_replicas != 1 => { - Err(SvcError::NReplSnapshotNotAllowed {}) - } VolumeOperation::CreateSnapshot(_) => Ok(()), VolumeOperation::DestroySnapshot(_) => Ok(()), VolumeOperation::Resize(_) => Ok(()), diff --git a/control-plane/agents/src/common/errors.rs b/control-plane/agents/src/common/errors.rs index 309175b48..72fec01c0 100644 --- a/control-plane/agents/src/common/errors.rs +++ b/control-plane/agents/src/common/errors.rs @@ -339,10 +339,9 @@ pub enum SvcError { ReplicaSnapSkipped { replica: String }, #[snafu(display("Replica's {} snapshot was unexpectedly not taken", replica))] ReplicaSnapMiss { replica: String }, - #[snafu(display("Replica's {} snapshot failed with error {}", replica, error))] + #[snafu(display("Failed to snapshot replicas {:?}", failed_replicas))] ReplicaSnapError { - replica: String, - error: nix::errno::Errno, + failed_replicas: Vec<(String, nix::errno::Errno)>, }, #[snafu(display("The service is busy, cannot process request"))] ServiceBusy {}, @@ -376,6 +375,18 @@ pub enum SvcError { cluster_capacity_limit: u64, excess: u64, }, + #[snafu(display( + "The number of healthy replicas does not match the expected replica count of volume '{id}'" + ))] + InsufficientHealthyReplicas { id: String }, + #[snafu(display( + "Reached maximum snapshots limit {max_snapshots} for volume {volume_id}, delete unused snapshots to continue"))] + SnapshotMaxLimit { + max_snapshots: u32, + volume_id: String, + }, + #[snafu(display("Invalid property name '{property_name}' for the volume '{id}'"))] + InvalidSetProperty { property_name: String, id: String }, } impl SvcError { @@ -1018,6 +1029,24 @@ impl From for ReplyError { source, extra, }, + SvcError::InsufficientHealthyReplicas { .. } => ReplyError { + kind: ReplyErrorKind::FailedPrecondition, + resource: ResourceKind::VolumeSnapshot, + source, + extra, + }, + SvcError::SnapshotMaxLimit { .. } => ReplyError { + kind: ReplyErrorKind::OutOfRange, + resource: ResourceKind::Volume, + source, + extra, + }, + SvcError::InvalidSetProperty { .. } => ReplyError { + kind: ReplyErrorKind::InvalidArgument, + resource: ResourceKind::Volume, + source, + extra, + }, } } } diff --git a/control-plane/csi-driver/src/bin/controller/client.rs b/control-plane/csi-driver/src/bin/controller/client.rs index fe31c3a78..ab69e56db 100644 --- a/control-plane/csi-driver/src/bin/controller/client.rs +++ b/control-plane/csi-driver/src/bin/controller/client.rs @@ -214,6 +214,7 @@ impl IoEngineApiClient { /// Create a volume of target size and provision storage resources for it. /// This operation is not idempotent, so the caller is responsible for taking /// all actions with regards to idempotency. + #[allow(clippy::too_many_arguments)] #[instrument(fields(volume.uuid = %volume_id), skip(self, volume_id))] pub(crate) async fn create_volume( &self, @@ -223,6 +224,7 @@ impl IoEngineApiClient { volume_topology: CreateVolumeTopology, thin: bool, affinity_group: Option, + max_snapshots: Option, ) -> Result { let topology = Topology::new_all(volume_topology.node_topology, volume_topology.pool_topology); @@ -235,6 +237,7 @@ impl IoEngineApiClient { policy: VolumePolicy::new_all(true), labels: None, affinity_group, + max_snapshots, }; let result = self @@ -259,6 +262,7 @@ impl IoEngineApiClient { volume_topology: CreateVolumeTopology, thin: bool, affinity_group: Option, + max_snapshots: Option, ) -> Result { let topology = Topology::new_all(volume_topology.node_topology, volume_topology.pool_topology); @@ -271,8 +275,8 @@ impl IoEngineApiClient { policy: VolumePolicy::new_all(true), labels: None, affinity_group, + max_snapshots, }; - let result = self .rest_client .volumes_api() diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index 7964ef8dd..17d42f0da 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -315,6 +315,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { ); let sts_affinity_group_name = context.sts_affinity_group(); + let max_snapshots = context.max_snapshots(); let volume = match volume_content_source { Some(snapshot_uuid) => { @@ -327,6 +328,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { volume_topology, thin, sts_affinity_group_name.clone().map(AffinityGroup::new), + max_snapshots, ) .await? } @@ -339,6 +341,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { volume_topology, thin, sts_affinity_group_name.clone().map(AffinityGroup::new), + max_snapshots, ) .await? } diff --git a/control-plane/csi-driver/src/context.rs b/control-plane/csi-driver/src/context.rs index 14c666cf6..03af5ea38 100644 --- a/control-plane/csi-driver/src/context.rs +++ b/control-plane/csi-driver/src/context.rs @@ -57,6 +57,8 @@ pub enum Parameters { PoolTopologyAffinity, #[strum(serialize = "poolTopologySpread")] PoolTopologySpread, + #[strum(serialize = "maxSnapshots")] + MaxSnapshots, } impl Parameters { fn parse_human_time( @@ -161,6 +163,10 @@ impl Parameters { ) -> Result>, serde_json::Error> { Self::parse_map(value) } + /// Parse the value for `Self::MaxSnapshots`. + pub fn max_snapshots(value: Option<&String>) -> Result, ParseIntError> { + Self::parse_u32(value) + } } /// Volume publish parameters. @@ -283,6 +289,7 @@ pub struct CreateParams { replica_count: u8, sts_affinity_group: Option, clone_fs_id_as_volume_id: Option, + max_snapshots: Option, } impl CreateParams { /// Get the `Parameters::PublishParams` value. @@ -306,6 +313,10 @@ impl CreateParams { pub fn clone_fs_id_as_volume_id(&self) -> &Option { &self.clone_fs_id_as_volume_id } + /// Get the `Parameters::MaxSnapshots` value. + pub fn max_snapshots(&self) -> Option { + self.max_snapshots + } } impl TryFrom<&HashMap> for CreateParams { type Error = tonic::Status; @@ -353,12 +364,18 @@ impl TryFrom<&HashMap> for CreateParams { ) .map_err(|_| tonic::Status::invalid_argument("Invalid clone_fs_id_as_volume_id"))?; + let max_snapshots = Parameters::max_snapshots(args.get(Parameters::MaxSnapshots.as_ref())) + .map_err(|_| { + tonic::Status::invalid_argument("Invalid `maxSnapshots` value, expected an u32") + })?; + Ok(Self { publish_params, share_protocol, replica_count, sts_affinity_group: sts_affinity_group_name, clone_fs_id_as_volume_id, + max_snapshots, }) } } diff --git a/control-plane/grpc/proto/v1/volume/volume.proto b/control-plane/grpc/proto/v1/volume/volume.proto index 7f6e36535..c27f58f0d 100644 --- a/control-plane/grpc/proto/v1/volume/volume.proto +++ b/control-plane/grpc/proto/v1/volume/volume.proto @@ -61,6 +61,8 @@ message VolumeSpec { optional VolumeContentSource content_source = 11; // Number of snapshots taken on this volume. uint32 num_snapshots = 12; + // Max snapshots limit per volume. + optional uint32 max_snapshots = 13; // Volume Content Source i.e the snapshot or a volume. message VolumeContentSource { @@ -265,6 +267,8 @@ message CreateVolumeRequest { optional AffinityGroup affinity_group = 9; // maximum total volume size optional uint64 cluster_capacity_limit = 10; + // Max snapshots limit per volume. + optional uint32 max_snapshots = 11; } // Publish a volume on a node diff --git a/control-plane/grpc/src/operations/volume/client.rs b/control-plane/grpc/src/operations/volume/client.rs index 74ab10c15..6d87d2740 100644 --- a/control-plane/grpc/src/operations/volume/client.rs +++ b/control-plane/grpc/src/operations/volume/client.rs @@ -22,7 +22,6 @@ use crate::{ volume_grpc_client::VolumeGrpcClient, GetSnapshotsRequest, GetVolumesRequest, ProbeRequest, }, }; - use stor_port::{ transport_api::{v0::Volumes, ReplyError, ResourceKind, TimeoutOptions}, types::v0::transport::{Filter, MessageIdVs, Volume}, diff --git a/control-plane/grpc/src/operations/volume/traits.rs b/control-plane/grpc/src/operations/volume/traits.rs index dd4c095aa..179d78951 100644 --- a/control-plane/grpc/src/operations/volume/traits.rs +++ b/control-plane/grpc/src/operations/volume/traits.rs @@ -161,6 +161,7 @@ impl From for volume::VolumeDefinition { affinity_group: volume_spec.affinity_group.into_opt(), content_source: volume_spec.content_source.into_opt(), num_snapshots: volume_spec.metadata.num_snapshots() as u32, + max_snapshots: volume_spec.max_snapshots, }), metadata: Some(volume::Metadata { spec_status: spec_status as i32, @@ -345,6 +346,7 @@ impl TryFrom for VolumeSpec { metadata: VolumeMetadata::new(volume_meta.as_thin), content_source: volume_spec.content_source.try_into_opt()?, num_snapshots: volume_spec.num_snapshots, + max_snapshots: volume_spec.max_snapshots, }; Ok(volume_spec) } @@ -892,6 +894,8 @@ pub trait CreateVolumeInfo: Send + Sync + std::fmt::Debug { fn affinity_group(&self) -> Option; /// Capacity Limit. fn cluster_capacity_limit(&self) -> Option; + /// Max snapshot limit per volume. + fn max_snapshots(&self) -> Option; } impl CreateVolumeInfo for CreateVolume { @@ -930,6 +934,10 @@ impl CreateVolumeInfo for CreateVolume { fn cluster_capacity_limit(&self) -> Option { self.cluster_capacity_limit } + + fn max_snapshots(&self) -> Option { + self.max_snapshots + } } /// Intermediate structure that validates the conversion to CreateVolumeRequest type. @@ -982,6 +990,10 @@ impl CreateVolumeInfo for ValidatedCreateVolumeRequest { fn cluster_capacity_limit(&self) -> Option { self.inner.cluster_capacity_limit } + + fn max_snapshots(&self) -> Option { + self.inner.max_snapshots + } } impl ValidateRequestTypes for CreateVolumeRequest { @@ -1019,6 +1031,7 @@ impl From<&dyn CreateVolumeInfo> for CreateVolume { thin: data.thin(), affinity_group: data.affinity_group(), cluster_capacity_limit: data.cluster_capacity_limit(), + max_snapshots: data.max_snapshots(), } } } @@ -1037,6 +1050,7 @@ impl From<&dyn CreateVolumeInfo> for CreateVolumeRequest { thin: data.thin(), affinity_group: data.affinity_group().map(|ag| ag.into()), cluster_capacity_limit: data.cluster_capacity_limit(), + max_snapshots: data.max_snapshots(), } } } diff --git a/control-plane/grpc/src/operations/volume/traits_snapshots.rs b/control-plane/grpc/src/operations/volume/traits_snapshots.rs index 3adfbd471..a7ec13d05 100644 --- a/control-plane/grpc/src/operations/volume/traits_snapshots.rs +++ b/control-plane/grpc/src/operations/volume/traits_snapshots.rs @@ -175,6 +175,14 @@ impl VolumeSnapshotMeta { pub fn num_restores(&self) -> u32 { self.num_restores } + /// The number of replica snapshots for the current transaction. + pub fn num_snapshot_replicas(&self) -> u32 { + if let Some(replica_snap_list) = self.transactions().get(self.txn_id()) { + replica_snap_list.len() as u32 + } else { + 0 + } + } } /// Volume replica snapshot information. diff --git a/control-plane/plugin/src/resources/error.rs b/control-plane/plugin/src/resources/error.rs index c22e26b62..225540a40 100644 --- a/control-plane/plugin/src/resources/error.rs +++ b/control-plane/plugin/src/resources/error.rs @@ -74,6 +74,12 @@ pub enum Error { id: String, source: openapi::tower::client::Error, }, + /// Error when set volume property request fails. + #[snafu(display("Failed to set volume {id} property, Error {source}"))] + ScaleVolumePropertyError { + id: String, + source: openapi::tower::client::Error, + }, /// Error when list snapshots request fails. #[snafu(display("Failed to list volume snapshots. Error {source}"))] ListSnapshotsError { diff --git a/control-plane/plugin/src/resources/snapshot.rs b/control-plane/plugin/src/resources/snapshot.rs index 5ba52026c..cb93dad1e 100644 --- a/control-plane/plugin/src/resources/snapshot.rs +++ b/control-plane/plugin/src/resources/snapshot.rs @@ -57,7 +57,8 @@ impl CreateRow for openapi::models::VolumeSnapshot { ::utils::bytes::into_human(state.allocated_size), ::utils::bytes::into_human(meta.total_allocated_size), state.source_volume, - self.definition.metadata.num_restores + self.definition.metadata.num_restores, + self.definition.metadata.num_snapshot_replicas ] } } diff --git a/control-plane/plugin/src/resources/tests.rs b/control-plane/plugin/src/resources/tests.rs index ca8201d96..9bd29433c 100644 --- a/control-plane/plugin/src/resources/tests.rs +++ b/control-plane/plugin/src/resources/tests.rs @@ -36,6 +36,7 @@ async fn setup() { labels: None, thin: false, affinity_group: None, + max_snapshots: None, }, ) .await @@ -108,6 +109,7 @@ async fn get_volumes_paginated() { topology: None, labels: None, affinity_group: None, + max_snapshots: None, }, ) .await diff --git a/control-plane/plugin/src/resources/utils.rs b/control-plane/plugin/src/resources/utils.rs index 263377696..fe505bd7e 100644 --- a/control-plane/plugin/src/resources/utils.rs +++ b/control-plane/plugin/src/resources/utils.rs @@ -22,7 +22,7 @@ lazy_static! { "THIN-PROVISIONED", "ALLOCATED", "SNAPSHOTS", - "SOURCE" + "SOURCE", ]; pub static ref SNAPSHOT_HEADERS: Row = row![ "ID", @@ -31,7 +31,8 @@ lazy_static! { "ALLOCATED-SIZE", "TOTAL-ALLOCATED-SIZE", "SOURCE-VOL", - "RESTORES" + "RESTORES", + "SNAPSHOT_REPLICAS" ]; pub static ref POOLS_HEADERS: Row = row![ "ID", diff --git a/control-plane/rest/openapi-specs/v0_api_spec.yaml b/control-plane/rest/openapi-specs/v0_api_spec.yaml index 9fb0cb5ab..d30423395 100644 --- a/control-plane/rest/openapi-specs/v0_api_spec.yaml +++ b/control-plane/rest/openapi-specs/v0_api_spec.yaml @@ -2521,6 +2521,7 @@ components: thin: false topology: null affinity_group: null + max_snapshots: 10 description: Create Volume Body type: object properties: @@ -2551,6 +2552,11 @@ components: description: Affinity Group related information. allOf: - $ref: '#/components/schemas/AffinityGroup' + max_snapshots: + description: Max Snapshots limit per volume. + type: integer + format: int32 + minimum: 0 required: - policy - replicas @@ -3356,6 +3362,7 @@ components: target_node: io-engine-1 uuid: 514ed1c8-7174-49ac-b9cd-ad44ef670a67 thin: false + max_snapshots: 10 description: User specification of a volume. type: object properties: @@ -3430,6 +3437,11 @@ components: type: integer format: int32 minimum: 0 + max_snapshots: + description: Max snapshots to limit per volume. + type: integer + format: int32 + minimum: 0 required: - num_paths - num_replicas @@ -3754,6 +3766,11 @@ components: type: integer format: int32 minimum: 0 + num_snapshot_replicas: + description: Number of snapshot replicas for a volumesnapshot. + type: integer + format: int32 + minimum: 0 required: - status - size @@ -3762,6 +3779,7 @@ components: - txn_id - transactions - num_restores + - num_snapshot_replicas VolumeSnapshotSpec: description: |- Volume Snapshot Spec information. diff --git a/control-plane/rest/service/src/v0/snapshots.rs b/control-plane/rest/service/src/v0/snapshots.rs index ea619f777..5bdc8a1fe 100644 --- a/control-plane/rest/service/src/v0/snapshots.rs +++ b/control-plane/rest/service/src/v0/snapshots.rs @@ -185,6 +185,7 @@ fn to_models_volume_snapshot(snap: &VolumeSnapshot) -> models::VolumeSnapshot { }) .collect::>(), snap.meta().num_restores(), + snap.meta().num_snapshot_replicas(), ), models::VolumeSnapshotSpec::new_all(snap.spec().snap_id(), snap.spec().source_id()), ), diff --git a/control-plane/rest/src/versions/v0.rs b/control-plane/rest/src/versions/v0.rs index 82e73a3a5..0dce7ecc2 100644 --- a/control-plane/rest/src/versions/v0.rs +++ b/control-plane/rest/src/versions/v0.rs @@ -192,6 +192,8 @@ pub struct CreateVolumeBody { pub thin: bool, /// Affinity Group related information. pub affinity_group: Option, + /// Max snapshot limit per volume. + pub max_snapshots: Option, } impl From for CreateVolumeBody { fn from(src: models::CreateVolumeBody) -> Self { @@ -203,6 +205,7 @@ impl From for CreateVolumeBody { labels: src.labels, thin: src.thin, affinity_group: src.affinity_group.map(|ag| ag.into()), + max_snapshots: src.max_snapshots, } } } @@ -216,6 +219,7 @@ impl From for CreateVolumeBody { labels: create.labels, thin: create.thin, affinity_group: create.affinity_group, + max_snapshots: create.max_snapshots, } } } @@ -232,6 +236,7 @@ impl CreateVolumeBody { thin: self.thin, affinity_group: self.affinity_group.clone(), cluster_capacity_limit: None, + max_snapshots: self.max_snapshots, } } /// Convert into rpc request type. diff --git a/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs b/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs index 4283bd9d3..160d2cdc3 100644 --- a/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs +++ b/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs @@ -248,8 +248,7 @@ pub type VolumeSnapshotCompleter = Arc, #[serde(skip, default)] complete: VolumeSnapshotCompleter, } @@ -257,12 +256,12 @@ impl VolumeSnapshotCreateInfo { /// Get a new `Self` from the given parameters. pub fn new( txn_id: impl Into, - replica: ReplicaSnapshot, + replicas: Vec, complete: &VolumeSnapshotCompleter, ) -> Self { Self { txn_id: txn_id.into(), - replica, + replicas: replicas.to_vec(), complete: complete.clone(), } } @@ -272,7 +271,7 @@ impl PartialEq for VolumeSnapshotCreateInfo { fn eq(&self, other: &Self) -> bool { self.txn_id .eq(&other.txn_id) - .then(|| self.replica.eq(&other.replica)) + .then(|| self.replicas.eq(&other.replicas)) .unwrap_or_default() } } @@ -324,16 +323,15 @@ impl DestroyRestoreInfo { #[derive(Debug, Clone, PartialEq)] pub struct VolumeSnapshotCreateResult { /// The resulting replicas including their success status. - /// todo: add support for multiple replica snapshots. replicas: Vec, /// The actual timestamp returned by the dataplane. timestamp: DateTime, } impl VolumeSnapshotCreateResult { /// Create a new `Self` based on the given parameters. - pub fn new_ok(replica: ReplicaSnapshot, timestamp: DateTime) -> Self { + pub fn new_ok(replica: Vec, timestamp: DateTime) -> Self { Self { - replicas: vec![replica], + replicas: replica, timestamp, } } @@ -426,10 +424,12 @@ impl SpecTransaction for VolumeSnapshot { .insert(info.txn_id, result.replicas.clone()); } } else { - info.replica.set_status_deleting(); + info.replicas + .iter_mut() + .for_each(|r| r.set_status_deleting()); self.metadata .transactions - .insert(info.txn_id, vec![info.replica]); + .insert(info.txn_id, info.replicas); } } VolumeSnapshotOperation::Destroy => {} @@ -444,7 +444,7 @@ impl SpecTransaction for VolumeSnapshot { self.metadata.txn_id = info.txn_id.clone(); self.metadata .transactions - .insert(info.txn_id.clone(), vec![info.replica.clone()]); + .insert(info.txn_id.clone(), info.replicas.clone()); } self.metadata.operation = Some(VolumeSnapshotOperationState { operation, diff --git a/control-plane/stor-port/src/types/v0/store/volume.rs b/control-plane/stor-port/src/types/v0/store/volume.rs index 85866d8c1..b3834088e 100644 --- a/control-plane/stor-port/src/types/v0/store/volume.rs +++ b/control-plane/stor-port/src/types/v0/store/volume.rs @@ -18,6 +18,17 @@ use crate::{ use pstor::ApiVersion; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use strum_macros::{EnumCount as EnumCountMacro, EnumIter, EnumString}; + +/// Volume properties. +#[derive( + Serialize, Deserialize, EnumString, Debug, EnumCountMacro, EnumIter, PartialEq, Clone, Copy, +)] +pub enum VolumeAttr { + /// Max number of snapshots allowed per volume. + #[strum(serialize = "max_snapshots")] + MaxSnapshots, +} /// Key used by the store to uniquely identify a VolumeState structure. pub struct VolumeStateKey(VolumeId); @@ -205,6 +216,9 @@ pub struct VolumeSpec { /// Volume metadata information. #[serde(default, skip_serializing_if = "super::is_default")] pub metadata: VolumeMetadata, + /// Max snapshots limit per volume. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub max_snapshots: Option, } /// Volume Content Source i.e the snapshot or a volume. @@ -725,6 +739,7 @@ impl From<&CreateVolume> for VolumeSpec { target_config: None, publish_context: None, affinity_group: request.affinity_group.clone(), + max_snapshots: request.max_snapshots, ..Default::default() } } @@ -781,6 +796,7 @@ impl From for models::VolumeSpec { src.affinity_group.into_opt(), src.content_source.into_opt(), src.num_snapshots, + src.max_snapshots, ) } } diff --git a/control-plane/stor-port/src/types/v0/transport/volume.rs b/control-plane/stor-port/src/types/v0/transport/volume.rs index 9df52f64e..c7c7079d2 100644 --- a/control-plane/stor-port/src/types/v0/transport/volume.rs +++ b/control-plane/stor-port/src/types/v0/transport/volume.rs @@ -457,6 +457,8 @@ pub struct CreateVolume { pub affinity_group: Option, /// Maximum total system volume size. pub cluster_capacity_limit: Option, + /// Max Snapshots to limit per volume. + pub max_snapshots: Option, } /// Resize volume request.