Skip to content

Commit

Permalink
Integrate recommended format in dataimportcron controller
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
  • Loading branch information
akalenyu committed May 3, 2023
1 parent fb9bdc3 commit a783d93
Show file tree
Hide file tree
Showing 15 changed files with 732 additions and 310 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ go_library(
"//pkg/util:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1/utils:go_default_library",
"//vendor/github.com/go-logr/logr:go_default_library",
"//vendor/github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1:go_default_library",
"//vendor/github.com/openshift/api/config/v1:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
Expand Down
160 changes: 115 additions & 45 deletions pkg/controller/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
ocpconfigv1 "github.com/openshift/api/config/v1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -294,9 +295,9 @@ const (

var (
// BlockMode is raw block device mode
BlockMode = corev1.PersistentVolumeBlock
BlockMode = v1.PersistentVolumeBlock
// FilesystemMode is filesystem device mode
FilesystemMode = corev1.PersistentVolumeFilesystem
FilesystemMode = v1.PersistentVolumeFilesystem

apiServerKeyOnce sync.Once
apiServerKey *rsa.PrivateKey
Expand All @@ -315,7 +316,7 @@ type FakeValidator struct {
// Validate is a fake token validation
func (v *FakeValidator) Validate(value string) (*token.Payload, error) {
if value != v.Match {
return nil, fmt.Errorf("Token does not match expected")
return nil, fmt.Errorf("token does not match expected")
}
resource := metav1.GroupVersionResource{
Resource: "persistentvolumeclaims",
Expand Down Expand Up @@ -438,7 +439,7 @@ func GetDefaultPodResourceRequirements(client client.Client) (*v1.ResourceRequir
}

// GetImagePullSecrets gets the imagePullSecrets needed to pull images from the cdi config
func GetImagePullSecrets(client client.Client) ([]corev1.LocalObjectReference, error) {
func GetImagePullSecrets(client client.Client) ([]v1.LocalObjectReference, error) {
cdiconfig := &cdiv1.CDIConfig{}
if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
klog.Errorf("Unable to find CDI configuration, %v\n", err)
Expand Down Expand Up @@ -669,15 +670,15 @@ func getTokenResourceNameDataVolume(source *cdiv1.DataVolumeSource) string {
return ""
}

func getTokenResourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
func getTokenResourceNamePvc(sourcePvc *v1.PersistentVolumeClaim) string {
if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
return "volumesnapshots"
}

return "persistentvolumeclaims"
}

func getSourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
func getSourceNamePvc(sourcePvc *v1.PersistentVolumeClaim) string {
if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
if sourcePvc.Spec.DataSourceRef != nil {
return sourcePvc.Spec.DataSourceRef.Name
Expand Down Expand Up @@ -756,7 +757,7 @@ func ValidateSnapshotClone(sourceSnapshot *snapshotv1.VolumeSnapshot, spec *cdiv
size := sourceSnapshot.Status.RestoreSize
restoreSizeAvailable := size != nil && size.Sign() > 0
if restoreSizeAvailable {
sourceResources.Requests = corev1.ResourceList{corev1.ResourceStorage: *size}
sourceResources.Requests = v1.ResourceList{v1.ResourceStorage: *size}
}

isSizelessClone := false
Expand Down Expand Up @@ -833,7 +834,7 @@ func HandleFailedPod(err error, podName string, pvc *v1.PersistentVolumeClaim, r
}

// GetSource returns the source string which determines the type of source. If no source or invalid source found, default to http
func GetSource(pvc *corev1.PersistentVolumeClaim) string {
func GetSource(pvc *v1.PersistentVolumeClaim) string {
source, found := pvc.Annotations[AnnSource]
if !found {
source = ""
Expand All @@ -855,7 +856,7 @@ func GetSource(pvc *corev1.PersistentVolumeClaim) string {
}

// GetEndpoint returns the endpoint string which contains the full path URI of the target object to be copied.
func GetEndpoint(pvc *corev1.PersistentVolumeClaim) (string, error) {
func GetEndpoint(pvc *v1.PersistentVolumeClaim) (string, error) {
ep, found := pvc.Annotations[AnnEndpoint]
if !found || ep == "" {
verb := "empty"
Expand All @@ -868,8 +869,8 @@ func GetEndpoint(pvc *corev1.PersistentVolumeClaim) (string, error) {
}

// AddImportVolumeMounts is being called for pods using PV with filesystem volume mode
func AddImportVolumeMounts() []corev1.VolumeMount {
volumeMounts := []corev1.VolumeMount{
func AddImportVolumeMounts() []v1.VolumeMount {
volumeMounts := []v1.VolumeMount{
{
Name: DataVolName,
MountPath: common.ImporterDataDir,
Expand All @@ -879,9 +880,9 @@ func AddImportVolumeMounts() []corev1.VolumeMount {
}

// ValidateRequestedCloneSize validates the clone size requirements on block
func ValidateRequestedCloneSize(sourceResources corev1.ResourceRequirements, targetResources corev1.ResourceRequirements) error {
sourceRequest := sourceResources.Requests[corev1.ResourceStorage]
targetRequest := targetResources.Requests[corev1.ResourceStorage]
func ValidateRequestedCloneSize(sourceResources v1.ResourceRequirements, targetResources v1.ResourceRequirements) error {
sourceRequest := sourceResources.Requests[v1.ResourceStorage]
targetRequest := targetResources.Requests[v1.ResourceStorage]
// Verify that the target PVC size is equal or larger than the source.
if sourceRequest.Value() > targetRequest.Value() {
return errors.New("target resources requests storage size is smaller than the source")
Expand All @@ -890,7 +891,7 @@ func ValidateRequestedCloneSize(sourceResources corev1.ResourceRequirements, tar
}

// CreateCloneSourcePodName creates clone source pod name
func CreateCloneSourcePodName(targetPvc *corev1.PersistentVolumeClaim) string {
func CreateCloneSourcePodName(targetPvc *v1.PersistentVolumeClaim) string {
return string(targetPvc.GetUID()) + common.ClonerSourcePodNameSuffix
}

Expand All @@ -912,8 +913,8 @@ func SetRestrictedSecurityContext(podSpec *v1.PodSpec) {
if container.SecurityContext == nil {
container.SecurityContext = &v1.SecurityContext{}
}
container.SecurityContext.Capabilities = &corev1.Capabilities{
Drop: []corev1.Capability{
container.SecurityContext.Capabilities = &v1.Capabilities{
Drop: []v1.Capability{
"ALL",
},
}
Expand All @@ -938,7 +939,7 @@ func SetRestrictedSecurityContext(podSpec *v1.PodSpec) {
}

// SetNodeNameIfPopulator sets NodeName in a pod spec when the PVC is being handled by a CDI volume populator
func SetNodeNameIfPopulator(pvc *corev1.PersistentVolumeClaim, podSpec *v1.PodSpec) {
func SetNodeNameIfPopulator(pvc *v1.PersistentVolumeClaim, podSpec *v1.PodSpec) {
_, isPopulator := pvc.Annotations[AnnPopulatorKind]
nodeName := pvc.Annotations[AnnSelectedNode]
if isPopulator && nodeName != "" {
Expand Down Expand Up @@ -997,18 +998,18 @@ func CreateStorageClass(name string, annotations map[string]string) *storagev1.S
}

// CreateImporterTestPod creates importer test pod CR
func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scratchPvc *corev1.PersistentVolumeClaim) *corev1.Pod {
func CreateImporterTestPod(pvc *v1.PersistentVolumeClaim, dvname string, scratchPvc *v1.PersistentVolumeClaim) *v1.Pod {
// importer pod name contains the pvc name
podName := fmt.Sprintf("%s-%s", common.ImporterPodName, pvc.Name)

blockOwnerDeletion := true
isController := true

volumes := []corev1.Volume{
volumes := []v1.Volume{
{
Name: dvname,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
ReadOnly: false,
},
Expand All @@ -1017,18 +1018,18 @@ func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scr
}

if scratchPvc != nil {
volumes = append(volumes, corev1.Volume{
volumes = append(volumes, v1.Volume{
Name: ScratchVolName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: scratchPvc.Name,
ReadOnly: false,
},
},
})
}

pod := &corev1.Pod{
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
Expand All @@ -1055,23 +1056,23 @@ func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scr
},
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: common.ImporterPodName,
Image: "test/myimage",
ImagePullPolicy: corev1.PullPolicy("Always"),
ImagePullPolicy: v1.PullPolicy("Always"),
Args: []string{"-v=5"},
Ports: []corev1.ContainerPort{
Ports: []v1.ContainerPort{
{
Name: "metrics",
ContainerPort: 8443,
Protocol: corev1.ProtocolTCP,
Protocol: v1.ProtocolTCP,
},
},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
RestartPolicy: v1.RestartPolicyOnFailure,
Volumes: volumes,
},
}
Expand All @@ -1082,7 +1083,7 @@ func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scr
imageSize, _ := GetRequestedImageSize(pvc)
volumeMode := GetVolumeMode(pvc)

env := []corev1.EnvVar{
env := []v1.EnvVar{
{
Name: common.ImporterSource,
Value: source,
Expand All @@ -1109,14 +1110,14 @@ func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scr
},
}
pod.Spec.Containers[0].Env = env
if volumeMode == corev1.PersistentVolumeBlock {
if volumeMode == v1.PersistentVolumeBlock {
pod.Spec.Containers[0].VolumeDevices = AddVolumeDevices()
} else {
pod.Spec.Containers[0].VolumeMounts = AddImportVolumeMounts()
}

if scratchPvc != nil {
pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, v1.VolumeMount{
Name: ScratchVolName,
MountPath: common.ScratchDataDir,
})
Expand All @@ -1141,7 +1142,7 @@ func CreateStorageClassWithProvisioner(name string, annotations, labels map[stri
func CreateClient(objs ...runtime.Object) client.Client {
s := scheme.Scheme
_ = cdiv1.AddToScheme(s)
_ = corev1.AddToScheme(s)
_ = v1.AddToScheme(s)
_ = storagev1.AddToScheme(s)
_ = ocpconfigv1.Install(s)

Expand All @@ -1166,7 +1167,7 @@ func GetContentType(contentType string) string {
}

// GetPVCContentType returns the content type of the source image. If invalid or not set, default to kubevirt
func GetPVCContentType(pvc *corev1.PersistentVolumeClaim) string {
func GetPVCContentType(pvc *v1.PersistentVolumeClaim) string {
contentType, found := pvc.Annotations[AnnContentType]
if !found {
return string(cdiv1.DataVolumeKubeVirt)
Expand Down Expand Up @@ -1216,8 +1217,8 @@ func NewImportDataVolume(name string) *cdiv1.DataVolume {
URL: "http://example.com/data",
},
},
PVC: &corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
PVC: &v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
},
PriorityClassName: "p0",
},
Expand Down Expand Up @@ -1293,17 +1294,17 @@ func IsUnbound(pvc *v1.PersistentVolumeClaim) bool {
}

// IsImageStream returns true if registry source is ImageStream
func IsImageStream(pvc *corev1.PersistentVolumeClaim) bool {
func IsImageStream(pvc *v1.PersistentVolumeClaim) bool {
return pvc.Annotations[AnnRegistryImageStream] == "true"
}

// ShouldIgnorePod checks if a pod should be ignored.
// If this is a completed pod that was used for one checkpoint of a multi-stage import, it
// should be ignored by pod lookups as long as the retainAfterCompletion annotation is set.
func ShouldIgnorePod(pod *corev1.Pod, pvc *corev1.PersistentVolumeClaim) bool {
func ShouldIgnorePod(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) bool {
retain := pvc.ObjectMeta.Annotations[AnnPodRetainAfterCompletion]
checkpoint := pvc.ObjectMeta.Annotations[AnnCurrentCheckpoint]
if checkpoint != "" && pod.Status.Phase == corev1.PodSucceeded {
if checkpoint != "" && pod.Status.Phase == v1.PodSucceeded {
return retain == "true"
}
return false
Expand Down Expand Up @@ -1338,7 +1339,7 @@ func ErrConnectionRefused(err error) bool {
}

// GetPodMetricsPort returns, if exists, the metrics port from the passed pod
func GetPodMetricsPort(pod *corev1.Pod) (int, error) {
func GetPodMetricsPort(pod *v1.Pod) (int, error) {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == "metrics" {
Expand All @@ -1350,7 +1351,7 @@ func GetPodMetricsPort(pod *corev1.Pod) (int, error) {
}

// GetMetricsURL builds the metrics URL according to the specified pod
func GetMetricsURL(pod *corev1.Pod) (string, error) {
func GetMetricsURL(pod *v1.Pod) (string, error) {
if pod == nil {
return "", nil
}
Expand Down Expand Up @@ -1473,3 +1474,72 @@ func UpdateImageIOAnnotations(annotations map[string]string, imageio *cdiv1.Data
annotations[AnnCertConfigMap] = imageio.CertConfigMap
annotations[AnnDiskID] = imageio.DiskID
}

// GetSnapshotClassForSmartClone looks up the snapshot class based on the storage class
func GetSnapshotClassForSmartClone(dvName string, targetPvcStorageClassName *string, log logr.Logger, client client.Client) (string, error) {
logger := log.WithName("GetSnapshotClassForSmartClone").V(3)
// Check if relevant CRDs are available
if !isCsiCrdsDeployed(client, log) {
logger.Info("Missing CSI snapshotter CRDs, falling back to host assisted clone")
return "", nil
}

targetStorageClass, err := GetStorageClassByName(client, targetPvcStorageClassName)
if err != nil {
return "", err
}
if targetStorageClass == nil {
logger.Info("Target PVC's Storage Class not found")
return "", nil
}

// List the snapshot classes
scs := &snapshotv1.VolumeSnapshotClassList{}
if err := client.List(context.TODO(), scs); err != nil {
logger.Info("Cannot list snapshot classes, falling back to host assisted clone")
return "", err
}
for _, snapshotClass := range scs.Items {
// Validate association between snapshot class and storage class
if snapshotClass.Driver == targetStorageClass.Provisioner {
logger.Info("smart-clone is applicable for datavolume", "datavolume",
dvName, "snapshot class", snapshotClass.Name)
return snapshotClass.Name, nil
}
}

logger.Info("Could not match snapshotter with storage class, falling back to host assisted clone")
return "", nil
}

// isCsiCrdsDeployed checks whether the CSI snapshotter CRD are deployed
func isCsiCrdsDeployed(c client.Client, log logr.Logger) bool {
version := "v1"
vsClass := "volumesnapshotclasses." + snapshotv1.GroupName
vsContent := "volumesnapshotcontents." + snapshotv1.GroupName
vs := "volumesnapshots." + snapshotv1.GroupName

return isCrdDeployed(c, vsClass, version, log) &&
isCrdDeployed(c, vsContent, version, log) &&
isCrdDeployed(c, vs, version, log)
}

// isCrdDeployed checks whether a CRD is deployed
func isCrdDeployed(c client.Client, name, version string, log logr.Logger) bool {
crd := &extv1.CustomResourceDefinition{}
err := c.Get(context.TODO(), types.NamespacedName{Name: name}, crd)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Info("Error looking up CRD", "crd name", name, "version", version, "error", err)
}
return false
}

for _, v := range crd.Spec.Versions {
if v.Name == version && v.Served {
return true
}
}

return false
}
Loading

0 comments on commit a783d93

Please sign in to comment.