From 89820ccad23c26effe8c60b9b8090e4600e4002b Mon Sep 17 00:00:00 2001 From: "Cuong. Duong Manh" Date: Mon, 1 Jul 2024 14:39:09 +0700 Subject: [PATCH] [build][feat] add event handler --- pkg/cloud/entity/snapshot.go | 4 +++ pkg/driver/controller.go | 65 ++++++++++++++++++++++-------------- pkg/k8s/ik8s.go | 4 +++ pkg/k8s/k8s.go | 32 ++++++++++++++++-- 4 files changed, 78 insertions(+), 27 deletions(-) diff --git a/pkg/cloud/entity/snapshot.go b/pkg/cloud/entity/snapshot.go index 3f1e99e..c4235af 100644 --- a/pkg/cloud/entity/snapshot.go +++ b/pkg/cloud/entity/snapshot.go @@ -13,3 +13,7 @@ type ListSnapshots struct { func (s *ListSnapshots) Len() int { return len(s.Items) } + +func (s *ListSnapshots) IsEmpty() bool { + return s.Len() < 1 +} diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 302eb29..0b68930 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -30,7 +30,6 @@ type controllerService struct { modifyVolumeManager *modifyVolumeManager driverOptions *DriverOptions k8sClient lsk8s.IKubernetes - recorder lk8srecord.EventRecorder broadcaster lk8srecord.EventBroadcaster lvmrpc.UnimplementedModifyServer @@ -68,8 +67,7 @@ func newControllerService(pdriOpts *DriverOptions) controllerService { inFlight: lsinternal.NewInFlight(), driverOptions: pdriOpts, modifyVolumeManager: newModifyVolumeManager(), - k8sClient: lsk8s.NewKubernetes(k8sClient), - recorder: recorder, + k8sClient: lsk8s.NewKubernetes(k8sClient, recorder), broadcaster: broadcaster, } } @@ -79,18 +77,20 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol serr lserr.IError ) - llog.V(5).InfoS("[INFO] - CreateVolume: called", "preq", *preq) + llog.V(5).InfoS("[INFO] - CreateVolume: Called", "request", *preq) // Validate the create volume request if err := validateCreateVolumeRequest(preq); err != nil { - llog.ErrorS(err, "[ERROR] - CreateVolume: invalid request") + llog.ErrorS(err, "[ERROR] - CreateVolume: Invalid request", "request", *preq) + ns, name := getCreateVolumeRequestNamespacedName(preq) + s.k8sClient.PersistentVolumeClaimEventWarning(pctx, ns, name, "CsiCreateVolumeInvalidRequest", err.Error()) return nil, err } // Validate volume size, if volume size is less than the default volume size of cloud provider, set it to the default volume size volSizeBytes, err := s.getVolSizeBytes(preq) if err != nil { - llog.ErrorS(err, "[ERROR] - CreateVolume: failed to get volume size") + llog.ErrorS(err, "[ERROR] - CreateVolume: Failed to get volume size") return nil, ErrFailedToValidateVolumeSize(preq.GetName(), err) } @@ -100,14 +100,19 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol // check if a request is already in-flight if ok := s.inFlight.Insert(volName); !ok { - llog.V(5).InfoS("[INFO] - CreateVolume: Volume is already in-flight", "volumeName", volName) + llog.InfoS("[INFO] - CreateVolume: Operation is already in-flight", "volumeName", volName, "inflightKey", volName) return nil, ErrVolumeIsCreating(volName) } - defer s.inFlight.Delete(volName) + + llog.InfoS("[INFO] - CreateVolume: Insert this action to inflight cache", "volumeName", volName, "inflightKey", volName) + defer func() { + llog.InfoS("[INFO] - CreateVolume: Operation completed", "volumeName", volName, "inflightKey", volName) + s.inFlight.Delete(volName) + }() if _, serr = s.cloud.GetVolumeByName(volName); serr != nil { if !serr.IsError(lsdkErrs.EcVServerVolumeNotFound) { - llog.ErrorS(serr.GetError(), "[ERROR] - CreateVolume: failed to get volume", "volumeName", volName) + llog.ErrorS(serr.GetError(), "[ERROR] - CreateVolume: Failed to get volume", "volumeName", volName) return nil, ErrFailedToListVolumeByName(volName) } } @@ -115,7 +120,7 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol cvr := NewCreateVolumeRequest().WithDriverOptions(s.driverOptions) parser, _ := ljoat.GetParser() for pk, pv := range preq.GetParameters() { - llog.InfoS("[INFO] - CreateVolume: parsing request parameters", "key", pk, "value", pv) + llog.InfoS("[INFO] - CreateVolume: Parsing request parameters", "key", pk, "value", pv) switch lstr.ToLower(pk) { case VolumeTypeKey: cvr = cvr.WithVolumeTypeID(pv) @@ -159,12 +164,7 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol } } - modifyOpts, err := parseModifyVolumeParameters(preq.GetMutableParameters()) - if err != nil { - llog.ErrorS(err, "[ERROR] - CreateVolume: invalid request") - return nil, ErrModifyMutableParam - } - + modifyOpts, _ := parseModifyVolumeParameters(preq.GetMutableParameters()) volumeSource := preq.GetVolumeContentSource() if volumeSource != nil { if _, ok := volumeSource.GetType().(*lcsi.VolumeContentSource_Snapshot); !ok { @@ -179,7 +179,7 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol respCtx, err := cvr.ToResponseContext(volCap) if err != nil { - llog.ErrorS(err, "[ERROR] - CreateVolume: failed to parse response context", "volumeID", volName) + llog.ErrorS(err, "[ERROR] - CreateVolume: Failed to parse response context", "volumeID", volName) return nil, err } @@ -220,35 +220,42 @@ func (s *controllerService) CreateVolume(pctx lctx.Context, preq *lcsi.CreateVol return newCreateVolumeResponse(resp, cvr, respCtx), nil } -func (s *controllerService) DeleteVolume(pctx lctx.Context, preq *lcsi.DeleteVolumeRequest) (*lcsi.DeleteVolumeResponse, error) { - llog.V(4).InfoS("[INFO] - DeleteVolume: called", "args", *preq) +func (s *controllerService) DeleteVolume(_ lctx.Context, preq *lcsi.DeleteVolumeRequest) (*lcsi.DeleteVolumeResponse, error) { + llog.V(4).InfoS("[INFO] - DeleteVolume: called", "request", *preq) + if err := validateDeleteVolumeRequest(preq); err != nil { - llog.Errorf("[ERROR] - DeleteVolume: invalid request") + llog.ErrorS(err, "[ERROR] - DeleteVolume: Invalid request", "request", *preq) return nil, err } volumeID := preq.GetVolumeId() // check if a request is already in-flight if ok := s.inFlight.Insert(volumeID); !ok { + llog.InfoS("[INFO] - DeleteVolume: Operation is already in-flight", "volumeID", volumeID) return nil, ErrOperationAlreadyExists(volumeID) } - defer s.inFlight.Delete(volumeID) + + llog.InfoS("[INFO] - DeleteVolume: Insert this action to inflight cache", "volumeID", volumeID, "inflightKey", volumeID) + defer func() { + llog.InfoS("[INFO] - DeleteVolume: Operation completed", "volumeID", volumeID, "inflightKey", volumeID) + s.inFlight.Delete(volumeID) + }() // So the volume MUST NOT truly be deleted if it has at least one snapshot lstSnapshots, ierr := s.cloud.ListSnapshots(volumeID, 1, 10) if ierr != nil { - llog.ErrorS(ierr.GetError(), "[ERROR] - DeleteVolume: failed to list snapshots", "volumeID", volumeID) + llog.ErrorS(ierr.GetError(), "[ERROR] - DeleteVolume: Failed to list snapshots", "volumeId", volumeID) return nil, ErrFailedToListSnapshot(volumeID) } - if lstSnapshots.Len() > 0 { + if !lstSnapshots.IsEmpty() { llog.ErrorS(nil, "[ERROR] - DeleteVolume: CANNOT delete this volume because of having snapshots", "volumeId", volumeID) return nil, ErrDeleteVolumeHavingSnapshots(volumeID) } if err := s.cloud.DeleteVolume(volumeID); err != nil { if err != nil { - llog.ErrorS(err.GetError(), "[ERROR] - DeleteVolume: failed to delete volume", "volumeID", volumeID) + llog.ErrorS(err.GetError(), "[ERROR] - DeleteVolume: Failed to delete volume", "volumeID", volumeID) return nil, ErrFailedToDeleteVolume(volumeID) } } @@ -309,7 +316,7 @@ func (s *controllerService) ControllerUnpublishVolume(_ lctx.Context, preq *lcsi return nil, ErrOperationAlreadyExists(volumeID) } - llog.V(5).InfoS("[INFO] - ControllerUnpublishVolume: Insert this action to inflight cach3", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key) + llog.V(5).InfoS("[INFO] - ControllerUnpublishVolume: Insert this action to inflight cache", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key) defer func() { llog.InfoS("[INFO] - ControllerUnpublishVolume: Operation completed", "volumeID", volumeID, "nodeID", nodeID, "inflightKey", key) s.inFlight.Delete(volumeID + nodeID) @@ -749,3 +756,11 @@ func parsePage(nextToken string) int { return page } + +func getCreateVolumeRequestNamespacedName(preq *lcsi.CreateVolumeRequest) (string, string) { + params := preq.GetParameters() + if params != nil { + return params[PVCNamespaceKey], params[PVCNameKey] + } + return "", "" +} diff --git a/pkg/k8s/ik8s.go b/pkg/k8s/ik8s.go index d9cc193..1b3ec89 100644 --- a/pkg/k8s/ik8s.go +++ b/pkg/k8s/ik8s.go @@ -10,4 +10,8 @@ import ( type IKubernetes interface { GetPersistentVolumeClaimByName(pctx lctx.Context, pnamespace, pname string) (*lsentity.PersistentVolumeClaim, lserr.IError) GetStorageClassByName(pctx lctx.Context, pname string) (*lsentity.StorageClass, lserr.IError) + + // Event recorder + PersistentVolumeClaimEventWarning(pctx lctx.Context, pnamespace, pname, preason, pmessage string) + PersistentVolumeClaimEventNormal(pctx lctx.Context, pnamespace, pname, preason, pmessage string) } diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index bd459fe..33280ec 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -3,8 +3,10 @@ package k8s import ( lctx "context" + lcoreV1 "k8s.io/api/core/v1" lmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" lk8s "k8s.io/client-go/kubernetes" + lk8srecord "k8s.io/client-go/tools/record" lsentity "github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/cloud/entity" lserr "github.com/vngcloud/vngcloud-blockstorage-csi-driver/pkg/cloud/errors" @@ -12,11 +14,13 @@ import ( type kubernetes struct { lk8s.Interface + lk8srecord.EventRecorder } -func NewKubernetes(pk8sclient lk8s.Interface) IKubernetes { +func NewKubernetes(pk8sclient lk8s.Interface, precorder lk8srecord.EventRecorder) IKubernetes { return &kubernetes{ - Interface: pk8sclient, + Interface: pk8sclient, + EventRecorder: precorder, } } @@ -45,3 +49,27 @@ func (s *kubernetes) GetStorageClassByName(pctx lctx.Context, pname string) (*ls return lsentity.NewStorageClass(sc), nil } + +func (s *kubernetes) PersistentVolumeClaimEventWarning(pctx lctx.Context, pnamespace, pname, preason, pmessage string) { + if pnamespace == "" || pname == "" { + return + } + + pvc, err := s.GetPersistentVolumeClaimByName(pctx, pnamespace, pname) + if err != nil || pvc == nil { + return + } + s.EventRecorder.Event(pvc.PersistentVolumeClaim, lcoreV1.EventTypeWarning, preason, pmessage) +} + +func (s *kubernetes) PersistentVolumeClaimEventNormal(pctx lctx.Context, pnamespace, pname, preason, pmessage string) { + if pnamespace == "" || pname == "" { + return + } + + pvc, err := s.GetPersistentVolumeClaimByName(pctx, pnamespace, pname) + if err != nil || pvc == nil { + return + } + s.EventRecorder.Event(pvc.PersistentVolumeClaim, lcoreV1.EventTypeNormal, preason, pmessage) +}