diff --git a/controllers/kustomization_controller.go b/controllers/kustomization_controller.go index 0e4a8fc6..47b14397 100644 --- a/controllers/kustomization_controller.go +++ b/controllers/kustomization_controller.go @@ -19,19 +19,13 @@ package controllers import ( "bytes" "context" - "crypto/sha1" - "crypto/sha256" "fmt" - "io" - "net/http" - "net/url" "os" "sort" "strings" "time" securejoin "github.com/cyphar/filepath-securejoin" - "github.com/hashicorp/go-retryablehttp" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -62,7 +56,6 @@ import ( "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/pkg/ssa" - "github.com/fluxcd/pkg/untar" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2" @@ -79,7 +72,7 @@ import ( // KustomizationReconciler reconciles a Kustomization object type KustomizationReconciler struct { client.Client - httpClient *retryablehttp.Client + artifactFetcher *ArtifactFetcher requeueDependency time.Duration Scheme *runtime.Scheme EventRecorder kuberecorder.EventRecorder @@ -122,15 +115,7 @@ func (r *KustomizationReconciler) SetupWithManager(mgr ctrl.Manager, opts Kustom r.requeueDependency = opts.DependencyRequeueInterval r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName) - - // Configure the retryable http client used for fetching artifacts. - // By default it retries 10 times within a 3.5 minutes window. - httpClient := retryablehttp.NewClient() - httpClient.RetryWaitMin = 5 * time.Second - httpClient.RetryWaitMax = 30 * time.Second - httpClient.RetryMax = opts.HTTPRetry - httpClient.Logger = nil - r.httpClient = httpClient + r.artifactFetcher = NewArtifactFetcher(opts.HTTPRetry) return ctrl.NewControllerManagedBy(mgr). For(&kustomizev1.Kustomization{}, builder.WithPredicates( @@ -268,6 +253,18 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques // reconcile kustomization by applying the latest revision reconciledKustomization, reconcileErr := r.reconcile(ctx, *kustomization.DeepCopy(), source) + + // requeue if the artifact is not found + if reconcileErr == ArtifactNotFoundError { + msg := fmt.Sprintf("Source is not ready, artifact not found, retrying in %s", r.requeueDependency.String()) + log.Info(msg) + if err := r.patchStatus(ctx, req, kustomizev1.KustomizationProgressing(kustomization, msg).Status); err != nil { + log.Error(err, "unable to update status for artifact not found") + return ctrl.Result{Requeue: true}, err + } + return ctrl.Result{RequeueAfter: r.requeueDependency}, nil + } + if err := r.patchStatus(ctx, req, reconciledKustomization.Status); err != nil { return ctrl.Result{Requeue: true}, err } @@ -320,7 +317,7 @@ func (r *KustomizationReconciler) reconcile( defer os.RemoveAll(tmpDir) // download artifact and extract files - err = r.download(source.GetArtifact(), tmpDir) + err = r.artifactFetcher.Fetch(source.GetArtifact(), tmpDir) if err != nil { return kustomizev1.KustomizationNotReady( kustomization, @@ -526,70 +523,6 @@ func (r *KustomizationReconciler) checkDependencies(source sourcev1.Source, kust return nil } -func (r *KustomizationReconciler) download(artifact *sourcev1.Artifact, tmpDir string) error { - artifactURL := artifact.URL - if hostname := os.Getenv("SOURCE_CONTROLLER_LOCALHOST"); hostname != "" { - u, err := url.Parse(artifactURL) - if err != nil { - return err - } - u.Host = hostname - artifactURL = u.String() - } - - req, err := retryablehttp.NewRequest(http.MethodGet, artifactURL, nil) - if err != nil { - return fmt.Errorf("failed to create a new request: %w", err) - } - - resp, err := r.httpClient.Do(req) - if err != nil { - return fmt.Errorf("failed to download artifact, error: %w", err) - } - defer resp.Body.Close() - - // check response - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("failed to download artifact from %s, status: %s", artifactURL, resp.Status) - } - - var buf bytes.Buffer - - // verify checksum matches origin - if err := r.verifyArtifact(artifact, &buf, resp.Body); err != nil { - return err - } - - // extract - if _, err = untar.Untar(&buf, tmpDir); err != nil { - return fmt.Errorf("failed to untar artifact, error: %w", err) - } - - return nil -} - -func (r *KustomizationReconciler) verifyArtifact(artifact *sourcev1.Artifact, buf *bytes.Buffer, reader io.Reader) error { - hasher := sha256.New() - - // for backwards compatibility with source-controller v0.17.2 and older - if len(artifact.Checksum) == 40 { - hasher = sha1.New() - } - - // compute checksum - mw := io.MultiWriter(hasher, buf) - if _, err := io.Copy(mw, reader); err != nil { - return err - } - - if checksum := fmt.Sprintf("%x", hasher.Sum(nil)); checksum != artifact.Checksum { - return fmt.Errorf("failed to verify artifact: computed checksum '%s' doesn't match advertised '%s'", - checksum, artifact.Checksum) - } - - return nil -} - func (r *KustomizationReconciler) getSource(ctx context.Context, kustomization kustomizev1.Kustomization) (sourcev1.Source, error) { var source sourcev1.Source sourceNamespace := kustomization.GetNamespace() diff --git a/controllers/kustomization_fetcher.go b/controllers/kustomization_fetcher.go new file mode 100644 index 00000000..04eb6630 --- /dev/null +++ b/controllers/kustomization_fetcher.go @@ -0,0 +1,127 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "bytes" + "crypto/sha1" + "crypto/sha256" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "time" + + "github.com/fluxcd/pkg/untar" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + "github.com/hashicorp/go-retryablehttp" +) + +// ArtifactFetcher holds the HTTP client that reties with back off when +// the artifact server is offline. +type ArtifactFetcher struct { + httpClient *retryablehttp.Client +} + +// ArtifactNotFoundError is an error type used to signal 404 HTTP status code responses. +var ArtifactNotFoundError = errors.New("artifact not found") + +// NewArtifactFetcher configures the retryable http client used for fetching artifacts. +// By default, it retries 10 times within a 3.5 minutes window. +func NewArtifactFetcher(retries int) *ArtifactFetcher { + httpClient := retryablehttp.NewClient() + httpClient.RetryWaitMin = 5 * time.Second + httpClient.RetryWaitMax = 30 * time.Second + httpClient.RetryMax = retries + httpClient.Logger = nil + + return &ArtifactFetcher{httpClient: httpClient} +} + +// Fetch downloads, verifies and extracts the artifact content to the specified directory. +// If the artifact server responds with 5xx errors, the download operation is retried. +// If the artifact server responds with 404, the returned error is of type ArtifactNotFoundError. +// If the artifact server is unavailable for more than 3 minutes, the returned error contains the original status code. +func (r *ArtifactFetcher) Fetch(artifact *sourcev1.Artifact, dir string) error { + artifactURL := artifact.URL + if hostname := os.Getenv("SOURCE_CONTROLLER_LOCALHOST"); hostname != "" { + u, err := url.Parse(artifactURL) + if err != nil { + return err + } + u.Host = hostname + artifactURL = u.String() + } + + req, err := retryablehttp.NewRequest(http.MethodGet, artifactURL, nil) + if err != nil { + return fmt.Errorf("failed to create a new request: %w", err) + } + + resp, err := r.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to download artifact, error: %w", err) + } + defer resp.Body.Close() + + if code := resp.StatusCode; code != http.StatusOK { + if code == http.StatusNotFound { + return ArtifactNotFoundError + } + return fmt.Errorf("failed to download artifact from %s, status: %s", artifactURL, resp.Status) + } + + var buf bytes.Buffer + + // verify checksum matches origin + if err := r.Verify(artifact, &buf, resp.Body); err != nil { + return err + } + + // extract + if _, err = untar.Untar(&buf, dir); err != nil { + return fmt.Errorf("failed to untar artifact, error: %w", err) + } + + return nil +} + +// Verify computes the checksum of the tarball and returns an error if the computed value +// does not match the artifact advertised checksum. +func (r *ArtifactFetcher) Verify(artifact *sourcev1.Artifact, buf *bytes.Buffer, reader io.Reader) error { + hasher := sha256.New() + + // for backwards compatibility with source-controller v0.17.2 and older + if len(artifact.Checksum) == 40 { + hasher = sha1.New() + } + + // compute checksum + mw := io.MultiWriter(hasher, buf) + if _, err := io.Copy(mw, reader); err != nil { + return err + } + + if checksum := fmt.Sprintf("%x", hasher.Sum(nil)); checksum != artifact.Checksum { + return fmt.Errorf("failed to verify artifact: computed checksum '%s' doesn't match advertised '%s'", + checksum, artifact.Checksum) + } + + return nil +} diff --git a/controllers/kustomization_fetcher_test.go b/controllers/kustomization_fetcher_test.go new file mode 100644 index 00000000..a9d06525 --- /dev/null +++ b/controllers/kustomization_fetcher_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/testserver" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + . "github.com/onsi/gomega" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2" +) + +func TestKustomizationReconciler_ArtifactDownload(t *testing.T) { + g := NewWithT(t) + id := "fetch-" + randStringRunes(5) + revision := "v1.0.0" + + err := createNamespace(id) + g.Expect(err).NotTo(HaveOccurred(), "failed to create test namespace") + + err = createKubeConfigSecret(id) + g.Expect(err).NotTo(HaveOccurred(), "failed to create kubeconfig secret") + + manifests := func(name string, data string) []testserver.File { + return []testserver.File{ + { + Name: "secret.yaml", + Body: fmt.Sprintf(`--- +apiVersion: v1 +kind: Secret +metadata: + name: %[1]s +stringData: + key: "%[2]s" +`, name, data), + }, + } + } + + artifact, err := testServer.ArtifactFromFiles(manifests(id, randStringRunes(5))) + g.Expect(err).NotTo(HaveOccurred(), "failed to create artifact from files") + + repositoryName := types.NamespacedName{ + Name: fmt.Sprintf("fetch-%s", randStringRunes(5)), + Namespace: id, + } + + err = applyGitRepository(repositoryName, artifact, revision) + g.Expect(err).NotTo(HaveOccurred()) + + kustomizationKey := types.NamespacedName{ + Name: fmt.Sprintf("fetch-%s", randStringRunes(5)), + Namespace: id, + } + kustomization := &kustomizev1.Kustomization{ + ObjectMeta: metav1.ObjectMeta{ + Name: kustomizationKey.Name, + Namespace: kustomizationKey.Namespace, + }, + Spec: kustomizev1.KustomizationSpec{ + Interval: metav1.Duration{Duration: reconciliationInterval}, + Path: "./", + KubeConfig: &kustomizev1.KubeConfig{ + SecretRef: meta.SecretKeyReference{ + Name: "kubeconfig", + }, + }, + SourceRef: kustomizev1.CrossNamespaceSourceReference{ + Name: repositoryName.Name, + Namespace: repositoryName.Namespace, + Kind: sourcev1.GitRepositoryKind, + }, + HealthChecks: []meta.NamespacedObjectKindReference{ + { + APIVersion: "v1", + Kind: "Secret", + Name: id, + Namespace: id, + }, + }, + TargetNamespace: id, + Force: false, + }, + } + + g.Expect(k8sClient.Create(context.Background(), kustomization)).To(Succeed()) + + resultK := &kustomizev1.Kustomization{} + repo := &sourcev1.GitRepository{ + TypeMeta: metav1.TypeMeta{ + Kind: sourcev1.GitRepositoryKind, + APIVersion: sourcev1.GroupVersion.String(), + }, + } + g.Expect(k8sClient.Get(context.Background(), repositoryName, repo)).Should(Succeed()) + repoURL := repo.Status.Artifact.URL + + t.Run("downloads artifact", func(t *testing.T) { + g.Eventually(func() bool { + _ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK) + return apimeta.IsStatusConditionTrue(resultK.Status.Conditions, meta.ReadyCondition) && + resultK.Status.LastAppliedRevision == revision + }, timeout, time.Second).Should(BeTrue()) + }) + + t.Run("retries on not found errors", func(t *testing.T) { + repo.Status.Artifact.URL = repoURL + "not-found" + repo.ManagedFields = nil + g.Expect(k8sClient.Status().Update(context.Background(), repo)).To(Succeed()) + + g.Eventually(func() bool { + _ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK) + ready := apimeta.FindStatusCondition(resultK.Status.Conditions, meta.ReadyCondition) + return ready.Reason == meta.ProgressingReason + }, timeout, time.Second).Should(BeTrue()) + + g.Expect(apimeta.FindStatusCondition(resultK.Status.Conditions, meta.ReadyCondition).Message).To(ContainSubstring("artifact not found")) + }) + + t.Run("recovers after not found errors", func(t *testing.T) { + g.Expect(k8sClient.Get(context.Background(), repositoryName, repo)).Should(Succeed()) + repo.Status.Artifact.URL = repoURL + repo.ManagedFields = nil + g.Expect(k8sClient.Status().Update(context.Background(), repo)).To(Succeed()) + + g.Eventually(func() bool { + _ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(kustomization), resultK) + return apimeta.IsStatusConditionTrue(resultK.Status.Conditions, meta.ReadyCondition) + }, timeout, time.Second).Should(BeTrue()) + }) +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 51149021..0eab7285 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -167,7 +167,10 @@ func TestMain(m *testing.M) { EventRecorder: testEnv.GetEventRecorderFor(controllerName), MetricsRecorder: testMetricsH.MetricsRecorder, } - if err := (reconciler).SetupWithManager(testEnv, KustomizationReconcilerOptions{MaxConcurrentReconciles: 4}); err != nil { + if err := (reconciler).SetupWithManager(testEnv, KustomizationReconcilerOptions{ + MaxConcurrentReconciles: 4, + DependencyRequeueInterval: 2 * time.Second, + }); err != nil { panic(fmt.Sprintf("Failed to start KustomizationReconciler: %v", err)) } }, func() error {