Skip to content

Commit

Permalink
Serve locally stored fbc content via an http server
Browse files Browse the repository at this point in the history
Closes #113

Signed-off-by: Anik <anikbhattacharya93@gmail.com>
  • Loading branch information
anik120 committed Sep 6, 2023
1 parent d30f161 commit be7f1dc
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 19 deletions.
16 changes: 15 additions & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"flag"
"fmt"
"net/url"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -67,6 +68,7 @@ func main() {
catalogdVersion bool
systemNamespace string
storageDir string
httpExternalAddr string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -77,6 +79,7 @@ func main() {
flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images")
flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads")
flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from")
flag.StringVar(&httpExternalAddr, "http-external-address", "http://localhost:8080", "The external address at which the http server is reachable.")
flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
opts := zap.Options{
Expand Down Expand Up @@ -122,10 +125,21 @@ func main() {
if err := os.MkdirAll(storageDir, 0700); err != nil {
setupLog.Error(err, "unable to create storage directory for catalogs")
}
serverURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", httpExternalAddr))
if err != nil {
setupLog.Error(err, "unable to parse bundle content server URL")
os.Exit(1)
}
localStorage := storage.LocalDir{RootDir: storageDir, URL: *serverURL}
if err := mgr.AddMetricsExtraHandler("/catalogs/", localStorage.StorageServerHandler()); err != nil {
setupLog.Error(err, "unable to start catalog server")
os.Exit(1)
}

if err = (&corecontrollers.CatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: storage.LocalDir{RootDir: storageDir},
Storage: localStorage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Catalog")
os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion config/default/manager_auth_proxy_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ spec:
- "--metrics-bind-address=127.0.0.1:8080"
- "--leader-elect"
- "--catalogs-storage-dir=/var/cache/catalogs"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=false"
- "--feature-gates=CatalogMetadataAPI=true,HTTPServer=true"
- "--http-external-address=https://core.catalogd-system.svc"
14 changes: 3 additions & 11 deletions config/rbac/auth_proxy_service.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
apiVersion: v1
kind: Service
metadata:
labels:
control-plane: controller-manager
app.kubernetes.io/name: service
app.kubernetes.io/instance: controller-manager-metrics-service
app.kubernetes.io/component: kube-rbac-proxy
app.kubernetes.io/created-by: catalogd
app.kubernetes.io/part-of: catalogd
app.kubernetes.io/managed-by: kustomize
name: controller-manager-metrics-service
name: core
namespace: system
spec:
ports:
- name: https
port: 8443
port: 443
protocol: TCP
targetPort: https
targetPort: 8443
selector:
control-plane: controller-manager
9 changes: 9 additions & 0 deletions config/rbac/catalog_reader_client_clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: catalog-reader
rules:
- nonResourceURLs:
- /catalogs/*
verbs:
- get
1 change: 1 addition & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ resources:
- role_binding.yaml
- leader_election_role.yaml
- leader_election_role_binding.yaml
- catalog_reader_client_clusterrole.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/core/catalog_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"testing/fstest"

Expand Down Expand Up @@ -67,6 +68,10 @@ func (m MockStore) Delete(_ string) error {
return nil
}

func (m MockStore) StorageServerHandler() http.Handler {
panic("not needed")
}

var _ = Describe("Catalogd Controller Test", func() {
format.MaxLength = 0
var (
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/localdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package storage
import (
"fmt"
"io/fs"
"net/http"
"net/url"
"os"
"path/filepath"

Expand All @@ -16,6 +18,7 @@ import (
// atomic view of the content for a catalog.
type LocalDir struct {
RootDir string
URL url.URL
}

func (s LocalDir) Store(catalog string, fsys fs.FS) error {
Expand Down Expand Up @@ -45,3 +48,26 @@ func (s LocalDir) Store(catalog string, fsys fs.FS) error {
func (s LocalDir) Delete(catalog string) error {
return os.RemoveAll(filepath.Join(s.RootDir, catalog))
}

func (s LocalDir) StorageServerHandler() http.Handler {
return http.StripPrefix(s.URL.Path, http.FileServer(http.FS(&filesOnlyFilesystem{os.DirFS(s.RootDir)})))

Check warning on line 53 in pkg/storage/localdir.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/localdir.go#L53

Added line #L53 was not covered by tests
}

type filesOnlyFilesystem struct {
FS fs.FS
}

func (f *filesOnlyFilesystem) Open(name string) (fs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err

Check warning on line 63 in pkg/storage/localdir.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/localdir.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}
stat, err := file.Stat()
if err != nil {
return nil, err

Check warning on line 67 in pkg/storage/localdir.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/localdir.go#L65-L67

Added lines #L65 - L67 were not covered by tests
}
if !stat.Mode().IsRegular() {
return nil, os.ErrNotExist

Check warning on line 70 in pkg/storage/localdir.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/localdir.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}
return file, nil

Check warning on line 72 in pkg/storage/localdir.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/localdir.go#L72

Added line #L72 was not covered by tests
}
6 changes: 5 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package storage

import "io/fs"
import (
"io/fs"
"net/http"
)

// Instance is a storage instance that stores FBC content of catalogs
// added to a cluster. It can be used to Store or Delete FBC in the
// host's filesystem
type Instance interface {
Store(catalog string, fsys fs.FS) error
Delete(catalog string) error
StorageServerHandler() http.Handler
}
19 changes: 15 additions & 4 deletions test/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -15,8 +19,11 @@ import (
)

var (
cfg *rest.Config
c client.Client
cfg *rest.Config
c client.Client
err error
defaultSystemNamespace = "catalogd-system"
kubeClient kubernetes.Interface
)

func TestE2E(t *testing.T) {
Expand All @@ -30,8 +37,12 @@ var _ = BeforeSuite(func() {
cfg = ctrl.GetConfigOrDie()

scheme := runtime.NewScheme()
err := catalogd.AddToScheme(scheme)
Expect(err).ToNot(HaveOccurred())
Expect(catalogd.AddToScheme(scheme)).To(Succeed())
Expect(corev1.AddToScheme(scheme)).To(Succeed())
Expect(rbacv1.AddToScheme(scheme)).To(Succeed())
Expect(batchv1.AddToScheme(scheme)).To(Succeed())
c, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).To(Not(HaveOccurred()))
kubeClient, err = kubernetes.NewForConfig(cfg)
Expect(err).ToNot(HaveOccurred())
})
136 changes: 135 additions & 1 deletion test/e2e/unpack_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package e2e

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"

"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/operator-framework/operator-registry/alpha/declcfg"
"github.com/operator-framework/operator-registry/alpha/property"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -169,8 +174,137 @@ var _ = Describe("Catalog Unpacking", func() {
err = c.Get(ctx, client.ObjectKeyFromObject(expectedBndl), bndl)
Expect(err).ToNot(HaveOccurred())
Expect(expectedBndl).To(komega.EqualObject(bndl, komega.IgnoreAutogeneratedMetadata))
})

By("Making sure the catalog content is available via the http server")
// Create a temporary ServiceAccount
var (
sa corev1.ServiceAccount
crb rbacv1.ClusterRoleBinding
job batchv1.Job
pod corev1.Pod
)
sa = corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-sa",
Namespace: defaultSystemNamespace,
},
}
err = c.Create(ctx, &sa)
Expect(err).ToNot(HaveOccurred())

// Create a temporary ClusterRoleBinding to bind the ServiceAccount to catalog-reader ClusterRole
crb = rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-crb",
Namespace: defaultSystemNamespace,
},

Subjects: []rbacv1.Subject{{Kind: "ServiceAccount", Name: "test-svr-sa", Namespace: defaultSystemNamespace}},
RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "ClusterRole", Name: "catalogd-catalog-reader"},
}

err = c.Create(ctx, &crb)
Expect(err).ToNot(HaveOccurred())
mounttoken := true
job = batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-svr-job",
Namespace: defaultSystemNamespace,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-svr",
Image: "curlimages/curl",
// (TODO): Get the URL from the CR once https://github.com/operator-framework/catalogd/issues/119 is done
Command: []string{"sh", "-c", "curl -sSLk -H \"Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)\" -o - " + "https://catalogd-core.catalogd-system.svc/catalogs/test-catalog/all.json"},
},
},
ServiceAccountName: "test-svr-sa",
RestartPolicy: "Never",
AutomountServiceAccountToken: &mounttoken,
},
},
},
}
err = c.Create(ctx, &job)
Expect(err).ToNot(HaveOccurred())
Eventually(func() (bool, error) {
err = c.Get(ctx, types.NamespacedName{Name: "test-svr-job", Namespace: defaultSystemNamespace}, &job)
if err != nil {
return false, err
}
return job.Status.CompletionTime != nil && job.Status.Succeeded == 1, err
}).Should(BeTrue())
pods := &corev1.PodList{}
Eventually(func() (bool, error) {
err := c.List(context.Background(), pods, client.MatchingLabels{"job-name": "test-svr-job"})
if err != nil {
return false, err
}
return len(pods.Items) == 1, nil
}).Should(BeTrue())

type value struct {
PackageName string `json:"packageName"`
Version string `json:"version"`
}
rawMessage, err := json.Marshal(value{PackageName: "prometheus", Version: "0.47.0"})
Expect(err).To(Not(HaveOccurred()))
expectedDeclCfg := declcfg.DeclarativeConfig{
Packages: []declcfg.Package{
{
Schema: "olm.package",
Name: "prometheus",
DefaultChannel: "beta",
},
},
Channels: []declcfg.Channel{
{
Schema: "olm.channel",
Name: "beta",
Package: "prometheus",
Entries: []declcfg.ChannelEntry{
{
Name: "prometheus-operator.0.47.0",
},
},
},
},
Bundles: []declcfg.Bundle{
{
Schema: "olm.bundle",
Name: "prometheus-operator.0.47.0",
Package: "prometheus",
Image: "localhost/testdata/bundles/registry-v1/prometheus-operator:v0.47.0",
Properties: []property.Property{
{
Type: "olm.package",
Value: rawMessage,
},
},
},
},
}
Eventually(func() (bool, error) {

// Get logs of the Pod
pod = pods.Items[0]
logReader, err := kubeClient.CoreV1().Pods(defaultSystemNamespace).GetLogs(pod.Name, &corev1.PodLogOptions{}).Stream(context.Background())
if err != nil {
return false, err
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(logReader)
Expect(err).ToNot(HaveOccurred())
cfg, err := declcfg.LoadReader(buf)
Expect(err).To(Not(HaveOccurred()))
Expect(err).To(Not(HaveOccurred()))
return cmp.Diff(cfg, &expectedDeclCfg) == "", nil
}).Should(BeTrue())
})
AfterEach(func() {
err := c.Delete(ctx, catalog)
Expect(err).ToNot(HaveOccurred())
Expand Down

0 comments on commit be7f1dc

Please sign in to comment.