Skip to content

Commit

Permalink
replication: add reconcile logic to update last_sync
Browse files Browse the repository at this point in the history
This commit adds reconcile logic to update the
last sync time.

Signed-off-by: yati1998 <ypadia@redhat.com>
  • Loading branch information
yati1998 committed Sep 7, 2022
1 parent 37e96d6 commit 13f6350
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 0 deletions.
11 changes: 11 additions & 0 deletions controllers/replication.storage/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ func (r *Replication) Resync() *Response {
return &Response{Response: resp, Error: err}
}

func (r *Replication) GetInfo() *Response {
resp, err := r.Params.Replication.GetVolumeReplicationInfo(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.SecretName,
r.Params.SecretNamespace,
)

return &Response{Response: resp, Error: err}
}

func (r *Response) HasKnownGRPCError(knownErrors []codes.Code) bool {
if r.Error == nil {
return false
Expand Down
53 changes: 53 additions & 0 deletions controllers/replication.storage/volumereplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}
// remove the prefix keys in volume replication class parameters
parameters := filterPrefixedParameters(replicationParameterPrefix, vrcObj.Spec.Parameters)
schedulingTime := parameters["schedulingInterval"]
scheduleTime, _ := time.ParseDuration(schedulingTime)

// get secret
secretName := vrcObj.Spec.Parameters[prefixedReplicationSecretNameKey]
Expand Down Expand Up @@ -363,13 +365,30 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

instance.Status.LastCompletionTime = getCurrentTime()
var last_sync_time int64
var requeueForInfo bool
if instance.Spec.ReplicationState == replicationv1alpha1.Primary {
last_sync_time, err = r.getVolumeReplicationInfo(vr)
last_sync_time1 := time.Unix(last_sync_time, 0).UTC()
instance.Status.LastSyncTime = getTime(last_sync_time1)
requeueForInfo = true

}
err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg)
if err != nil {
return ctrl.Result{}, err
}

logger.Info(msg)

if requeueForInfo {
logger.Info("volume is primary, requeuing to get volume replication info")
return ctrl.Result{
Requeue: true,
RequeueAfter: scheduleTime,
}, nil
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -614,6 +633,34 @@ func (r *VolumeReplicationReconciler) enableReplication(vr *volumeReplicationIns
return nil
}

// getVolumeReplicationInfo gets volume replication info.
func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplicationInstance) (int64, error) {
volumeReplication := replication.Replication{
Params: vr.commonRequestParameters,
}

resp := volumeReplication.GetInfo()
if resp.Error != nil {
vr.logger.Error(resp.Error, "failed to get volume replication info")

return 0, resp.Error
}

infoResponse, ok := resp.Response.(*proto.GetVolumeReplicationInfoResponse)
if !ok {
err := fmt.Errorf("received response of unexpected type")
vr.logger.Error(err, "unable to parse response")

return 0, err
}

var last_sync_time int64

last_sync_time = infoResponse.LastSyncTime

return last_sync_time, nil
}

func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State {
switch instance.Spec.ReplicationState {
case replicationv1alpha1.Primary:
Expand Down Expand Up @@ -651,3 +698,9 @@ func getCurrentTime() *metav1.Time {

return &metav1NowTime
}

func getTime(time time.Time) *metav1.Time {
metav1Time := metav1.NewTime(time)

return &metav1Time
}
12 changes: 12 additions & 0 deletions internal/client/fake/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ReplicationClient struct {
DemoteVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.DemoteVolumeResponse, error)
// ResyncVolumeMock mocks ResyncVolume RPC call.
ResyncVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.ResyncVolumeResponse, error)
// GetVolumeReplicationInfo mocks GetVolumeReplicationInfo RPC call.
GetVolumeReplicationInfoMock func(volumeID, replicationID string, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error)
}

// EnableVolumeReplication calls EnableVolumeReplicationMock mock function.
Expand Down Expand Up @@ -87,3 +89,13 @@ func (rc *ReplicationClient) ResyncVolume(
error) {
return rc.ResyncVolumeMock(volumeID, replicationID, secretName, secretNamespace, parameters)
}

// GetVolumeReplicationInfo calls GetVolumeReplicationInfoMock function.
func (rc *ReplicationClient) GetVolumeReplicationInfo(
volumeID,
replicationID string,
secretName, secretNamespace string) (
*proto.GetVolumeReplicationInfoResponse,
error) {
return rc.GetVolumeReplicationInfoMock(volumeID, replicationID, secretName, secretNamespace)
}
19 changes: 19 additions & 0 deletions internal/client/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type VolumeReplication interface {
// ResyncVolume RPC call to resync the volume.
ResyncVolume(volumeID, replicationID string, force bool, secretName, secretNamespace string, parameters map[string]string) (*proto.
ResyncVolumeResponse, error)
// GetVolumeReplicationInfo RPC call to get volume replication info.
GetVolumeReplicationInfo(volumeID, replicationID string, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error)
}

// NewReplicationClient returns VolumeReplication interface which has the RPC
Expand Down Expand Up @@ -143,3 +145,20 @@ func (rc *replicationClient) ResyncVolume(volumeID, replicationID string, force

return resp, err
}

// GetVolumeReplicationInfo RPC call to get volume replication info.
func (rc *replicationClient) GetVolumeReplicationInfo(volumeID, replicationID string,
secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error) {
req := &proto.GetVolumeReplicationInfoRequest{
VolumeId: volumeID,
ReplicationId: replicationID,
SecretName: secretName,
SecretNamespace: secretNamespace,
}

createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout)
defer cancel()
resp, err := rc.client.GetVolumeReplicationInfo(createCtx, req)

return resp, err
}
31 changes: 31 additions & 0 deletions internal/sidecar/service/volumereplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,34 @@ func (rs *ReplicationServer) ResyncVolume(
Ready: resp.Ready,
}, nil
}

// GetVolumeReplicationInfo fetches required information from kubernetes cluster and calls
// CSI-Addons GetVolumeReplicationInfo service.
func (rs *ReplicationServer) GetVolumeReplicationInfo(
ctx context.Context,
req *proto.GetVolumeReplicationInfoRequest) (*proto.GetVolumeReplicationInfoResponse, error) {
// Get the secrets from the k8s cluster
data, err := kube.GetSecret(ctx, rs.kubeClient, req.GetSecretName(), req.GetSecretNamespace())
if err != nil {
klog.Errorf("Failed to get secret %s in namespace %s: %v", req.GetSecretName(), req.GetSecretNamespace(), err)
return nil, status.Error(codes.Internal, err.Error())
}

resp, err := rs.controllerClient.GetVolumeReplicationInfo(ctx,
&csiReplication.GetVolumeReplicationInfoRequest{
VolumeId: req.VolumeId,
Secrets: data,
ReplicationId: req.ReplicationId,
})
if err != nil {
klog.Errorf("Failed to get volume replication info: %v", err)
return nil, err
}

lastsynctime := resp.LastSyncTime
synctime := lastsynctime.GetSeconds()

return &proto.GetVolumeReplicationInfoResponse{
LastSyncTime: synctime,
}, nil
}

0 comments on commit 13f6350

Please sign in to comment.