From 038505ce3074e2291ea68e80c5c6ed8d755b87d9 Mon Sep 17 00:00:00 2001 From: Bryce Palmer Date: Thu, 21 Sep 2023 10:36:48 -0400 Subject: [PATCH] use catalogd HTTP server instead of CatalogMetadata API Signed-off-by: Bryce Palmer --- cmd/manager/main.go | 14 +- config/manager/manager.yaml | 6 + go.mod | 2 +- go.sum | 4 +- internal/catalogmetadata/cache/cache.go | 143 ++++++++++ internal/catalogmetadata/cache/cache_test.go | 269 ++++++++++++++++++ internal/catalogmetadata/client/client.go | 75 +++-- .../catalogmetadata/client/client_test.go | 218 +++++++------- internal/catalogmetadata/unmarshal.go | 20 -- internal/catalogmetadata/unmarshal_test.go | 110 ------- test/e2e/e2e_suite_test.go | 10 - test/e2e/install_test.go | 7 - .../operator_framework_test.go | 56 +--- 13 files changed, 596 insertions(+), 338 deletions(-) create mode 100644 internal/catalogmetadata/cache/cache.go create mode 100644 internal/catalogmetadata/cache/cache_test.go delete mode 100644 internal/catalogmetadata/unmarshal.go delete mode 100644 internal/catalogmetadata/unmarshal_test.go diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 27b923833..38a62a10f 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "net/http" "os" "github.com/spf13/pflag" @@ -35,6 +36,7 @@ import ( rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1" operatorsv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" + "github.com/operator-framework/operator-controller/internal/catalogmetadata/cache" catalogclient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" "github.com/operator-framework/operator-controller/internal/controllers" "github.com/operator-framework/operator-controller/pkg/features" @@ -56,14 +58,18 @@ func init() { } func main() { - var metricsAddr string - var enableLeaderElection bool - var probeAddr string + var ( + metricsAddr string + enableLeaderElection bool + probeAddr string + cachePath string + ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + flag.StringVar(&cachePath, "cache-path", "/var/cache", "The local directory path used for filesystem based caching") opts := zap.Options{ Development: true, } @@ -100,7 +106,7 @@ func main() { } cl := mgr.GetClient() - catalogClient := catalogclient.New(cl) + catalogClient := catalogclient.New(cl, cache.NewFilesystemCache(cachePath, http.DefaultClient)) if err = (&controllers.OperatorReconciler{ Client: cl, diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 16f58c1fd..23e407afc 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -72,6 +72,9 @@ spec: image: controller:latest imagePullPolicy: IfNotPresent name: manager + volumeMounts: + - name: cache + mountPath: /var/cache securityContext: allowPrivilegeEscalation: false capabilities: @@ -97,3 +100,6 @@ spec: memory: 64Mi serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 + volumes: + - name: cache + emptyDir: {} \ No newline at end of file diff --git a/go.mod b/go.mod index 795fe5e8a..1322419a8 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/onsi/ginkgo/v2 v2.12.1 github.com/onsi/gomega v1.27.10 github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42 - github.com/operator-framework/catalogd v0.6.0 + github.com/operator-framework/catalogd v0.7.0 github.com/operator-framework/deppy v0.0.1 github.com/operator-framework/operator-registry v1.28.0 github.com/operator-framework/rukpak v0.13.0 diff --git a/go.sum b/go.sum index c881f14d9..6276e2e79 100644 --- a/go.sum +++ b/go.sum @@ -702,8 +702,8 @@ github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3 github.com/opencontainers/selinux v1.8.2/go.mod h1:MUIHuUEvKB1wtJjQdOyYRgOnLD2xAPP8dBsCoU0KuF8= github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42 h1:d/Pnr19TnmIq3zQ6ebewC+5jt5zqYbRkvYd37YZENQY= github.com/operator-framework/api v0.17.4-0.20230223191600-0131a6301e42/go.mod h1:l/cuwtPxkVUY7fzYgdust2m9tlmb8I4pOvbsUufRb24= -github.com/operator-framework/catalogd v0.6.0 h1:dSZ54MVSHJ8hcoV7OCRxnk3x4O3ramlyPvvz0vsKYdk= -github.com/operator-framework/catalogd v0.6.0/go.mod h1:I0n086a4a+nP1YZy742IrPaWvOlWu0Mj6qA6j4K96Vg= +github.com/operator-framework/catalogd v0.7.0 h1:L0uesxq+r59rGubtxMoVtIShKn7gSSSLqxpWLfwpAaw= +github.com/operator-framework/catalogd v0.7.0/go.mod h1:tVhaenJVFTHHgdJ0Pju7U4G3epeoZfUWWM1J5nPISPQ= github.com/operator-framework/deppy v0.0.1 h1:PLTtaFGwktPhKuKZkfUruTimrWpyaO3tghbsjs0uMjc= github.com/operator-framework/deppy v0.0.1/go.mod h1:EV6vnxRodSFRn2TFztfxFhMPGh5QufOhn3tpIP1Z8cc= github.com/operator-framework/operator-registry v1.28.0 h1:vtmd2WgJxkx7vuuOxW4k5Le/oo0SfonSeJVMU3rKIfk= diff --git a/internal/catalogmetadata/cache/cache.go b/internal/catalogmetadata/cache/cache.go new file mode 100644 index 000000000..1a792d9f1 --- /dev/null +++ b/internal/catalogmetadata/cache/cache.go @@ -0,0 +1,143 @@ +package cache + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "sync" + + catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" + + "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" +) + +var _ client.Fetcher = &filesystemCache{} + +// NewFilesystemCache returns a client.Fetcher implementation that uses a +// local filesystem to cache Catalog contents. When fetching the Catalog contents +// it will: +// - Check if the Catalog is cached +// - IF !cached it will fetch from the catalogd HTTP server and cache the response +// - IF cached it will verify the cache is up to date. If it is up to date it will return +// the cached contents, if not it will fetch the new contents from the catalogd HTTP +// server and update the cached contents. +func NewFilesystemCache(cachePath string, client *http.Client) client.Fetcher { + return &filesystemCache{ + cachePath: cachePath, + mutex: sync.RWMutex{}, + client: client, + cacheDataByCatalogName: map[string]cacheData{}, + } +} + +// cacheData holds information about a catalog +// other than it's contents that is used for +// making decisions on when to attempt to refresh +// the cache. +type cacheData struct { + ResolvedRef string +} + +// FilesystemCache is a cache that +// uses the local filesystem for caching +// catalog contents. It will fetch catalog +// contents if the catalog does not already +// exist in the cache. +type filesystemCache struct { + mutex sync.RWMutex + cachePath string + client *http.Client + cacheDataByCatalogName map[string]cacheData +} + +// FetchCatalogContents implements the client.Fetcher interface and +// will fetch the contents for the provided Catalog from the filesystem. +// If the provided Catalog has not yet been cached, it will make a GET +// request to the Catalogd HTTP server to get the Catalog contents and cache +// them. The cache will be updated automatically if a Catalog is noticed to +// have a different resolved image reference. +// The Catalog provided to this function is expected to: +// - Be non-nil +// - Have a non-nil Catalog.Status.ResolvedSource.Image +// This ensures that we are only attempting to fetch catalog contents for Catalog +// resources that have been successfully reconciled, unpacked, and are being served. +// These requirements help ensure that we can rely on status conditions to determine +// when to issue a request to update the cached Catalog contents. +func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.Catalog) (io.ReadCloser, error) { + if catalog == nil { + return nil, fmt.Errorf("error: provided catalog must be non-nil") + } + + if catalog.Status.ResolvedSource == nil { + return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name) + } + + if catalog.Status.ResolvedSource.Image == nil { + return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name) + } + + cacheDir := filepath.Join(fsc.cachePath, catalog.Name) + cacheFilePath := filepath.Join(cacheDir, "data.json") + + fsc.mutex.RLock() + if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok { + if catalog.Status.ResolvedSource.Image.Ref == data.ResolvedRef { + fsc.mutex.RUnlock() + return os.Open(cacheFilePath) + } + } + fsc.mutex.RUnlock() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil) + if err != nil { + return nil, fmt.Errorf("error forming request: %s", err) + } + + resp, err := fsc.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error performing request: %s", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + fsc.mutex.Lock() + defer fsc.mutex.Unlock() + + // make sure we only write if this info hasn't been updated + // by another thread. The check here, if multiple threads are + // updating this, has no way to tell if the current ref is the + // newest possible ref. If another thread has already updated + // this to be the same value, skip the write logic and return + // the cached contents + if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok { + if data.ResolvedRef == catalog.Status.ResolvedSource.Image.Ref { + break + } + } + + if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil { + return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err) + } + + file, err := os.Create(cacheFilePath) + if err != nil { + return nil, fmt.Errorf("error creating cache file for Catalog %q: %s", catalog.Name, err) + } + + if _, err := io.Copy(file, resp.Body); err != nil { + return nil, fmt.Errorf("error writing contents to cache for Catalog %q: %s", catalog.Name, err) + } + + fsc.cacheDataByCatalogName[catalog.Name] = cacheData{ + ResolvedRef: catalog.Status.ResolvedSource.Image.Ref, + } + default: + return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode) + } + + return os.Open(cacheFilePath) +} diff --git a/internal/catalogmetadata/cache/cache_test.go b/internal/catalogmetadata/cache/cache_test.go new file mode 100644 index 000000000..0d9d834e8 --- /dev/null +++ b/internal/catalogmetadata/cache/cache_test.go @@ -0,0 +1,269 @@ +package cache_test + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" + + "github.com/operator-framework/operator-controller/internal/catalogmetadata/cache" + "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" +) + +const ( + package1 = `{ + "schema": "olm.package", + "name": "fake1" + }` + + bundle1 = `{ + "schema": "olm.bundle", + "name": "fake1.v1.0.0", + "package": "fake1", + "image": "fake-image", + "properties": [ + { + "type": "olm.package", + "value": {"packageName":"fake1","version":"1.0.0"} + } + ] + }` + + stableChannel = `{ + "schema": "olm.channel", + "name": "stable", + "package": "fake1", + "entries": [ + { + "name": "fake1.v1.0.0" + } + ] + }` +) + +func TestClient(t *testing.T) { + t.Run("Bundles", func(t *testing.T) { + type test struct { + name string + catalog *catalogd.Catalog + contents []byte + wantErr bool + tripper *MockTripper + extraTests func(t *testing.T, c client.Fetcher, ctx context.Context, tst test) + } + for _, tt := range []test{ + { + name: "valid non-cached fetch", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: &catalogd.CatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ImageSource{ + Ref: "fake/catalog@sha256:fakesha", + }, + }, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{}, + extraTests: func(_ *testing.T, _ client.Fetcher, _ context.Context, _ test) {}, + }, + { + name: "valid cached fetch", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: &catalogd.CatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ImageSource{ + Ref: "fake/catalog@sha256:fakesha", + }, + }, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{}, + extraTests: func(t *testing.T, c client.Fetcher, ctx context.Context, tst test) { + rc, err := c.FetchCatalogContents(ctx, tst.catalog) + assert.NoError(t, err) + defer rc.Close() + data, err := io.ReadAll(rc) + assert.NoError(t, err) + assert.Equal(t, tst.contents, data) + }, + }, + { + name: "cached update fetch with changes", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: &catalogd.CatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ImageSource{ + Ref: "fake/catalog@sha256:fakesha", + }, + }, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{}, + extraTests: func(t *testing.T, c client.Fetcher, ctx context.Context, tst test) { + tst.catalog.Status.ResolvedSource.Image.Ref = "fake/catalog@sha256:shafake" + tst.contents = append(tst.contents, []byte(`{"schema": "olm.package", "name": "foobar"}`)...) + tst.tripper.content = tst.contents + rc, err := c.FetchCatalogContents(ctx, tst.catalog) + assert.NoError(t, err) + defer rc.Close() + data, err := io.ReadAll(rc) + assert.NoError(t, err) + assert.Equal(t, tst.contents, data) + }, + }, + { + name: "fetch error", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: &catalogd.CatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ImageSource{ + Ref: "fake/catalog@sha256:fakesha", + }, + }, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{shouldError: true}, + extraTests: func(_ *testing.T, _ client.Fetcher, _ context.Context, _ test) {}, + wantErr: true, + }, + { + name: "fetch internal server error response", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: &catalogd.CatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ImageSource{ + Ref: "fake/catalog@sha256:fakesha", + }, + }, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{serverError: true}, + extraTests: func(_ *testing.T, _ client.Fetcher, _ context.Context, _ test) {}, + wantErr: true, + }, + { + name: "nil catalog", + catalog: nil, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{serverError: true}, + extraTests: func(_ *testing.T, _ client.Fetcher, _ context.Context, _ test) {}, + wantErr: true, + }, + { + name: "nil catalog.status.resolvedSource", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: nil, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{serverError: true}, + extraTests: func(_ *testing.T, _ client.Fetcher, _ context.Context, _ test) {}, + wantErr: true, + }, + { + name: "nil catalog.status.resolvedSource.image", + catalog: &catalogd.Catalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.CatalogStatus{ + ResolvedSource: &catalogd.CatalogSource{ + Image: nil, + }, + }, + }, + contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + tripper: &MockTripper{serverError: true}, + extraTests: func(_ *testing.T, _ client.Fetcher, _ context.Context, _ test) {}, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + cacheDir := t.TempDir() + tt.tripper.content = tt.contents + httpClient := http.DefaultClient + httpClient.Transport = tt.tripper + c := cache.NewFilesystemCache(cacheDir, httpClient) + + rc, err := c.FetchCatalogContents(ctx, tt.catalog) + if !tt.wantErr { + assert.NoError(t, err) + assert.FileExists(t, filepath.Join(cacheDir, tt.catalog.Name, "data.json")) + data, err := io.ReadAll(rc) + assert.NoError(t, err) + assert.Equal(t, tt.contents, data) + defer rc.Close() + } else { + assert.Error(t, err) + } + + tt.extraTests(t, c, ctx, tt) + }) + } + }) +} + +var _ http.RoundTripper = &MockTripper{} + +type MockTripper struct { + content []byte + shouldError bool + serverError bool +} + +func (mt *MockTripper) RoundTrip(_ *http.Request) (*http.Response, error) { + if mt.shouldError { + return nil, errors.New("mock tripper error") + } + + if mt.serverError { + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: http.NoBody, + }, nil + } + + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(mt.content)), + }, nil +} diff --git a/internal/catalogmetadata/client/client.go b/internal/catalogmetadata/client/client.go index bb48955f8..80d879824 100644 --- a/internal/catalogmetadata/client/client.go +++ b/internal/catalogmetadata/client/client.go @@ -2,8 +2,12 @@ package client import ( "context" + "encoding/json" "fmt" + "io" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" @@ -12,8 +16,21 @@ import ( "github.com/operator-framework/operator-controller/internal/catalogmetadata" ) -func New(cl client.Client) *Client { - return &Client{cl: cl} +// Fetcher is an interface to facilitate fetching +// catalog contents from catalogd. +type Fetcher interface { + // FetchCatalogContents fetches contents from the catalogd HTTP + // server for the catalog provided. It returns an io.ReadCloser + // containing the FBC contents that the caller is expected to close. + // returns an error if any occur. + FetchCatalogContents(ctx context.Context, catalog *catalogd.Catalog) (io.ReadCloser, error) +} + +func New(cl client.Client, fetcher Fetcher) *Client { + return &Client{ + cl: cl, + fetcher: fetcher, + } } // Client is reading catalog metadata @@ -21,6 +38,9 @@ type Client struct { // Note that eventually we will be reading from catalogd http API // instead of kube API server. We will need to swap this implementation. cl client.Client + + // fetcher is the Fetcher to use for fetching catalog contents + fetcher Fetcher } func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) { @@ -31,14 +51,42 @@ func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) return nil, err } for _, catalog := range catalogList.Items { - channels, err := fetchCatalogMetadata[catalogmetadata.Channel](ctx, c.cl, catalog.Name, declcfg.SchemaChannel) + // if the catalog has not been successfully unpacked, skip it + if !meta.IsStatusConditionPresentAndEqual(catalog.Status.Conditions, catalogd.TypeUnpacked, metav1.ConditionTrue) { + continue + } + channels := []*catalogmetadata.Channel{} + bundles := []*catalogmetadata.Bundle{} + + rc, err := c.fetcher.FetchCatalogContents(ctx, catalog.DeepCopy()) if err != nil { - return nil, err + return nil, fmt.Errorf("error fetching catalog contents: %s", err) } + defer rc.Close() + + err = declcfg.WalkMetasReader(rc, func(meta *declcfg.Meta, err error) error { + if err != nil { + return fmt.Errorf("error was provided to the WalkMetasReaderFunc: %s", err) + } + switch meta.Schema { + case declcfg.SchemaChannel: + var content catalogmetadata.Channel + if err := json.Unmarshal(meta.Blob, &content); err != nil { + return fmt.Errorf("error unmarshalling channel from catalog metadata: %s", err) + } + channels = append(channels, &content) + case declcfg.SchemaBundle: + var content catalogmetadata.Bundle + if err := json.Unmarshal(meta.Blob, &content); err != nil { + return fmt.Errorf("error unmarshalling bundle from catalog metadata: %s", err) + } + bundles = append(bundles, &content) + } - bundles, err := fetchCatalogMetadata[catalogmetadata.Bundle](ctx, c.cl, catalog.Name, declcfg.SchemaBundle) + return nil + }) if err != nil { - return nil, err + return nil, fmt.Errorf("error processing response: %s", err) } bundles, err = PopulateExtraFields(catalog.Name, channels, bundles) @@ -52,21 +100,6 @@ func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) return allBundles, nil } -func fetchCatalogMetadata[T catalogmetadata.Schemas](ctx context.Context, cl client.Client, catalogName, schema string) ([]*T, error) { - var cmList catalogd.CatalogMetadataList - err := cl.List(ctx, &cmList, client.MatchingLabels{"catalog": catalogName, "schema": schema}) - if err != nil { - return nil, err - } - - content, err := catalogmetadata.Unmarshal[T](cmList.Items) - if err != nil { - return nil, fmt.Errorf("error unmarshalling catalog metadata: %s", err) - } - - return content, nil -} - func PopulateExtraFields(catalogName string, channels []*catalogmetadata.Channel, bundles []*catalogmetadata.Bundle) ([]*catalogmetadata.Bundle, error) { bundlesMap := map[string]*catalogmetadata.Bundle{} for i := range bundles { diff --git a/internal/catalogmetadata/client/client_test.go b/internal/catalogmetadata/client/client_test.go index 8ec64ca94..66aa92d4a 100644 --- a/internal/catalogmetadata/client/client_test.go +++ b/internal/catalogmetadata/client/client_test.go @@ -1,8 +1,12 @@ package client_test import ( + "bytes" "context" "encoding/json" + "errors" + "io" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -33,25 +37,27 @@ func TestClient(t *testing.T) { t.Run("Bundles", func(t *testing.T) { for _, tt := range []struct { name string - fakeCatalog func() ([]client.Object, []*catalogmetadata.Bundle) - wantErr string + fakeCatalog func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) + wantErr bool + cache *MockFetcher }{ { name: "valid catalog", fakeCatalog: defaultFakeCatalog, + cache: &MockFetcher{}, + }, + { + name: "cache error", + fakeCatalog: defaultFakeCatalog, + cache: &MockFetcher{shouldError: true}, + wantErr: true, }, { name: "channel has a ref to a missing bundle", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle) { - objs, _ := defaultFakeCatalog() + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { + objs, _, catalogContentMap := defaultFakeCatalog() - objs = append(objs, &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-fake1-channel-with-missing-bundle", - Labels: map[string]string{"schema": declcfg.SchemaChannel, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(`{ + catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], []byte(`{ "schema": "olm.channel", "name": "channel-with-missing-bundle", "package": "fake1", @@ -60,76 +66,92 @@ func TestClient(t *testing.T) { "name": "fake1.v9.9.9" } ] - }`), - }, - }) + }`)...) + + return objs, nil, catalogContentMap + }, + wantErr: true, + cache: &MockFetcher{}, + }, + { + name: "invalid meta", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { + objs, _, catalogContentMap := defaultFakeCatalog() + + catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], []byte(`{"schema": "olm.bundle", "name":123123123}`)...) - return objs, nil + return objs, nil, catalogContentMap }, - wantErr: `bundle "fake1.v9.9.9" not found in catalog "catalog-1" (package "fake1", channel "channel-with-missing-bundle")`, + wantErr: true, + cache: &MockFetcher{}, }, { name: "invalid bundle", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle) { - objs, _ := defaultFakeCatalog() + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { + objs, _, catalogContentMap := defaultFakeCatalog() - objs = append(objs, &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-broken-bundle", - Labels: map[string]string{"schema": declcfg.SchemaBundle, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(`{"name":123123123}`), - }, - }) + catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], + []byte(`{"schema": "olm.bundle", "name":"foo", "package":"bar", "image":123123123}`)...) - return objs, nil + return objs, nil, catalogContentMap }, - wantErr: "error unmarshalling catalog metadata: json: cannot unmarshal number into Go struct field Bundle.name of type string", + wantErr: true, + cache: &MockFetcher{}, }, { name: "invalid channel", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle) { - objs, _ := defaultFakeCatalog() + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { + objs, _, catalogContentMap := defaultFakeCatalog() - objs = append(objs, &catalogd.CatalogMetadata{ + catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], + []byte(`{"schema": "olm.channel", "name":"foo", "package":"bar", "entries":[{"name":123123123}]}`)...) + + return objs, nil, catalogContentMap + }, + wantErr: true, + cache: &MockFetcher{}, + }, + { + name: "skip catalog missing Unpacked status condition", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { + objs, bundles, catalogContentMap := defaultFakeCatalog() + objs = append(objs, &catalogd.Catalog{ ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-fake1-broken-channel", - Labels: map[string]string{"schema": declcfg.SchemaChannel, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(`{"name":123123123}`), + Name: "foobar", }, }) + catalogContentMap["foobar"] = catalogContentMap["catalog-1"] - return objs, nil + return objs, bundles, catalogContentMap }, - wantErr: "error unmarshalling catalog metadata: json: cannot unmarshal number into Go struct field Channel.name of type string", + cache: &MockFetcher{}, }, } { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - objs, expectedBundles := tt.fakeCatalog() + objs, expectedBundles, catalogContentMap := tt.fakeCatalog() + tt.cache.contentMap = catalogContentMap fakeCatalogClient := catalogClient.New( fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(), + tt.cache, ) bundles, err := fakeCatalogClient.Bundles(ctx) - if tt.wantErr == "" { + if !tt.wantErr { assert.NoError(t, err) + assert.Equal(t, expectedBundles, bundles) } else { - assert.EqualError(t, err, tt.wantErr) + assert.Error(t, err) } - assert.Equal(t, expectedBundles, bundles) }) } }) } -func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle) { +func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { package1 := `{ - "schema": "olm.bundle", + "schema": "olm.package", "name": "fake1" }` @@ -173,73 +195,28 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle) { ObjectMeta: metav1.ObjectMeta{ Name: "catalog-1", }, + Status: catalogd.CatalogStatus{ + Conditions: []metav1.Condition{ + { + Type: catalogd.TypeUnpacked, + Status: metav1.ConditionTrue, + Reason: catalogd.ReasonUnpackSuccessful, + }, + }, + }, }, &catalogd.Catalog{ ObjectMeta: metav1.ObjectMeta{ Name: "catalog-2", }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-fake1", - Labels: map[string]string{"schema": declcfg.SchemaPackage, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(package1), - }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-fake1-channel-stable", - Labels: map[string]string{"schema": declcfg.SchemaChannel, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(stableChannel), - }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-fake1-channel-beta", - Labels: map[string]string{"schema": declcfg.SchemaChannel, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(betaChannel), - }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-1-fake1-bundle-1", - Labels: map[string]string{"schema": declcfg.SchemaBundle, "catalog": "catalog-1"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(bundle1), - }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-2-fake1", - Labels: map[string]string{"schema": declcfg.SchemaPackage, "catalog": "catalog-2"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(package1), - }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-2-fake1-channel-stable", - Labels: map[string]string{"schema": declcfg.SchemaChannel, "catalog": "catalog-2"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(stableChannel), - }, - }, - &catalogd.CatalogMetadata{ - ObjectMeta: metav1.ObjectMeta{ - Name: "catalog-2-fake1-bundle-1", - Labels: map[string]string{"schema": declcfg.SchemaBundle, "catalog": "catalog-2"}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(bundle1), + Status: catalogd.CatalogStatus{ + Conditions: []metav1.Condition{ + { + Type: catalogd.TypeUnpacked, + Status: metav1.ConditionTrue, + Reason: catalogd.ReasonUnpackSuccessful, + }, + }, }, }, } @@ -263,7 +240,7 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle) { { Channel: declcfg.Channel{ Schema: declcfg.SchemaChannel, - Name: "beta", + Name: "stable", Package: "fake1", Entries: []declcfg.ChannelEntry{ { @@ -275,7 +252,7 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle) { { Channel: declcfg.Channel{ Schema: declcfg.SchemaChannel, - Name: "stable", + Name: "beta", Package: "fake1", Entries: []declcfg.ChannelEntry{ { @@ -317,5 +294,26 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle) { }, } - return objs, expectedBundles + catalogContents := map[string][]byte{ + "catalog-1": []byte(strings.Join([]string{package1, bundle1, stableChannel, betaChannel}, "\n")), + "catalog-2": []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + } + + return objs, expectedBundles, catalogContents +} + +var _ catalogClient.Fetcher = &MockFetcher{} + +type MockFetcher struct { + contentMap map[string][]byte + shouldError bool +} + +func (mc *MockFetcher) FetchCatalogContents(_ context.Context, catalog *catalogd.Catalog) (io.ReadCloser, error) { + if mc.shouldError { + return nil, errors.New("mock cache error") + } + + data := mc.contentMap[catalog.Name] + return io.NopCloser(bytes.NewReader(data)), nil } diff --git a/internal/catalogmetadata/unmarshal.go b/internal/catalogmetadata/unmarshal.go deleted file mode 100644 index 4bb85b20b..000000000 --- a/internal/catalogmetadata/unmarshal.go +++ /dev/null @@ -1,20 +0,0 @@ -package catalogmetadata - -import ( - "encoding/json" - - catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" -) - -func Unmarshal[T Schemas](cm []catalogd.CatalogMetadata) ([]*T, error) { - contents := make([]*T, 0, len(cm)) - for _, cm := range cm { - var content T - if err := json.Unmarshal(cm.Spec.Content, &content); err != nil { - return nil, err - } - contents = append(contents, &content) - } - - return contents, nil -} diff --git a/internal/catalogmetadata/unmarshal_test.go b/internal/catalogmetadata/unmarshal_test.go deleted file mode 100644 index 8ca44acab..000000000 --- a/internal/catalogmetadata/unmarshal_test.go +++ /dev/null @@ -1,110 +0,0 @@ -package catalogmetadata_test - -import ( - "encoding/json" - "testing" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - - catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-registry/alpha/property" - "github.com/stretchr/testify/assert" - - "github.com/operator-framework/operator-controller/internal/catalogmetadata" -) - -var ( - scheme *runtime.Scheme -) - -func init() { - scheme = runtime.NewScheme() - utilruntime.Must(catalogd.AddToScheme(scheme)) -} - -func TestFetchByScheme(t *testing.T) { - fakeCatalogName := "fake-catalog" - - validBundle := `{ - "schema": "olm.bundle", - "name": "fake1.v1.0.0", - "package": "fake1", - "image": "fake-image", - "properties": [ - { - "type": "olm.package", - "value": {"packageName":"fake1","version":"1.0.0"} - } - ] - }` - - for _, tt := range []struct { - name string - objs []catalogd.CatalogMetadata - wantData []*catalogmetadata.Bundle - wantErr string - }{ - { - name: "valid objects", - objs: []catalogd.CatalogMetadata{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "obj-1", - Labels: map[string]string{"schema": declcfg.SchemaBundle, "catalog": fakeCatalogName}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(validBundle), - }, - }, - }, - wantData: []*catalogmetadata.Bundle{ - { - Bundle: declcfg.Bundle{ - Schema: declcfg.SchemaBundle, - Name: "fake1.v1.0.0", - Package: "fake1", - Image: "fake-image", - Properties: []property.Property{ - { - Type: property.TypePackage, - Value: json.RawMessage(`{"packageName":"fake1","version":"1.0.0"}`), - }, - }, - }, - }, - }, - }, - { - name: "invalid objects", - objs: []catalogd.CatalogMetadata{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "obj-1", - Labels: map[string]string{"schema": declcfg.SchemaBundle, "catalog": fakeCatalogName}, - }, - Spec: catalogd.CatalogMetadataSpec{ - Content: json.RawMessage(`{"name":123123123}`), - }, - }, - }, - wantErr: "json: cannot unmarshal number into Go struct field Bundle.name of type string", - }, - { - name: "not found", - wantData: []*catalogmetadata.Bundle{}, - }, - } { - t.Run(tt.name, func(t *testing.T) { - data, err := catalogmetadata.Unmarshal[catalogmetadata.Bundle](tt.objs) - assert.Equal(t, tt.wantData, data) - if tt.wantErr != "" { - assert.EqualError(t, err, tt.wantErr) - } else { - assert.NoError(t, err) - } - }) - } -} diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index eed98f3bf..16d9d2804 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -101,16 +101,6 @@ var _ = AfterSuite(func() { err := c.Get(ctx, types.NamespacedName{Name: operatorCatalog.Name}, &catalogd.Catalog{}) g.Expect(errors.IsNotFound(err)).To(BeTrue()) }).Should(Succeed()) - - // speed up delete without waiting for gc - Expect(c.DeleteAllOf(ctx, &catalogd.CatalogMetadata{})).To(Succeed()) - - Eventually(func(g Gomega) { - // ensure resource cleanup - cmd := &catalogd.CatalogMetadataList{} - g.Expect(c.List(ctx, cmd)).To(Succeed()) - g.Expect(cmd.Items).To(BeEmpty()) - }).Should(Succeed()) }) // createTestCatalog will create a new catalog on the test cluster, provided diff --git a/test/e2e/install_test.go b/test/e2e/install_test.go index 1905cfb7a..896b43355 100644 --- a/test/e2e/install_test.go +++ b/test/e2e/install_test.go @@ -12,7 +12,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" - "github.com/operator-framework/operator-registry/alpha/declcfg" rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" @@ -141,12 +140,6 @@ var _ = Describe("Operator Install", func() { // Delete the catalog first Expect(c.Delete(ctx, operatorCatalog)).To(Succeed()) - Eventually(func(g Gomega) { - // target package should not be present on cluster - err := c.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%s-%s-%s", operatorCatalog.Name, declcfg.SchemaPackage, pkgName)}, &catalogd.CatalogMetadata{}) - g.Expect(errors.IsNotFound(err)).To(BeTrue()) - }).Should(Succeed()) - By("creating the Operator resource") Expect(c.Create(ctx, operator)).To(Succeed()) diff --git a/test/operator-framework-e2e/operator_framework_test.go b/test/operator-framework-e2e/operator_framework_test.go index 480685a89..51852f2ac 100644 --- a/test/operator-framework-e2e/operator_framework_test.go +++ b/test/operator-framework-e2e/operator_framework_test.go @@ -2,7 +2,6 @@ package operatore2e import ( "context" - "encoding/json" "fmt" "io" "os" @@ -26,8 +25,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-registry/alpha/property" rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1" operatorv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" @@ -203,11 +200,7 @@ var _ = Describe("Operator Framework E2E for plain+v0 bundles", func() { Expect(err).ToNot(HaveOccurred()) By("creating a Catalog CR and verifying the creation of respective packages and bundle metadata") - bundleVersions := make([]string, len(bundleInfo.bundles)) - for i, bundle := range bundleInfo.bundles { - bundleVersions[i] = bundle.bundleVersion - } - operatorCatalog, err = createCatalogCheckResources(operatorCatalog, catalogDInfo, bundleVersions) + operatorCatalog, err = createCatalogCheckResources(operatorCatalog, catalogDInfo) Expect(err).ToNot(HaveOccurred()) By("creating an operator CR and verifying the operator operations") @@ -339,11 +332,7 @@ var _ = Describe("Operator Framework E2E for registry+v1 bundles", func() { Expect(err).ToNot(HaveOccurred()) By("creating a Catalog CR and verifying the creation of respective packages and bundle metadata") - bundleVersions := make([]string, len(bundleInfo.bundles)) - for i, bundle := range bundleInfo.bundles { - bundleVersions[i] = bundle.bundleVersion - } - operatorCatalog, err = createCatalogCheckResources(operatorCatalog, catalogDInfo, bundleVersions) + operatorCatalog, err = createCatalogCheckResources(operatorCatalog, catalogDInfo) Expect(err).ToNot(HaveOccurred()) By("creating an operator CR and verifying the operator operations") @@ -813,7 +802,7 @@ func validateCatalogUnpacking(operatorCatalog *catalogd.Catalog) error { } // Creates catalog CR and checks if catalog unpackging is successful and if the packages and bundle metadatas are formed -func createCatalogCheckResources(operatorCatalog *catalogd.Catalog, catalogDInfo *CatalogDInfo, bundleVersions []string) (*catalogd.Catalog, error) { +func createCatalogCheckResources(operatorCatalog *catalogd.Catalog, catalogDInfo *CatalogDInfo) (*catalogd.Catalog, error) { operatorCatalog, err := createTestCatalog(ctx, catalogDInfo.catalogDir, catalogDInfo.imageRef) if err != nil { return nil, fmt.Errorf("Error creating catalog %v : %v", catalogDInfo.catalogDir, err) @@ -825,11 +814,6 @@ func createCatalogCheckResources(operatorCatalog *catalogd.Catalog, catalogDInfo g.Expect(err).ToNot(HaveOccurred()) }, 2*time.Minute, 1).Should(Succeed()) - // checking if the catalog metadatas are created - By("Eventually checking if catalog metadata is created") - Eventually(func(g Gomega) { - validateCatalogMetadataCreation(g, operatorCatalog, catalogDInfo.operatorName, bundleVersions) - }).Should(Succeed()) return operatorCatalog, nil } @@ -860,40 +844,6 @@ func checkOperatorOperationsSuccess(operator *operatorv1alpha1.Operator, pkgName }).Should(Succeed()) } -// Checks if the CatalogMetadata was created from the catalog and returns error if not. -// The expected pkgNames and their versions are taken as input. This is then compared against the collected bundle versions. -// The collected bundle versions are formed by reading the version from "olm.package" property type whose catalog name -// matches the catalog name and pkgName matches the pkgName under consideration. -func validateCatalogMetadataCreation(g Gomega, operatorCatalog *catalogd.Catalog, pkgName string, versions []string) { - cmList := catalogd.CatalogMetadataList{} - err := c.List(ctx, &cmList, client.MatchingLabels{ - "schema": declcfg.SchemaBundle, - "catalog": operatorCatalog.Name, - "package": pkgName, - }) - g.Expect(err).ToNot(HaveOccurred()) - - collectedBundleVersions := []string{} - for _, cm := range cmList.Items { - bundle := declcfg.Bundle{} - - err := json.Unmarshal(cm.Spec.Content, &bundle) - g.Expect(err).ToNot(HaveOccurred()) - - for _, prop := range bundle.Properties { - if prop.Type == "olm.package" { - var pkgValue property.Package - err := json.Unmarshal(prop.Value, &pkgValue) - g.Expect(err).ToNot(HaveOccurred()) - - collectedBundleVersions = append(collectedBundleVersions, pkgValue.Version) - } - } - } - - g.Expect(collectedBundleVersions).To(ConsistOf(versions)) -} - // Checks for a successful resolution and bundle path for the operator and returns error if not. func validateResolutionAndBundlePath(operator *operatorv1alpha1.Operator) error { if err := c.Get(ctx, types.NamespacedName{Name: operator.Name}, operator); err != nil {