From 6d90be79e5163e6890a3b83b4dd458806ae3d0cf Mon Sep 17 00:00:00 2001
From: Hrudaya <hrudayaranjan.sahoo@datacore.com>
Date: Mon, 27 Nov 2023 07:36:54 +0000
Subject: [PATCH] feat(snapshot): support multi-replica snapshot

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>

feat(snapshot): set plugin command to change volume property, show
max_snapshots in volume query response

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>

fix: rework review comments

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
---
 .../src/bin/core/controller/registry.rs       |   1 -
 .../core/controller/resources/operations.rs   |  13 ++
 .../controller/scheduling/resources/mod.rs    |  18 +-
 .../bin/core/controller/scheduling/volume.rs  |   7 +-
 .../agents/src/bin/core/volume/operations.rs  |  47 ++++-
 .../agents/src/bin/core/volume/service.rs     |  47 ++++-
 .../src/bin/core/volume/snapshot_helpers.rs   |  53 +++--
 .../bin/core/volume/snapshot_operations.rs    | 198 ++++++++++--------
 .../agents/src/bin/core/volume/specs.rs       |   6 +-
 control-plane/agents/src/common/errors.rs     |  31 +++
 .../csi-driver/src/bin/controller/client.rs   |   6 +-
 .../src/bin/controller/controller.rs          |   3 +
 control-plane/csi-driver/src/context.rs       |  17 ++
 .../grpc/proto/v1/volume/volume.proto         |  25 ++-
 .../grpc/src/operations/volume/client.rs      |  30 ++-
 .../grpc/src/operations/volume/server.rs      |  23 +-
 .../grpc/src/operations/volume/traits.rs      |  99 ++++++++-
 .../src/operations/volume/traits_snapshots.rs |   8 +
 control-plane/plugin/src/lib.rs               |  19 +-
 control-plane/plugin/src/operations.rs        |  17 +-
 control-plane/plugin/src/resources/error.rs   |   6 +
 control-plane/plugin/src/resources/mod.rs     |  14 ++
 .../plugin/src/resources/snapshot.rs          |   3 +-
 control-plane/plugin/src/resources/tests.rs   |   2 +
 control-plane/plugin/src/resources/utils.rs   |   5 +-
 control-plane/plugin/src/resources/volume.rs  |  37 +++-
 .../rest/openapi-specs/v0_api_spec.yaml       |  53 +++++
 .../rest/service/src/v0/snapshots.rs          |   1 +
 control-plane/rest/service/src/v0/volumes.rs  |  18 +-
 control-plane/rest/src/versions/v0.rs         |   5 +
 .../src/types/v0/store/snapshots/volume.rs    |  22 +-
 .../stor-port/src/types/v0/store/volume.rs    |  49 +++++
 .../stor-port/src/types/v0/transport/mod.rs   |   2 +
 .../src/types/v0/transport/volume.rs          |  23 ++
 34 files changed, 736 insertions(+), 172 deletions(-)

