Skip to content

Commit

Permalink
feat(policy): reconcile volume policy per volume bases (#52)
Browse files Browse the repository at this point in the history
changes enhances the cvc controller to reconcile the
volume target deployment based on the changes done in
different cstorvolumeconfig (cvc) policy resources.
Signed-off-by: prateekpandey14 <prateek.pandey@mayadata.io>
  • Loading branch information
prateekpandey14 authored May 7, 2020
1 parent 9a0149f commit 0de93a3
Show file tree
Hide file tree
Showing 33 changed files with 7,719 additions and 137 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ CVC_OPERATOR_ARM64?=cvc-operator-arm64
deps:
@go mod tidy
@go mod verify
@git diff --exit-code -- go.sum go.mod

.PHONY: test
test:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/openebs/cstor-operators
go 1.13

require (
github.com/davecgh/go-spew v1.1.1
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
Expand All @@ -11,6 +12,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
Expand Down
139 changes: 129 additions & 10 deletions pkg/controllers/cstorvolumeconfig/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

apis "github.com/openebs/api/pkg/apis/cstor/v1"
apitypes "github.com/openebs/api/pkg/apis/types"
"github.com/openebs/cstor-operators/pkg/util/hash"
"github.com/openebs/cstor-operators/pkg/version"
errors "github.com/pkg/errors"
"k8s.io/klog"

Expand Down Expand Up @@ -144,6 +146,15 @@ func (c *CVCController) enqueueCVC(obj interface{}) {
// synCVC is the function which tries to converge to a desired state for the
// CStorVolumeConfigs
func (c *CVCController) syncCVC(cvc *apis.CStorVolumeConfig) error {

if ok, reason := c.ShouldReconcile(cvc); !ok {
// Do not reconcile cvc if version mismatched
message := fmt.Sprintf("can not reconcile CVC %s as %s", cvc.Name, reason)
c.recorder.Event(cvc, corev1.EventTypeWarning, "CVC Reconcile", message)
klog.Warningf("Cannot not reconcile CVC %s in namespace %s as %s", cvc.Name, cvc.Namespace, reason)
return nil
}

var err error
// CStor Volume Claim should be deleted. Check if deletion timestamp is set
// and remove finalizer.
Expand Down Expand Up @@ -208,6 +219,13 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeConfig) error {
// change in curent and desired state of replicas pool information
_ = c.scaleVolumeReplicas(cvc)
}

// sync policy changes from cvc.spec.policy e.g. tunables like toleration, resource requirements etc
err = c.syncPolicySpec(cvc)
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -268,7 +286,7 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeConfig) (*api
}

klog.V(2).Infof("creating cstorvolume target deployment")
_, err = c.getOrCreateCStorTargetDeployment(cvObj, volumePolicy)
_, err = c.getOrCreateCStorTargetDeployment(cvObj, &volumePolicy.Spec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -317,6 +335,8 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeConfig) (*api

// update volume replica pool information on cvc spec and status
addReplicaPoolInfo(cvc, poolNames)
// add hash label in cvc generated from volume policy spec
addPolicySpecHash(cvc)

err = c.updateCVCObj(cvc, cvObj)
if err != nil {
Expand All @@ -325,13 +345,86 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeConfig) (*api
return cvc, nil
}

// syncPolicySpec reconcile the policy changes to volume target deployment
// for each volumes based on desired changes under cvc.Spec.Policy
func (c *CVCController) syncPolicySpec(cvc *apis.CStorVolumeConfig) error {
cvcCopy := cvc.DeepCopy()

// compare hash label value to the generated hash out of policy changes
if cvcCopy.Labels[hash.TemplateHashLabelName] != hash.HashObject(cvcCopy.Spec.Policy) {
klog.V(4).Infof("Initiated policy reconcile for cvc %q :", cvc.Name)
err := c.patchTargetDeploymentSpec(cvcCopy)
if err != nil {
c.recorder.Event(cvcCopy, corev1.EventTypeWarning,
string("PolicySync"),
fmt.Sprintf("failed to patch target deployment for cvc %s, err %s ", cvcCopy.Name, err.Error()),
)
return err
}
// update the hash value in cvc labels generated for new policy changes
addPolicySpecHash(cvcCopy)
_, err = c.clientset.CstorV1().CStorVolumeConfigs(cvc.Namespace).Update(cvcCopy)
if err != nil {
c.recorder.Event(cvcCopy, corev1.EventTypeWarning,
string("PolicySync"),
fmt.Sprintf("failed to update hash label in cvc %q, err %s", cvcCopy.Name, err.Error()),
)
return err
}
c.recorder.Event(cvcCopy, corev1.EventTypeNormal,
string("PolicySync"),
fmt.Sprintf("successfully sync policy for cvc %s", cvcCopy.Name),
)
}
return nil
}

func (c *CVCController) patchTargetDeploymentSpec(cvc *apis.CStorVolumeConfig) error {
orignalDeployObj, err := c.kubeclientset.AppsV1().Deployments(cvc.Namespace).Get(cvc.Name+"-target", metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get target deployment for volume %s in namespace %s", cvc.Name, cvc.Namespace)
}

klog.V(4).Infof("Syncing cvc policy spec \n: %+v", cvc)

vol, err := c.clientset.CstorV1().CStorVolumes(cvc.Namespace).Get(cvc.Name, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get cstorvolume {%v}", cvc.Name)
}

newDeployObj, err := c.BuildTargetDeployment(vol, &cvc.Spec.Policy)
if err != nil {
return errors.Wrapf(err, "failed to build target deployment {%v}", vol.Name)
}

twoWayPatchData, orignalDeployObjInBytes, err := getPatchData(orignalDeployObj, newDeployObj)
if err != nil {
return err
}

strategicPatchData, err := strategicpatch.StrategicMergePatch(orignalDeployObjInBytes, twoWayPatchData, orignalDeployObj)
if err != nil {
return errors.Wrap(err, "failed to create strategic merge patch data")
}

_, err = c.kubeclientset.AppsV1().Deployments(cvc.Namespace).Patch(orignalDeployObj.Name, types.StrategicMergePatchType, strategicPatchData)
if err != nil {
return errors.Wrap(err, "failed to patch volume target deployment")
}

return nil
}

func (c *CVCController) getVolumePolicy(
policyName string,
cvc *apis.CStorVolumeConfig,
) (*apis.CStorVolumePolicy, error) {

volumePolicy := &apis.CStorVolumePolicy{}
var err error
volumePolicy := &apis.CStorVolumePolicy{}

// Get the default policy
policySpec := getDefaultPolicySpec()

if policyName != "" {
klog.Infof("uses cstorvolume policy %q to configure volume %q", policyName, cvc.Name)
Expand All @@ -344,10 +437,19 @@ func (c *CVCController) getVolumePolicy(
cvc.Name,
)
}
validatePolicySpec(&volumePolicy.Spec)
return volumePolicy, nil
}
// retrun the default policy
volumePolicy.Spec = policySpec
return volumePolicy, nil
}

func addPolicySpecHash(cvc *apis.CStorVolumeConfig) {
labels := hash.SetTemplateHashLabel(cvc.Labels, cvc.Spec.Policy)
cvc.WithLabels(labels)
}

// isReplicaAffinityEnabled checks if replicaAffinity has been enabled using
// cstor volume policy
func (c *CVCController) isReplicaAffinityEnabled(policy *apis.CStorVolumePolicy) bool {
Expand Down Expand Up @@ -396,7 +498,7 @@ func (c *CVCController) removeClaimFinalizer(
}
}
cvcPatch := []Patch{
Patch{
{
Op: "remove",
Path: "/metadata/finalizers",
},
Expand Down Expand Up @@ -610,7 +712,7 @@ func (c *CVCController) markCVCResizeFinished(cvc *apis.CStorVolumeConfig) error
func (c *CVCController) PatchCVCStatus(oldCVC,
newCVC *apis.CStorVolumeConfig,
) (*apis.CStorVolumeConfig, error) {
patchBytes, err := getPatchData(oldCVC, newCVC)
patchBytes, _, err := getPatchData(oldCVC, newCVC)
if err != nil {
return nil, fmt.Errorf("can't patch status of CVC %s as generate path data failed: %v", oldCVC.Name, err)
}
Expand All @@ -623,28 +725,28 @@ func (c *CVCController) PatchCVCStatus(oldCVC,
return updatedClaim, nil
}

func getPatchData(oldObj, newObj interface{}) ([]byte, error) {
func getPatchData(oldObj, newObj interface{}) ([]byte, []byte, error) {
oldData, err := json.Marshal(oldObj)
if err != nil {
return nil, fmt.Errorf("marshal old object failed: %v", err)
return nil, nil, fmt.Errorf("marshal old object failed: %v", err)
}
newData, err := json.Marshal(newObj)
if err != nil {
return nil, fmt.Errorf("mashal new object failed: %v", err)
return nil, nil, fmt.Errorf("mashal new object failed: %v", err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldObj)
if err != nil {
return nil, fmt.Errorf("CreateTwoWayMergePatch failed: %v", err)
return nil, nil, fmt.Errorf("CreateTwoWayMergePatch failed: %v", err)
}
return patchBytes, nil
return patchBytes, oldData, nil
}

// resizeCV resize the cstor volume to desired size, and update CV's capacity
func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quantity) error {
newCV := cv.DeepCopy()
newCV.Spec.Capacity = newCapacity

patchBytes, err := getPatchData(cv, newCV)
patchBytes, _, err := getPatchData(cv, newCV)
if err != nil {
return fmt.Errorf("can't update capacity of CV %s as generate patch data failed: %v", cv.Name, err)
}
Expand Down Expand Up @@ -713,3 +815,20 @@ func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeConfig) error {
"successfully scaled volume replicas to %d", len(cvc.Status.PoolInfo))
return nil
}

func (c *CVCController) ShouldReconcile(cvc *apis.CStorVolumeConfig) (bool, string) {
cvcOperatorVersion := version.Current()
cvcVersion := cvc.VersionDetails.Status.Current
// if version is not exists means its a brand new resource that has not been
// reconciled by controller yet
if cvcVersion == "" {
return true, ""
}

if cvcVersion != cvcOperatorVersion {
return false, fmt.Sprintf("cvc operator version is %s but cvc version is %s",
cvcOperatorVersion,
cvcVersion)
}
return true, ""
}
Loading

0 comments on commit 0de93a3

Please sign in to comment.