Skip to content

Commit

Permalink
replication: add reconcile logic to update last_sync
Browse files Browse the repository at this point in the history
This commit adds reconcile logic to update the
last sync time.

Signed-off-by: yati1998 <ypadia@redhat.com>
  • Loading branch information
yati1998 committed Sep 13, 2022
1 parent dc04589 commit f0c7f15
Show file tree
Hide file tree
Showing 10 changed files with 1,127 additions and 2 deletions.
11 changes: 11 additions & 0 deletions controllers/replication.storage/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ func (r *Replication) Resync() *Response {
return &Response{Response: resp, Error: err}
}

func (r *Replication) GetInfo() *Response {
resp, err := r.Params.Replication.GetVolumeReplicationInfo(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.SecretName,
r.Params.SecretNamespace,
)

return &Response{Response: resp, Error: err}
}

func (r *Response) HasKnownGRPCError(knownErrors []codes.Code) bool {
if r.Error == nil {
return false
Expand Down
73 changes: 73 additions & 0 deletions controllers/replication.storage/volumereplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
pvcDataSource = "PersistentVolumeClaim"
volumeReplicationClass = "VolumeReplicationClass"
volumeReplication = "VolumeReplication"
defaultScheduleTime = time.Hour
)

var (
Expand Down Expand Up @@ -363,16 +364,64 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

instance.Status.LastCompletionTime = getCurrentTime()

var requeueForInfo bool

if instance.Spec.ReplicationState == replicationv1alpha1.Primary {
info, err := r.getVolumeReplicationInfo(vr)
if err != nil {
logger.Error(err, "Failed to get volume replication info")
return ctrl.Result{}, err
}
ts := info.GetLastSyncTime()

lastSyncTime := metav1.NewTime(ts.AsTime())
instance.Status.LastSyncTime = &lastSyncTime
requeueForInfo = true
}
err = r.updateReplicationStatus(instance, logger, getReplicationState(instance), msg)
if err != nil {
return ctrl.Result{}, err
}
if instance.Spec.ReplicationState == replicationv1alpha1.Secondary {
instance.Status.LastSyncTime = nil
}

logger.Info(msg)

if requeueForInfo {
scheduleTime := getScheduleTime(parameters, logger)
return ctrl.Result{
Requeue: true,
RequeueAfter: scheduleTime,
}, nil
}

return ctrl.Result{}, nil
}

// getScheduleTime takes parameters and returns the scheduling interval
// after converting it to time.Duration. If the schedulingInterval is empty
// or there is error parsing, it is set to the default value.
func getScheduleTime(parameters map[string]string, logger logr.Logger) time.Duration {
// the schedulingInterval looks like below, which is the part of volumereplicationclass
// and is an optional parameter.
// ```parameters:
// replication.storage.openshift.io/replication-secret-name: rook-csi-rbd-provisioner
// replication.storage.openshift.io/replication-secret-namespace: rook-ceph
// schedulingInterval: 1m```
rawScheduleTime := parameters["schedulingInterval"]
if rawScheduleTime == "" {
return defaultScheduleTime
}
scheduleTime, err := time.ParseDuration(rawScheduleTime)
if err != nil {
logger.Error(err, "failed to parse time: %v", rawScheduleTime)
return defaultScheduleTime
}
return scheduleTime
}

func (r *VolumeReplicationReconciler) getReplicationClient(driverName string) (grpcClient.VolumeReplication, error) {
conns := r.Connpool.GetByNodeID(driverName, "")

Expand Down Expand Up @@ -614,6 +663,30 @@ func (r *VolumeReplicationReconciler) enableReplication(vr *volumeReplicationIns
return nil
}

// getVolumeReplicationInfo gets volume replication info.
func (r *VolumeReplicationReconciler) getVolumeReplicationInfo(vr *volumeReplicationInstance) (*proto.GetVolumeReplicationInfoResponse, error) {
volumeReplication := replication.Replication{
Params: vr.commonRequestParameters,
}

resp := volumeReplication.GetInfo()
if resp.Error != nil {
vr.logger.Error(resp.Error, "failed to get volume replication info")

return nil, resp.Error
}

infoResponse, ok := resp.Response.(*proto.GetVolumeReplicationInfoResponse)
if !ok {
err := fmt.Errorf("received response of unexpected type")
vr.logger.Error(err, "unable to parse response")

return nil, err
}

return infoResponse, nil
}

func getReplicationState(instance *replicationv1alpha1.VolumeReplication) replicationv1alpha1.State {
switch instance.Spec.ReplicationState {
case replicationv1alpha1.Primary:
Expand Down
74 changes: 74 additions & 0 deletions controllers/replication.storage/volumereplication_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2022 The Kubernetes-CSI-Addons Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"testing"
"time"

"github.com/go-logr/logr/testr"
)

func TestGetScheduledTime(t *testing.T) {
t.Parallel()
td, _ := time.ParseDuration("1m")
const defaultScheduleTime = time.Hour
logger := testr.New(t)
testcases := []struct {
parameters map[string]string
time time.Duration
}{
{
parameters: map[string]string{
"replication.storage.openshift.io/replication-secret-name": "rook-csi-rbd-provisioner",
"schedulingInterval": "1m",
},
time: td,
},
{
parameters: map[string]string{
"replication.storage.openshift.io/replication-secret-name": "rook-csi-rbd-provisioner",
},
time: defaultScheduleTime,
},
{
parameters: map[string]string{},
time: defaultScheduleTime,
},
{
parameters: map[string]string{
"schedulingInterval": "",
},
time: defaultScheduleTime,
},
{
parameters: map[string]string{
"schedulingInterval": "2mm",
},
time: defaultScheduleTime,
},
}
for _, tt := range testcases {
newtt := tt
t.Run("", func(t *testing.T) {
t.Parallel()
if got := getScheduleTime(newtt.parameters, logger); got != newtt.time {
t.Errorf("GetSchedluedTime() = %v, want %v", got, newtt.time)
}
})
}
}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/csi-addons/spec v0.1.2-0.20220829042231-b27a0d84b50b h1:C5KgryC4RwQVSF8L/pgcKftgn7Z1zHFZlACJukPlCxs=
github.com/csi-addons/spec v0.1.2-0.20220829042231-b27a0d84b50b/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.1.2-0.20220906123848-52ce69f90900 h1:zX0138DipZsZqxK1UwAmaRZmL89OuQMkwh7FtvTDgFw=
github.com/csi-addons/spec v0.1.2-0.20220906123848-52ce69f90900/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
Expand Down
12 changes: 12 additions & 0 deletions internal/client/fake/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type ReplicationClient struct {
DemoteVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.DemoteVolumeResponse, error)
// ResyncVolumeMock mocks ResyncVolume RPC call.
ResyncVolumeMock func(volumeID, replicationID string, secretName, secretNamespace string, parameters map[string]string) (*proto.ResyncVolumeResponse, error)
// GetVolumeReplicationInfo mocks GetVolumeReplicationInfo RPC call.
GetVolumeReplicationInfoMock func(volumeID, replicationID string, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error)
}

// EnableVolumeReplication calls EnableVolumeReplicationMock mock function.
Expand Down Expand Up @@ -87,3 +89,13 @@ func (rc *ReplicationClient) ResyncVolume(
error) {
return rc.ResyncVolumeMock(volumeID, replicationID, secretName, secretNamespace, parameters)
}

// GetVolumeReplicationInfo calls GetVolumeReplicationInfoMock function.
func (rc *ReplicationClient) GetVolumeReplicationInfo(
volumeID,
replicationID string,
secretName, secretNamespace string) (
*proto.GetVolumeReplicationInfoResponse,
error) {
return rc.GetVolumeReplicationInfoMock(volumeID, replicationID, secretName, secretNamespace)
}
19 changes: 19 additions & 0 deletions internal/client/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type VolumeReplication interface {
// ResyncVolume RPC call to resync the volume.
ResyncVolume(volumeID, replicationID string, force bool, secretName, secretNamespace string, parameters map[string]string) (*proto.
ResyncVolumeResponse, error)
// GetVolumeReplicationInfo RPC call to get volume replication info.
GetVolumeReplicationInfo(volumeID, replicationID, secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error)
}

// NewReplicationClient returns VolumeReplication interface which has the RPC
Expand Down Expand Up @@ -143,3 +145,20 @@ func (rc *replicationClient) ResyncVolume(volumeID, replicationID string, force

return resp, err
}

// GetVolumeReplicationInfo RPC call to get volume replication info.
func (rc *replicationClient) GetVolumeReplicationInfo(volumeID, replicationID,
secretName, secretNamespace string) (*proto.GetVolumeReplicationInfoResponse, error) {
req := &proto.GetVolumeReplicationInfoRequest{
VolumeId: volumeID,
ReplicationId: replicationID,
SecretName: secretName,
SecretNamespace: secretNamespace,
}

createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout)
defer cancel()
resp, err := rc.client.GetVolumeReplicationInfo(createCtx, req)

return resp, err
}
33 changes: 33 additions & 0 deletions internal/sidecar/service/volumereplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,36 @@ func (rs *ReplicationServer) ResyncVolume(
Ready: resp.Ready,
}, nil
}