diff --git a/control-plane/agents/src/bin/core/controller/registry.rs b/control-plane/agents/src/bin/core/controller/registry.rs
index f775a6e1b..79fbcdfd1 100644
--- a/control-plane/agents/src/bin/core/controller/registry.rs
+++ b/control-plane/agents/src/bin/core/controller/registry.rs
@@ -280,7 +280,6 @@ impl Registry {
     pub(crate) fn create_volume_limit(&self) -> usize {
         self.create_volume_limit
     }
-
     /// Get a reference to the actual state of the nodes.
     pub(crate) fn nodes(&self) -> &NodesMapLocked {
         &self.nodes
diff --git a/control-plane/agents/src/bin/core/controller/resources/operations.rs b/control-plane/agents/src/bin/core/controller/resources/operations.rs
index 3999e8055..a5564e14e 100644
--- a/control-plane/agents/src/bin/core/controller/resources/operations.rs
+++ b/control-plane/agents/src/bin/core/controller/resources/operations.rs
@@ -181,6 +181,19 @@ pub(crate) trait ResourceReplicas {
     ) -> Result<Self::MoveResp, SvcError>;
 }
 
+/// Property modification as resource operation.
+#[async_trait::async_trait]
+pub(crate) trait ResourceProperties {
+    type Request: Sync + Send;
+
+    /// Set the property value.
+    async fn set_property(
+        &mut self,
+        registry: &Registry,
+        request: &Self::Request,
+    ) -> Result<(), SvcError>;
+}
+
 /// Resource Children/Offspring Operations.
 #[async_trait::async_trait]
 pub(crate) trait ResourceOffspring {
diff --git a/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs b/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs
index 93a676655..e7ed5f0f2 100644
--- a/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs
+++ b/control-plane/agents/src/bin/core/controller/scheduling/resources/mod.rs
@@ -93,14 +93,18 @@ impl PoolItemLister {
         pools
     }
     /// Get a list of pool items to create a snapshot on.
-    /// todo: support multi-replica snapshot.
-    pub(crate) async fn list_for_snaps(registry: &Registry, item: &ChildItem) -> Vec<PoolItem> {
+    pub(crate) async fn list_for_snaps(registry: &Registry, items: &[ChildItem]) -> Vec<PoolItem> {
         let nodes = Self::nodes(registry).await;
-
-        match nodes.iter().find(|n| n.id() == item.node()) {
-            Some(node) => vec![PoolItem::new(node.clone(), item.pool().clone(), None)],
-            None => vec![],
-        }
+        let pool_items = items
+            .iter()
+            .filter_map(|item| {
+                nodes
+                    .iter()
+                    .find(|node| node.id() == item.node())
+                    .map(|node| PoolItem::new(node.clone(), item.pool().clone(), None))
+            })
+            .collect();
+        pool_items
     }
     /// Get a list of replicas wrapped as ChildItem, for resize.
     pub(crate) async fn list_for_resize(registry: &Registry, spec: &VolumeSpec) -> Vec<ChildItem> {
diff --git a/control-plane/agents/src/bin/core/controller/scheduling/volume.rs b/control-plane/agents/src/bin/core/controller/scheduling/volume.rs
index a671effa1..b249c0bb2 100644
--- a/control-plane/agents/src/bin/core/controller/scheduling/volume.rs
+++ b/control-plane/agents/src/bin/core/controller/scheduling/volume.rs
@@ -636,7 +636,7 @@ pub(crate) struct SnapshotVolumeReplica {
 }
 
 impl SnapshotVolumeReplica {
-    async fn builder(registry: &Registry, volume: &VolumeSpec, item: &ChildItem) -> Self {
+    async fn builder(registry: &Registry, volume: &VolumeSpec, items: &[ChildItem]) -> Self {
         let allocated_bytes = AddVolumeReplica::allocated_bytes(registry, volume).await;
 
         Self {
@@ -649,7 +649,7 @@ impl SnapshotVolumeReplica {
                     snap_repl: true,
                     ag_restricted_nodes: None,
                 },
-                PoolItemLister::list_for_snaps(registry, item).await,
+                PoolItemLister::list_for_snaps(registry, items).await,
             ),
         }
     }
@@ -670,8 +670,7 @@ impl SnapshotVolumeReplica {
     pub(crate) async fn builder_with_defaults(
         registry: &Registry,
         volume: &VolumeSpec,
-        // todo: only 1 replica snapshot supported atm
-        items: &ChildItem,
+        items: &[ChildItem],
     ) -> Self {
         Self::builder(registry, volume, items)
             .await
diff --git a/control-plane/agents/src/bin/core/volume/operations.rs b/control-plane/agents/src/bin/core/volume/operations.rs
index 331ff9e84..4b0baca87 100644
--- a/control-plane/agents/src/bin/core/volume/operations.rs
+++ b/control-plane/agents/src/bin/core/volume/operations.rs
@@ -5,8 +5,8 @@ use crate::{
         resources::{
             operations::{
                 ResourceLifecycle, ResourceLifecycleExt, ResourceLifecycleWithLifetime,
-                ResourceOwnerUpdate, ResourcePublishing, ResourceReplicas, ResourceResize,
-                ResourceSharing, ResourceShutdownOperations,
+                ResourceOwnerUpdate, ResourceProperties, ResourcePublishing, ResourceReplicas,
+                ResourceResize, ResourceSharing, ResourceShutdownOperations,
             },
             operations_helper::{
                 GuardedOperationsHelper, OnCreateFail, OperationSequenceGuard, ResourceSpecsLocked,
@@ -26,18 +26,24 @@ use crate::{
     },
 };
 use agents::errors::SvcError;
+use std::str::FromStr;
+
+// use grpc::operations::volume::traits::CreateVolumeInfo;
 use stor_port::{
     transport_api::ErrorChain,
     types::v0::{
         store::{
             nexus_persistence::NexusInfoKey,
             replica::ReplicaSpec,
-            volume::{PublishOperation, RepublishOperation, VolumeOperation, VolumeSpec},
+            volume::{
+                PublishOperation, RepublishOperation, VolumeAttr, VolumeOperation, VolumeProperty,
+                VolumeSpec,
+            },
         },
         transport::{
             CreateVolume, DestroyNexus, DestroyReplica, DestroyShutdownTargets, DestroyVolume,
             Protocol, PublishVolume, Replica, ReplicaId, ReplicaOwners, RepublishVolume,
-            ResizeVolume, SetVolumeReplica, ShareNexus, ShareVolume, ShutdownNexus,
+            ResizeVolume, SetVolumeProp, SetVolumeReplica, ShareNexus, ShareVolume, ShutdownNexus,
             UnpublishVolume, UnshareNexus, UnshareVolume, Volume,
         },
     },
@@ -638,6 +644,34 @@ impl ResourceReplicas for OperationGuardArc<VolumeSpec> {
     }
 }
 
+#[async_trait::async_trait]
+impl ResourceProperties for OperationGuardArc<VolumeSpec> {
+    type Request = SetVolumeProp;
+
+    async fn set_property(
+        &mut self,
+        registry: &Registry,
+        request: &Self::Request,
+    ) -> Result<(), SvcError> {
+        let state = registry.volume_state(&request.uuid).await?;
+        let operation = match VolumeAttr::from_str(&request.prop_name) {
+            Ok(VolumeAttr::MaxSnapshots) => VolumeOperation::SetVolumeProperty(
+                VolumeProperty::new(VolumeAttr::MaxSnapshots, &request.prop_value),
+            ),
+            _ => {
+                return Err(SvcError::InvalidSetProperty {
+                    property_name: request.prop_name.clone(),
+                    id: state.uuid.to_string(),
+                });
+            }
+        };
+
+        let spec_clone = self.start_update(registry, &state, operation).await?;
+
+        self.complete_update(registry, Ok(()), spec_clone).await?;
+        Ok(())
+    }
+}
 #[async_trait::async_trait]
 impl ResourceShutdownOperations for OperationGuardArc<VolumeSpec> {
     type RemoveShutdownTargets = DestroyShutdownTargets;
@@ -811,14 +845,13 @@ impl ResourceLifecycleExt<CreateVolumeSource<'_>> for OperationGuardArc<VolumeSp
         request_src: &CreateVolumeSource,
     ) -> Result<Self::CreateOutput, SvcError> {
         request_src.pre_flight_check()?;
-        let request = request_src.source();
-
+        let request = request_src.source().clone();
         let specs = registry.specs();
         let mut volume = specs
             .get_or_create_volume(request_src)?
             .operation_guard_wait()
             .await?;
-        let volume_clone = volume.start_create_update(registry, request).await?;
+        let volume_clone = volume.start_create_update(registry, &request).await?;
 
         // If the volume is a part of the ag, create or update accordingly.
         registry.specs().get_or_create_affinity_group(&volume_clone);
diff --git a/control-plane/agents/src/bin/core/volume/service.rs b/control-plane/agents/src/bin/core/volume/service.rs
index 5f6887a12..748ffecfb 100644
--- a/control-plane/agents/src/bin/core/volume/service.rs
+++ b/control-plane/agents/src/bin/core/volume/service.rs
@@ -4,8 +4,8 @@ use crate::{
         resources::{
             operations::{
                 ResourceCloning, ResourceLifecycle, ResourceLifecycleWithLifetime,
-                ResourcePublishing, ResourceReplicas, ResourceResize, ResourceSharing,
-                ResourceShutdownOperations, ResourceSnapshotting,
+                ResourceProperties, ResourcePublishing, ResourceReplicas, ResourceResize,
+                ResourceSharing, ResourceShutdownOperations, ResourceSnapshotting,
             },
             operations_helper::{OperationSequenceGuard, ResourceSpecsLocked},
             OperationGuardArc,
@@ -21,9 +21,9 @@ use grpc::{
             CreateSnapshotVolumeInfo, CreateVolumeInfo, CreateVolumeSnapshot,
             CreateVolumeSnapshotInfo, DestroyShutdownTargetsInfo, DestroyVolumeInfo,
             DestroyVolumeSnapshot, DestroyVolumeSnapshotInfo, PublishVolumeInfo,
-            RepublishVolumeInfo, ResizeVolumeInfo, SetVolumeReplicaInfo, ShareVolumeInfo,
-            UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations, VolumeSnapshot,
-            VolumeSnapshots,
+            RepublishVolumeInfo, ResizeVolumeInfo, SetVolumePropInfo, SetVolumeReplicaInfo,
+            ShareVolumeInfo, UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations,
+            VolumeSnapshot, VolumeSnapshots,
         },
         Pagination,
     },
@@ -37,8 +37,8 @@ use stor_port::{
         },
         transport::{
             CreateSnapshotVolume, CreateVolume, DestroyShutdownTargets, DestroyVolume, Filter,
-            PublishVolume, RepublishVolume, ResizeVolume, SetVolumeReplica, ShareVolume,
-            UnpublishVolume, UnshareVolume, Volume,
+            PublishVolume, RepublishVolume, ResizeVolume, SetVolumeProp, SetVolumeReplica,
+            ShareVolume, UnpublishVolume, UnshareVolume, Volume,
         },
     },
 };
@@ -161,6 +161,18 @@ impl VolumeOperations for Service {
         Ok(volume)
     }
 
+    async fn set_volume_property(
+        &self,
+        req: &dyn SetVolumePropInfo,
+        _ctx: Option<Context>,
+    ) -> Result<Volume, ReplyError> {
+        let set_volume_prop = req.into();
+        let service = self.clone();
+        let volume =
+            Context::spawn(async move { service.set_volume_property(&set_volume_prop).await })
+                .await??;
+        Ok(volume)
+    }
     async fn probe(&self, _ctx: Option<Context>) -> Result<bool, ReplyError> {
         return Ok(true);
     }
@@ -291,7 +303,6 @@ impl Service {
             }
             filter => return Err(SvcError::InvalidFilter { filter }),
         };
-
         Ok(Volumes {
             entries: filtered_volumes,
             next_token: match last_result {
@@ -403,6 +414,16 @@ impl Service {
         volume.set_replica(&self.registry, request).await?;
         self.registry.volume(&request.uuid).await
     }
+    /// Set volume property.
+    #[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.uuid))]
+    pub(super) async fn set_volume_property(
+        &self,
+        request: &SetVolumeProp,
+    ) -> Result<Volume, SvcError> {
+        let mut volume = self.specs().volume(&request.uuid).await?;
+        volume.set_property(&self.registry, request).await?;
+        self.registry.volume(&request.uuid).await
+    }
 
     /// Create a volume snapshot.
     #[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.source_id, snapshot.source_uuid = %request.source_id, snapshot.uuid = %request.snap_id))]
@@ -426,6 +447,16 @@ impl Service {
             }
             Err(error) => Err(error),
         }?;
+
+        if let Some(max_snapshots) = volume.as_ref().max_snapshots {
+            if volume.as_ref().metadata.num_snapshots() as u32 >= max_snapshots {
+                return Err(SvcError::SnapshotMaxLimit {
+                    max_snapshots,
+                    volume_id: volume.as_ref().uuid.to_string(),
+                });
+            }
+        }
+
         let snapshot = volume
             .create_snap(
                 &self.registry,
diff --git a/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs b/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs
index 5acc9e108..f80b8a99c 100644
--- a/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs
+++ b/control-plane/agents/src/bin/core/volume/snapshot_helpers.rs
@@ -1,3 +1,5 @@
+use std::collections::HashSet;
+
 use crate::controller::{
     registry::Registry,
     resources::{
@@ -29,7 +31,7 @@ use stor_port::{
 /// means a snapshot of all(or selected) healthy replicas associated with that volume.
 pub(super) struct PrepareVolumeSnapshot {
     pub(super) parameters: SnapshotParameters<VolumeId>,
-    pub(super) replica_snapshot: (Replica, ReplicaSnapshot),
+    pub(super) replica_snapshot: Vec<(Replica, ReplicaSnapshot)>,
     pub(super) completer: VolumeSnapshotCompleter,
 }
 
@@ -95,34 +97,41 @@ impl SpecOperationsHelper for VolumeSnapshot {
 pub(crate) async fn snapshoteable_replica(
     volume: &VolumeSpec,
     registry: &Registry,
-) -> Result<ChildItem, SvcError> {
-    if volume.num_replicas != 1 {
+) -> Result<Vec<ChildItem>, SvcError> {
+    let children = super::scheduling::snapshoteable_replica(volume, registry).await?;
+
+    //todo: Remove this check once we support snapshotting with n-replicas.
+    if volume.num_replicas != 1 || children.candidates().len() != 1 {
         return Err(SvcError::NReplSnapshotNotAllowed {});
     }
 
-    let children = super::scheduling::snapshoteable_replica(volume, registry).await?;
-
     volume.trace(&format!("Snapshoteable replicas for volume: {children:?}"));
 
-    let item = match children.candidates().as_slice() {
-        [item] => Ok(item),
-        [] => Err(SvcError::NoHealthyReplicas {
+    if children.candidates().is_empty() {
+        return Err(SvcError::NoHealthyReplicas {
             id: volume.uuid_str(),
-        }),
-        _ => Err(SvcError::NReplSnapshotNotAllowed {}),
-    }?;
+        });
+    }
+
+    if children.candidates().len() != volume.num_replicas as usize {
+        return Err(SvcError::AllReplicaNotHealthy {
+            id: volume.uuid_str(),
+        });
+    }
+    //todo: check for snapshot chain for all the replicas.
 
-    let pools = SnapshotVolumeReplica::builder_with_defaults(registry, volume, item)
-        .await
-        .collect();
+    let pools =
+        SnapshotVolumeReplica::builder_with_defaults(registry, volume, children.candidates())
+            .await
+            .collect();
+    let pools: HashSet<_> = pools.iter().map(|item| item.pool.id.clone()).collect();
 
-    match pools
-        .iter()
-        .any(|pool_item| pool_item.pool.id == item.pool().id)
-    {
-        true => Ok(item.clone()),
-        false => Err(SvcError::NotEnoughResources {
-            source: NotEnough::PoolFree {},
-        }),
+    for item in children.candidates() {
+        if !pools.contains(&item.pool().id) {
+            return Err(SvcError::NotEnoughResources {
+                source: NotEnough::PoolFree {},
+            });
+        }
     }
+    Ok(children.candidates().clone())
 }
diff --git a/control-plane/agents/src/bin/core/volume/snapshot_operations.rs b/control-plane/agents/src/bin/core/volume/snapshot_operations.rs
index 9515607de..475366313 100644
--- a/control-plane/agents/src/bin/core/volume/snapshot_operations.rs
+++ b/control-plane/agents/src/bin/core/volume/snapshot_operations.rs
@@ -38,7 +38,10 @@ use stor_port::{
 };
 
 use chrono::{DateTime, Utc};
-use std::collections::{HashMap, HashSet};
+use std::{
+    collections::{HashMap, HashSet},
+    time::SystemTime,
+};
 
 #[async_trait::async_trait]
 impl ResourceSnapshotting for OperationGuardArc<VolumeSpec> {
@@ -66,7 +69,6 @@ impl ResourceSnapshotting for OperationGuardArc<VolumeSpec> {
             },
         )
         .await;
-
         self.complete_update(registry, snap_result, spec_clone)
             .await
     }
@@ -155,13 +157,13 @@ impl ResourceLifecycleWithLifetime for OperationGuardArc<VolumeSnapshot> {
         let volume = &request.volume;
         let request = &request.request;
 
-        let replica = snapshoteable_replica(volume.as_ref(), registry).await?;
+        let replicas = snapshoteable_replica(volume.as_ref(), registry).await?;
         let target_node = if let Some(target) = volume.as_ref().target() {
-            registry.node_wrapper(target.node()).await
+            let node = registry.node_wrapper(target.node()).await?;
+            Some(node)
         } else {
-            registry.node_wrapper(&replica.state().node).await
-        }?;
-
+            None
+        };
         let specs = registry.specs();
         let mut snapshot = specs
             .get_or_create_snapshot(request)
@@ -179,13 +181,17 @@ impl ResourceLifecycleWithLifetime for OperationGuardArc<VolumeSnapshot> {
         // Try to prune 1 stale transaction, if present..
         snapshot.prune(registry, Some(1)).await.ok();
 
-        let prepare_snapshot = snapshot.snapshot_params(&replica)?;
+        let prepare_snapshot = snapshot.snapshot_params(&replicas)?;
         snapshot
             .start_create_update(
                 registry,
                 &VolumeSnapshotCreateInfo::new(
                     prepare_snapshot.parameters.txn_id(),
-                    prepare_snapshot.replica_snapshot.1.clone(),
+                    prepare_snapshot
+                        .replica_snapshot
+                        .iter()
+                        .map(|(_, snapshot)| snapshot.clone())
+                        .collect(),
                     &prepare_snapshot.completer,
                 ),
             )
@@ -285,31 +291,39 @@ impl ResourcePruning for OperationGuardArc<VolumeSnapshot> {
 }
 
 impl OperationGuardArc<VolumeSnapshot> {
-    fn snapshot_params(&self, replica: &ChildItem) -> Result<PrepareVolumeSnapshot, SvcError> {
+    fn snapshot_params(
+        &self,
+        replicas: &Vec<ChildItem>,
+    ) -> Result<PrepareVolumeSnapshot, SvcError> {
         let Some(parameters) = self.as_ref().prepare() else {
             return Err(SvcError::AlreadyExists {
                 id: self.uuid().to_string(),
                 kind: ResourceKind::VolumeSnapshot,
             });
         };
+        let mut replica_snapshots = vec![];
         let volume = self.as_ref().spec().source_id();
         let generic_params = parameters.params().clone();
-        let snapshot_source = ReplicaSnapshotSource::new(
-            replica.spec().uid().clone(),
-            replica.state().pool_id.clone(),
-            replica.state().pool_uuid.clone().unwrap_or_default(),
-        );
-        let replica_snapshot = ReplicaSnapshot::new_vol(
-            ReplicaSnapshotSpec::new(&snapshot_source, SnapshotId::new()),
-            SnapshotParameters::new(volume, generic_params),
-            replica.state().size,
-            0,
-            replica.spec().size,
-        );
-        let replica = replica.state().clone();
+        for replica in replicas {
+            let snapshot_source = ReplicaSnapshotSource::new(
+                replica.spec().uid().clone(),
+                replica.state().pool_id.clone(),
+                replica.state().pool_uuid.clone().unwrap_or_default(),
+            );
+            let replica_snapshot = ReplicaSnapshot::new_vol(
+                ReplicaSnapshotSpec::new(&snapshot_source, SnapshotId::new()),
+                SnapshotParameters::new(volume, generic_params.clone()),
+                replica.state().size,
+                0,
+                replica.spec().size,
+            );
+            let replica = replica.state().clone();
+            replica_snapshots.push((replica, replica_snapshot));
+        }
+
         Ok(PrepareVolumeSnapshot {
             parameters,
-            replica_snapshot: (replica, replica_snapshot),
+            replica_snapshot: replica_snapshots,
             completer: VolumeSnapshotCompleter::default(),
         })
     }
@@ -318,13 +332,13 @@ impl OperationGuardArc<VolumeSnapshot> {
         volume: &OperationGuardArc<VolumeSpec>,
         prep_params: &PrepareVolumeSnapshot,
         registry: &Registry,
-        target_node: N,
+        target_node: Option<N>,
     ) -> Result<VolumeSnapshotCreateResult, SvcError> {
         if let Some(target) = volume.as_ref().target() {
-            self.snapshot_nexus(prep_params, target, registry, target_node)
+            self.snapshot_nexus(prep_params, target, registry, target_node.unwrap())
                 .await
         } else {
-            self.snapshot_replica(prep_params, target_node).await
+            self.snapshot_replica::<N>(prep_params, registry).await
         }
     }
 
@@ -335,88 +349,102 @@ impl OperationGuardArc<VolumeSnapshot> {
         registry: &Registry,
         target_node: N,
     ) -> Result<VolumeSnapshotCreateResult, SvcError> {
-        let mut replica_snap = prep_params.replica_snapshot.1.clone();
-        let replica = &prep_params.replica_snapshot.0;
         let generic_params = prep_params.parameters.params();
-
-        let replica_id = replica_snap.spec().source_id().replica_id();
+        let nexus_snap_desc = prep_params
+            .replica_snapshot
+            .iter()
+            .map(|(_, snapshot)| {
+                CreateNexusSnapReplDescr::new(
+                    snapshot.spec().source_id().replica_id(),
+                    snapshot.spec().uuid().clone(),
+                )
+            })
+            .collect::<Vec<_>>();
         let response = target_node
             .create_nexus_snapshot(&CreateNexusSnapshot::new(
                 SnapshotParameters::new(target.nexus(), generic_params.clone()),
-                vec![CreateNexusSnapReplDescr::new(
-                    replica_id,
-                    replica_snap.spec().uuid().clone(),
-                )],
+                nexus_snap_desc,
             ))
             .await?;
 
-        if response.skipped.contains(replica_id) || !response.skipped.is_empty() {
+        if !response.skipped.is_empty() {
             return Err(SvcError::ReplicaSnapSkipped {
-                replica: replica_snap.spec().uuid().to_string(),
+                replica: response
+                    .skipped
+                    .iter()
+                    .map(|r| r.to_string())
+                    .collect::<Vec<String>>()
+                    .join(", "),
             });
         }
-
-        let snapped = match response.replicas_status.as_slice() {
-            [snapped] if &snapped.replica_uuid == replica_snap.spec().source_id().replica_id() => {
-                Ok(snapped)
-            }
-            _ => Err(SvcError::ReplicaSnapMiss {
-                replica: replica_snap.spec().uuid().to_string(),
-            }),
-        }?;
-
-        if let Some(error) = snapped.error {
+        if let Some(snap) = response
+            .replicas_status
+            .iter()
+            .find(|&snap| snap.error.is_some())
+        {
             return Err(SvcError::ReplicaSnapError {
-                replica: replica_snap.spec().uuid().to_string(),
-                error,
+                replica: snap.replica_uuid.to_string(),
+                error: snap.error.unwrap(),
             });
         }
-
         let timestamp = DateTime::<Utc>::from(response.snap_time);
         // What if snapshot succeeds but we can't fetch the replica snapshot, should we carry
         // on as following, or should we bail out?
-        let node = registry.node_wrapper(&replica.node).await?;
-
-        let snapshot = NodeWrapper::fetch_update_snapshot_state(
-            &node,
-            ReplicaSnapshotInfo::new(
-                replica_snap.spec().source_id().replica_id(),
-                replica_snap.spec().uuid().clone(),
-            ),
-        )
-        .await?;
-
-        replica_snap.complete_vol(
-            snapshot.timestamp().into(),
-            snapshot.replica_size(),
-            snapshot.allocated_size() + snapshot.predecessor_alloc_size(),
-        );
-        Ok(VolumeSnapshotCreateResult::new_ok(replica_snap, timestamp))
+        for (replica, replica_snap) in prep_params.replica_snapshot.clone().iter_mut() {
+            let node = registry.node_wrapper(&replica.node).await?;
+            let snapshot = NodeWrapper::fetch_update_snapshot_state(
+                &node,
+                ReplicaSnapshotInfo::new(
+                    replica_snap.spec().source_id().replica_id(),
+                    replica_snap.spec().uuid().clone(),
+                ),
+            )
+            .await?;
+
+            replica_snap.complete_vol(
+                snapshot.timestamp().into(),
+                snapshot.replica_size(),
+                snapshot.allocated_size() + snapshot.predecessor_alloc_size(),
+            );
+        }
+        let snapshots = prep_params
+            .replica_snapshot
+            .iter()
+            .map(|(_, snapshot)| snapshot.clone())
+            .collect::<Vec<_>>();
+        Ok(VolumeSnapshotCreateResult::new_ok(snapshots, timestamp))
     }
     async fn snapshot_replica<N: ReplicaSnapshotApi>(
         &self,
         prep_params: &PrepareVolumeSnapshot,
-        target_node: N,
+        registry: &Registry,
     ) -> Result<VolumeSnapshotCreateResult, SvcError> {
-        let mut replica_snap = prep_params.replica_snapshot.clone();
         let volume_params = prep_params.parameters.params().clone();
-
-        let replica_params = volume_params.with_uuid(replica_snap.1.spec().uuid());
-        let response = target_node
-            .create_repl_snapshot(&CreateReplicaSnapshot::new(SnapshotParameters::new(
-                replica_snap.1.spec().source_id().replica_id(),
-                replica_params,
-            )))
-            .await?;
-        let timestamp = response.timestamp();
-        replica_snap.1.complete_vol(
-            timestamp.into(),
-            response.replica_size(),
-            response.allocated_size() + response.predecessor_alloc_size(),
-        );
+        let mut timestamp = SystemTime::now();
+
+        for (replica, replica_snap) in prep_params.replica_snapshot.clone().iter_mut() {
+            let replica_params = volume_params.clone().with_uuid(replica_snap.spec().uuid());
+            let target_node = registry.node_wrapper(&replica.node).await?;
+            let response = target_node
+                .create_repl_snapshot(&CreateReplicaSnapshot::new(SnapshotParameters::new(
+                    replica_snap.spec().source_id().replica_id(),
+                    replica_params,
+                )))
+                .await?;
+            timestamp = response.timestamp();
+            replica_snap.complete_vol(
+                timestamp.into(),
+                response.replica_size(),
+                response.allocated_size() + response.predecessor_alloc_size(),
+            );
+        }
 
         Ok(VolumeSnapshotCreateResult::new_ok(
-            replica_snap.1,
+            prep_params
+                .replica_snapshot
+                .iter()
+                .map(|(_, snapshot)| snapshot.clone())
+                .collect::<Vec<_>>(),
             timestamp.into(),
         ))
     }
diff --git a/control-plane/agents/src/bin/core/volume/specs.rs b/control-plane/agents/src/bin/core/volume/specs.rs
index 26401ea80..0efbb0d70 100644
--- a/control-plane/agents/src/bin/core/volume/specs.rs
+++ b/control-plane/agents/src/bin/core/volume/specs.rs
@@ -1059,7 +1059,7 @@ impl SpecOperationsHelper for VolumeSpec {
                     }
                 }
             }
-
+            VolumeOperation::SetVolumeProperty(_) => Ok(()),
             VolumeOperation::RemoveUnusedReplica(uuid) => {
                 let last_replica = !registry
                     .specs()
@@ -1111,10 +1111,6 @@ impl SpecOperationsHelper for VolumeSpec {
             }
             VolumeOperation::Create => unreachable!(),
             VolumeOperation::Destroy => unreachable!(),
-
-            VolumeOperation::CreateSnapshot(_) if self.num_replicas != 1 => {
-                Err(SvcError::NReplSnapshotNotAllowed {})
-            }
             VolumeOperation::CreateSnapshot(_) => Ok(()),
             VolumeOperation::DestroySnapshot(_) => Ok(()),
             VolumeOperation::Resize(_) => Ok(()),
diff --git a/control-plane/agents/src/common/errors.rs b/control-plane/agents/src/common/errors.rs
index 309175b48..f898b5561 100644
--- a/control-plane/agents/src/common/errors.rs
+++ b/control-plane/agents/src/common/errors.rs
@@ -376,6 +376,19 @@ pub enum SvcError {
         cluster_capacity_limit: u64,
         excess: u64,
     },
+    #[snafu(display("All replicas are not healthy for volume '{}'", id))]
+    AllReplicaNotHealthy { id: String },
+    #[snafu(display(
+        "Reached maximum snapshots limit {} for volume {}, delete unused snapshots to continue",
+        max_snapshots,
+        volume_id
+    ))]
+    SnapshotMaxLimit {
+        max_snapshots: u32,
+        volume_id: String,
+    },
+    #[snafu(display("Invalid property name '{}' for the volume '{}'", property_name, id))]
+    InvalidSetProperty { property_name: String, id: String },
 }
 
 impl SvcError {
@@ -1018,6 +1031,24 @@ impl From<SvcError> for ReplyError {
                 source,
                 extra,
             },
+            SvcError::AllReplicaNotHealthy { .. } => ReplyError {
+                kind: ReplyErrorKind::FailedPrecondition,
+                resource: ResourceKind::VolumeSnapshot,
+                source,
+                extra,
+            },
+            SvcError::SnapshotMaxLimit { .. } => ReplyError {
+                kind: ReplyErrorKind::FailedPrecondition,
+                resource: ResourceKind::Volume,
+                source,
+                extra,
+            },
+            SvcError::InvalidSetProperty { .. } => ReplyError {
+                kind: ReplyErrorKind::FailedPrecondition,
+                resource: ResourceKind::Volume,
+                source,
+                extra,
+            },
         }
     }
 }
diff --git a/control-plane/csi-driver/src/bin/controller/client.rs b/control-plane/csi-driver/src/bin/controller/client.rs
index fe31c3a78..ab69e56db 100644
--- a/control-plane/csi-driver/src/bin/controller/client.rs
+++ b/control-plane/csi-driver/src/bin/controller/client.rs
@@ -214,6 +214,7 @@ impl IoEngineApiClient {
     /// Create a volume of target size and provision storage resources for it.
     /// This operation is not idempotent, so the caller is responsible for taking
     /// all actions with regards to idempotency.
+    #[allow(clippy::too_many_arguments)]
     #[instrument(fields(volume.uuid = %volume_id), skip(self, volume_id))]
     pub(crate) async fn create_volume(
         &self,
@@ -223,6 +224,7 @@ impl IoEngineApiClient {
         volume_topology: CreateVolumeTopology,
         thin: bool,
         affinity_group: Option<AffinityGroup>,
+        max_snapshots: Option<u32>,
     ) -> Result<Volume, ApiClientError> {
         let topology =
             Topology::new_all(volume_topology.node_topology, volume_topology.pool_topology);
@@ -235,6 +237,7 @@ impl IoEngineApiClient {
             policy: VolumePolicy::new_all(true),
             labels: None,
             affinity_group,
+            max_snapshots,
         };
 
         let result = self
@@ -259,6 +262,7 @@ impl IoEngineApiClient {
         volume_topology: CreateVolumeTopology,
         thin: bool,
         affinity_group: Option<AffinityGroup>,
+        max_snapshots: Option<u32>,
     ) -> Result<Volume, ApiClientError> {
         let topology =
             Topology::new_all(volume_topology.node_topology, volume_topology.pool_topology);
@@ -271,8 +275,8 @@ impl IoEngineApiClient {
             policy: VolumePolicy::new_all(true),
             labels: None,
             affinity_group,
+            max_snapshots,
         };
-
         let result = self
             .rest_client
             .volumes_api()
diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs
index 7964ef8dd..17d42f0da 100644
--- a/control-plane/csi-driver/src/bin/controller/controller.rs
+++ b/control-plane/csi-driver/src/bin/controller/controller.rs
@@ -315,6 +315,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
                 );
 
                 let sts_affinity_group_name = context.sts_affinity_group();
+                let max_snapshots = context.max_snapshots();
 
                 let volume = match volume_content_source {
                     Some(snapshot_uuid) => {
@@ -327,6 +328,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
                                 volume_topology,
                                 thin,
                                 sts_affinity_group_name.clone().map(AffinityGroup::new),
+                                max_snapshots,
                             )
                             .await?
                     }
@@ -339,6 +341,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
                                 volume_topology,
                                 thin,
                                 sts_affinity_group_name.clone().map(AffinityGroup::new),
+                                max_snapshots,
                             )
                             .await?
                     }
diff --git a/control-plane/csi-driver/src/context.rs b/control-plane/csi-driver/src/context.rs
index 14c666cf6..9b577b1a6 100644
--- a/control-plane/csi-driver/src/context.rs
+++ b/control-plane/csi-driver/src/context.rs
@@ -57,6 +57,8 @@ pub enum Parameters {
     PoolTopologyAffinity,
     #[strum(serialize = "poolTopologySpread")]
     PoolTopologySpread,
+    #[strum(serialize = "maxSnapshots")]
+    MaxSnapshots,
 }
 impl Parameters {
     fn parse_human_time(
@@ -161,6 +163,10 @@ impl Parameters {
     ) -> Result<Option<HashMap<String, String>>, serde_json::Error> {
         Self::parse_map(value)
     }
+    /// Parse the value for `Self::MaxSnapshots`.
+    pub fn max_snapshots(value: Option<&String>) -> Result<Option<u32>, ParseIntError> {
+        Self::parse_u32(value)
+    }
 }
 
 /// Volume publish parameters.
@@ -283,6 +289,7 @@ pub struct CreateParams {
     replica_count: u8,
     sts_affinity_group: Option<String>,
     clone_fs_id_as_volume_id: Option<bool>,
+    max_snapshots: Option<u32>,
 }
 impl CreateParams {
     /// Get the `Parameters::PublishParams` value.
@@ -306,6 +313,10 @@ impl CreateParams {
     pub fn clone_fs_id_as_volume_id(&self) -> &Option<bool> {
         &self.clone_fs_id_as_volume_id
     }
+    /// Get the `Parameters::MaxSnapshots` value.
+    pub fn max_snapshots(&self) -> Option<u32> {
+        self.max_snapshots
+    }
 }
 impl TryFrom<&HashMap<String, String>> for CreateParams {
     type Error = tonic::Status;
@@ -353,12 +364,18 @@ impl TryFrom<&HashMap<String, String>> for CreateParams {
         )
         .map_err(|_| tonic::Status::invalid_argument("Invalid clone_fs_id_as_volume_id"))?;
 
+        let max_snapshots = Parameters::max_snapshots(args.get(Parameters::MaxSnapshots.as_ref()))
+            .map_err(|_| {
+                tonic::Status::invalid_argument("Invalid `maxSnapshots` value, expected an i32")
+            })?;
+
         Ok(Self {
             publish_params,
             share_protocol,
             replica_count,
             sts_affinity_group: sts_affinity_group_name,
             clone_fs_id_as_volume_id,
+            max_snapshots,
         })
     }
 }
diff --git a/control-plane/grpc/proto/v1/volume/volume.proto b/control-plane/grpc/proto/v1/volume/volume.proto
index 7f6e36535..82fa0adc2 100644
--- a/control-plane/grpc/proto/v1/volume/volume.proto
+++ b/control-plane/grpc/proto/v1/volume/volume.proto
@@ -61,6 +61,8 @@ message VolumeSpec {
   optional VolumeContentSource content_source  = 11;
   // Number of snapshots taken on this volume.
   uint32 num_snapshots = 12;
+  // Max snapshots limit per volume.
+  optional uint32 max_snapshots = 13;
 
   // Volume Content Source i.e the snapshot or a volume.
   message VolumeContentSource {
@@ -259,12 +261,14 @@ message CreateVolumeRequest {
   VolumePolicy policy = 6;
   // replica placement topology for the volume creation only
   optional Topology topology = 7;
-  // flag indicating thin provisioning
+  // flag indicating thin provisioningcontrol-plane/grpc/proto/v1/volume/volume.proto
   bool thin = 8;
   // Affinity Group related information.
   optional AffinityGroup affinity_group = 9;
   // maximum total volume size
   optional uint64 cluster_capacity_limit = 10;
+  // Max snapshots limit per volume.
+  optional uint32 max_snapshots = 11;
 }
 
 // Publish a volume on a node
@@ -344,6 +348,16 @@ message SetVolumeReplicaRequest {
   // replica count
   uint32 replicas = 2;
 }
+// Set the volume property.
+message SetVolumePropRequest {
+  // Uuid of the volume.
+  string uuid = 1;
+  // Property name.
+  string prop_name = 2;
+  // Property value.
+  string prop_value = 3;
+}
+
 
 // Delete volume
 message DestroyVolumeRequest {
@@ -425,6 +439,14 @@ message SetVolumeReplicaReply {
   }
 }
 
+// Reply type for a SetVolumeProperty request
+message SetVolumePropReply {
+  oneof reply {
+    Volume volume = 1;
+    common.ReplyError error = 2;
+  }
+}
+
 message ProbeRequest {
   // Intentionally empty.
 }
@@ -593,6 +615,7 @@ service VolumeGrpc {
   rpc ShareVolume (ShareVolumeRequest) returns (ShareVolumeReply) {}
   rpc UnshareVolume (UnshareVolumeRequest) returns (UnshareVolumeReply) {}
   rpc SetVolumeReplica (SetVolumeReplicaRequest) returns (SetVolumeReplicaReply) {}
+  rpc SetVolumeProperty (SetVolumePropRequest) returns (SetVolumePropReply) {}
   rpc Probe (ProbeRequest) returns (ProbeResponse) {}
 
   // Snapshots
diff --git a/control-plane/grpc/src/operations/volume/client.rs b/control-plane/grpc/src/operations/volume/client.rs
index 74ab10c15..8982a2e8a 100644
--- a/control-plane/grpc/src/operations/volume/client.rs
+++ b/control-plane/grpc/src/operations/volume/client.rs
@@ -6,9 +6,9 @@ use crate::{
             traits::{
                 CreateSnapshotVolumeInfo, CreateVolumeInfo, CreateVolumeSnapshotInfo,
                 DestroyShutdownTargetsInfo, DestroyVolumeInfo, PublishVolumeInfo,
-                RepublishVolumeInfo, ResizeVolumeInfo, SetVolumeReplicaInfo, ShareVolumeInfo,
-                UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations, VolumeSnapshot,
-                VolumeSnapshots,
+                RepublishVolumeInfo, ResizeVolumeInfo, SetVolumePropInfo, SetVolumeReplicaInfo,
+                ShareVolumeInfo, UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations,
+                VolumeSnapshot, VolumeSnapshots,
             },
             traits_snapshots::DestroyVolumeSnapshotInfo,
         },
@@ -17,7 +17,7 @@ use crate::{
     volume::{
         create_snapshot_reply, create_snapshot_volume_reply, create_volume_reply,
         get_snapshots_reply, get_snapshots_request, get_volumes_reply, get_volumes_request,
-        publish_volume_reply, republish_volume_reply, resize_volume_reply,
+        publish_volume_reply, republish_volume_reply, resize_volume_reply, set_volume_prop_reply,
         set_volume_replica_reply, share_volume_reply, unpublish_volume_reply,
         volume_grpc_client::VolumeGrpcClient, GetSnapshotsRequest, GetVolumesRequest, ProbeRequest,
     },
@@ -235,6 +235,28 @@ impl VolumeOperations for VolumeClient {
         }
     }
 
+    #[tracing::instrument(
+        name = "VolumeClient::set_volume_property",
+        level = "debug",
+        skip(self),
+        err
+    )]
+    async fn set_volume_property(
+        &self,
+        request: &dyn SetVolumePropInfo,
+        ctx: Option<Context>,
+    ) -> Result<Volume, ReplyError> {
+        let req = self.request(request, ctx, MessageIdVs::SetVolumeProp);
+        let response = self.client().set_volume_property(req).await?.into_inner();
+        match response.reply {
+            Some(set_volume_prop_reply) => match set_volume_prop_reply {
+                set_volume_prop_reply::Reply::Volume(volume) => Ok(Volume::try_from(volume)?),
+                set_volume_prop_reply::Reply::Error(err) => Err(err.into()),
+            },
+            None => Err(ReplyError::invalid_response(ResourceKind::Volume)),
+        }
+    }
+
     #[tracing::instrument(name = "VolumeClient::probe", level = "debug", skip(self))]
     async fn probe(&self, _ctx: Option<Context>) -> Result<bool, ReplyError> {
         match self.client().probe(ProbeRequest {}).await {
diff --git a/control-plane/grpc/src/operations/volume/server.rs b/control-plane/grpc/src/operations/volume/server.rs
index c1fd9269e..b077ea511 100644
--- a/control-plane/grpc/src/operations/volume/server.rs
+++ b/control-plane/grpc/src/operations/volume/server.rs
@@ -4,7 +4,8 @@ use crate::{
     volume::{
         create_snapshot_reply, create_snapshot_volume_reply, create_volume_reply,
         get_snapshots_reply, get_volumes_reply, publish_volume_reply, republish_volume_reply,
-        resize_volume_reply, set_volume_replica_reply, share_volume_reply, unpublish_volume_reply,
+        resize_volume_reply, set_volume_prop_reply, set_volume_replica_reply, share_volume_reply,
+        unpublish_volume_reply,
         volume_grpc_server::{VolumeGrpc, VolumeGrpcServer},
         CreateSnapshotReply, CreateSnapshotRequest, CreateSnapshotVolumeReply,
         CreateSnapshotVolumeRequest, CreateVolumeReply, CreateVolumeRequest,
@@ -12,9 +13,9 @@ use crate::{
         DestroySnapshotRequest, DestroyVolumeReply, DestroyVolumeRequest, GetSnapshotsReply,
         GetSnapshotsRequest, GetVolumesReply, GetVolumesRequest, ProbeRequest, ProbeResponse,
         PublishVolumeReply, PublishVolumeRequest, RepublishVolumeReply, RepublishVolumeRequest,
-        ResizeVolumeReply, ResizeVolumeRequest, SetVolumeReplicaReply, SetVolumeReplicaRequest,
-        ShareVolumeReply, ShareVolumeRequest, UnpublishVolumeReply, UnpublishVolumeRequest,
-        UnshareVolumeReply, UnshareVolumeRequest,
+        ResizeVolumeReply, ResizeVolumeRequest, SetVolumePropReply, SetVolumePropRequest,
+        SetVolumeReplicaReply, SetVolumeReplicaRequest, ShareVolumeReply, ShareVolumeRequest,
+        UnpublishVolumeReply, UnpublishVolumeRequest, UnshareVolumeReply, UnshareVolumeRequest,
     },
 };
 use std::{convert::TryFrom, sync::Arc};
@@ -215,6 +216,20 @@ impl VolumeGrpc for VolumeServer {
             })),
         }
     }
+    async fn set_volume_property(
+        &self,
+        request: tonic::Request<SetVolumePropRequest>,
+    ) -> Result<tonic::Response<SetVolumePropReply>, tonic::Status> {
+        let req = request.into_inner().validated()?;
+        match self.service.set_volume_property(&req, None).await {
+            Ok(volume) => Ok(Response::new(SetVolumePropReply {
+                reply: Some(set_volume_prop_reply::Reply::Volume(volume.into())),
+            })),
+            Err(err) => Ok(Response::new(SetVolumePropReply {
+                reply: Some(set_volume_prop_reply::Reply::Error(err.into())),
+            })),
+        }
+    }
     async fn probe(
         &self,
         _request: tonic::Request<ProbeRequest>,
diff --git a/control-plane/grpc/src/operations/volume/traits.rs b/control-plane/grpc/src/operations/volume/traits.rs
index dd4c095aa..1b030e2dc 100644
--- a/control-plane/grpc/src/operations/volume/traits.rs
+++ b/control-plane/grpc/src/operations/volume/traits.rs
@@ -9,8 +9,8 @@ use crate::{
     volume::{
         get_volumes_request, CreateSnapshotVolumeRequest, CreateVolumeRequest,
         DestroyShutdownTargetRequest, DestroyVolumeRequest, PublishVolumeRequest,
-        RegisteredTargets, RepublishVolumeRequest, ResizeVolumeRequest, SetVolumeReplicaRequest,
-        ShareVolumeRequest, UnpublishVolumeRequest, UnshareVolumeRequest,
+        RegisteredTargets, RepublishVolumeRequest, ResizeVolumeRequest, SetVolumePropRequest,
+        SetVolumeReplicaRequest, ShareVolumeRequest, UnpublishVolumeRequest, UnshareVolumeRequest,
     },
 };
 use events_api::event::{EventAction, EventCategory, EventMessage, EventMeta, EventSource};
@@ -26,9 +26,9 @@ use stor_port::{
             DestroyVolume, ExplicitNodeTopology, Filter, LabelledTopology, Nexus, NexusId,
             NexusNvmfConfig, NodeId, NodeTopology, NvmeNqn, PoolTopology, PublishVolume, ReplicaId,
             ReplicaStatus, ReplicaTopology, ReplicaUsage, RepublishVolume, ResizeVolume,
-            SetVolumeReplica, ShareVolume, SnapshotId, Topology, UnpublishVolume, UnshareVolume,
-            Volume, VolumeId, VolumeLabels, VolumePolicy, VolumeShareProtocol, VolumeState,
-            VolumeUsage,
+            SetVolumeProp, SetVolumeReplica, ShareVolume, SnapshotId, Topology, UnpublishVolume,
+            UnshareVolume, Volume, VolumeId, VolumeLabels, VolumePolicy, VolumeShareProtocol,
+            VolumeState, VolumeUsage,
         },
     },
     IntoOption, IntoVec, TryIntoOption,
@@ -95,6 +95,12 @@ pub trait VolumeOperations: Send + Sync {
         req: &dyn SetVolumeReplicaInfo,
         ctx: Option<Context>,
     ) -> Result<Volume, ReplyError>;
+    /// Set volume property.
+    async fn set_volume_property(
+        &self,
+        req: &dyn SetVolumePropInfo,
+        ctx: Option<Context>,
+    ) -> Result<Volume, ReplyError>;
     /// Liveness probe for volume service
     async fn probe(&self, ctx: Option<Context>) -> Result<bool, ReplyError>;
     /// Destroy shutdown targets
@@ -161,6 +167,7 @@ impl From<VolumeSpec> for volume::VolumeDefinition {
                 affinity_group: volume_spec.affinity_group.into_opt(),
                 content_source: volume_spec.content_source.into_opt(),
                 num_snapshots: volume_spec.metadata.num_snapshots() as u32,
+                max_snapshots: volume_spec.max_snapshots,
             }),
             metadata: Some(volume::Metadata {
                 spec_status: spec_status as i32,
@@ -345,6 +352,7 @@ impl TryFrom<volume::VolumeDefinition> for VolumeSpec {
             metadata: VolumeMetadata::new(volume_meta.as_thin),
             content_source: volume_spec.content_source.try_into_opt()?,
             num_snapshots: volume_spec.num_snapshots,
+            max_snapshots: volume_spec.max_snapshots,
         };
         Ok(volume_spec)
     }
@@ -892,6 +900,8 @@ pub trait CreateVolumeInfo: Send + Sync + std::fmt::Debug {
     fn affinity_group(&self) -> Option<AffinityGroup>;
     /// Capacity Limit.
     fn cluster_capacity_limit(&self) -> Option<u64>;
+    /// Max snapshot limit per volume.
+    fn max_snapshots(&self) -> Option<u32>;
 }
 
 impl CreateVolumeInfo for CreateVolume {
@@ -930,6 +940,10 @@ impl CreateVolumeInfo for CreateVolume {
     fn cluster_capacity_limit(&self) -> Option<u64> {
         self.cluster_capacity_limit
     }
+
+    fn max_snapshots(&self) -> Option<u32> {
+        self.max_snapshots
+    }
 }
 
 /// Intermediate structure that validates the conversion to CreateVolumeRequest type.
@@ -982,6 +996,10 @@ impl CreateVolumeInfo for ValidatedCreateVolumeRequest {
     fn cluster_capacity_limit(&self) -> Option<u64> {
         self.inner.cluster_capacity_limit
     }
+
+    fn max_snapshots(&self) -> Option<u32> {
+        self.inner.max_snapshots
+    }
 }
 
 impl ValidateRequestTypes for CreateVolumeRequest {
@@ -1019,6 +1037,7 @@ impl From<&dyn CreateVolumeInfo> for CreateVolume {
             thin: data.thin(),
             affinity_group: data.affinity_group(),
             cluster_capacity_limit: data.cluster_capacity_limit(),
+            max_snapshots: data.max_snapshots(),
         }
     }
 }
@@ -1037,6 +1056,7 @@ impl From<&dyn CreateVolumeInfo> for CreateVolumeRequest {
             thin: data.thin(),
             affinity_group: data.affinity_group().map(|ag| ag.into()),
             cluster_capacity_limit: data.cluster_capacity_limit(),
+            max_snapshots: data.max_snapshots(),
         }
     }
 }
@@ -1668,6 +1688,27 @@ impl SetVolumeReplicaInfo for SetVolumeReplica {
     }
 }
 
+/// Trait to be implemented for SetVolumeProp operation.
+pub trait SetVolumePropInfo: Send + Sync + std::fmt::Debug {
+    /// Uuid of the concerned volume.
+    fn uuid(&self) -> VolumeId;
+    /// Property name.
+    fn prop_name(&self) -> String;
+    /// Property value.
+    fn prop_value(&self) -> String;
+}
+
+impl SetVolumePropInfo for SetVolumeProp {
+    fn uuid(&self) -> VolumeId {
+        self.uuid.clone()
+    }
+    fn prop_name(&self) -> String {
+        self.prop_name.clone()
+    }
+    fn prop_value(&self) -> String {
+        self.prop_value.clone()
+    }
+}
 /// Intermediate structure that validates the conversion to SetVolumeReplicaRequest type.
 #[derive(Debug)]
 pub struct ValidatedSetVolumeReplicaRequest {
@@ -1712,6 +1753,54 @@ impl From<&dyn SetVolumeReplicaInfo> for SetVolumeReplicaRequest {
     }
 }
 
+/// Intermediate structure that validates the conversion to SetVolumePropRequest type.
+#[derive(Debug)]
+pub struct ValidatedSetVolumePropRequest {
+    inner: SetVolumePropRequest,
+    uuid: VolumeId,
+}
+
+impl SetVolumePropInfo for ValidatedSetVolumePropRequest {
+    fn uuid(&self) -> VolumeId {
+        self.uuid.clone()
+    }
+    fn prop_name(&self) -> String {
+        self.inner.prop_name.clone()
+    }
+    fn prop_value(&self) -> String {
+        self.inner.prop_value.clone()
+    }
+}
+
+impl ValidateRequestTypes for SetVolumePropRequest {
+    type Validated = ValidatedSetVolumePropRequest;
+    fn validated(self) -> Result<Self::Validated, ReplyError> {
+        Ok(ValidatedSetVolumePropRequest {
+            uuid: VolumeId::try_from(StringValue(Some(self.uuid.clone())))?,
+            inner: self,
+        })
+    }
+}
+
+impl From<&dyn SetVolumePropInfo> for SetVolumeProp {
+    fn from(data: &dyn SetVolumePropInfo) -> Self {
+        Self {
+            uuid: data.uuid(),
+            prop_name: data.prop_name(),
+            prop_value: data.prop_value(),
+        }
+    }
+}
+
+impl From<&dyn SetVolumePropInfo> for SetVolumePropRequest {
+    fn from(data: &dyn SetVolumePropInfo) -> Self {
+        Self {
+            uuid: data.uuid().to_string(),
+            prop_name: data.prop_name(),
+            prop_value: data.prop_value(),
+        }
+    }
+}
 /// A helper to convert the replica topology map form grpc type to corresponding control plane type.
 fn to_replica_topology_map(
     map: HashMap<String, volume::ReplicaTopology>,
diff --git a/control-plane/grpc/src/operations/volume/traits_snapshots.rs b/control-plane/grpc/src/operations/volume/traits_snapshots.rs
index 3adfbd471..a7ec13d05 100644
--- a/control-plane/grpc/src/operations/volume/traits_snapshots.rs
+++ b/control-plane/grpc/src/operations/volume/traits_snapshots.rs
@@ -175,6 +175,14 @@ impl VolumeSnapshotMeta {
     pub fn num_restores(&self) -> u32 {
         self.num_restores
     }
+    /// The number of replica snapshots for the current transaction.
+    pub fn num_snapshot_replicas(&self) -> u32 {
+        if let Some(replica_snap_list) = self.transactions().get(self.txn_id()) {
+            replica_snap_list.len() as u32
+        } else {
+            0
+        }
+    }
 }
 
 /// Volume replica snapshot information.
diff --git a/control-plane/plugin/src/lib.rs b/control-plane/plugin/src/lib.rs
index b992300f5..2247eba45 100644
--- a/control-plane/plugin/src/lib.rs
+++ b/control-plane/plugin/src/lib.rs
@@ -5,11 +5,12 @@ extern crate lazy_static;
 
 use operations::Label;
 use resources::LabelResources;
+use resources::SetResources;
 
 use crate::{
     operations::{
         Cordoning, Drain, Get, GetBlockDevices, GetSnapshots, List, ListExt, Operations,
-        PluginResult, RebuildHistory, ReplicaTopology, Scale,
+        PluginResult, RebuildHistory, ReplicaTopology, Scale, Set,
     },
     resources::{
         blockdevice, cordon, drain, node, pool, snapshot, volume, CordonResources, DrainResources,
@@ -84,6 +85,7 @@ impl ExecuteOperation for Operations {
         match self {
             Operations::Drain(resource) => resource.execute(cli_args).await,
             Operations::Get(resource) => resource.execute(cli_args).await,
+            Operations::Set(resource) => resource.execute(cli_args).await,
             Operations::Scale(resource) => resource.execute(cli_args).await,
             Operations::Cordon(resource) => resource.execute(cli_args).await,
             Operations::Uncordon(resource) => resource.execute(cli_args).await,
@@ -179,6 +181,21 @@ impl ExecuteOperation for ScaleResources {
     }
 }
 
+#[async_trait::async_trait(?Send)]
+impl ExecuteOperation for SetResources {
+    type Args = CliArgs;
+    type Error = crate::resources::Error;
+    async fn execute(&self, cli_args: &CliArgs) -> PluginResult {
+        match self {
+            SetResources::Volume {
+                id,
+                prop_name,
+                prop_value,
+            } => volume::Volume::set(id, prop_name, prop_value, &cli_args.output).await,
+        }
+    }
+}
+
 #[async_trait::async_trait(?Send)]
 impl ExecuteOperation for CordonResources {
     type Args = CliArgs;
diff --git a/control-plane/plugin/src/operations.rs b/control-plane/plugin/src/operations.rs
index 645cf1eb8..1e878cf3c 100644
--- a/control-plane/plugin/src/operations.rs
+++ b/control-plane/plugin/src/operations.rs
@@ -1,6 +1,6 @@
 use crate::resources::{
     error::Error, utils, CordonResources, DrainResources, GetResources, LabelResources,
-    ScaleResources, UnCordonResources,
+    ScaleResources, SetResources, UnCordonResources,
 };
 use async_trait::async_trait;
 
@@ -16,6 +16,9 @@ pub enum Operations {
     /// 'Get' resources.
     #[clap(subcommand)]
     Get(GetResources),
+    /// 'Set' resources.
+    #[clap(subcommand)]
+    Set(SetResources),
     /// 'Scale' resources.
     #[clap(subcommand)]
     Scale(ScaleResources),
@@ -87,6 +90,18 @@ pub trait Scale {
     async fn scale(id: &Self::ID, replica_count: u8, output: &utils::OutputFormat) -> PluginResult;
 }
 
+/// Set trait.
+/// To be implemented by resources which support the 'set' operation.
+#[async_trait(?Send)]
+pub trait Set {
+    type ID;
+    async fn set(
+        id: &Self::ID,
+        prop_name: &str,
+        prop_value: &str,
+        output: &utils::OutputFormat,
+    ) -> PluginResult;
+}
 /// Replica topology trait.
 /// To be implemented by resources which support the 'replica-topology' operation
 #[async_trait(?Send)]
diff --git a/control-plane/plugin/src/resources/error.rs b/control-plane/plugin/src/resources/error.rs
index c22e26b62..225540a40 100644
--- a/control-plane/plugin/src/resources/error.rs
+++ b/control-plane/plugin/src/resources/error.rs
@@ -74,6 +74,12 @@ pub enum Error {
         id: String,
         source: openapi::tower::client::Error<openapi::models::RestJsonError>,
     },
+    /// Error when set volume property request fails.
+    #[snafu(display("Failed to set volume {id} property, Error {source}"))]
+    ScaleVolumePropertyError {
+        id: String,
+        source: openapi::tower::client::Error<openapi::models::RestJsonError>,
+    },
     /// Error when list snapshots request fails.
     #[snafu(display("Failed to list volume snapshots. Error {source}"))]
     ListSnapshotsError {
diff --git a/control-plane/plugin/src/resources/mod.rs b/control-plane/plugin/src/resources/mod.rs
index 4747ab129..32ba8576d 100644
--- a/control-plane/plugin/src/resources/mod.rs
+++ b/control-plane/plugin/src/resources/mod.rs
@@ -57,6 +57,20 @@ pub enum GetResources {
     BlockDevices(BlockDeviceArgs),
 }
 
+/// Resources needed to set the volume property.
+#[derive(clap::Subcommand, Debug)]
+pub enum SetResources {
+    /// Scale volume.
+    Volume {
+        /// ID of the volume.
+        id: VolumeId,
+        /// Volume prop-name.
+        prop_name: String,
+        /// Volume prop-value.
+        prop_value: String,
+    },
+}
+
 /// The types of resources that support the 'scale' operation.
 #[derive(clap::Subcommand, Debug)]
 pub enum ScaleResources {
diff --git a/control-plane/plugin/src/resources/snapshot.rs b/control-plane/plugin/src/resources/snapshot.rs
index 5ba52026c..cb93dad1e 100644
--- a/control-plane/plugin/src/resources/snapshot.rs
+++ b/control-plane/plugin/src/resources/snapshot.rs
@@ -57,7 +57,8 @@ impl CreateRow for openapi::models::VolumeSnapshot {
             ::utils::bytes::into_human(state.allocated_size),
             ::utils::bytes::into_human(meta.total_allocated_size),
             state.source_volume,
-            self.definition.metadata.num_restores
+            self.definition.metadata.num_restores,
+            self.definition.metadata.num_snapshot_replicas
         ]
     }
 }
diff --git a/control-plane/plugin/src/resources/tests.rs b/control-plane/plugin/src/resources/tests.rs
index ca8201d96..9bd29433c 100644
--- a/control-plane/plugin/src/resources/tests.rs
+++ b/control-plane/plugin/src/resources/tests.rs
@@ -36,6 +36,7 @@ async fn setup() {
                 labels: None,
                 thin: false,
                 affinity_group: None,
+                max_snapshots: None,
             },
         )
         .await
@@ -108,6 +109,7 @@ async fn get_volumes_paginated() {
                     topology: None,
                     labels: None,
                     affinity_group: None,
+                    max_snapshots: None,
                 },
             )
             .await
diff --git a/control-plane/plugin/src/resources/utils.rs b/control-plane/plugin/src/resources/utils.rs
index 263377696..fe505bd7e 100644
--- a/control-plane/plugin/src/resources/utils.rs
+++ b/control-plane/plugin/src/resources/utils.rs
@@ -22,7 +22,7 @@ lazy_static! {
         "THIN-PROVISIONED",
         "ALLOCATED",
         "SNAPSHOTS",
-        "SOURCE"
+        "SOURCE",
     ];
     pub static ref SNAPSHOT_HEADERS: Row = row![
         "ID",
@@ -31,7 +31,8 @@ lazy_static! {
         "ALLOCATED-SIZE",
         "TOTAL-ALLOCATED-SIZE",
         "SOURCE-VOL",
-        "RESTORES"
+        "RESTORES",
+        "SNAPSHOT_REPLICAS"
     ];
     pub static ref POOLS_HEADERS: Row = row![
         "ID",
diff --git a/control-plane/plugin/src/resources/volume.rs b/control-plane/plugin/src/resources/volume.rs
index 8bc9cf1ac..c1fe06413 100644
--- a/control-plane/plugin/src/resources/volume.rs
+++ b/control-plane/plugin/src/resources/volume.rs
@@ -1,5 +1,5 @@
 use crate::{
-    operations::{Get, ListExt, PluginResult, RebuildHistory, ReplicaTopology, Scale},
+    operations::{Get, ListExt, PluginResult, RebuildHistory, ReplicaTopology, Scale, Set},
     resources::{
         error::Error,
         utils,
@@ -192,6 +192,41 @@ impl Scale for Volume {
     }
 }
 
+#[async_trait(?Send)]
+impl Set for Volume {
+    type ID = VolumeId;
+    async fn set(
+        id: &Self::ID,
+        prop_name: &str,
+        prop_value: &str,
+        output: &utils::OutputFormat,
+    ) -> PluginResult {
+        match RestClient::client()
+            .volumes_api()
+            .put_volume_prop(id, prop_name, prop_value)
+            .await
+        {
+            Ok(volume) => match output {
+                OutputFormat::Yaml | OutputFormat::Json => {
+                    // Print json or yaml based on output format.
+                    utils::print_table(output, volume.into_body());
+                }
+                OutputFormat::None => {
+                    // In case the output format is not specified, show a success message.
+                    println!("Volume {id} max_snapshots set to {prop_value} successfully 🚀")
+                }
+            },
+            Err(e) => {
+                return Err(Error::ScaleVolumePropertyError {
+                    id: id.to_string(),
+                    source: e,
+                });
+            }
+        }
+        Ok(())
+    }
+}
+
 #[async_trait(?Send)]
 impl ReplicaTopology for Volume {
     type ID = VolumeId;
diff --git a/control-plane/rest/openapi-specs/v0_api_spec.yaml b/control-plane/rest/openapi-specs/v0_api_spec.yaml
index 9fb0cb5ab..76fd257c1 100644
--- a/control-plane/rest/openapi-specs/v0_api_spec.yaml
+++ b/control-plane/rest/openapi-specs/v0_api_spec.yaml
@@ -1544,6 +1544,40 @@ paths:
           $ref: '#/components/responses/ServerError'
       security:
         - JWT: []
+  '/volumes/{volume_id}/properties/{property_name}/{property_value}':
+    put:
+      tags:
+        - Volumes
+      operationId: put_volume_prop
+      parameters:
+        - in: path
+          name: volume_id
+          required: true
+          schema:
+            $ref: '#/components/schemas/VolumeId'
+        - in: path
+          name: property_name
+          required: true
+          schema:
+            type: string
+        - in: path
+          name: property_value
+          required: true
+          schema:
+            type: string
+      responses:
+        '200':
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Volume'
+        '4XX':
+          $ref: '#/components/responses/ClientError'
+        '5XX':
+          $ref: '#/components/responses/ServerError'
+      security:
+        - JWT: []
   '/volumes/{volume_id}/target':
     put:
       tags:
@@ -2521,6 +2555,7 @@ components:
         thin: false
         topology: null
         affinity_group: null
+        max_snapshots: 10
       description: Create Volume Body
       type: object
       properties:
@@ -2551,6 +2586,11 @@ components:
           description: Affinity Group related information.
           allOf:
             - $ref: '#/components/schemas/AffinityGroup'
+        max_snapshots:
+          description: Max Snapshots limit per volume.
+          type: integer
+          format: int32
+          minimum: 0
       required:
         - policy
         - replicas
@@ -3356,6 +3396,7 @@ components:
         target_node: io-engine-1
         uuid: 514ed1c8-7174-49ac-b9cd-ad44ef670a67
         thin: false
+        max_snapshots: 10
       description: User specification of a volume.
       type: object
       properties:
@@ -3393,6 +3434,7 @@ components:
                 - CreateSnapshot
                 - DestroySnapshot
                 - Resize
+                - SetVolumeProperty
             result:
               description: Result of the operation
               type: boolean
@@ -3430,6 +3472,11 @@ components:
           type: integer
           format: int32
           minimum: 0
+        max_snapshots:
+          description: Max snapshots to limit per volume.
+          type: integer
+          format: int32
+          minimum: 0
       required:
         - num_paths
         - num_replicas
@@ -3754,6 +3801,11 @@ components:
           type: integer
           format: int32
           minimum: 0
+        num_snapshot_replicas:
+          description: Number of snapshot replicas for a volumesnapshot.
+          type: integer
+          format: int32
+          minimum: 0
       required:
         - status
         - size
@@ -3762,6 +3814,7 @@ components:
         - txn_id
         - transactions
         - num_restores
+        - num_snapshot_replicas
     VolumeSnapshotSpec:
       description: |-
         Volume Snapshot Spec information.
diff --git a/control-plane/rest/service/src/v0/snapshots.rs b/control-plane/rest/service/src/v0/snapshots.rs
index ea619f777..5bdc8a1fe 100644
--- a/control-plane/rest/service/src/v0/snapshots.rs
+++ b/control-plane/rest/service/src/v0/snapshots.rs
@@ -185,6 +185,7 @@ fn to_models_volume_snapshot(snap: &VolumeSnapshot) -> models::VolumeSnapshot {
                     })
                     .collect::<HashMap<_, _>>(),
                 snap.meta().num_restores(),
+                snap.meta().num_snapshot_replicas(),
             ),
             models::VolumeSnapshotSpec::new_all(snap.spec().snap_id(), snap.spec().source_id()),
         ),
diff --git a/control-plane/rest/service/src/v0/volumes.rs b/control-plane/rest/service/src/v0/volumes.rs
index 114447bb6..72a00a63f 100644
--- a/control-plane/rest/service/src/v0/volumes.rs
+++ b/control-plane/rest/service/src/v0/volumes.rs
@@ -8,7 +8,7 @@ use stor_port::types::v0::{
     transport::{
         DestroyShutdownTargets, DestroyVolume, Filter, GetRebuildRecord, PublishVolume,
         RebuildHistory, RebuildJobState, RebuildRecord, RepublishVolume, ResizeVolume,
-        SetVolumeReplica, ShareVolume, UnpublishVolume, UnshareVolume, Volume,
+        SetVolumeProp, SetVolumeReplica, ShareVolume, UnpublishVolume, UnshareVolume, Volume,
     },
 };
 
@@ -176,6 +176,22 @@ impl apis::actix_server::Volumes for RestApi {
         Ok(volume.into())
     }
 
+    async fn put_volume_prop(
+        Path((volume_id, prop_name, prop_value)): Path<(Uuid, String, String)>,
+    ) -> Result<models::Volume, RestError<RestJsonError>> {
+        let volume = client()
+            .set_volume_property(
+                &SetVolumeProp {
+                    uuid: volume_id.into(),
+                    prop_name,
+                    prop_value,
+                },
+                None,
+            )
+            .await?;
+        Ok(volume.into())
+    }
+
     async fn put_volume_share(
         Path((volume_id, protocol)): Path<(Uuid, models::VolumeShareProtocol)>,
         Query(frontend_host): Query<Option<String>>,
diff --git a/control-plane/rest/src/versions/v0.rs b/control-plane/rest/src/versions/v0.rs
index 82e73a3a5..0dce7ecc2 100644
--- a/control-plane/rest/src/versions/v0.rs
+++ b/control-plane/rest/src/versions/v0.rs
@@ -192,6 +192,8 @@ pub struct CreateVolumeBody {
     pub thin: bool,
     /// Affinity Group related information.
     pub affinity_group: Option<AffinityGroup>,
+    /// Max snapshot limit per volume.
+    pub max_snapshots: Option<u32>,
 }
 impl From<models::CreateVolumeBody> for CreateVolumeBody {
     fn from(src: models::CreateVolumeBody) -> Self {
@@ -203,6 +205,7 @@ impl From<models::CreateVolumeBody> for CreateVolumeBody {
             labels: src.labels,
             thin: src.thin,
             affinity_group: src.affinity_group.map(|ag| ag.into()),
+            max_snapshots: src.max_snapshots,
         }
     }
 }
@@ -216,6 +219,7 @@ impl From<CreateVolume> for CreateVolumeBody {
             labels: create.labels,
             thin: create.thin,
             affinity_group: create.affinity_group,
+            max_snapshots: create.max_snapshots,
         }
     }
 }
@@ -232,6 +236,7 @@ impl CreateVolumeBody {
             thin: self.thin,
             affinity_group: self.affinity_group.clone(),
             cluster_capacity_limit: None,
+            max_snapshots: self.max_snapshots,
         }
     }
     /// Convert into rpc request type.
diff --git a/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs b/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs
index 4283bd9d3..160d2cdc3 100644
--- a/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs
+++ b/control-plane/stor-port/src/types/v0/store/snapshots/volume.rs
@@ -248,8 +248,7 @@ pub type VolumeSnapshotCompleter = Arc<std::sync::Mutex<Option<VolumeSnapshotCre
 #[derive(Serialize, Deserialize, Debug, Clone)]
 pub struct VolumeSnapshotCreateInfo {
     txn_id: SnapshotTxId,
-    // todo: support multi-replica snapshot
-    replica: ReplicaSnapshot,
+    replicas: Vec<ReplicaSnapshot>,
     #[serde(skip, default)]
     complete: VolumeSnapshotCompleter,
 }
@@ -257,12 +256,12 @@ impl VolumeSnapshotCreateInfo {
     /// Get a new `Self` from the given parameters.
     pub fn new(
         txn_id: impl Into<SnapshotTxId>,
-        replica: ReplicaSnapshot,
+        replicas: Vec<ReplicaSnapshot>,
         complete: &VolumeSnapshotCompleter,
     ) -> Self {
         Self {
             txn_id: txn_id.into(),
-            replica,
+            replicas: replicas.to_vec(),
             complete: complete.clone(),
         }
     }
@@ -272,7 +271,7 @@ impl PartialEq for VolumeSnapshotCreateInfo {
     fn eq(&self, other: &Self) -> bool {
         self.txn_id
             .eq(&other.txn_id)
-            .then(|| self.replica.eq(&other.replica))
+            .then(|| self.replicas.eq(&other.replicas))
             .unwrap_or_default()
     }
 }
@@ -324,16 +323,15 @@ impl DestroyRestoreInfo {
 #[derive(Debug, Clone, PartialEq)]
 pub struct VolumeSnapshotCreateResult {
     /// The resulting replicas including their success status.
-    /// todo: add support for multiple replica snapshots.
     replicas: Vec<ReplicaSnapshot>,
     /// The actual timestamp returned by the dataplane.
     timestamp: DateTime<Utc>,
 }
 impl VolumeSnapshotCreateResult {
     /// Create a new `Self` based on the given parameters.
-    pub fn new_ok(replica: ReplicaSnapshot, timestamp: DateTime<Utc>) -> Self {
+    pub fn new_ok(replica: Vec<ReplicaSnapshot>, timestamp: DateTime<Utc>) -> Self {
         Self {
-            replicas: vec![replica],
+            replicas: replica,
             timestamp,
         }
     }
@@ -426,10 +424,12 @@ impl SpecTransaction<VolumeSnapshotOperation> for VolumeSnapshot {
                             .insert(info.txn_id, result.replicas.clone());
                     }
                 } else {
-                    info.replica.set_status_deleting();
+                    info.replicas
+                        .iter_mut()
+                        .for_each(|r| r.set_status_deleting());
                     self.metadata
                         .transactions
-                        .insert(info.txn_id, vec![info.replica]);
+                        .insert(info.txn_id, info.replicas);
                 }
             }
             VolumeSnapshotOperation::Destroy => {}
@@ -444,7 +444,7 @@ impl SpecTransaction<VolumeSnapshotOperation> for VolumeSnapshot {
             self.metadata.txn_id = info.txn_id.clone();
             self.metadata
                 .transactions
-                .insert(info.txn_id.clone(), vec![info.replica.clone()]);
+                .insert(info.txn_id.clone(), info.replicas.clone());
         }
         self.metadata.operation = Some(VolumeSnapshotOperationState {
             operation,
diff --git a/control-plane/stor-port/src/types/v0/store/volume.rs b/control-plane/stor-port/src/types/v0/store/volume.rs
index 85866d8c1..922c8d0ae 100644
--- a/control-plane/stor-port/src/types/v0/store/volume.rs
+++ b/control-plane/stor-port/src/types/v0/store/volume.rs
@@ -18,6 +18,16 @@ use crate::{
 use pstor::ApiVersion;
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
+use strum_macros::{EnumCount as EnumCountMacro, EnumIter, EnumString};
+
+/// Volume properties.
+#[derive(
+    Serialize, Deserialize, EnumString, Debug, EnumCountMacro, EnumIter, PartialEq, Clone, Copy,
+)]
+pub enum VolumeAttr {
+    #[strum(serialize = "max_snapshots")]
+    MaxSnapshots,
+}
 
 /// Key used by the store to uniquely identify a VolumeState structure.
 pub struct VolumeStateKey(VolumeId);
@@ -205,6 +215,9 @@ pub struct VolumeSpec {
     /// Volume metadata information.
     #[serde(default, skip_serializing_if = "super::is_default")]
     pub metadata: VolumeMetadata,
+    /// Max snapshots limit per volume.
+    #[serde(default, skip_serializing_if = "Option::is_none")]
+    pub max_snapshots: Option<u32>,
 }
 
 /// Volume Content Source i.e the snapshot or a volume.
@@ -505,6 +518,13 @@ impl SpecTransaction<VolumeOperation> for VolumeSpec {
                 VolumeOperation::Resize(size) => {
                     self.size = size;
                 }
+                VolumeOperation::SetVolumeProperty(prop) => match prop.prop_name() {
+                    VolumeAttr::MaxSnapshots => {
+                        if let Ok(x) = prop.prop_value().parse::<u32>() {
+                            self.max_snapshots = Some(x);
+                        }
+                    }
+                },
             }
         }
         self.clear_op();
@@ -569,6 +589,7 @@ pub enum VolumeOperation {
     CreateSnapshot(SnapshotId),
     DestroySnapshot(SnapshotId),
     Resize(u64),
+    SetVolumeProperty(VolumeProperty),
 }
 
 #[test]
@@ -630,6 +651,29 @@ impl PublishOperation {
     }
 }
 
+/// SetVolumeProperty Operation parameters.
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
+pub struct VolumeProperty {
+    prop_name: VolumeAttr,
+    prop_value: String,
+}
+impl VolumeProperty {
+    /// Return new `Self` from the given parameters.
+    pub fn new(prop_name: VolumeAttr, prop_value: &String) -> Self {
+        Self {
+            prop_name,
+            prop_value: prop_value.to_string(),
+        }
+    }
+    /// Get property name.
+    pub fn prop_name(&self) -> VolumeAttr {
+        self.prop_name
+    }
+    /// Get property value.
+    pub fn prop_value(&self) -> String {
+        self.prop_value.clone()
+    }
+}
 /// Volume Republish Operation parameters.
 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
 pub struct RepublishOperation {
@@ -667,6 +711,9 @@ impl From<VolumeOperation> for models::volume_spec_operation::Operation {
             VolumeOperation::DestroySnapshot(_) => {
                 models::volume_spec_operation::Operation::DestroySnapshot
             }
+            VolumeOperation::SetVolumeProperty(_) => {
+                models::volume_spec_operation::Operation::SetVolumeProperty
+            }
             VolumeOperation::Resize(_) => todo!(),
         }
     }
@@ -725,6 +772,7 @@ impl From<&CreateVolume> for VolumeSpec {
             target_config: None,
             publish_context: None,
             affinity_group: request.affinity_group.clone(),
+            max_snapshots: request.max_snapshots,
             ..Default::default()
         }
     }
@@ -781,6 +829,7 @@ impl From<VolumeSpec> for models::VolumeSpec {
             src.affinity_group.into_opt(),
             src.content_source.into_opt(),
             src.num_snapshots,
+            src.max_snapshots,
         )
     }
 }
diff --git a/control-plane/stor-port/src/types/v0/transport/mod.rs b/control-plane/stor-port/src/types/v0/transport/mod.rs
index b2d9cfd59..d49ddca76 100644
--- a/control-plane/stor-port/src/types/v0/transport/mod.rs
+++ b/control-plane/stor-port/src/types/v0/transport/mod.rs
@@ -187,6 +187,8 @@ pub enum MessageIdVs {
     GetAppNode,
     /// List app nodes.
     ListAppNodes,
+    /// Set volume property.
+    SetVolumeProp,
 }
 
 impl From<MessageIdVs> for MessageId {
diff --git a/control-plane/stor-port/src/types/v0/transport/volume.rs b/control-plane/stor-port/src/types/v0/transport/volume.rs
index 9df52f64e..6b6b35db7 100644
--- a/control-plane/stor-port/src/types/v0/transport/volume.rs
+++ b/control-plane/stor-port/src/types/v0/transport/volume.rs
@@ -457,6 +457,8 @@ pub struct CreateVolume {
     pub affinity_group: Option<AffinityGroup>,
     /// Maximum total system volume size.
     pub cluster_capacity_limit: Option<u64>,
+    /// Max Snapshots to limit per volume.
+    pub max_snapshots: Option<u32>,
 }
 
 /// Resize volume request.
@@ -701,6 +703,27 @@ impl SetVolumeReplica {
     }
 }
 
+/// Set the volume property.
+#[derive(Serialize, Deserialize, Default, Debug, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct SetVolumeProp {
+    /// The uuid of the volume.
+    pub uuid: VolumeId,
+    /// Property name.
+    pub prop_name: String,
+    /// Property value.
+    pub prop_value: String,
+}
+impl SetVolumeProp {
+    /// Create new `Self` based on the provided arguments.
+    pub fn new(uuid: VolumeId, prop_name: String, prop_value: String) -> Self {
+        Self {
+            uuid,
+            prop_name,
+            prop_value,
+        }
+    }
+}
 /// Delete volume request.
 #[derive(Serialize, Deserialize, Default, Debug, Clone)]
 #[serde(rename_all = "camelCase")]