From fe2259c1d83c00cb050f4ecd3a577625e77ddccd Mon Sep 17 00:00:00 2001 From: Anik Date: Wed, 6 Sep 2023 13:36:47 -0400 Subject: [PATCH] Serve locally stored fbc content via an http server Closes #113 Signed-off-by: Anik --- cmd/manager/main.go | 13 +- config/default/manager_auth_proxy_patch.yaml | 3 +- config/rbac/auth_proxy_service.yaml | 14 +- .../catalog_reader_client_clusterrole.yaml | 9 ++ config/rbac/kustomization.yaml | 1 + .../core/catalog_controller_test.go | 5 + pkg/storage/localdir.go | 26 ++++ pkg/storage/storage.go | 6 +- test/e2e/e2e_suite_test.go | 19 ++- test/e2e/unpack_test.go | 136 +++++++++++++++++- 10 files changed, 213 insertions(+), 19 deletions(-) create mode 100644 config/rbac/catalog_reader_client_clusterrole.yaml diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 30ec040d..9cce2cd1 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -19,6 +19,7 @@ package main import ( "flag" "fmt" + "net/url" "os" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -67,6 +68,7 @@ func main() { catalogdVersion bool systemNamespace string storageDir string + httpExternalAddr string ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -77,6 +79,7 @@ func main() { flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images") flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads") flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from") + flag.StringVar(&httpExternalAddr, "http-external-address", "http://localhost:8080", "The external address at which the http server is reachable.") flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof") flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit") opts := zap.Options{ @@ -122,10 +125,18 @@ func main() { if err := os.MkdirAll(storageDir, 0700); err != nil { setupLog.Error(err, "unable to create storage directory for catalogs") } + serverURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", httpExternalAddr)) + if err != nil { + setupLog.Error(err, "unable to parse bundle content server URL") + os.Exit(1) + } + localStorage := storage.LocalDir{RootDir: storageDir, URL: *serverURL} + mgr.AddMetricsExtraHandler("/catalogs/", localStorage.StorageServerHandler()) + if err = (&corecontrollers.CatalogReconciler{ Client: mgr.GetClient(), Unpacker: unpacker, - Storage: storage.LocalDir{RootDir: storageDir}, + Storage: localStorage, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Catalog") os.Exit(1) diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index 08d8b908..7e13c728 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -51,4 +51,5 @@ spec: - "--metrics-bind-address=127.0.0.1:8080" - "--leader-elect" - "--catalogs-storage-dir=/var/cache/catalogs" - - "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false" + - "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true" + - "--http-external-address=https://core.catalogd-system.svc" diff --git a/config/rbac/auth_proxy_service.yaml b/config/rbac/auth_proxy_service.yaml index fcd6f2ae..1dcc080b 100644 --- a/config/rbac/auth_proxy_service.yaml +++ b/config/rbac/auth_proxy_service.yaml @@ -1,21 +1,13 @@ apiVersion: v1 kind: Service metadata: - labels: - control-plane: controller-manager - app.kubernetes.io/name: service - app.kubernetes.io/instance: controller-manager-metrics-service - app.kubernetes.io/component: kube-rbac-proxy - app.kubernetes.io/created-by: catalogd - app.kubernetes.io/part-of: catalogd - app.kubernetes.io/managed-by: kustomize - name: controller-manager-metrics-service + name: core namespace: system spec: ports: - name: https - port: 8443 + port: 443 protocol: TCP - targetPort: https + targetPort: 8443 selector: control-plane: controller-manager diff --git a/config/rbac/catalog_reader_client_clusterrole.yaml b/config/rbac/catalog_reader_client_clusterrole.yaml new file mode 100644 index 00000000..61e85964 --- /dev/null +++ b/config/rbac/catalog_reader_client_clusterrole.yaml @@ -0,0 +1,9 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: catalog-reader +rules: + - nonResourceURLs: + - /catalogs/* + verbs: + - get diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index 731832a6..419a0027 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -9,6 +9,7 @@ resources: - role_binding.yaml - leader_election_role.yaml - leader_election_role_binding.yaml +- catalog_reader_client_clusterrole.yaml # Comment the following 4 lines if you want to disable # the auth proxy (https://github.com/brancz/kube-rbac-proxy) # which protects your /metrics endpoint. diff --git a/pkg/controllers/core/catalog_controller_test.go b/pkg/controllers/core/catalog_controller_test.go index f47c180d..624b2064 100644 --- a/pkg/controllers/core/catalog_controller_test.go +++ b/pkg/controllers/core/catalog_controller_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/fs" + "net/http" "os" "testing/fstest" @@ -67,6 +68,10 @@ func (m MockStore) Delete(_ string) error { return nil } +func (m MockStore) StorageServerHandler() http.Handler { + panic("not needed") +} + var _ = Describe("Catalogd Controller Test", func() { format.MaxLength = 0 var ( diff --git a/pkg/storage/localdir.go b/pkg/storage/localdir.go index 9a1015b1..1545020f 100644 --- a/pkg/storage/localdir.go +++ b/pkg/storage/localdir.go @@ -3,6 +3,8 @@ package storage import ( "fmt" "io/fs" + "net/http" + "net/url" "os" "path/filepath" @@ -16,6 +18,7 @@ import ( // atomic view of the content for a catalog. type LocalDir struct { RootDir string + URL url.URL } func (s LocalDir) Store(catalog string, fsys fs.FS) error { @@ -45,3 +48,26 @@ func (s LocalDir) Store(catalog string, fsys fs.FS) error { func (s LocalDir) Delete(catalog string) error { return os.RemoveAll(filepath.Join(s.RootDir, catalog)) } + +func (s LocalDir) StorageServerHandler() http.Handler { + return http.StripPrefix(s.URL.Path, http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)}))) +} + +type filesOnlyFilesystem struct { + FS fs.FS +} + +func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { + file, err := f.FS.Open(name) + if err != nil { + return nil, err + } + stat, err := file.Stat() + if err != nil { + return nil, err + } + if !stat.Mode().IsRegular() { + return nil, os.ErrNotExist + } + return file, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d0a6e111..0938a6b0 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,6 +1,9 @@ package storage -import "io/fs" +import ( + "io/fs" + "net/http" +) // Instance is a storage instance that stores FBC content of catalogs // added to a cluster. It can be used to Store or Delete FBC in the @@ -8,4 +11,5 @@ import "io/fs" type Instance interface { Store(catalog string, fsys fs.FS) error Delete(catalog string) error + StorageServerHandler() http.Handler } diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index d0217bc4..5bafce8c 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -6,7 +6,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -15,8 +19,11 @@ import ( ) var ( - cfg *rest.Config - c client.Client + cfg *rest.Config + c client.Client + err error + defaultSystemNamespace = "catalogd-system" + kubeClient kubernetes.Interface ) func TestE2E(t *testing.T) { @@ -30,8 +37,12 @@ var _ = BeforeSuite(func() { cfg = ctrl.GetConfigOrDie() scheme := runtime.NewScheme() - err := catalogd.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) + Expect(catalogd.AddToScheme(scheme)).To(Succeed()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + Expect(rbacv1.AddToScheme(scheme)).To(Succeed()) + Expect(batchv1.AddToScheme(scheme)).To(Succeed()) c, err = client.New(cfg, client.Options{Scheme: scheme}) Expect(err).To(Not(HaveOccurred())) + kubeClient, err = kubernetes.NewForConfig(cfg) + Expect(err).ToNot(HaveOccurred()) }) diff --git a/test/e2e/unpack_test.go b/test/e2e/unpack_test.go index 7825366c..96b810ed 100644 --- a/test/e2e/unpack_test.go +++ b/test/e2e/unpack_test.go @@ -1,15 +1,20 @@ package e2e import ( + "bytes" "context" "encoding/json" "fmt" "os" + "github.com/google/go-cmp/cmp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/alpha/property" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -169,8 +174,137 @@ var _ = Describe("Catalog Unpacking", func() { err = c.Get(ctx, client.ObjectKeyFromObject(expectedBndl), bndl) Expect(err).ToNot(HaveOccurred()) Expect(expectedBndl).To(komega.EqualObject(bndl, komega.IgnoreAutogeneratedMetadata)) - }) + By("Making sure the catalog content is available via the http server") + // Create a temporary ServiceAccount + var ( + sa corev1.ServiceAccount + crb rbacv1.ClusterRoleBinding + job batchv1.Job + pod corev1.Pod + ) + sa = corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svr-sa", + Namespace: defaultSystemNamespace, + }, + } + err = c.Create(ctx, &sa) + Expect(err).ToNot(HaveOccurred()) + + // Create a temporary ClusterRoleBinding to bind the ServiceAccount to catalog-reader ClusterRole + crb = rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svr-crb", + Namespace: defaultSystemNamespace, + }, + + Subjects: []rbacv1.Subject{{Kind: "ServiceAccount", Name: "test-svr-sa", Namespace: defaultSystemNamespace}}, + RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "ClusterRole", Name: "catalogd-catalog-reader"}, + } + + err = c.Create(ctx, &crb) + Expect(err).ToNot(HaveOccurred()) + mounttoken := true + job = batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svr-job", + Namespace: defaultSystemNamespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-svr", + Image: "curlimages/curl", + // (TODO): Get the URL from the CR once https://github.com/operator-framework/catalogd/issues/119 is done + Command: []string{"sh", "-c", "curl -sSLk -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" -o - " + "https://catalogd-core.catalogd-system.svc/catalogs/test-catalog/all.json"}, + }, + }, + ServiceAccountName: "test-svr-sa", + RestartPolicy: "Never", + AutomountServiceAccountToken: &mounttoken, + }, + }, + }, + } + err = c.Create(ctx, &job) + Expect(err).ToNot(HaveOccurred()) + Eventually(func() (bool, error) { + err = c.Get(ctx, types.NamespacedName{Name: "test-svr-job", Namespace: defaultSystemNamespace}, &job) + if err != nil { + return false, err + } + return job.Status.CompletionTime != nil && job.Status.Succeeded == 1, err + }).Should(BeTrue()) + pods := &corev1.PodList{} + Eventually(func() (bool, error) { + err := c.List(context.Background(), pods, client.MatchingLabels{"job-name": "test-svr-job"}) + if err != nil { + return false, err + } + return len(pods.Items) == 1, nil + }).Should(BeTrue()) + + type value struct { + PackageName string `json:"packageName"` + Version string `json:"version"` + } + rawMessage, err := json.Marshal(value{PackageName: "prometheus", Version: "0.47.0"}) + Expect(err).To(Not(HaveOccurred())) + expectedDeclCfg := declcfg.DeclarativeConfig{ + Packages: []declcfg.Package{ + { + Schema: "olm.package", + Name: "prometheus", + DefaultChannel: "beta", + }, + }, + Channels: []declcfg.Channel{ + { + Schema: "olm.channel", + Name: "beta", + Package: "prometheus", + Entries: []declcfg.ChannelEntry{ + { + Name: "prometheus-operator.0.47.0", + }, + }, + }, + }, + Bundles: []declcfg.Bundle{ + { + Schema: "olm.bundle", + Name: "prometheus-operator.0.47.0", + Package: "prometheus", + Image: "localhost/testdata/bundles/registry-v1/prometheus-operator:v0.47.0", + Properties: []property.Property{ + { + Type: "olm.package", + Value: rawMessage, + }, + }, + }, + }, + } + Eventually(func() (bool, error) { + + // Get logs of the Pod + pod = pods.Items[0] + logReader, err := kubeClient.CoreV1().Pods(defaultSystemNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{}).Stream(context.Background()) + if err != nil { + return false, err + } + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(logReader) + Expect(err).ToNot(HaveOccurred()) + cfg, err := declcfg.LoadReader(buf) + Expect(err).To(Not(HaveOccurred())) + Expect(err).To(Not(HaveOccurred())) + return cmp.Diff(cfg, &expectedDeclCfg) == "", nil + }).Should(BeTrue()) + }) AfterEach(func() { err := c.Delete(ctx, catalog) Expect(err).ToNot(HaveOccurred())