diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 30ec040d..1b16a6c0 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -19,7 +19,9 @@ package main import ( "flag" "fmt" + "net/http" "os" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -35,6 +37,7 @@ import ( "github.com/spf13/pflag" "github.com/operator-framework/catalogd/internal/source" + "github.com/operator-framework/catalogd/internal/third_party/server" "github.com/operator-framework/catalogd/internal/version" corecontrollers "github.com/operator-framework/catalogd/pkg/controllers/core" "github.com/operator-framework/catalogd/pkg/features" @@ -67,6 +70,7 @@ func main() { catalogdVersion bool systemNamespace string storageDir string + catalogServerAddr string ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -77,6 +81,7 @@ func main() { flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images") flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads") flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from") + flag.StringVar(&catalogServerAddr, "catalogs-server-addr", ":8083", "The address where the unpacked catalogs' content will be accessible") flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof") flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit") opts := zap.Options{ @@ -122,10 +127,27 @@ func main() { if err := os.MkdirAll(storageDir, 0700); err != nil { setupLog.Error(err, "unable to create storage directory for catalogs") } + localStorage := storage.LocalDir{RootDir: storageDir} + shutdownTimeout := 30 * time.Second + catalogServer := server.Server{ + Kind: "catalogs", + Server: &http.Server{ + Addr: catalogServerAddr, + Handler: localStorage.StorageServerHandler(), + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + }, + ShutdownTimeout: &shutdownTimeout, + } + if err := mgr.Add(&catalogServer); err != nil { + setupLog.Error(err, "unable to start catalog server") + os.Exit(1) + } + if err = (&corecontrollers.CatalogReconciler{ Client: mgr.GetClient(), Unpacker: unpacker, - Storage: storage.LocalDir{RootDir: storageDir}, + Storage: localStorage, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Catalog") os.Exit(1) diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index 08d8b908..b24c81b9 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -51,4 +51,5 @@ spec: - "--metrics-bind-address=127.0.0.1:8080" - "--leader-elect" - "--catalogs-storage-dir=/var/cache/catalogs" - - "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false" + - "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true" + diff --git a/config/rbac/catalogserver_service.yaml b/config/rbac/catalogserver_service.yaml new file mode 100644 index 00000000..96e74b56 --- /dev/null +++ b/config/rbac/catalogserver_service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: catalogserver + namespace: system +spec: + selector: + control-plane: controller-manager + ports: + - name: http + protocol: TCP + port: 80 + targetPort: 8083 diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index 731832a6..0ae1b3c1 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -9,6 +9,7 @@ resources: - role_binding.yaml - leader_election_role.yaml - leader_election_role_binding.yaml +- catalogserver_service.yaml # Comment the following 4 lines if you want to disable # the auth proxy (https://github.com/brancz/kube-rbac-proxy) # which protects your /metrics endpoint. diff --git a/go.mod b/go.mod index 34200f46..97a9969b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/blang/semver/v4 v4.0.0 + github.com/go-logr/logr v1.2.4 github.com/google/go-cmp v0.5.9 github.com/nlepage/go-tarfs v1.1.0 github.com/onsi/ginkgo/v2 v2.9.7 @@ -31,7 +32,6 @@ require ( github.com/go-git/gcfg v1.5.0 // indirect github.com/go-git/go-billy/v5 v5.4.1 // indirect github.com/go-git/go-git/v5 v5.4.2 // indirect - github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/zapr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect diff --git a/internal/third_party/server/server.go b/internal/third_party/server/server.go new file mode 100644 index 00000000..c8cf442c --- /dev/null +++ b/internal/third_party/server/server.go @@ -0,0 +1,122 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// this is copied from https://raw.githubusercontent.com/kubernetes-sigs/controller-runtime/77b08a845e451b695cfa25b79ebe277d85064345/pkg/manager/server.go +// we will remove this once we update to a version of controller-runitme that has this included +// https://github.com/kubernetes-sigs/controller-runtime/pull/2473 + +package server + +import ( + "context" + "errors" + "net" + "net/http" + "time" + + "github.com/go-logr/logr" + + crlog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +var ( + _ manager.Runnable = (*Server)(nil) + _ manager.LeaderElectionRunnable = (*Server)(nil) +) + +// Server is a general purpose HTTP server Runnable for a manager. +// It is used to serve some internal handlers for health probes and profiling, +// but it can also be used to run custom servers. +type Server struct { + // Kind is an optional string that describes the purpose of the server. It is used in logs to distinguish + // among multiple servers. + Kind string + + // Log is the logger used by the server. If not set, a logger will be derived from the context passed to Start. + Log logr.Logger + + // Server is the HTTP server to run. It is required. + Server *http.Server + + // Listener is an optional listener to use. If not set, the server start a listener using the server.Addr. + // Using a listener is useful when the port reservation needs to happen in advance of this runnable starting. + Listener net.Listener + + // OnlyServeWhenLeader is an optional bool that indicates that the server should only be started when the manager is the leader. + OnlyServeWhenLeader bool + + // ShutdownTimeout is an optional duration that indicates how long to wait for the server to shutdown gracefully. If not set, + // the server will wait indefinitely for all connections to close. + ShutdownTimeout *time.Duration +} + +// Start starts the server. It will block until the server is stopped or an error occurs. +func (s *Server) Start(ctx context.Context) error { + log := s.Log + if log.GetSink() == nil { + log = crlog.FromContext(ctx) + } + if s.Kind != "" { + log = log.WithValues("kind", s.Kind) + } + log = log.WithValues("addr", s.addr()) + + serverShutdown := make(chan struct{}) + go func() { + <-ctx.Done() + log.Info("shutting down server") + + shutdownCtx := context.Background() + if s.ShutdownTimeout != nil { + var shutdownCancel context.CancelFunc + shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), *s.ShutdownTimeout) + defer shutdownCancel() + } + + if err := s.Server.Shutdown(shutdownCtx); err != nil { + log.Error(err, "error shutting down server") + } + close(serverShutdown) + }() + + log.Info("starting server") + if err := s.serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + + <-serverShutdown + return nil +} + +// NeedLeaderElection returns true if the server should only be started when the manager is the leader. +func (s *Server) NeedLeaderElection() bool { + return s.OnlyServeWhenLeader +} + +func (s *Server) addr() string { + if s.Listener != nil { + return s.Listener.Addr().String() + } + return s.Server.Addr +} + +func (s *Server) serve() error { + if s.Listener != nil { + return s.Server.Serve(s.Listener) + } + return s.Server.ListenAndServe() +} diff --git a/pkg/controllers/core/catalog_controller_test.go b/pkg/controllers/core/catalog_controller_test.go index f47c180d..624b2064 100644 --- a/pkg/controllers/core/catalog_controller_test.go +++ b/pkg/controllers/core/catalog_controller_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/fs" + "net/http" "os" "testing/fstest" @@ -67,6 +68,10 @@ func (m MockStore) Delete(_ string) error { return nil } +func (m MockStore) StorageServerHandler() http.Handler { + panic("not needed") +} + var _ = Describe("Catalogd Controller Test", func() { format.MaxLength = 0 var ( diff --git a/pkg/storage/localdir.go b/pkg/storage/localdir.go index 9a1015b1..fbbcda9a 100644 --- a/pkg/storage/localdir.go +++ b/pkg/storage/localdir.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "io/fs" + "net/http" "os" "path/filepath" @@ -45,3 +46,36 @@ func (s LocalDir) Store(catalog string, fsys fs.FS) error { func (s LocalDir) Delete(catalog string) error { return os.RemoveAll(filepath.Join(s.RootDir, catalog)) } + +func (s LocalDir) StorageServerHandler() http.Handler { + mux := http.NewServeMux() + mux.Handle("/catalogs/", http.StripPrefix("/catalogs/", http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})))) + return mux +} + +// filesOnlyFilesystem is a file system that can open only regular +// files from the underlying filesystem. All other file types result +// in os.ErrNotExists +type filesOnlyFilesystem struct { + FS fs.FS +} + +// Open opens a named file from the underlying filesystem. If the file +// is not a regular file, it return os.ErrNotExists. Callers are resposible +// for closing the file returned. +func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) { + file, err := f.FS.Open(name) + if err != nil { + return nil, err + } + stat, err := file.Stat() + if err != nil { + _ = file.Close() + return nil, err + } + if !stat.Mode().IsRegular() { + _ = file.Close() + return nil, os.ErrNotExist + } + return file, nil +} diff --git a/pkg/storage/localdir_test.go b/pkg/storage/localdir_test.go index 8600d7c3..aec96c65 100644 --- a/pkg/storage/localdir_test.go +++ b/pkg/storage/localdir_test.go @@ -2,7 +2,10 @@ package storage import ( "fmt" + "io" "io/fs" + "net/http" + "net/http/httptest" "os" "path/filepath" "testing/fstest" @@ -34,8 +37,8 @@ var _ = Describe("LocalDir Storage Test", func() { ) BeforeEach(func() { d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") - rootDir = d Expect(err).ToNot(HaveOccurred()) + rootDir = d store = LocalDir{RootDir: rootDir} unpackResultFS = &fstest.MapFS{ @@ -76,6 +79,68 @@ var _ = Describe("LocalDir Storage Test", func() { }) }) +var _ = Describe("LocalDir Server Handler tests", func() { + var ( + testServer *httptest.Server + store LocalDir + ) + BeforeEach(func() { + d, err := os.MkdirTemp(GinkgoT().TempDir(), "cache") + Expect(err).ToNot(HaveOccurred()) + Expect(os.Mkdir(filepath.Join(d, "test-catalog"), 0700)).To(Succeed()) + store = LocalDir{RootDir: d} + testServer = httptest.NewServer(store.StorageServerHandler()) + + }) + It("gets 404 for the path /", func() { + expectNotFound(testServer.URL) + }) + It("gets 404 for the path /catalogs/", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/")) + }) + It("gets 404 for the path /catalogs/test-catalog/", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/")) + }) + It("gets 404 for the path /test-catalog/foo.txt", func() { + // This ensures that even if the file exists, the URL must contain the /catalogs/ prefix + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), []byte("bar"), 0600)).To(Succeed()) + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/test-catalog/foo.txt")) + }) + It("gets 404 for the path /catalogs/test-catalog/non-existent.txt", func() { + expectNotFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/non-existent.txt")) + }) + It("gets 200 for the path /catalogs/foo.txt", func() { + expectedContent := []byte("bar") + Expect(os.WriteFile(filepath.Join(store.RootDir, "foo.txt"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/foo.txt"), expectedContent) + }) + It("gets 200 for the path /catalogs/test-catalog/foo.txt", func() { + expectedContent := []byte("bar") + Expect(os.WriteFile(filepath.Join(store.RootDir, "test-catalog", "foo.txt"), expectedContent, 0600)).To(Succeed()) + expectFound(fmt.Sprintf("%s/%s", testServer.URL, "/catalogs/test-catalog/foo.txt"), expectedContent) + }) + AfterEach(func() { + testServer.Close() + }) +}) + +func expectNotFound(url string) { + resp, err := http.Get(url) //nolint:gosec + Expect(err).To(Not(HaveOccurred())) + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) + Expect(resp.Body.Close()).To(Succeed()) +} + +func expectFound(url string, expectedContent []byte) { + resp, err := http.Get(url) //nolint:gosec + Expect(err).To(Not(HaveOccurred())) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + actualContent, err := io.ReadAll(resp.Body) + Expect(err).To(Not(HaveOccurred())) + Expect(actualContent).To(Equal(expectedContent)) + Expect(resp.Body.Close()).To(Succeed()) +} + const testBundleTemplate = `--- image: %s name: %s diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d0a6e111..1b244973 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,11 +1,16 @@ package storage -import "io/fs" +import ( + "io/fs" + "net/http" +) // Instance is a storage instance that stores FBC content of catalogs // added to a cluster. It can be used to Store or Delete FBC in the -// host's filesystem +// host's filesystem. It also a manager runnable object, that starts +// a server to serve the content stored. type Instance interface { Store(catalog string, fsys fs.FS) error Delete(catalog string) error + StorageServerHandler() http.Handler } diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index d0217bc4..4945d4c3 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -6,7 +6,8 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -15,8 +16,11 @@ import ( ) var ( - cfg *rest.Config - c client.Client + cfg *rest.Config + c client.Client + err error + defaultSystemNamespace = "catalogd-system" + kubeClient kubernetes.Interface ) func TestE2E(t *testing.T) { @@ -29,9 +33,10 @@ func TestE2E(t *testing.T) { var _ = BeforeSuite(func() { cfg = ctrl.GetConfigOrDie() - scheme := runtime.NewScheme() - err := catalogd.AddToScheme(scheme) - Expect(err).ToNot(HaveOccurred()) - c, err = client.New(cfg, client.Options{Scheme: scheme}) + sch := scheme.Scheme + Expect(catalogd.AddToScheme(sch)).To(Succeed()) + c, err = client.New(cfg, client.Options{Scheme: sch}) Expect(err).To(Not(HaveOccurred())) + kubeClient, err = kubernetes.NewForConfig(cfg) + Expect(err).ToNot(HaveOccurred()) }) diff --git a/test/e2e/unpack_test.go b/test/e2e/unpack_test.go index 7825366c..444e0230 100644 --- a/test/e2e/unpack_test.go +++ b/test/e2e/unpack_test.go @@ -4,11 +4,14 @@ import ( "context" "encoding/json" "fmt" + "io" "os" + "github.com/google/go-cmp/cmp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/operator-framework/operator-registry/alpha/declcfg" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -44,6 +47,7 @@ var _ = Describe("Catalog Unpacking", func() { var ( ctx context.Context catalog *catalogd.Catalog + job batchv1.Job ) When("A Catalog is created", func() { BeforeEach(func() { @@ -169,11 +173,61 @@ var _ = Describe("Catalog Unpacking", func() { err = c.Get(ctx, client.ObjectKeyFromObject(expectedBndl), bndl) Expect(err).ToNot(HaveOccurred()) Expect(expectedBndl).To(komega.EqualObject(bndl, komega.IgnoreAutogeneratedMetadata)) - }) - AfterEach(func() { - err := c.Delete(ctx, catalog) + By("Making sure the catalog content is available via the http server") + // (TODO): Get the URL from the CR once https://github.com/operator-framework/catalogd/issues/119 is done + catalogURL := fmt.Sprintf("%s.%s.svc/catalogs/%s/all.json", "catalogd-catalogserver", defaultSystemNamespace, catalogName) + job = batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-svr-job", + Namespace: defaultSystemNamespace, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-svr", + Image: "curlimages/curl", + Command: []string{"sh", "-c", "curl --silent --show-error --location -o - " + catalogURL}, + }, + }, + RestartPolicy: "Never", + }, + }, + }, + } + err = c.Create(ctx, &job) Expect(err).ToNot(HaveOccurred()) + Eventually(func() (bool, error) { + err = c.Get(ctx, types.NamespacedName{Name: "test-svr-job", Namespace: defaultSystemNamespace}, &job) + if err != nil { + return false, err + } + return job.Status.CompletionTime != nil && job.Status.Succeeded == 1, err + }).Should(BeTrue()) + pods := &corev1.PodList{} + Eventually(func() (bool, error) { + err := c.List(context.Background(), pods, client.MatchingLabels{"job-name": "test-svr-job"}) + if err != nil { + return false, err + } + return len(pods.Items) == 1, nil + }).Should(BeTrue()) + + expectedFBC, err := os.ReadFile("../../testdata/catalogs/test-catalog/expected_all.json") + Expect(err).To(Not(HaveOccurred())) + // Get logs of the Pod + pod := pods.Items[0] + logReader, err := kubeClient.CoreV1().Pods(defaultSystemNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{}).Stream(context.Background()) + Expect(err).To(Not(HaveOccurred())) + actualFBC, err := io.ReadAll(logReader) + Expect(err).To(Not(HaveOccurred())) + Expect(cmp.Diff(expectedFBC, actualFBC)).To(BeEmpty()) + }) + AfterEach(func() { + Expect(c.Delete(ctx, &job)).To(Succeed()) + Expect(c.Delete(ctx, catalog)).To(Succeed()) Eventually(func(g Gomega) { err = c.Get(ctx, types.NamespacedName{Name: catalog.Name}, &catalogd.Catalog{}) g.Expect(errors.IsNotFound(err)).To(BeTrue()) diff --git a/testdata/catalogs/test-catalog/.indexignore b/testdata/catalogs/test-catalog/.indexignore new file mode 100644 index 00000000..75d7da97 --- /dev/null +++ b/testdata/catalogs/test-catalog/.indexignore @@ -0,0 +1 @@ +/expected_all.json diff --git a/testdata/catalogs/test-catalog/expected_all.json b/testdata/catalogs/test-catalog/expected_all.json new file mode 100644 index 00000000..ed44b148 --- /dev/null +++ b/testdata/catalogs/test-catalog/expected_all.json @@ -0,0 +1 @@ +{"defaultChannel":"beta","name":"prometheus","schema":"olm.package"}{"entries":[{"name":"prometheus-operator.0.47.0"}],"name":"beta","package":"prometheus","schema":"olm.channel"}{"image":"localhost/testdata/bundles/registry-v1/prometheus-operator:v0.47.0","name":"prometheus-operator.0.47.0","package":"prometheus","properties":[{"type":"olm.package","value":{"packageName":"prometheus","version":"0.47.0"}}],"schema":"olm.bundle"} \ No newline at end of file