Skip to content

Commit

Permalink
Refactor common populator code to be shared among all populators
Browse files Browse the repository at this point in the history
This commit introduces and modifies several functions so we can reuse common code between all populators.

Other than having a common reconcile function, a new populatorController interface has been introduced so we are able to call populator-specific methods from the populator-base reconciler.

Signed-off-by: Alvaro Romero <alromero@redhat.com>
  • Loading branch information
alromeros committed Apr 12, 2023
1 parent b310c62 commit f51f2f7
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 146 deletions.
109 changes: 109 additions & 0 deletions pkg/controller/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"fmt"
"io/ioutil"
"math"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -83,6 +87,9 @@ const (
AnnExternalPopulation = AnnAPIGroup + "/externalPopulation"
// AnnOwnedByDataVolume annotation has the owner DataVolume name
AnnOwnedByDataVolume = AnnAPIGroup + "/ownedByDataVolume"
// AnnSelectedNodeName annotation is added to the PVC' to specify the node used when creating the populator pod for
// dynamic provisioning
AnnSelectedNodeName = AnnAPIGroup + "/storage.selected.node"

// AnnDeleteAfterCompletion is PVC annotation for deleting DV after completion
AnnDeleteAfterCompletion = AnnAPIGroup + "/storage.deleteAfterCompletion"
Expand All @@ -103,6 +110,9 @@ const (
// AnnMultiStageImportDone marks a multi-stage import as totally finished
AnnMultiStageImportDone = AnnAPIGroup + "/storage.checkpoint.done"

// AnnImportProgressReporting stores the current progress of the import process as a percetange
AnnImportProgressReporting = AnnAPIGroup + "/storage.import.progress"

// AnnPreallocationRequested provides a const to indicate whether preallocation should be performed on the PV
AnnPreallocationRequested = AnnAPIGroup + "/storage.preallocation.requested"
// AnnPreallocationApplied provides a const for PVC preallocation annotation
Expand Down Expand Up @@ -559,6 +569,12 @@ func GetPriorityClass(pvc *v1.PersistentVolumeClaim) string {
return anno[AnnPriorityClassName]
}

// GetSelectedNodename gets the node name specified in a PVC for dynamic provisioning
func GetSelectedNodename(pvc *v1.PersistentVolumeClaim) string {
anno := pvc.GetAnnotations()
return anno[AnnSelectedNodeName]
}

// ShouldDeletePod returns whether the PVC workload pod should be deleted
func ShouldDeletePod(pvc *v1.PersistentVolumeClaim) bool {
return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.DeletionTimestamp != nil
Expand Down Expand Up @@ -1269,3 +1285,96 @@ func HasAnnOwnedByDataVolume(obj metav1.Object) bool {
_, ok := obj.GetAnnotations()[AnnOwnedByDataVolume]
return ok
}

// IsImageStream returns true if registry source is ImageStream
func IsImageStream(pvc *corev1.PersistentVolumeClaim) bool {
return pvc.Annotations[AnnRegistryImageStream] == "true"
}

// ShouldIgnorePod checks if a pod should be ignored.
// If this is a completed pod that was used for one checkpoint of a multi-stage import, it
// should be ignored by pod lookups as long as the retainAfterCompletion annotation is set.
func ShouldIgnorePod(pod *corev1.Pod, pvc *corev1.PersistentVolumeClaim) bool {
retain := pvc.ObjectMeta.Annotations[AnnPodRetainAfterCompletion]
checkpoint := pvc.ObjectMeta.Annotations[AnnCurrentCheckpoint]
if checkpoint != "" && pod.Status.Phase == corev1.PodSucceeded {
return retain == "true"
}
return false
}

// BuildHTTPClient generates an http client that accepts any certificate, since we are using
// it to get prometheus data it doesn't matter if someone can intercept the data. Once we have
// a mechanism to properly sign the server, we can update this method to get a proper client.
func BuildHTTPClient(httpClient *http.Client) *http.Client {
if httpClient == nil {
defaultTransport := http.DefaultTransport.(*http.Transport)
// Create new Transport that ignores self-signed SSL
tr := &http.Transport{
Proxy: defaultTransport.Proxy,
DialContext: defaultTransport.DialContext,
MaxIdleConns: defaultTransport.MaxIdleConns,
IdleConnTimeout: defaultTransport.IdleConnTimeout,
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
httpClient = &http.Client{
Transport: tr,
}
}
return httpClient
}

// ErrConnectionRefused checks for connection refused errors
func ErrConnectionRefused(err error) bool {
return strings.Contains(err.Error(), "connection refused")
}

// GetPodMetricsPort returns, if exists, the metrics port from the passed pod
func GetPodMetricsPort(pod *corev1.Pod) (int, error) {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == "metrics" {
return int(port.ContainerPort), nil
}
}
}
return 0, errors.New("Metrics port not found in pod")
}

// GetMetricsURL builds the metrics URL according to the specified pod
func GetMetricsURL(pod *corev1.Pod) (string, error) {
if pod == nil {
return "", nil
}
port, err := GetPodMetricsPort(pod)
if err != nil || pod.Status.PodIP == "" {
return "", err
}
url := fmt.Sprintf("https://%s:%d/metrics", pod.Status.PodIP, port)
return url, nil
}

// GetProgressReport fetches the progress report from the passed URL according to an specific regular expression
func GetProgressReportFromURL(url string, regExp *regexp.Regexp, httpClient *http.Client) (string, error) {
resp, err := httpClient.Get(url)
if err != nil {
if ErrConnectionRefused(err) {
return "", nil
}
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
// Parse the progress from the body
progressReport := ""
match := regExp.FindStringSubmatch(string(body))
if match != nil {
progressReport = match[1]
}
return progressReport, nil
}
92 changes: 14 additions & 78 deletions pkg/controller/datavolume/controller-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ package datavolume
import (
"context"
"crypto/rsa"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"reflect"
"regexp"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -873,7 +870,7 @@ func (r *ReconcilerBase) getPodFromPvc(namespace string, pvc *corev1.PersistentV

pvcUID := pvc.GetUID()
for _, pod := range pods.Items {
if shouldIgnorePod(&pod, pvc) {
if cc.ShouldIgnorePod(&pod, pvc) {
continue
}
for _, or := range pod.OwnerReferences {
Expand All @@ -899,87 +896,26 @@ func (r *ReconcilerBase) addOwnerRef(pvc *corev1.PersistentVolumeClaim, dv *cdiv
return r.updatePVC(pvc)
}

// If this is a completed pod that was used for one checkpoint of a multi-stage import, it
// should be ignored by pod lookups as long as the retainAfterCompletion annotation is set.
func shouldIgnorePod(pod *corev1.Pod, pvc *corev1.PersistentVolumeClaim) bool {
retain := pvc.ObjectMeta.Annotations[cc.AnnPodRetainAfterCompletion]
checkpoint := pvc.ObjectMeta.Annotations[cc.AnnCurrentCheckpoint]
if checkpoint != "" && pod.Status.Phase == corev1.PodSucceeded {
return retain == "true"
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
httpClient := cc.BuildHTTPClient(httpClient)
url, err := cc.GetMetricsURL(pod)
if err != nil {
return err
}
if url == "" {
return nil
}
return false
}

func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
httpClient := buildHTTPClient()
// Example value: import_progress{ownerUID="b856691e-1038-11e9-a5ab-525500d15501"} 13.45
var importRegExp = regexp.MustCompile("progress\\{ownerUID\\=\"" + string(dataVolumeCopy.UID) + "\"\\} (\\d{1,3}\\.?\\d*)")

port, err := getPodMetricsPort(pod)
if err == nil && pod.Status.PodIP != "" {
url := fmt.Sprintf("https://%s:%d/metrics", pod.Status.PodIP, port)
resp, err := httpClient.Get(url)
if err != nil {
if errConnectionRefused(err) {
return nil
}
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

match := importRegExp.FindStringSubmatch(string(body))
if match == nil {
// No match
return nil
}
if f, err := strconv.ParseFloat(match[1], 64); err == nil {
if progressReport, err := cc.GetProgressReportFromURL(url, importRegExp, httpClient); err != nil {
return err
} else if progressReport != "" {
if f, err := strconv.ParseFloat(progressReport, 64); err == nil {
dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
}
return nil
}
return err
}

func errConnectionRefused(err error) bool {
return strings.Contains(err.Error(), "connection refused")
}

func getPodMetricsPort(pod *corev1.Pod) (int, error) {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == "metrics" {
return int(port.ContainerPort), nil
}
}
}
return 0, errors.New("Metrics port not found in pod")
}

// buildHTTPClient generates an http client that accepts any certificate, since we are using
// it to get prometheus data it doesn't matter if someone can intercept the data. Once we have
// a mechanism to properly sign the server, we can update this method to get a proper client.
func buildHTTPClient() *http.Client {
if httpClient == nil {
defaultTransport := http.DefaultTransport.(*http.Transport)
// Create new Transport that ignores self-signed SSL
tr := &http.Transport{
Proxy: defaultTransport.Proxy,
DialContext: defaultTransport.DialContext,
MaxIdleConns: defaultTransport.MaxIdleConns,
IdleConnTimeout: defaultTransport.IdleConnTimeout,
ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
httpClient = &http.Client{
Transport: tr,
}
}
return httpClient
return nil
}

// newPersistentVolumeClaim creates a new PVC for the DataVolume resource.
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type importerPodArgs struct {
workloadNodePlacement *sdkapi.NodePlacement
vddkImageName *string
priorityClassName string
selectedNodeName string
}

// NewImportController creates a new instance of the import controller.
Expand Down Expand Up @@ -185,10 +186,6 @@ func (r *ImportReconciler) shouldReconcilePVC(pvc *corev1.PersistentVolumeClaim,
nil
}

func isImageStream(pvc *corev1.PersistentVolumeClaim) bool {
return pvc.Annotations[cc.AnnRegistryImageStream] == "true"
}

// Reconcile the reconcile loop for the CDIConfig object.
func (r *ImportReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
log := r.log.WithValues("PVC", req.NamespacedName)
Expand Down Expand Up @@ -245,7 +242,7 @@ func (r *ImportReconciler) findImporterPod(pvc *corev1.PersistentVolumeClaim, lo
}
return nil, nil
}
if !metav1.IsControlledBy(pod, pvc) && !isImageStream(pvc) {
if !metav1.IsControlledBy(pod, pvc) && !cc.IsImageStream(pvc) {
return nil, errors.Errorf("Pod is not owned by PVC")
}
log.V(1).Info("Pod is owned by PVC", pod.Name, pvc.Name)
Expand Down Expand Up @@ -525,6 +522,7 @@ func (r *ImportReconciler) createImporterPod(pvc *corev1.PersistentVolumeClaim)
scratchPvcName: scratchPvcName,
vddkImageName: vddkImageName,
priorityClassName: cc.GetPriorityClass(pvc),
selectedNodeName: cc.GetSelectedNodename(pvc),
}

pod, err := createImporterPod(r.log, r.client, podArgs, r.installerLabels)
Expand All @@ -537,7 +535,7 @@ func (r *ImportReconciler) createImporterPod(pvc *corev1.PersistentVolumeClaim)

// If importing from image stream, add finalizer. Note we don't watch the importer pod in this case,
// so to prevent a deadlock we add finalizer only if the pod is not retained after completion.
if isImageStream(pvc) && pvc.GetAnnotations()[cc.AnnPodRetainAfterCompletion] != "true" {
if cc.IsImageStream(pvc) && pvc.GetAnnotations()[cc.AnnPodRetainAfterCompletion] != "true" {
cc.AddFinalizer(pvc, importPodImageStreamFinalizer)
if err := r.updatePVC(pvc, r.log); err != nil {
return err
Expand Down Expand Up @@ -818,7 +816,7 @@ func getRegistryImportImage(pvc *corev1.PersistentVolumeClaim) (string, error) {
if err != nil {
return "", nil
}
if isImageStream(pvc) {
if cc.IsImageStream(pvc) {
return ep, nil
}
url, err := url.Parse(ep)
Expand Down Expand Up @@ -976,6 +974,7 @@ func makeNodeImporterPodSpec(args *importerPodArgs) *corev1.Pod {
Tolerations: args.workloadNodePlacement.Tolerations,
Affinity: args.workloadNodePlacement.Affinity,
PriorityClassName: args.priorityClassName,
NodeName: args.selectedNodeName,
ImagePullSecrets: args.imagePullSecrets,
},
}
Expand All @@ -989,7 +988,7 @@ func makeNodeImporterPodSpec(args *importerPodArgs) *corev1.Pod {
unauthorized: authentication required
When we don't set pod OwnerReferences, all works well.
*/
if isImageStream(args.pvc) {
if cc.IsImageStream(args.pvc) {
pod.Annotations[cc.AnnOpenShiftImageLookup] = "*"
} else {
blockOwnerDeletion := true
Expand Down Expand Up @@ -1090,6 +1089,7 @@ func makeImporterPodSpec(args *importerPodArgs) *corev1.Pod {
NodeSelector: args.workloadNodePlacement.NodeSelector,
Tolerations: args.workloadNodePlacement.Tolerations,
Affinity: args.workloadNodePlacement.Affinity,
NodeName: args.selectedNodeName,
PriorityClassName: args.priorityClassName,
ImagePullSecrets: args.imagePullSecrets,
},
Expand Down
Loading

0 comments on commit f51f2f7

Please sign in to comment.