Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use JSON patch for many VolumeSnapshot and VolumeSnapshotContent updates #526

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/kubernetes/csi-snapshotter/rbac-csi-snapshotter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents"]
verbs: ["create", "get", "list", "watch", "update", "delete"]
verbs: ["create", "get", "list", "watch", "update", "delete", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents/status"]
verbs: ["update"]
verbs: ["update", "patch"]

---
kind: ClusterRoleBinding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents"]
verbs: ["create", "get", "list", "watch", "update", "delete"]
verbs: ["create", "get", "list", "watch", "update", "delete", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents/status"]
verbs: ["patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots"]
verbs: ["get", "list", "watch", "update"]
verbs: ["get", "list", "watch", "update", "patch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["update"]
verbs: ["update", "patch"]

---
kind: ClusterRoleBinding
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/container-storage-interface/spec v1.5.0
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.2
Expand Down
94 changes: 92 additions & 2 deletions pkg/common-controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package common_controller

import (
"encoding/json"
"errors"
"fmt"
"net/http"
Expand All @@ -29,8 +30,7 @@ import (
"testing"
"time"

"k8s.io/client-go/util/workqueue"

jsonpatch "github.com/evanphx/json-patch"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
clientset "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
"github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
Expand All @@ -55,6 +55,7 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
klog "k8s.io/klog/v2"
)

Expand Down Expand Up @@ -258,6 +259,46 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
klog.V(4).Infof("saved updated content %s", content.Name)
return true, content, nil

case action.Matches("patch", "volumesnapshotcontents"):
content := &crdv1.VolumeSnapshotContent{}
action := action.(core.PatchAction)

// Check and bump object version
storedSnapshotContent, found := r.contents[action.GetName()]
if found {
// Apply patch
storedSnapshotBytes, err := json.Marshal(storedSnapshotContent)
if err != nil {
return true, nil, err
}
contentPatch, err := jsonpatch.DecodePatch(action.GetPatch())
if err != nil {
return true, nil, err
}

modified, err := contentPatch.Apply(storedSnapshotBytes)
if err != nil {
return true, nil, err
}

err = json.Unmarshal(modified, content)
if err != nil {
return true, nil, err
}

storedVer, _ := strconv.Atoi(content.ResourceVersion)
content.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("cannot update snapshot content %s: snapshot content not found", action.GetName())
}

// Store the updated object to appropriate places.
r.contents[content.Name] = content
r.changedObjects = append(r.changedObjects, content)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated content %s", content.Name)
return true, content, nil

case action.Matches("update", "volumesnapshots"):
obj := action.(core.UpdateAction).GetObject()
snapshot := obj.(*crdv1.VolumeSnapshot)
Expand All @@ -284,6 +325,52 @@ func (r *snapshotReactor) React(action core.Action) (handled bool, ret runtime.O
klog.V(4).Infof("saved updated snapshot %s", snapshot.Name)
return true, snapshot, nil

case action.Matches("patch", "volumesnapshots"):
snapshot := &crdv1.VolumeSnapshot{}
action := action.(core.PatchAction)

// Check and bump object version
storedSnapshot, found := r.snapshots[action.GetName()]
if found {
// Apply patch
storedSnapshotBytes, err := json.Marshal(storedSnapshot)
if err != nil {
return true, nil, err
}
snapPatch, err := jsonpatch.DecodePatch(action.GetPatch())
if err != nil {
return true, nil, err
}

modified, err := snapPatch.Apply(storedSnapshotBytes)
if err != nil {
return true, nil, err
}

err = json.Unmarshal(modified, storedSnapshot)
if err != nil {
return true, nil, err
}

storedVer, _ := strconv.Atoi(storedSnapshot.ResourceVersion)
storedSnapshot.ResourceVersion = strconv.Itoa(storedVer + 1)

// // If we were updating annotations and the new annotations are nil, leave as empty.
// // This seems to be the behavior for merge-patching nil & empty annotations
// if !reflect.DeepEqual(storedSnapshotContent.Annotations, content.Annotations) && content.Annotations == nil {
// content.Annotations = make(map[string]string)
// }
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
} else {
return true, nil, fmt.Errorf("cannot update snapshot %s: snapshot not found", action.GetName())
}

// Store the updated object to appropriate places.
r.snapshots[snapshot.Name] = storedSnapshot
r.changedObjects = append(r.changedObjects, storedSnapshot)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated snapshot %s", storedSnapshot.Name)
return true, storedSnapshot, nil

case action.Matches("get", "volumesnapshotcontents"):
name := action.(core.GetAction).GetName()
content, found := r.contents[name]
Expand Down Expand Up @@ -437,6 +524,7 @@ func (r *snapshotReactor) checkContents(expectedContents []*crdv1.VolumeSnapshot
}
gotMap[v.Name] = v
}

if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
Expand Down Expand Up @@ -714,6 +802,8 @@ func newSnapshotReactor(kubeClient *kubefake.Clientset, client *fake.Clientset,
client.AddReactor("create", "volumesnapshotcontents", reactor.React)
client.AddReactor("update", "volumesnapshotcontents", reactor.React)
client.AddReactor("update", "volumesnapshots", reactor.React)
client.AddReactor("patch", "volumesnapshotcontents", reactor.React)
client.AddReactor("patch", "volumesnapshots", reactor.React)
client.AddReactor("update", "volumesnapshotclasses", reactor.React)
client.AddReactor("get", "volumesnapshotcontents", reactor.React)
client.AddReactor("get", "volumesnapshots", reactor.React)
Expand Down
106 changes: 84 additions & 22 deletions pkg/common-controller/snapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,25 @@ func (ctrl *csiSnapshotCommonController) updateSnapshotErrorStatusWithEvent(snap

// addContentFinalizer adds a Finalizer for VolumeSnapshotContent.
func (ctrl *csiSnapshotCommonController) addContentFinalizer(content *crdv1.VolumeSnapshotContent) error {
contentClone := content.DeepCopy()
contentClone.ObjectMeta.Finalizers = append(contentClone.ObjectMeta.Finalizers, utils.VolumeSnapshotContentFinalizer)

newContent, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Update(context.TODO(), contentClone, metav1.UpdateOptions{})
var patches []utils.PatchOp
if len(content.Finalizers) > 0 {
// Add to the end of the finalizers if we have any other finalizers
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers/-",
Value: utils.VolumeSnapshotContentFinalizer,
})

} else {
// Replace finalizers with new array if there are no other finalizers
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers",
Value: []string{utils.VolumeSnapshotContentFinalizer},
})
}
newContent, err := utils.PatchVolumeSnapshotContent(content, patches, ctrl.clientset)
if err != nil {
return newControllerUpdateError(content.Name, err.Error())
}
Expand Down Expand Up @@ -983,15 +998,26 @@ func (ctrl *csiSnapshotCommonController) checkandBindSnapshotContent(snapshot *c
} else if content.Spec.VolumeSnapshotRef.UID != "" && content.Spec.VolumeSnapshotClassName != nil {
return content, nil
}
contentClone := content.DeepCopy()
contentClone.Spec.VolumeSnapshotRef.UID = snapshot.UID

patches := []utils.PatchOp{
{
Op: "replace",
Path: "/spec/volumeSnapshotRef/uid",
Value: string(snapshot.UID),
},
}
if snapshot.Spec.VolumeSnapshotClassName != nil {
className := *(snapshot.Spec.VolumeSnapshotClassName)
contentClone.Spec.VolumeSnapshotClassName = &className
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/spec/volumeSnapshotClassName",
Value: className,
})
}
newContent, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Update(context.TODO(), contentClone, metav1.UpdateOptions{})

newContent, err := utils.PatchVolumeSnapshotContent(content, patches, ctrl.clientset)
if err != nil {
klog.V(4).Infof("updating VolumeSnapshotContent[%s] error status failed %v", contentClone.Name, err)
klog.V(4).Infof("updating VolumeSnapshotContent[%s] error status failed %v", content.Name, err)
return content, err
}

Expand Down Expand Up @@ -1392,24 +1418,53 @@ func isControllerUpdateFailError(err *crdv1.VolumeSnapshotError) bool {

// addSnapshotFinalizer adds a Finalizer for VolumeSnapshot.
func (ctrl *csiSnapshotCommonController) addSnapshotFinalizer(snapshot *crdv1.VolumeSnapshot, addSourceFinalizer bool, addBoundFinalizer bool) error {
snapshotClone := snapshot.DeepCopy()
if addSourceFinalizer {
snapshotClone.ObjectMeta.Finalizers = append(snapshotClone.ObjectMeta.Finalizers, utils.VolumeSnapshotAsSourceFinalizer)
}
if addBoundFinalizer {
snapshotClone.ObjectMeta.Finalizers = append(snapshotClone.ObjectMeta.Finalizers, utils.VolumeSnapshotBoundFinalizer)
}
newSnapshot, err := ctrl.clientset.SnapshotV1().VolumeSnapshots(snapshotClone.Namespace).Update(context.TODO(), snapshotClone, metav1.UpdateOptions{})
if err != nil {
return newControllerUpdateError(utils.SnapshotKey(snapshot), err.Error())
var updatedSnapshot *crdv1.VolumeSnapshot
var err error

// Must perform an update if no finalizers exist
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
if len(snapshot.ObjectMeta.Finalizers) == 0 {
snapshotClone := snapshot.DeepCopy()
if addSourceFinalizer {
snapshotClone.ObjectMeta.Finalizers = append(snapshotClone.ObjectMeta.Finalizers, utils.VolumeSnapshotAsSourceFinalizer)
}
if addBoundFinalizer {
snapshotClone.ObjectMeta.Finalizers = append(snapshotClone.ObjectMeta.Finalizers, utils.VolumeSnapshotBoundFinalizer)
}
updatedSnapshot, err = ctrl.clientset.SnapshotV1().VolumeSnapshots(snapshotClone.Namespace).Update(context.TODO(), snapshotClone, metav1.UpdateOptions{})
if err != nil {
return newControllerUpdateError(utils.SnapshotKey(snapshot), err.Error())
}
} else {
// Otherwise, perform a patch
var patches []utils.PatchOp

if addSourceFinalizer {
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers/-",
Value: utils.VolumeSnapshotAsSourceFinalizer,
})
}
if addBoundFinalizer {
patches = append(patches, utils.PatchOp{
Op: "add",
Path: "/metadata/finalizers/-",
Value: utils.VolumeSnapshotBoundFinalizer,
})
}

updatedSnapshot, err = utils.PatchVolumeSnapshot(snapshot, patches, ctrl.clientset)
if err != nil {
return newControllerUpdateError(utils.SnapshotKey(snapshot), err.Error())
}
}

_, err = ctrl.storeSnapshotUpdate(newSnapshot)
_, err = ctrl.storeSnapshotUpdate(updatedSnapshot)
if err != nil {
klog.Errorf("failed to update snapshot store %v", err)
}

klog.V(5).Infof("Added protection finalizer to volume snapshot %s", utils.SnapshotKey(newSnapshot))
klog.V(5).Infof("Added protection finalizer to volume snapshot %s", utils.SnapshotKey(updatedSnapshot))
return nil
}

Expand Down Expand Up @@ -1489,14 +1544,21 @@ func (ctrl *csiSnapshotCommonController) setAnnVolumeSnapshotBeingDeleted(conten
// Set AnnVolumeSnapshotBeingDeleted if it is not set yet
if !metav1.HasAnnotation(content.ObjectMeta, utils.AnnVolumeSnapshotBeingDeleted) {
klog.V(5).Infof("setAnnVolumeSnapshotBeingDeleted: set annotation [%s] on content [%s].", utils.AnnVolumeSnapshotBeingDeleted, content.Name)
var patches []utils.PatchOp
metav1.SetMetaDataAnnotation(&content.ObjectMeta, utils.AnnVolumeSnapshotBeingDeleted, "yes")
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/metadata/annotations",
xing-yang marked this conversation as resolved.
Show resolved Hide resolved
Value: content.ObjectMeta.GetAnnotations(),
})

updateContent, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Update(context.TODO(), content, metav1.UpdateOptions{})
patchedContent, err := utils.PatchVolumeSnapshotContent(content, patches, ctrl.clientset)
if err != nil {
return content, newControllerUpdateError(content.Name, err.Error())
}

// update content if update is successful
content = updateContent
content = patchedContent

_, err = ctrl.storeContentUpdate(content)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/common-controller/snapshot_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestSync(t *testing.T) {
expectedSnapshots: newSnapshotArray("snap2-12", "snapuid2-12", "", "content2-12", validSecretClass, "content2-12", &False, nil, nil, newVolumeError("Snapshot failed to bind VolumeSnapshotContent, mock update error"), false, true, nil),
errors: []reactorError{
// Inject error to the forth client.VolumesnapshotV1().VolumeSnapshots().Update call.
{"update", "volumesnapshotcontents", errors.New("mock update error")},
{"patch", "volumesnapshotcontents", errors.New("mock update error")},
},
test: testSyncSnapshot,
},
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestSync(t *testing.T) {
initialSecrets: []*v1.Secret{secret()},
errors: []reactorError{
// Inject error to the forth client.VolumesnapshotV1().VolumeSnapshots().Update call.
{"update", "volumesnapshotcontents", errors.New("mock update error")},
{"patch", "volumesnapshotcontents", errors.New("mock update error")},
},
expectSuccess: false,
test: testSyncContentError,
Expand Down
Loading