// GetVolumeReplicationInfo fetches required information from kubernetes cluster and calls
// CSI-Addons GetVolumeReplicationInfo service.
func (rs *ReplicationServer) GetVolumeReplicationInfo(
ctx context.Context,
req *proto.GetVolumeReplicationInfoRequest) (*proto.GetVolumeReplicationInfoResponse, error) {
// Get the secrets from the k8s cluster
data, err := kube.GetSecret(ctx, rs.kubeClient, req.GetSecretName(), req.GetSecretNamespace())
if err != nil {
klog.Errorf("Failed to get secret %s in namespace %s: %v", req.GetSecretName(), req.GetSecretNamespace(), err)
return nil, status.Error(codes.Internal, err.Error())
}

resp, err := rs.controllerClient.GetVolumeReplicationInfo(ctx,
&csiReplication.GetVolumeReplicationInfoRequest{
VolumeId: req.VolumeId,
Secrets: data,
ReplicationId: req.ReplicationId,
})
if err != nil {
klog.Errorf("Failed to get volume replication info: %v", err)
return nil, err
}

lastsynctime := resp.GetLastSyncTime()
if lastsynctime == nil {
klog.Errorf("Failed to get last sync time: %v", lastsynctime)
}

return &proto.GetVolumeReplicationInfoResponse{
LastSyncTime: lastsynctime,
}, nil
}
Loading

0 comments on commit f0c7f15

Please sign in to comment.