diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 30ec040d..2c3abf07 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -19,7 +19,9 @@ package main import ( "flag" "fmt" + "net/http" "os" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -67,6 +69,7 @@ func main() { catalogdVersion bool systemNamespace string storageDir string + catalogServerAddr string ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -77,6 +80,7 @@ func main() { flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images") flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads") flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from") + flag.StringVar(&catalogServerAddr, "catalogs-server-addr", "127.0.0.1:8083", "The address where the unpacked catalogs' content will be accessible") flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof") flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit") opts := zap.Options{ @@ -122,10 +126,16 @@ func main() { if err := os.MkdirAll(storageDir, 0700); err != nil { setupLog.Error(err, "unable to create storage directory for catalogs") } + localStorage := storage.LocalDir{RootDir: storageDir, Server: &http.Server{Addr: catalogServerAddr, ReadHeaderTimeout: 3. * time.Second}} + if err := mgr.Add(localStorage); err != nil { + setupLog.Error(err, "unable to start catalog server") + os.Exit(1) + } + if err = (&corecontrollers.CatalogReconciler{ Client: mgr.GetClient(), Unpacker: unpacker, - Storage: storage.LocalDir{RootDir: storageDir}, + Storage: localStorage, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Catalog") os.Exit(1) diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index 08d8b908..52db543a 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -51,4 +51,5 @@ spec: - "--metrics-bind-address=127.0.0.1:8080" - "--leader-elect" - "--catalogs-storage-dir=/var/cache/catalogs" - - "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false" + - "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true" + - "--catalogs-server-addr=:8083" diff --git a/config/rbac/auth_proxy_service.yaml b/config/rbac/auth_proxy_service.yaml index fcd6f2ae..1dcc080b 100644 --- a/config/rbac/auth_proxy_service.yaml +++ b/config/rbac/auth_proxy_service.yaml @@ -1,21 +1,13 @@ apiVersion: v1 kind: Service metadata: - labels: - control-plane: controller-manager - app.kubernetes.io/name: service - app.kubernetes.io/instance: controller-manager-metrics-service - app.kubernetes.io/component: kube-rbac-proxy - app.kubernetes.io/created-by: catalogd - app.kubernetes.io/part-of: catalogd - app.kubernetes.io/managed-by: kustomize - name: controller-manager-metrics-service + name: core namespace: system spec: ports: - name: https - port: 8443 + port: 443 protocol: TCP - targetPort: https + targetPort: 8443 selector: control-plane: controller-manager diff --git a/config/rbac/catalogserver_service.yaml b/config/rbac/catalogserver_service.yaml new file mode 100644 index 00000000..57ef631b --- /dev/null +++ b/config/rbac/catalogserver_service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: catalogserver + namespace: system +spec: + selector: + control-plane: controller-manager + ports: + - name: http + protocol: TCP + port: 80 + targetPort: 8083 \ No newline at end of file diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index 731832a6..0ae1b3c1 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -9,6 +9,7 @@ resources: - role_binding.yaml - leader_election_role.yaml - leader_election_role_binding.yaml +- catalogserver_service.yaml # Comment the following 4 lines if you want to disable # the auth proxy (https://github.com/brancz/kube-rbac-proxy) # which protects your /metrics endpoint. diff --git a/pkg/controllers/core/catalog_controller_test.go b/pkg/controllers/core/catalog_controller_test.go index f47c180d..78095651 100644 --- a/pkg/controllers/core/catalog_controller_test.go +++ b/pkg/controllers/core/catalog_controller_test.go @@ -67,6 +67,10 @@ func (m MockStore) Delete(_ string) error { return nil } +func (m MockStore) Start(_ context.Context) error { + panic("not needed") +} + var _ = Describe("Catalogd Controller Test", func() { format.MaxLength = 0 var ( diff --git a/pkg/storage/localdir.go b/pkg/storage/localdir.go index 9a1015b1..5aefd153 100644 --- a/pkg/storage/localdir.go +++ b/pkg/storage/localdir.go @@ -1,8 +1,10 @@ package storage import ( + "context" "fmt" "io/fs" + "net/http" "os" "path/filepath" @@ -16,6 +18,7 @@ import ( // atomic view of the content for a catalog. type LocalDir struct { RootDir string + Server *http.Server } func (s LocalDir) Store(catalog string, fsys fs.FS) error { @@ -45,3 +48,36 @@ func (s LocalDir) Store(catalog string, fsys fs.FS) error { func (s LocalDir) Delete(catalog string) error { return os.RemoveAll(filepath.Join(s.RootDir, catalog)) } + +func (s LocalDir) Start(ctx context.Context) error { + s.Server.Handler = http.StripPrefix("/catalogs/", http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)}))) + errChan := make(chan error) + go func() { + errChan <- s.Server.ListenAndServe() + }() + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return s.Server.Shutdown(context.Background()) + } +} + +type filesOnlyFilesystem struct { + FS fs.FS +} + +func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { + file, err := f.FS.Open(name) + if err != nil { + return nil, err + } + stat, err := file.Stat() + if err != nil { + return nil, err + } + if !stat.Mode().IsRegular() { + return nil, os.ErrNotExist + } + return file, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d0a6e111..f4ed42b7 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,11 +1,16 @@ package storage -import "io/fs" +import ( + "context" + "io/fs" +) // Instance is a storage instance that stores FBC content of catalogs // added to a cluster. It can be used to Store or Delete FBC in the -// host's filesystem +// host's filesystem. It also a manager runnable object, that starts +// a server to serve the content stored. type Instance interface { Store(catalog string, fsys fs.FS) error Delete(catalog string) error + Start(ctx context.Context) error } diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index d0217bc4..5bafce8c 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -6,7 +6,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -15,8 +19,11 @@ import ( ) var ( - cfg *rest.Config - c client.Client + cfg *rest.Config + c client.Client + err error + defaultSystemNamespace = "catalogd-system" + kubeClient kubernetes.Interface ) func TestE2E(t *testing.T) { @@ -30,8 +37,12 @@ var _ = BeforeSuite(func() { cfg = ctrl.GetConfigOrDie() scheme := runtime.NewScheme() - err := catalogd.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) + Expect(catalogd.AddToScheme(scheme)).To(Succeed()) + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + Expect(rbacv1.AddToScheme(scheme)).To(Succeed()) + Expect(batchv1.AddToScheme(scheme)).To(Succeed()) c, err = client.New(cfg, client.Options{Scheme: scheme}) Expect(err).To(Not(HaveOccurred())) + kubeClient, err = kubernetes.NewForConfig(cfg) + Expect(err).ToNot(HaveOccurred()) }) diff --git a/test/e2e/unpack_test.go b/test/e2e/unpack_test.go index 7825366c..b5a01c26 100644 --- a/test/e2e/unpack_test.go +++ b/test/e2e/unpack_test.go @@ -6,9 +6,12 @@ import ( "fmt" "os" + "github.com/google/go-cmp/cmp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/alpha/property" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -44,6 +47,8 @@ var _ = Describe("Catalog Unpacking", func() { var ( ctx context.Context catalog *catalogd.Catalog + job batchv1.Job + pod corev1.Pod ) When("A Catalog is created", func() { BeforeEach(func() { @@ -169,11 +174,105 @@ var _ = Describe("Catalog Unpacking", func() { err = c.Get(ctx, client.ObjectKeyFromObject(expectedBndl), bndl) Expect(err).ToNot(HaveOccurred()) Expect(expectedBndl).To(komega.EqualObject(bndl, komega.IgnoreAutogeneratedMetadata)) - }) - AfterEach(func() { - err := c.Delete(ctx, catalog) + By("Making sure the catalog content is available via the http server") + // (TODO): Get the URL from the CR once https://github.com/operator-framework/catalogd/issues/119 is done + catalogURL := fmt.Sprintf("%s.%s.svc/catalogs/%s/all.json", "catalogd-catalogserver", defaultSystemNamespace, catalogName) + job = batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svr-job", + Namespace: defaultSystemNamespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-svr", + Image: "curlimages/curl", + Command: []string{"sh", "-c", "curl --silent --show-error --location --insecure --header \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" -o - " + catalogURL}, + }, + }, + RestartPolicy: "Never", + }, + }, + }, + } + err = c.Create(ctx, &job) Expect(err).ToNot(HaveOccurred()) + Eventually(func() (bool, error) { + err = c.Get(ctx, types.NamespacedName{Name: "test-svr-job", Namespace: defaultSystemNamespace}, &job) + if err != nil { + return false, err + } + return job.Status.CompletionTime != nil && job.Status.Succeeded == 1, err + }).Should(BeTrue()) + pods := &corev1.PodList{} + Eventually(func() (bool, error) { + err := c.List(context.Background(), pods, client.MatchingLabels{"job-name": "test-svr-job"}) + if err != nil { + return false, err + } + return len(pods.Items) == 1, nil + }).Should(BeTrue()) + + type value struct { + PackageName string `json:"packageName"` + Version string `json:"version"` + } + rawMessage, err := json.Marshal(value{PackageName: "prometheus", Version: "0.47.0"}) + Expect(err).To(Not(HaveOccurred())) + expectedDeclCfg := declcfg.DeclarativeConfig{ + Packages: []declcfg.Package{ + { + Schema: "olm.package", + Name: "prometheus", + DefaultChannel: "beta", + }, + }, + Channels: []declcfg.Channel{ + { + Schema: "olm.channel", + Name: "beta", + Package: "prometheus", + Entries: []declcfg.ChannelEntry{ + { + Name: "prometheus-operator.0.47.0", + }, + }, + }, + }, + Bundles: []declcfg.Bundle{ + { + Schema: "olm.bundle", + Name: "prometheus-operator.0.47.0", + Package: "prometheus", + Image: "localhost/testdata/bundles/registry-v1/prometheus-operator:v0.47.0", + Properties: []property.Property{ + { + Type: "olm.package", + Value: rawMessage, + }, + }, + }, + }, + } + Eventually(func() (bool, error) { + + // Get logs of the Pod + pod = pods.Items[0] + logReader, err := kubeClient.CoreV1().Pods(defaultSystemNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{}).Stream(context.Background()) + if err != nil { + return false, err + } + + cfg, err := declcfg.LoadReader(logReader) + Expect(err).To(Not(HaveOccurred())) + return cmp.Diff(cfg, &expectedDeclCfg) == "", nil + }).Should(BeTrue()) + }) + AfterEach(func() { + Expect(c.Delete(ctx, catalog)).To(Succeed()) Eventually(func(g Gomega) { err = c.Get(ctx, types.NamespacedName{Name: catalog.Name}, &catalogd.Catalog{}) g.Expect(errors.IsNotFound(err)).To(BeTrue())