Skip to content

Commit

Permalink
feat(snapshot): set plugin command to change volume property, show
Browse files Browse the repository at this point in the history
max_snapshots in volume query response

Signed-off-by: Hrudaya <hrudayaranjan.sahoo@datacore.com>
  • Loading branch information
hrudaya21 committed Jan 19, 2024
1 parent 2f6f6c3 commit 0d1c5c9
Show file tree
Hide file tree
Showing 25 changed files with 467 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ pub(crate) trait ResourceReplicas {
) -> Result<Self::MoveResp, SvcError>;
}

/// Volume resource operation.
#[async_trait::async_trait]
pub(crate) trait ResourceVolume {
type Request: Sync + Send;

/// Set the property value.
async fn set_property(
&mut self,
registry: &Registry,
request: &Self::Request,
) -> Result<(), SvcError>;
}

/// Resource Children/Offspring Operations.
#[async_trait::async_trait]
pub(crate) trait ResourceOffspring {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ impl PoolItemLister {
pools
}
/// Get a list of pool items to create a snapshot on.
/// todo: support multi-replica snapshot.
pub(crate) async fn list_for_snaps(
registry: &Registry,
items: &Vec<ChildItem>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ impl SnapshotVolumeReplica {
pub(crate) async fn builder_with_defaults(
registry: &Registry,
volume: &VolumeSpec,
// todo: only 1 replica snapshot supported atm
items: &Vec<ChildItem>,
) -> Self {
Self::builder(registry, volume, items)
Expand Down
39 changes: 33 additions & 6 deletions control-plane/agents/src/bin/core/volume/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
operations::{
ResourceLifecycle, ResourceLifecycleExt, ResourceLifecycleWithLifetime,
ResourceOwnerUpdate, ResourcePublishing, ResourceReplicas, ResourceResize,
ResourceSharing, ResourceShutdownOperations,
ResourceSharing, ResourceShutdownOperations, ResourceVolume,
},
operations_helper::{
GuardedOperationsHelper, OnCreateFail, OperationSequenceGuard, ResourceSpecsLocked,
Expand All @@ -26,18 +26,22 @@ use crate::{
},
};
use agents::errors::SvcError;

use grpc::operations::volume::traits::CreateVolumeInfo;
use stor_port::{
transport_api::ErrorChain,
types::v0::{
store::{
nexus_persistence::NexusInfoKey,
replica::ReplicaSpec,
volume::{PublishOperation, RepublishOperation, VolumeOperation, VolumeSpec},
volume::{
PublishOperation, RepublishOperation, VolumeOperation, VolumeProperty, VolumeSpec,
},
},
transport::{
CreateVolume, DestroyNexus, DestroyReplica, DestroyShutdownTargets, DestroyVolume,
Protocol, PublishVolume, Replica, ReplicaId, ReplicaOwners, RepublishVolume,
ResizeVolume, SetVolumeReplica, ShareNexus, ShareVolume, ShutdownNexus,
ResizeVolume, SetVolumeProp, SetVolumeReplica, ShareNexus, ShareVolume, ShutdownNexus,
UnpublishVolume, UnshareNexus, UnshareVolume, Volume,
},
},
Expand Down Expand Up @@ -613,6 +617,27 @@ impl ResourceReplicas for OperationGuardArc<VolumeSpec> {
}
}

#[async_trait::async_trait]
impl ResourceVolume for OperationGuardArc<VolumeSpec> {
type Request = SetVolumeProp;

async fn set_property(
&mut self,
registry: &Registry,
request: &Self::Request,
) -> Result<(), SvcError> {
let state = registry.volume_state(&request.uuid).await?;

let operation = VolumeOperation::SetVolumeProperty(VolumeProperty::new(
&request.prop_name,
&request.prop_value,
));
let spec_clone = self.start_update(registry, &state, operation).await?;

self.complete_update(registry, Ok(()), spec_clone).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl ResourceShutdownOperations for OperationGuardArc<VolumeSpec> {
type RemoveShutdownTargets = DestroyShutdownTargets;
Expand Down Expand Up @@ -786,14 +811,16 @@ impl ResourceLifecycleExt<CreateVolumeSource<'_>> for OperationGuardArc<VolumeSp
request_src: &CreateVolumeSource,
) -> Result<Self::CreateOutput, SvcError> {
request_src.pre_flight_check()?;
let request = request_src.source();

let mut request = request_src.source().clone();
if request.max_snapshots().is_none() {
request.max_snapshots = Some(registry.max_snapshots());
}
let specs = registry.specs();
let mut volume = specs
.get_or_create_volume(request_src)
.operation_guard_wait()
.await?;
let volume_clone = volume.start_create_update(registry, request).await?;
let volume_clone = volume.start_create_update(registry, &request).await?;

// If the volume is a part of the ag, create or update accordingly.
registry.specs().get_or_create_affinity_group(&volume_clone);
Expand Down
35 changes: 28 additions & 7 deletions control-plane/agents/src/bin/core/volume/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
operations::{
ResourceCloning, ResourceLifecycle, ResourceLifecycleWithLifetime,
ResourcePublishing, ResourceReplicas, ResourceResize, ResourceSharing,
ResourceShutdownOperations, ResourceSnapshotting,
ResourceShutdownOperations, ResourceSnapshotting, ResourceVolume,
},
operations_helper::{OperationSequenceGuard, ResourceSpecsLocked},
OperationGuardArc,
Expand All @@ -21,9 +21,9 @@ use grpc::{
CreateSnapshotVolumeInfo, CreateVolumeInfo, CreateVolumeSnapshot,
CreateVolumeSnapshotInfo, DestroyShutdownTargetsInfo, DestroyVolumeInfo,
DestroyVolumeSnapshot, DestroyVolumeSnapshotInfo, PublishVolumeInfo,
RepublishVolumeInfo, ResizeVolumeInfo, SetVolumeReplicaInfo, ShareVolumeInfo,
UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations, VolumeSnapshot,
VolumeSnapshots,
RepublishVolumeInfo, ResizeVolumeInfo, SetVolumePropInfo, SetVolumeReplicaInfo,
ShareVolumeInfo, UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations,
VolumeSnapshot, VolumeSnapshots,
},
Pagination,
},
Expand All @@ -37,8 +37,8 @@ use stor_port::{
},
transport::{
CreateSnapshotVolume, CreateVolume, DestroyShutdownTargets, DestroyVolume, Filter,
PublishVolume, RepublishVolume, ResizeVolume, SetVolumeReplica, ShareVolume,
UnpublishVolume, UnshareVolume, Volume,
PublishVolume, RepublishVolume, ResizeVolume, SetVolumeProp, SetVolumeReplica,
ShareVolume, UnpublishVolume, UnshareVolume, Volume,
},
},
};
Expand Down Expand Up @@ -161,6 +161,18 @@ impl VolumeOperations for Service {
Ok(volume)
}

async fn set_volume_prop(
&self,
req: &dyn SetVolumePropInfo,
_ctx: Option<Context>,
) -> Result<Volume, ReplyError> {
let set_volume_prop = req.into();
let service = self.clone();
let volume =
Context::spawn(async move { service.set_volume_property(&set_volume_prop).await })
.await??;
Ok(volume)
}
async fn probe(&self, _ctx: Option<Context>) -> Result<bool, ReplyError> {
return Ok(true);
}
Expand Down Expand Up @@ -291,7 +303,6 @@ impl Service {
}
filter => return Err(SvcError::InvalidFilter { filter }),
};

Ok(Volumes {
entries: filtered_volumes,
next_token: match last_result {
Expand Down Expand Up @@ -403,6 +414,16 @@ impl Service {
volume.set_replica(&self.registry, request).await?;
self.registry.volume(&request.uuid).await
}
/// Set volume property.
#[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.uuid))]
pub(super) async fn set_volume_property(
&self,
request: &SetVolumeProp,
) -> Result<Volume, SvcError> {
let mut volume = self.specs().volume(&request.uuid).await?;
volume.set_property(&self.registry, request).await?;
self.registry.volume(&request.uuid).await
}

/// Create a volume snapshot.
#[tracing::instrument(level = "info", skip(self), err, fields(volume.uuid = %request.source_id, snapshot.source_uuid = %request.source_id, snapshot.uuid = %request.snap_id))]
Expand Down
47 changes: 25 additions & 22 deletions control-plane/agents/src/bin/core/volume/snapshot_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use stor_port::{
};

use chrono::{DateTime, Utc};
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::SystemTime,
};

#[async_trait::async_trait]
impl ResourceSnapshotting for OperationGuardArc<VolumeSpec> {
Expand Down Expand Up @@ -66,9 +69,6 @@ impl ResourceSnapshotting for OperationGuardArc<VolumeSpec> {
},
)
.await;
// if snap_result.is_ok() {
// self.as_ref().num_snapshots += 1;
// }
self.complete_update(registry, snap_result, spec_clone)
.await
}
Expand Down Expand Up @@ -337,7 +337,7 @@ impl OperationGuardArc<VolumeSnapshot> {
self.snapshot_nexus(prep_params, target, registry, target_node)
.await
} else {
self.snapshot_replica(prep_params, target_node).await
self.snapshot_replica::<N>(prep_params, registry).await
}
}

Expand Down Expand Up @@ -415,27 +415,30 @@ impl OperationGuardArc<VolumeSnapshot> {
async fn snapshot_replica<N: ReplicaSnapshotApi>(
&self,
prep_params: &PrepareVolumeSnapshot,
target_node: N,
registry: &Registry,
) -> Result<VolumeSnapshotCreateResult, SvcError> {
let mut replica_snap = prep_params.replica_snapshot.clone();
let mut replica_snaps = prep_params.replica_snapshot.clone();
let volume_params = prep_params.parameters.params().clone();
let mut timestamp = SystemTime::now();

let replica_params = volume_params.with_uuid(replica_snap.1[0].spec().uuid());
let response = target_node
.create_repl_snapshot(&CreateReplicaSnapshot::new(SnapshotParameters::new(
replica_snap.1[0].spec().source_id().replica_id(),
replica_params,
)))
.await?;
let timestamp = response.timestamp();
replica_snap.1[0].complete_vol(
timestamp.into(),
response.replica_size(),
response.allocated_size() + response.predecessor_alloc_size(),
);

for (replica, replica_snap) in replica_snaps.0.iter_mut().zip(replica_snaps.1.iter_mut()) {
let replica_params = volume_params.clone().with_uuid(replica_snap.spec().uuid());
let target_node = registry.node_wrapper(&replica.node).await?;
let response = target_node
.create_repl_snapshot(&CreateReplicaSnapshot::new(SnapshotParameters::new(
replica_snap.spec().source_id().replica_id(),
replica_params,
)))
.await?;
timestamp = response.timestamp();
replica_snap.complete_vol(
timestamp.into(),
response.replica_size(),
response.allocated_size() + response.predecessor_alloc_size(),
);
}
Ok(VolumeSnapshotCreateResult::new_ok(
replica_snap.1,
replica_snaps.1,
timestamp.into(),
))
}
Expand Down
2 changes: 1 addition & 1 deletion control-plane/agents/src/bin/core/volume/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ impl SpecOperationsHelper for VolumeSpec {
}
}
}

VolumeOperation::SetVolumeProperty(_) => Ok(()),
VolumeOperation::RemoveUnusedReplica(uuid) => {
let last_replica = !registry
.specs()
Expand Down
21 changes: 21 additions & 0 deletions control-plane/grpc/proto/v1/volume/volume.proto
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ message CreateVolumeRequest {
bool thin = 8;
// Affinity Group related information.
optional AffinityGroup affinity_group = 9;
// Max snapshots limit per volume.
optional int32 max_snapshots = 10;
}

// Publish a volume on a node
Expand Down Expand Up @@ -344,6 +346,16 @@ message SetVolumeReplicaRequest {
// replica count
uint32 replicas = 2;
}
// Set the volume property
message SetVolumePropRequest {
// uuid of the volume
google.protobuf.StringValue uuid = 1;
// property name
string prop_name = 2;
// property value
string prop_value = 3;
}


// Delete volume
message DestroyVolumeRequest {
Expand Down Expand Up @@ -425,6 +437,14 @@ message SetVolumeReplicaReply {
}
}

// Reply type for a SetVolumeProperty request
message SetVolumePropReply {
oneof reply {
Volume volume = 1;
common.ReplyError error = 2;
}
}

message ProbeRequest {
// Intentionally empty.
}
Expand Down Expand Up @@ -593,6 +613,7 @@ service VolumeGrpc {
rpc ShareVolume (ShareVolumeRequest) returns (ShareVolumeReply) {}
rpc UnshareVolume (UnshareVolumeRequest) returns (UnshareVolumeReply) {}
rpc SetVolumeReplica (SetVolumeReplicaRequest) returns (SetVolumeReplicaReply) {}
rpc SetVolumeProperty(SetVolumePropRequest) returns (SetVolumePropReply) {}
rpc Probe (ProbeRequest) returns (ProbeResponse) {}

// Snapshots
Expand Down
34 changes: 28 additions & 6 deletions control-plane/grpc/src/operations/volume/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::{
traits::{
CreateSnapshotVolumeInfo, CreateVolumeInfo, CreateVolumeSnapshotInfo,
DestroyShutdownTargetsInfo, DestroyVolumeInfo, PublishVolumeInfo,
RepublishVolumeInfo, ResizeVolumeInfo, SetVolumeReplicaInfo, ShareVolumeInfo,
UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations, VolumeSnapshot,
VolumeSnapshots,
RepublishVolumeInfo, ResizeVolumeInfo, SetVolumePropInfo, SetVolumeReplicaInfo,
ShareVolumeInfo, UnpublishVolumeInfo, UnshareVolumeInfo, VolumeOperations,
VolumeSnapshot, VolumeSnapshots,
},
traits_snapshots::DestroyVolumeSnapshotInfo,
},
Expand All @@ -17,9 +17,9 @@ use crate::{
volume::{
create_snapshot_reply, create_snapshot_volume_reply, create_volume_reply,
get_snapshots_reply, get_snapshots_request, get_volumes_reply, get_volumes_request,
publish_volume_reply, republish_volume_reply, set_volume_replica_reply, share_volume_reply,
unpublish_volume_reply, volume_grpc_client::VolumeGrpcClient, GetSnapshotsRequest,
GetVolumesRequest, ProbeRequest,
publish_volume_reply, republish_volume_reply, set_volume_prop_reply,
set_volume_replica_reply, share_volume_reply, unpublish_volume_reply,
volume_grpc_client::VolumeGrpcClient, GetSnapshotsRequest, GetVolumesRequest, ProbeRequest,
},
};

Expand Down Expand Up @@ -219,6 +219,28 @@ impl VolumeOperations for VolumeClient {
}
}

#[tracing::instrument(
name = "VolumeClient::set_volume_prop",
level = "debug",
skip(self),
err
)]
async fn set_volume_prop(
&self,
request: &dyn SetVolumePropInfo,
ctx: Option<Context>,
) -> Result<Volume, ReplyError> {
let req = self.request(request, ctx, MessageIdVs::SetVolumeProp);
let response = self.client().set_volume_property(req).await?.into_inner();
match response.reply {
Some(set_volume_prop_reply) => match set_volume_prop_reply {
set_volume_prop_reply::Reply::Volume(volume) => Ok(Volume::try_from(volume)?),
set_volume_prop_reply::Reply::Error(err) => Err(err.into()),
},
None => Err(ReplyError::invalid_response(ResourceKind::Volume)),
}
}

#[tracing::instrument(name = "VolumeClient::probe", level = "debug", skip(self))]
async fn probe(&self, _ctx: Option<Context>) -> Result<bool, ReplyError> {
match self.client().probe(ProbeRequest {}).await {
Expand Down
Loading

0 comments on commit 0d1c5c9

Please sign in to comment.