From 5b5181b46f0ef7e7d8c4ed4c64db3fdd4f5db373 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Tue, 14 May 2024 16:26:36 -0400 Subject: [PATCH] pkg/cache: add preferred pogreb database cache impl (#1278) * declcfg: concurrently load and process files in WalkMetasFS Signed-off-by: Joe Lanford * pkg/cache: add preferred pogreb database cache impl * refactoring to avoid code duplication Signed-off-by: Joe Lanford * pkg/cache: memory efficient cache building Signed-off-by: Joe Lanford * cmd/opm/serve: improve logging related to caches Signed-off-by: Joe Lanford --------- Signed-off-by: Joe Lanford --- alpha/action/render.go | 11 +- alpha/declcfg/load.go | 263 +++++++++---------- alpha/declcfg/load_test.go | 6 +- cmd/opm/internal/util/util.go | 14 +- cmd/opm/render/cmd.go | 6 - cmd/opm/serve/serve.go | 25 +- go.mod | 4 +- go.sum | 4 + pkg/cache/bundle_key.go | 47 ++++ pkg/cache/cache.go | 407 +++++++++++++++++++++++++++--- pkg/cache/cache_test.go | 317 ++++++++++++----------- pkg/cache/json.go | 340 +++++++++---------------- pkg/cache/json_test.go | 10 +- pkg/cache/pkgs.go | 18 +- pkg/cache/pogrebv1.go | 227 +++++++++++++++++ pkg/cache/pogrebv1_test.go | 109 ++++++++ pkg/lib/log/null.go | 13 + pkg/lib/registry/registry_test.go | 2 +- pkg/server/server_test.go | 76 +++--- 19 files changed, 1266 insertions(+), 633 deletions(-) create mode 100644 pkg/cache/bundle_key.go create mode 100644 pkg/cache/pogrebv1.go create mode 100644 pkg/cache/pogrebv1_test.go create mode 100644 pkg/lib/log/null.go diff --git a/alpha/action/render.go b/alpha/action/render.go index 5dbb980cf..be63aab53 100644 --- a/alpha/action/render.go +++ b/alpha/action/render.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" "path/filepath" "sort" @@ -17,7 +16,6 @@ import ( "github.com/h2non/filetype" "github.com/h2non/filetype/matchers" "github.com/operator-framework/api/pkg/operators/v1alpha1" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/sets" "github.com/operator-framework/operator-registry/alpha/declcfg" @@ -26,6 +24,7 @@ import ( "github.com/operator-framework/operator-registry/pkg/image" "github.com/operator-framework/operator-registry/pkg/image/containerdregistry" "github.com/operator-framework/operator-registry/pkg/lib/bundle" + "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/registry" "github.com/operator-framework/operator-registry/pkg/sqlite" ) @@ -61,12 +60,6 @@ type Render struct { skipSqliteDeprecationLog bool } -func nullLogger() *logrus.Entry { - logger := logrus.New() - logger.SetOutput(io.Discard) - return logrus.NewEntry(logger) -} - func (r Render) Run(ctx context.Context) (*declcfg.DeclarativeConfig, error) { if r.skipSqliteDeprecationLog { // exhaust once with a no-op function. @@ -119,7 +112,7 @@ func (r Render) createRegistry() (*containerdregistry.Registry, error) { // The containerd registry impl is somewhat verbose, even on the happy path, // so discard all logger logs. Any important failures will be returned from // registry methods and eventually logged as fatal errors. - containerdregistry.WithLog(nullLogger()), + containerdregistry.WithLog(log.Null()), ) if err != nil { return nil, err diff --git a/alpha/declcfg/load.go b/alpha/declcfg/load.go index a493b2204..8717299a6 100644 --- a/alpha/declcfg/load.go +++ b/alpha/declcfg/load.go @@ -11,11 +11,12 @@ import ( "sync" "github.com/joelanford/ignore" - "github.com/operator-framework/api/pkg/operators" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/yaml" + "github.com/operator-framework/api/pkg/operators" + "github.com/operator-framework/operator-registry/alpha/property" ) @@ -25,22 +26,42 @@ const ( type WalkMetasFSFunc func(path string, meta *Meta, err error) error -func WalkMetasFS(root fs.FS, walkFn WalkMetasFSFunc) error { - return walkFiles(root, func(root fs.FS, path string, err error) error { - if err != nil { - return walkFn(path, nil, err) - } +// WalkMetasFS walks the filesystem rooted at root and calls walkFn for each individual meta object found in the root. +// By default, WalkMetasFS is not thread-safe because it invokes walkFn concurrently. In order to make it thread-safe, +// use the WithConcurrency(1) to avoid concurrent invocations of walkFn. +func WalkMetasFS(ctx context.Context, root fs.FS, walkFn WalkMetasFSFunc, opts ...LoadOption) error { + if root == nil { + return fmt.Errorf("no declarative config filesystem provided") + } - f, err := root.Open(path) - if err != nil { - return walkFn(path, nil, err) - } - defer f.Close() + options := LoadOptions{ + concurrency: runtime.NumCPU(), + } + for _, opt := range opts { + opt(&options) + } - return WalkMetasReader(f, func(meta *Meta, err error) error { - return walkFn(path, meta, err) - }) + pathChan := make(chan string, options.concurrency) + + // Create an errgroup to manage goroutines. The context is closed when any + // goroutine returns an error. Goroutines should check the context + // to see if they should return early (in the case of another goroutine + // returning an error). + eg, ctx := errgroup.WithContext(ctx) + + // Walk the FS and send paths to a channel for parsing. + eg.Go(func() error { + return sendPaths(ctx, root, pathChan) }) + + // Parse paths concurrently. The waitgroup ensures that all paths are parsed + // before the cfgChan is closed. + for i := 0; i < options.concurrency; i++ { + eg.Go(func() error { + return parseMetaPaths(ctx, root, pathChan, walkFn, options) + }) + } + return eg.Wait() } type WalkMetasReaderFunc func(meta *Meta, err error) error @@ -126,59 +147,16 @@ func WithConcurrency(concurrency int) LoadOption { // If LoadFS encounters an error loading or parsing any file, the error will be // immediately returned. func LoadFS(ctx context.Context, root fs.FS, opts ...LoadOption) (*DeclarativeConfig, error) { - if root == nil { - return nil, fmt.Errorf("no declarative config filesystem provided") - } - - options := LoadOptions{ - concurrency: runtime.NumCPU(), - } - for _, opt := range opts { - opt(&options) - } - - var ( - fcfg = &DeclarativeConfig{} - pathChan = make(chan string, options.concurrency) - cfgChan = make(chan *DeclarativeConfig, options.concurrency) - ) - - // Create an errgroup to manage goroutines. The context is closed when any - // goroutine returns an error. Goroutines should check the context - // to see if they should return early (in the case of another goroutine - // returning an error). - eg, ctx := errgroup.WithContext(ctx) - - // Walk the FS and send paths to a channel for parsing. - eg.Go(func() error { - return sendPaths(ctx, root, pathChan) - }) - - // Parse paths concurrently. The waitgroup ensures that all paths are parsed - // before the cfgChan is closed. - var wg sync.WaitGroup - for i := 0; i < options.concurrency; i++ { - wg.Add(1) - eg.Go(func() error { - defer wg.Done() - return parsePaths(ctx, root, pathChan, cfgChan) - }) - } - - // Merge parsed configs into a single config. - eg.Go(func() error { - return mergeCfgs(ctx, cfgChan, fcfg) - }) - - // Wait for all path parsing goroutines to finish before closing cfgChan. - wg.Wait() - close(cfgChan) - - // Wait for all goroutines to finish. - if err := eg.Wait(); err != nil { + builder := fbcBuilder{} + if err := WalkMetasFS(ctx, root, func(path string, meta *Meta, err error) error { + if err != nil { + return err + } + return builder.addMeta(meta) + }, opts...); err != nil { return nil, err } - return fcfg, nil + return &builder.cfg, nil } func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error { @@ -196,7 +174,7 @@ func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error { }) } -func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan chan<- *DeclarativeConfig) error { +func parseMetaPaths(ctx context.Context, root fs.FS, pathChan <-chan string, walkFn WalkMetasFSFunc, options LoadOptions) error { for { select { case <-ctx.Done(): // don't block on receiving from pathChan @@ -205,51 +183,35 @@ func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan if !ok { return nil } - cfg, err := LoadFile(root, path) + file, err := root.Open(path) if err != nil { return err } - select { - case cfgChan <- cfg: - case <-ctx.Done(): // don't block on sending to cfgChan - return ctx.Err() + if err := WalkMetasReader(file, func(meta *Meta, err error) error { + return walkFn(path, meta, err) + }); err != nil { + return err } } } } -func mergeCfgs(ctx context.Context, cfgChan <-chan *DeclarativeConfig, fcfg *DeclarativeConfig) error { - for { - select { - case <-ctx.Done(): // don't block on receiving from cfgChan - return ctx.Err() - case cfg, ok := <-cfgChan: - if !ok { - return nil - } - fcfg.Merge(cfg) +func readBundleObjects(b *Bundle) error { + var obj property.BundleObject + for i, props := range b.Properties { + if props.Type != property.TypeBundleObject { + continue } - } -} - -func readBundleObjects(bundles []Bundle) error { - for bi, b := range bundles { - var obj property.BundleObject - for i, props := range b.Properties { - if props.Type != property.TypeBundleObject { - continue - } - if err := json.Unmarshal(props.Value, &obj); err != nil { - return fmt.Errorf("package %q, bundle %q: parse property at index %d as bundle object: %v", b.Package, b.Name, i, err) - } - objJson, err := yaml.ToJSON(obj.Data) - if err != nil { - return fmt.Errorf("package %q, bundle %q: convert bundle object property at index %d to JSON: %v", b.Package, b.Name, i, err) - } - bundles[bi].Objects = append(bundles[bi].Objects, string(objJson)) + if err := json.Unmarshal(props.Value, &obj); err != nil { + return fmt.Errorf("package %q, bundle %q: parse property at index %d as bundle object: %v", b.Package, b.Name, i, err) } - bundles[bi].CsvJSON = extractCSV(bundles[bi].Objects) + objJson, err := yaml.ToJSON(obj.Data) + if err != nil { + return fmt.Errorf("package %q, bundle %q: convert bundle object property at index %d to JSON: %v", b.Package, b.Name, i, err) + } + b.Objects = append(b.Objects, string(objJson)) } + b.CsvJSON = extractCSV(b.Objects) return nil } @@ -268,52 +230,16 @@ func extractCSV(objs []string) string { // LoadReader reads yaml or json from the passed in io.Reader and unmarshals it into a DeclarativeConfig struct. func LoadReader(r io.Reader) (*DeclarativeConfig, error) { - cfg := &DeclarativeConfig{} - - if err := WalkMetasReader(r, func(in *Meta, err error) error { + builder := fbcBuilder{} + if err := WalkMetasReader(r, func(meta *Meta, err error) error { if err != nil { return err } - switch in.Schema { - case SchemaPackage: - var p Package - if err := json.Unmarshal(in.Blob, &p); err != nil { - return fmt.Errorf("parse package: %v", err) - } - cfg.Packages = append(cfg.Packages, p) - case SchemaChannel: - var c Channel - if err := json.Unmarshal(in.Blob, &c); err != nil { - return fmt.Errorf("parse channel: %v", err) - } - cfg.Channels = append(cfg.Channels, c) - case SchemaBundle: - var b Bundle - if err := json.Unmarshal(in.Blob, &b); err != nil { - return fmt.Errorf("parse bundle: %v", err) - } - cfg.Bundles = append(cfg.Bundles, b) - case SchemaDeprecation: - var d Deprecation - if err := json.Unmarshal(in.Blob, &d); err != nil { - return fmt.Errorf("parse deprecation: %w", err) - } - cfg.Deprecations = append(cfg.Deprecations, d) - case "": - return fmt.Errorf("object '%s' is missing root schema field", string(in.Blob)) - default: - cfg.Others = append(cfg.Others, *in) - } - return nil + return builder.addMeta(meta) }); err != nil { return nil, err } - - if err := readBundleObjects(cfg.Bundles); err != nil { - return nil, fmt.Errorf("read bundle objects: %v", err) - } - - return cfg, nil + return &builder.cfg, nil } // LoadFile will unmarshall declarative config components from a single filename provided in 'path' @@ -332,3 +258,60 @@ func LoadFile(root fs.FS, path string) (*DeclarativeConfig, error) { return cfg, nil } + +type fbcBuilder struct { + cfg DeclarativeConfig + + packagesMu sync.Mutex + channelsMu sync.Mutex + bundlesMu sync.Mutex + deprecationsMu sync.Mutex + othersMu sync.Mutex +} + +func (c *fbcBuilder) addMeta(in *Meta) error { + switch in.Schema { + case SchemaPackage: + var p Package + if err := json.Unmarshal(in.Blob, &p); err != nil { + return fmt.Errorf("parse package: %v", err) + } + c.packagesMu.Lock() + c.cfg.Packages = append(c.cfg.Packages, p) + c.packagesMu.Unlock() + case SchemaChannel: + var ch Channel + if err := json.Unmarshal(in.Blob, &ch); err != nil { + return fmt.Errorf("parse channel: %v", err) + } + c.channelsMu.Lock() + c.cfg.Channels = append(c.cfg.Channels, ch) + c.channelsMu.Unlock() + case SchemaBundle: + var b Bundle + if err := json.Unmarshal(in.Blob, &b); err != nil { + return fmt.Errorf("parse bundle: %v", err) + } + if err := readBundleObjects(&b); err != nil { + return fmt.Errorf("read bundle objects: %v", err) + } + c.bundlesMu.Lock() + c.cfg.Bundles = append(c.cfg.Bundles, b) + c.bundlesMu.Unlock() + case SchemaDeprecation: + var d Deprecation + if err := json.Unmarshal(in.Blob, &d); err != nil { + return fmt.Errorf("parse deprecation: %w", err) + } + c.deprecationsMu.Lock() + c.cfg.Deprecations = append(c.cfg.Deprecations, d) + c.deprecationsMu.Unlock() + case "": + return fmt.Errorf("object '%s' is missing root schema field", string(in.Blob)) + default: + c.othersMu.Lock() + c.cfg.Others = append(c.cfg.Others, *in) + c.othersMu.Unlock() + } + return nil +} diff --git a/alpha/declcfg/load_test.go b/alpha/declcfg/load_test.go index ae9ad5f7a..9f8f90fda 100644 --- a/alpha/declcfg/load_test.go +++ b/alpha/declcfg/load_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io/fs" "os" + "sync" "testing" "testing/fstest" @@ -136,11 +137,14 @@ func TestWalkMetasFS(t *testing.T) { for _, s := range specs { t.Run(s.name, func(t *testing.T) { + var mu sync.Mutex numPackages, numChannels, numBundles, numDeprecations, numOthers := 0, 0, 0, 0, 0 - err := WalkMetasFS(s.fsys, func(path string, meta *Meta, err error) error { + err := WalkMetasFS(context.Background(), s.fsys, func(path string, meta *Meta, err error) error { if err != nil { return err } + mu.Lock() + defer mu.Unlock() switch meta.Schema { case SchemaPackage: numPackages++ diff --git a/cmd/opm/internal/util/util.go b/cmd/opm/internal/util/util.go index 5b0f06f27..a265ba743 100644 --- a/cmd/opm/internal/util/util.go +++ b/cmd/opm/internal/util/util.go @@ -2,12 +2,12 @@ package util import ( "errors" - "io" "os" - "github.com/operator-framework/operator-registry/pkg/image/containerdregistry" - "github.com/sirupsen/logrus" "github.com/spf13/cobra" + + "github.com/operator-framework/operator-registry/pkg/image/containerdregistry" + "github.com/operator-framework/operator-registry/pkg/lib/log" ) // GetTLSOptions validates and returns TLS options set by opm flags @@ -59,16 +59,10 @@ func CreateCLIRegistry(cmd *cobra.Command) (*containerdregistry.Registry, error) containerdregistry.WithCacheDir(cacheDir), containerdregistry.SkipTLSVerify(skipTlsVerify), containerdregistry.WithPlainHTTP(useHTTP), - containerdregistry.WithLog(nullLogger()), + containerdregistry.WithLog(log.Null()), ) if err != nil { return nil, err } return reg, nil } - -func nullLogger() *logrus.Entry { - logger := logrus.New() - logger.SetOutput(io.Discard) - return logrus.NewEntry(logger) -} diff --git a/cmd/opm/render/cmd.go b/cmd/opm/render/cmd.go index 88c706933..4cdd5e584 100644 --- a/cmd/opm/render/cmd.go +++ b/cmd/opm/render/cmd.go @@ -94,9 +94,3 @@ those images actually existing. Available template variables are: cmd.Long += "\n" + sqlite.DeprecationMessage return cmd } - -func nullLogger() *logrus.Entry { - logger := logrus.New() - logger.SetOutput(io.Discard) - return logrus.NewEntry(logger) -} diff --git a/cmd/opm/serve/serve.go b/cmd/opm/serve/serve.go index f9e35102e..234ea1a48 100644 --- a/cmd/opm/serve/serve.go +++ b/cmd/opm/serve/serve.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "io" "net" "net/http" endpoint "net/http/pprof" @@ -111,8 +110,6 @@ func (s *serve) run(ctx context.Context) error { s.logger.WithError(err).Warn("unable to write default nsswitch config") } - s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port}) - if s.cacheDir == "" && s.cacheEnforceIntegrity { return fmt.Errorf("--cache-dir must be specified with --cache-enforce-integrity") } @@ -124,24 +121,26 @@ func (s *serve) run(ctx context.Context) error { } defer os.RemoveAll(s.cacheDir) } + s.logger = s.logger.WithFields(logrus.Fields{ + "configs": s.configDir, + "cache": s.cacheDir, + }) - store, err := cache.New(s.cacheDir) + store, err := cache.New(s.cacheDir, cache.WithLog(s.logger)) if err != nil { return err } - if storeCloser, ok := store.(io.Closer); ok { - defer storeCloser.Close() - } + defer store.Close() if s.cacheEnforceIntegrity { - if err := store.CheckIntegrity(os.DirFS(s.configDir)); err != nil { - return err + if err := store.CheckIntegrity(ctx, os.DirFS(s.configDir)); err != nil { + return fmt.Errorf("integrity check failed: %v", err) } - if err := store.Load(); err != nil { - return err + if err := store.Load(ctx); err != nil { + return fmt.Errorf("failed to load cache: %v", err) } } else { if err := cache.LoadOrRebuild(ctx, store, os.DirFS(s.configDir)); err != nil { - return err + return fmt.Errorf("failed to load or rebuild cache: %v", err) } } @@ -149,6 +148,8 @@ func (s *serve) run(ctx context.Context) error { return nil } + s.logger = s.logger.WithFields(logrus.Fields{"port": s.port}) + lis, err := net.Listen("tcp", ":"+s.port) if err != nil { return fmt.Errorf("failed to listen: %s", err) diff --git a/go.mod b/go.mod index 54ff6fc07..b1323fe17 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/operator-framework/operator-registry go 1.22 require ( + github.com/akrylysov/pogreb v0.10.2 github.com/blang/semver/v4 v4.0.0 github.com/containerd/containerd v1.7.16 github.com/containers/common v0.58.2 @@ -12,6 +13,7 @@ require ( github.com/docker/cli v26.1.1+incompatible github.com/golang-migrate/migrate/v4 v4.17.1 github.com/golang/mock v1.6.0 + github.com/golang/protobuf v1.5.4 github.com/google/go-cmp v0.6.0 github.com/grpc-ecosystem/grpc-health-probe v0.4.26 github.com/h2non/filetype v1.1.3 @@ -31,6 +33,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 + github.com/tidwall/btree v1.7.0 go.etcd.io/bbolt v1.3.10 golang.org/x/mod v0.17.0 golang.org/x/net v0.25.0 @@ -100,7 +103,6 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/cel-go v0.17.7 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/go.sum b/go.sum index da0be1a5b..d44296204 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/Microsoft/hcsshim v0.12.0-rc.3 h1:5GNGrobGs/sN/0nFO21W9k4lFn+iXXZAE8fCZbmdRak= github.com/Microsoft/hcsshim v0.12.0-rc.3/go.mod h1:WuNfcaYNaw+KpCEsZCIM6HCEmu0c5HfXpi+dDSmveP0= +github.com/akrylysov/pogreb v0.10.2 h1:e6PxmeyEhWyi2AKOBIJzAEi4HkiC+lKyCocRGlnDi78= +github.com/akrylysov/pogreb v0.10.2/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0= @@ -372,6 +374,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/vbatts/tar-split v0.11.5 h1:3bHCTIheBm1qFTcgh9oPu+nNBtX+XJIupG/vacinCts= diff --git a/pkg/cache/bundle_key.go b/pkg/cache/bundle_key.go new file mode 100644 index 000000000..dba87c266 --- /dev/null +++ b/pkg/cache/bundle_key.go @@ -0,0 +1,47 @@ +package cache + +import ( + "github.com/tidwall/btree" +) + +type bundleKey struct { + PackageName string + ChannelName string + Name string +} + +func bundleKeyComparator(a, b bundleKey) bool { + if a.ChannelName != b.ChannelName { + return a.ChannelName < b.ChannelName + } + if a.PackageName != b.PackageName { + return a.PackageName < b.PackageName + } + return a.Name < b.Name +} + +type bundleKeys struct { + t *btree.BTreeG[bundleKey] +} + +func newBundleKeys() bundleKeys { + return bundleKeys{btree.NewBTreeG[bundleKey](bundleKeyComparator)} +} + +func (b bundleKeys) Set(k bundleKey) { + b.t.Set(k) +} + +func (b bundleKeys) Len() int { + return b.t.Len() +} + +func (b bundleKeys) Walk(f func(k bundleKey) error) error { + it := b.t.Iter() + for it.Next() { + if err := f(it.Item()); err != nil { + return err + } + } + return nil +} diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index faf65d993..7ea1f3fd5 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -4,56 +4,389 @@ import ( "context" "errors" "fmt" + "io" "io/fs" "os" "path/filepath" + "runtime" + "strings" + "sync" - "k8s.io/apimachinery/pkg/util/sets" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/registry" ) type Cache interface { registry.GRPCQuery - CheckIntegrity(fbc fs.FS) error + CheckIntegrity(ctx context.Context, fbc fs.FS) error Build(ctx context.Context, fbc fs.FS) error - Load() error + Load(ctc context.Context) error + Close() error } -func LoadOrRebuild(ctx context.Context, c Cache, fbc fs.FS) error { - if err := c.CheckIntegrity(fbc); err != nil { - if err := c.Build(ctx, fbc); err != nil { - return err - } +type backend interface { + Name() string + IsCachePresent() bool + + Init() error + Open() error + Close() error + + GetPackageIndex(context.Context) (packageIndex, error) + PutPackageIndex(context.Context, packageIndex) error + + SendBundles(context.Context, registry.BundleSender) error + GetBundle(context.Context, bundleKey) (*api.Bundle, error) + PutBundle(context.Context, bundleKey, *api.Bundle) error + + GetDigest(context.Context) (string, error) + ComputeDigest(context.Context, fs.FS) (string, error) + PutDigest(context.Context, string) error +} + +type CacheOptions struct { + Log *logrus.Entry +} + +func WithLog(log *logrus.Entry) CacheOption { + return func(o *CacheOptions) { + o.Log = log } - return c.Load() } +type CacheOption func(*CacheOptions) + // New creates a new Cache. It chooses a cache implementation based // on the files it finds in the cache directory, with a preference for the -// latest iteration of the cache implementation. It returns an error if -// cacheDir exists and contains unexpected files. -func New(cacheDir string) (Cache, error) { +// latest iteration of the cache implementation. If the cache directory +// is non-empty and a supported cache format is not found, an error is returned. +func New(cacheDir string, cacheOpts ...CacheOption) (Cache, error) { + opts := &CacheOptions{ + Log: log.Null(), + } + for _, opt := range cacheOpts { + opt(opts) + } + cacheBackend, err := getDefaultBackend(cacheDir, opts.Log) + if err != nil { + return nil, err + } + + if err := cacheBackend.Open(); err != nil { + return nil, fmt.Errorf("open cache: %v", err) + } + return &cache{backend: cacheBackend, log: opts.Log}, nil +} + +func getDefaultBackend(cacheDir string, log *logrus.Entry) (backend, error) { entries, err := os.ReadDir(cacheDir) if err != nil && !errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("detect cache format: read cache directory: %v", err) } - jsonCache := sets.NewString(jsonDir, jsonDigestFile) - found := sets.NewString() - for _, e := range entries { - found.Insert(e.Name()) + backends := []backend{ + newPogrebV1Backend(cacheDir), + newJSONBackend(cacheDir), } - // Preference (and currently only supported) is the JSON-based cache implementation. - if found.IsSuperset(jsonCache) || len(entries) == 0 { - return NewJSON(cacheDir), nil + if len(entries) == 0 { + log.WithField("backend", backends[0].Name()).Info("cache directory is empty, using preferred backend") + return backends[0], nil + } + + for _, backend := range backends { + if backend.IsCachePresent() { + log.WithField("backend", backend.Name()).Info("found existing cache contents") + return backend, nil + } } // Anything else is unexpected. - return nil, fmt.Errorf("cache directory has unexpected contents") + entryNames := make([]string, 0, len(entries)) + for _, entry := range entries { + if entry.Name() == "." { + continue + } + entryNames = append(entryNames, entry.Name()) + } + return nil, fmt.Errorf("cache directory has unexpected contents: %v", strings.Join(entryNames, ",")) +} + +func LoadOrRebuild(ctx context.Context, c Cache, fbc fs.FS) error { + if err := c.CheckIntegrity(ctx, fbc); err != nil { + if err := c.Build(ctx, fbc); err != nil { + return fmt.Errorf("failed to rebuild cache: %v", err) + } + } + return c.Load(ctx) +} + +var _ Cache = &cache{} + +type cache struct { + backend backend + log *logrus.Entry + packageIndex +} + +type bundleStreamTransformer func(*api.Bundle) +type transformingBundleSender struct { + stream registry.BundleSender + transformer bundleStreamTransformer +} + +func (t *transformingBundleSender) Send(b *api.Bundle) error { + t.transformer(b) + return t.stream.Send(b) +} + +type sliceBundleSender []*api.Bundle + +func (s *sliceBundleSender) Send(b *api.Bundle) error { + *s = append(*s, b) + return nil +} + +func (c *cache) SendBundles(ctx context.Context, stream registry.BundleSender) error { + transform := func(bundle *api.Bundle) { + if bundle.BundlePath != "" { + // The SQLite-based server + // configures its querier to + // omit these fields when + // key path is set. + bundle.CsvJson = "" + bundle.Object = nil + } + } + return c.backend.SendBundles(ctx, &transformingBundleSender{stream, transform}) +} + +func (c *cache) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + var bundleSender sliceBundleSender + if err := c.SendBundles(ctx, &bundleSender); err != nil { + return nil, err + } + return bundleSender, nil +} + +func (c *cache) getTrimmedBundle(ctx context.Context, key bundleKey) (*api.Bundle, error) { + apiBundle, err := c.backend.GetBundle(ctx, key) + if err != nil { + return nil, err + } + apiBundle.Replaces = "" + apiBundle.Skips = nil + return apiBundle, nil +} + +func (c *cache) GetBundle(ctx context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { + pkg, ok := c.packageIndex[pkgName] + if !ok { + return nil, fmt.Errorf("package %q not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + b, ok := ch.Bundles[csvName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", pkgName, channelName, csvName) + } + return c.getTrimmedBundle(ctx, bundleKey{pkg.Name, ch.Name, b.Name}) +} + +func (c *cache) GetBundleForChannel(ctx context.Context, pkgName string, channelName string) (*api.Bundle, error) { + return c.packageIndex.GetBundleForChannel(ctx, c.getTrimmedBundle, pkgName, channelName) +} + +func (c *cache) GetBundleThatReplaces(ctx context.Context, name, pkgName, channelName string) (*api.Bundle, error) { + return c.packageIndex.GetBundleThatReplaces(ctx, c.getTrimmedBundle, name, pkgName, channelName) +} + +func (c *cache) GetChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { + return c.packageIndex.GetChannelEntriesThatProvide(ctx, c.backend.GetBundle, group, version, kind) +} + +func (c *cache) GetLatestChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { + return c.packageIndex.GetLatestChannelEntriesThatProvide(ctx, c.backend.GetBundle, group, version, kind) +} + +func (c *cache) GetBundleThatProvides(ctx context.Context, group, version, kind string) (*api.Bundle, error) { + return c.packageIndex.GetBundleThatProvides(ctx, c, group, version, kind) +} + +func (c *cache) CheckIntegrity(ctx context.Context, fbc fs.FS) error { + existingDigest, err := c.backend.GetDigest(ctx) + if err != nil { + return fmt.Errorf("read existing cache digest: %v", err) + } + computedDigest, err := c.backend.ComputeDigest(ctx, fbc) + if err != nil { + return fmt.Errorf("compute digest: %v", err) + } + if existingDigest != computedDigest { + c.log.WithField("existingDigest", existingDigest).WithField("computedDigest", computedDigest).Warn("cache requires rebuild") + return fmt.Errorf("cache requires rebuild: cache reports digest as %q, but computed digest is %q", existingDigest, computedDigest) + } + return nil +} + +func (c *cache) Build(ctx context.Context, fbcFsys fs.FS) error { + // ensure that generated cache is available to all future users + oldUmask := umask(000) + defer umask(oldUmask) + + c.log.Info("building cache") + + if err := c.backend.Init(); err != nil { + return fmt.Errorf("init cache: %v", err) + } + + tmpFile, err := os.CreateTemp("", "opm-cache-build-*.json") + if err != nil { + return err + } + defer func() { + tmpFile.Close() + os.Remove(tmpFile.Name()) + }() + + var ( + concurrency = runtime.NumCPU() + byPackageReaders = map[string][]io.Reader{} + walkMu sync.Mutex + offset int64 + ) + if err := declcfg.WalkMetasFS(ctx, fbcFsys, func(path string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + packageName := meta.Package + if meta.Schema == declcfg.SchemaPackage { + packageName = meta.Name + } + + walkMu.Lock() + defer walkMu.Unlock() + if _, err := tmpFile.Write(meta.Blob); err != nil { + return err + } + sr := io.NewSectionReader(tmpFile, offset, int64(len(meta.Blob))) + byPackageReaders[packageName] = append(byPackageReaders[packageName], sr) + offset += int64(len(meta.Blob)) + return nil + }, declcfg.WithConcurrency(concurrency)); err != nil { + return err + } + if err := tmpFile.Sync(); err != nil { + return err + } + + eg, egCtx := errgroup.WithContext(ctx) + pkgNameChan := make(chan string, concurrency) + eg.Go(func() error { + defer close(pkgNameChan) + for pkgName := range byPackageReaders { + select { + case <-egCtx.Done(): + return egCtx.Err() + case pkgNameChan <- pkgName: + } + } + return nil + }) + + var ( + pkgs = packageIndex{} + pkgsMu sync.Mutex + ) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case pkgName, ok := <-pkgNameChan: + if !ok { + return nil + } + pkgIndex, err := c.processPackage(egCtx, io.MultiReader(byPackageReaders[pkgName]...)) + if err != nil { + return fmt.Errorf("process package %q: %v", pkgName, err) + } + + pkgsMu.Lock() + pkgs[pkgName] = pkgIndex[pkgName] + pkgsMu.Unlock() + } + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return fmt.Errorf("build package index: %v", err) + } + + if err := c.backend.PutPackageIndex(ctx, pkgs); err != nil { + return fmt.Errorf("store package index: %v", err) + } + + digest, err := c.backend.ComputeDigest(ctx, fbcFsys) + if err != nil { + return fmt.Errorf("compute digest: %v", err) + } + if err := c.backend.PutDigest(ctx, digest); err != nil { + return fmt.Errorf("store digest: %v", err) + } + return nil +} + +func (c *cache) processPackage(ctx context.Context, reader io.Reader) (packageIndex, error) { + pkgFbc, err := declcfg.LoadReader(reader) + if err != nil { + return nil, err + } + pkgModel, err := declcfg.ConvertToModel(*pkgFbc) + if err != nil { + return nil, err + } + pkgIndex, err := packagesFromModel(pkgModel) + if err != nil { + return nil, err + } + for _, p := range pkgModel { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) + if err != nil { + return nil, err + } + if err := c.backend.PutBundle(ctx, bundleKey{p.Name, ch.Name, b.Name}, apiBundle); err != nil { + return nil, fmt.Errorf("store bundle %q: %v", b.Name, err) + } + } + } + } + return pkgIndex, nil +} + +func (c *cache) Load(ctx context.Context) error { + pi, err := c.backend.GetPackageIndex(ctx) + if err != nil { + return fmt.Errorf("get package index: %v", err) + } + c.packageIndex = pi + return nil +} + +func (c *cache) Close() error { + return c.backend.Close() } func ensureEmptyDir(dir string, mode os.FileMode) error { @@ -72,8 +405,20 @@ func ensureEmptyDir(dir string, mode os.FileMode) error { return nil } -func doesBundleProvide(ctx context.Context, c Cache, pkgName, chName, bundleName, group, version, kind string) (bool, error) { - apiBundle, err := c.GetBundle(ctx, pkgName, chName, bundleName) +func readDigestFile(digestFile string) (string, error) { + existingDigestBytes, err := os.ReadFile(digestFile) + if err != nil { + return "", err + } + return strings.TrimSpace(string(existingDigestBytes)), nil +} + +func writeDigestFile(file string, digest string, mode os.FileMode) error { + return os.WriteFile(file, []byte(digest), mode) +} + +func doesBundleProvide(ctx context.Context, getBundle getBundleFunc, pkgName, chName, bundleName, group, version, kind string) (bool, error) { + apiBundle, err := getBundle(ctx, bundleKey{pkgName, chName, bundleName}) if err != nil { return false, fmt.Errorf("get bundle %q: %v", bundleName, err) } @@ -84,21 +429,3 @@ func doesBundleProvide(ctx context.Context, c Cache, pkgName, chName, bundleName } return false, nil } - -type sliceBundleSender []*api.Bundle - -func (s *sliceBundleSender) Send(b *api.Bundle) error { - *s = append(*s, b) - return nil -} - -func listBundles(ctx context.Context, c Cache) ([]*api.Bundle, error) { - var bundleSender sliceBundleSender - - err := c.SendBundles(ctx, &bundleSender) - if err != nil { - return nil, err - } - - return bundleSender, nil -} diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index c48f85e74..7a779719d 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -8,206 +8,229 @@ import ( "github.com/stretchr/testify/require" + "github.com/operator-framework/operator-registry/pkg/lib/log" "github.com/operator-framework/operator-registry/pkg/registry" ) func TestCache_GetBundle(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - b, err := testQuerier.GetBundle(context.TODO(), "etcd", "singlenamespace-alpha", "etcdoperator.v0.9.4") - require.NoError(t, err) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + b, err := testQuerier.GetBundle(context.TODO(), "etcd", "singlenamespace-alpha", "etcdoperator.v0.9.4") + require.NoError(t, err) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + }) } } func TestCache_GetBundleForChannel(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - b, err := testQuerier.GetBundleForChannel(context.TODO(), "etcd", "singlenamespace-alpha") - require.NoError(t, err) - require.NotNil(t, b) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + b, err := testQuerier.GetBundleForChannel(context.TODO(), "etcd", "singlenamespace-alpha") + + require.NoError(t, err) + require.NotNil(t, b) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + }) } } func TestCache_GetBundleThatProvides(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - b, err := testQuerier.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") - require.NoError(t, err) - require.NotNil(t, b) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + b, err := testQuerier.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") + require.NoError(t, err) + require.NotNil(t, b) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.4") + }) } } func TestCache_GetBundleThatReplaces(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - b, err := testQuerier.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "singlenamespace-alpha") - require.NoError(t, err) - require.NotNil(t, b) - require.Equal(t, b.PackageName, "etcd") - require.Equal(t, b.ChannelName, "singlenamespace-alpha") - require.Equal(t, b.CsvName, "etcdoperator.v0.9.2") + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + b, err := testQuerier.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "singlenamespace-alpha") + require.NoError(t, err) + require.NotNil(t, b) + require.Equal(t, b.PackageName, "etcd") + require.Equal(t, b.ChannelName, "singlenamespace-alpha") + require.Equal(t, b.CsvName, "etcdoperator.v0.9.2") + }) } } func TestCache_GetChannelEntriesThatProvide(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - entries, err := testQuerier.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") - require.NoError(t, err) - require.NotNil(t, entries) - require.ElementsMatch(t, []*registry.ChannelEntry{ - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.0", - Replaces: "", - }, - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.4", - Replaces: "etcdoperator.v0.9.2", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.0", - Replaces: "", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.2-clusterwide", - Replaces: "etcdoperator.v0.9.0", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.2-clusterwide", - Replaces: "etcdoperator.v0.6.1", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.4-clusterwide", - Replaces: "etcdoperator.v0.9.2-clusterwide", - }, - }, entries) + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + entries, err := testQuerier.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") + require.NoError(t, err) + require.NotNil(t, entries) + require.ElementsMatch(t, []*registry.ChannelEntry{ + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.0", + Replaces: "", + }, + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.4", + Replaces: "etcdoperator.v0.9.2", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.0", + Replaces: "", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.2-clusterwide", + Replaces: "etcdoperator.v0.9.0", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.2-clusterwide", + Replaces: "etcdoperator.v0.6.1", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.4-clusterwide", + Replaces: "etcdoperator.v0.9.2-clusterwide", + }, + }, entries) + }) } } func TestCache_GetChannelEntriesThatReplace(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - entries, err := testQuerier.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0") - require.NoError(t, err) - require.NotNil(t, entries) - require.ElementsMatch(t, []*registry.ChannelEntry{ - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.2", - Replaces: "etcdoperator.v0.9.0", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.2-clusterwide", - Replaces: "etcdoperator.v0.9.0", - }, - }, entries) + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + entries, err := testQuerier.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0") + require.NoError(t, err) + require.NotNil(t, entries) + require.ElementsMatch(t, []*registry.ChannelEntry{ + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.2", + Replaces: "etcdoperator.v0.9.0", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.2-clusterwide", + Replaces: "etcdoperator.v0.9.0", + }, + }, entries) + }) } } func TestCache_GetLatestChannelEntriesThatProvide(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - entries, err := testQuerier.GetLatestChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") - require.NoError(t, err) - require.NotNil(t, entries) - require.ElementsMatch(t, []*registry.ChannelEntry{ - { - PackageName: "etcd", - ChannelName: "singlenamespace-alpha", - BundleName: "etcdoperator.v0.9.4", - Replaces: "etcdoperator.v0.9.2", - }, - { - PackageName: "etcd", - ChannelName: "clusterwide-alpha", - BundleName: "etcdoperator.v0.9.4-clusterwide", - Replaces: "etcdoperator.v0.9.2-clusterwide", - }, - }, entries) + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + entries, err := testQuerier.GetLatestChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") + require.NoError(t, err) + require.NotNil(t, entries) + require.ElementsMatch(t, []*registry.ChannelEntry{ + { + PackageName: "etcd", + ChannelName: "singlenamespace-alpha", + BundleName: "etcdoperator.v0.9.4", + Replaces: "etcdoperator.v0.9.2", + }, + { + PackageName: "etcd", + ChannelName: "clusterwide-alpha", + BundleName: "etcdoperator.v0.9.4-clusterwide", + Replaces: "etcdoperator.v0.9.2-clusterwide", + }, + }, entries) + }) } } func TestCache_GetPackage(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - p, err := testQuerier.GetPackage(context.TODO(), "etcd") - require.NoError(t, err) - require.NotNil(t, p) + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + p, err := testQuerier.GetPackage(context.TODO(), "etcd") + require.NoError(t, err) + require.NotNil(t, p) - expected := ®istry.PackageManifest{ - PackageName: "etcd", - DefaultChannelName: "singlenamespace-alpha", - Channels: []registry.PackageChannel{ - { - Name: "singlenamespace-alpha", - CurrentCSVName: "etcdoperator.v0.9.4", - }, - { - Name: "clusterwide-alpha", - CurrentCSVName: "etcdoperator.v0.9.4-clusterwide", + expected := ®istry.PackageManifest{ + PackageName: "etcd", + DefaultChannelName: "singlenamespace-alpha", + Channels: []registry.PackageChannel{ + { + Name: "singlenamespace-alpha", + CurrentCSVName: "etcdoperator.v0.9.4", + }, + { + Name: "clusterwide-alpha", + CurrentCSVName: "etcdoperator.v0.9.4-clusterwide", + }, + { + Name: "alpha", + CurrentCSVName: "etcdoperator-community.v0.6.1", + }, }, - { - Name: "alpha", - CurrentCSVName: "etcdoperator-community.v0.6.1", - }, - }, - } + } - require.ElementsMatch(t, expected.Channels, p.Channels) - expected.Channels, p.Channels = nil, nil - require.Equal(t, expected, p) + require.ElementsMatch(t, expected.Channels, p.Channels) + expected.Channels, p.Channels = nil, nil + require.Equal(t, expected, p) + }) } } func TestCache_ListBundles(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - bundles, err := testQuerier.ListBundles(context.TODO()) - require.NoError(t, err) - require.NotNil(t, bundles) - require.Len(t, bundles, 12) - for _, b := range bundles { - require.Zero(t, b.CsvJson) - require.Zero(t, b.Object) - } + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + bundles, err := testQuerier.ListBundles(context.TODO()) + require.NoError(t, err) + require.NotNil(t, bundles) + require.Len(t, bundles, 12) + for _, b := range bundles { + require.Zero(t, b.CsvJson) + require.Zero(t, b.Object) + } + }) } } func TestCache_ListPackages(t *testing.T) { - for _, testQuerier := range genTestCaches(t, validFS) { - packages, err := testQuerier.ListPackages(context.TODO()) - require.NoError(t, err) - require.NotNil(t, packages) - require.Equal(t, 2, len(packages)) + for name, testQuerier := range genTestCaches(t, validFS) { + t.Run(name, func(t *testing.T) { + packages, err := testQuerier.ListPackages(context.TODO()) + require.NoError(t, err) + require.NotNil(t, packages) + require.Equal(t, 2, len(packages)) + }) } } -func genTestCaches(t *testing.T, fbcFS fs.FS) []Cache { +func genTestCaches(t *testing.T, fbcFS fs.FS) map[string]Cache { t.Helper() - caches := []Cache{ - NewJSON(t.TempDir()), + caches := map[string]Cache{ + "json": &cache{backend: newJSONBackend(t.TempDir()), log: log.Null()}, + "pogreb.v1": &cache{backend: newPogrebV1Backend(t.TempDir()), log: log.Null()}, } for _, c := range caches { err := c.Build(context.Background(), fbcFS) require.NoError(t, err) - err = c.Load() + err = c.Load(context.Background()) require.NoError(t, err) } diff --git a/pkg/cache/json.go b/pkg/cache/json.go index 7345a2b5f..7ea599218 100644 --- a/pkg/cache/json.go +++ b/pkg/cache/json.go @@ -10,198 +10,139 @@ import ( "io/fs" "os" "path/filepath" - "sort" - "strings" - "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/sirupsen/logrus" + "github.com/operator-framework/operator-registry/pkg/api" "github.com/operator-framework/operator-registry/pkg/registry" - "github.com/sirupsen/logrus" ) -var _ Cache = &JSON{} - -type JSON struct { - baseDir string +var _ backend = &jsonBackend{} - packageIndex - apiBundles map[apiBundleKey]string +func newJSONBackend(baseDir string) *jsonBackend { + return &jsonBackend{ + baseDir: baseDir, + bundles: newBundleKeys(), + } } const ( jsonCacheModeDir = 0750 jsonCacheModeFile = 0640 -) -type apiBundleKey struct { - pkgName string - chName string - name string -} + jsonDigestFile = "digest" + jsonDir = "cache" + jsonPackagesFile = jsonDir + string(filepath.Separator) + "packages.json" +) -func (q *JSON) loadAPIBundle(k apiBundleKey) (*api.Bundle, error) { - filename, ok := q.apiBundles[k] - if !ok { - return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", k.pkgName, k.chName, k.name) - } - d, err := os.ReadFile(filename) - if err != nil { - return nil, err - } - var b api.Bundle - if err := json.Unmarshal(d, &b); err != nil { - return nil, err - } - return &b, nil +type jsonBackend struct { + baseDir string + bundles bundleKeys } -func (q *JSON) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - return listBundles(ctx, q) +func (q *jsonBackend) Name() string { + return "json" } -func (q *JSON) SendBundles(_ context.Context, s registry.BundleSender) error { - var keys []apiBundleKey - for _, pkg := range q.packageIndex { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - keys = append(keys, apiBundleKey{pkg.Name, ch.Name, b.Name}) - } - } +func (q *jsonBackend) IsCachePresent() bool { + entries, err := os.ReadDir(q.baseDir) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return false } - sort.Slice(keys, func(i, j int) bool { - if keys[i].chName != keys[j].chName { - return keys[i].chName < keys[j].chName - } - if keys[i].pkgName != keys[j].pkgName { - return keys[i].pkgName < keys[j].pkgName - } - return keys[i].name < keys[j].name - }) - var files []*os.File - var readers []io.Reader - for _, key := range keys { - filename, ok := q.apiBundles[key] - if !ok { - return fmt.Errorf("package %q, channel %q, key %q not found", key.pkgName, key.chName, key.name) - } - file, err := os.Open(filename) - if err != nil { - return fmt.Errorf("failed to open file for package %q, channel %q, key %q: %w", key.pkgName, key.chName, key.name, err) - } - files = append(files, file) - readers = append(readers, file) - } - defer func() { - for _, file := range files { - if err := file.Close(); err != nil { - logrus.WithError(err).WithField("file", file.Name()).Warn("could not close file") - } - } - }() - multiReader := io.MultiReader(readers...) - decoder := json.NewDecoder(multiReader) - index := 0 - for { - var bundle api.Bundle - if err := decoder.Decode(&bundle); err == io.EOF { - break - } else if err != nil { - return fmt.Errorf("failed to decode file for package %q, channel %q, key %q: %w", keys[index].pkgName, keys[index].chName, keys[index].name, err) + var hasDir, hasDigest bool + for _, entry := range entries { + if entry.IsDir() && entry.Name() == jsonDir { + hasDir = true } - if bundle.BundlePath != "" { - // The SQLite-based server - // configures its querier to - // omit these fields when - // key path is set. - bundle.CsvJson = "" - bundle.Object = nil + if entry.Name() == jsonDigestFile { + hasDigest = true } - if err := s.Send(&bundle); err != nil { - return err - } - index += 1 } - return nil + return hasDir && hasDigest } -func (q *JSON) GetBundle(_ context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { - pkg, ok := q.packageIndex[pkgName] - if !ok { - return nil, fmt.Errorf("package %q not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - b, ok := ch.Bundles[csvName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", pkgName, channelName, csvName) +func (q *jsonBackend) Init() error { + if err := ensureEmptyDir(filepath.Join(q.baseDir, jsonDir), jsonCacheModeDir); err != nil { + return fmt.Errorf("failed to ensure JSON cache directory: %v", err) } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) + if err := os.RemoveAll(filepath.Join(q.baseDir, jsonDigestFile)); err != nil { + return fmt.Errorf("failed to remove existing JSON digest file: %v", err) } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil -} - -func (q *JSON) GetBundleForChannel(ctx context.Context, pkgName string, channelName string) (*api.Bundle, error) { - return q.packageIndex.GetBundleForChannel(ctx, q, pkgName, channelName) + q.bundles = newBundleKeys() + return nil } -func (q *JSON) GetBundleThatReplaces(ctx context.Context, name, pkgName, channelName string) (*api.Bundle, error) { - return q.packageIndex.GetBundleThatReplaces(ctx, q, name, pkgName, channelName) +func (q *jsonBackend) Open() error { + return nil } -func (q *JSON) GetChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { - return q.packageIndex.GetChannelEntriesThatProvide(ctx, q, group, version, kind) +func (q *jsonBackend) Close() error { + return nil } -func (q *JSON) GetLatestChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { - return q.packageIndex.GetLatestChannelEntriesThatProvide(ctx, q, group, version, kind) +func (q *jsonBackend) GetPackageIndex(_ context.Context) (packageIndex, error) { + packagesData, err := os.ReadFile(filepath.Join(q.baseDir, jsonPackagesFile)) + if err != nil { + return nil, err + } + var pi packageIndex + if err := json.Unmarshal(packagesData, &pi); err != nil { + return nil, err + } + for _, pkg := range pi { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + q.bundles.Set(bundleKey{PackageName: pkg.Name, ChannelName: ch.Name, Name: b.Name}) + } + } + } + return pi, nil } -func (q *JSON) GetBundleThatProvides(ctx context.Context, group, version, kind string) (*api.Bundle, error) { - return q.packageIndex.GetBundleThatProvides(ctx, q, group, version, kind) +func (q *jsonBackend) PutPackageIndex(_ context.Context, pi packageIndex) error { + packageJson, err := json.Marshal(pi) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(q.baseDir, jsonPackagesFile), packageJson, jsonCacheModeFile); err != nil { + return err + } + return nil } -func NewJSON(baseDir string) *JSON { - return &JSON{baseDir: baseDir} +func (q *jsonBackend) bundleFile(in bundleKey) string { + return filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", in.PackageName, in.ChannelName, in.Name)) } -const ( - jsonDigestFile = "digest" - jsonDir = "cache" - packagesFile = jsonDir + string(filepath.Separator) + "packages.json" -) - -func (q *JSON) CheckIntegrity(fbcFsys fs.FS) error { - existingDigest, err := q.existingDigest() +func (q *jsonBackend) GetBundle(_ context.Context, key bundleKey) (*api.Bundle, error) { + d, err := os.ReadFile(q.bundleFile(key)) if err != nil { - return fmt.Errorf("read existing cache digest: %v", err) + return nil, err } - computedDigest, err := q.computeDigest(fbcFsys) + var b api.Bundle + if err := json.Unmarshal(d, &b); err != nil { + return nil, err + } + return &b, nil +} + +func (q *jsonBackend) PutBundle(_ context.Context, key bundleKey, bundle *api.Bundle) error { + d, err := json.Marshal(bundle) if err != nil { - return fmt.Errorf("compute digest: %v", err) + return err } - if existingDigest != computedDigest { - return fmt.Errorf("cache requires rebuild: cache reports digest as %q, but computed digest is %q", existingDigest, computedDigest) + if err := os.WriteFile(q.bundleFile(key), d, jsonCacheModeFile); err != nil { + return err } + q.bundles.Set(key) return nil } -func (q *JSON) existingDigest() (string, error) { - existingDigestBytes, err := os.ReadFile(filepath.Join(q.baseDir, jsonDigestFile)) - if err != nil { - return "", err - } - return strings.TrimSpace(string(existingDigestBytes)), nil +func (q *jsonBackend) GetDigest(_ context.Context) (string, error) { + return readDigestFile(filepath.Join(q.baseDir, jsonDigestFile)) } -func (q *JSON) computeDigest(fbcFsys fs.FS) (string, error) { +func (q *jsonBackend) ComputeDigest(_ context.Context, fbcFsys fs.FS) (string, error) { // We are not sensitive to the size of this buffer, we just need it to be shared. // For simplicity, do the same as io.Copy() would. buf := make([]byte, 32*1024) @@ -218,86 +159,47 @@ func (q *JSON) computeDigest(fbcFsys fs.FS) (string, error) { return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil } -func (q *JSON) Build(ctx context.Context, fbcFsys fs.FS) error { - // ensure that generated cache is available to all future users - oldUmask := umask(000) - defer umask(oldUmask) - - if err := ensureEmptyDir(q.baseDir, jsonCacheModeDir); err != nil { - return fmt.Errorf("ensure clean base directory: %v", err) - } - if err := ensureEmptyDir(filepath.Join(q.baseDir, jsonDir), jsonCacheModeDir); err != nil { - return fmt.Errorf("ensure clean base directory: %v", err) - } - - fbc, err := declcfg.LoadFS(ctx, fbcFsys) - if err != nil { - return err - } - fbcModel, err := declcfg.ConvertToModel(*fbc) - if err != nil { - return err - } - - pkgs, err := packagesFromModel(fbcModel) - if err != nil { - return err - } - - packageJson, err := json.Marshal(pkgs) - if err != nil { - return err - } - if err := os.WriteFile(filepath.Join(q.baseDir, packagesFile), packageJson, jsonCacheModeFile); err != nil { - return err - } - - q.apiBundles = map[apiBundleKey]string{} - for _, p := range fbcModel { - for _, ch := range p.Channels { - for _, b := range ch.Bundles { - apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) - if err != nil { - return err - } - jsonBundle, err := json.Marshal(apiBundle) - if err != nil { - return err - } - filename := filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) - if err := os.WriteFile(filename, jsonBundle, jsonCacheModeFile); err != nil { - return err - } - q.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename - } - } - } - digest, err := q.computeDigest(fbcFsys) - if err != nil { - return err - } - if err := os.WriteFile(filepath.Join(q.baseDir, jsonDigestFile), []byte(digest), jsonCacheModeFile); err != nil { - return err - } - return nil +func (q *jsonBackend) PutDigest(_ context.Context, digest string) error { + return writeDigestFile(filepath.Join(q.baseDir, jsonDigestFile), digest, jsonCacheModeFile) } -func (q *JSON) Load() error { - packagesData, err := os.ReadFile(filepath.Join(q.baseDir, packagesFile)) - if err != nil { - return err - } - if err := json.Unmarshal(packagesData, &q.packageIndex); err != nil { +func (q *jsonBackend) SendBundles(_ context.Context, s registry.BundleSender) error { + keys := make([]bundleKey, 0, q.bundles.Len()) + files := make([]*os.File, 0, q.bundles.Len()) + readers := make([]io.Reader, 0, q.bundles.Len()) + if err := q.bundles.Walk(func(key bundleKey) error { + file, err := os.Open(q.bundleFile(key)) + if err != nil { + return fmt.Errorf("failed to open file for package %q, channel %q, key %q: %w", key.PackageName, key.ChannelName, key.Name, err) + } + keys = append(keys, key) + files = append(files, file) + readers = append(readers, file) + return nil + }); err != nil { return err } - q.apiBundles = map[apiBundleKey]string{} - for _, p := range q.packageIndex { - for _, ch := range p.Channels { - for _, b := range ch.Bundles { - filename := filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) - q.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename + defer func() { + for _, file := range files { + if err := file.Close(); err != nil { + logrus.WithError(err).WithField("file", file.Name()).Warn("could not close file") } } + }() + multiReader := io.MultiReader(readers...) + decoder := json.NewDecoder(multiReader) + index := 0 + for { + var bundle api.Bundle + if err := decoder.Decode(&bundle); err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("failed to decode file for package %q, channel %q, key %q: %w", keys[index].PackageName, keys[index].ChannelName, keys[index].Name, err) + } + if err := s.Send(&bundle); err != nil { + return err + } + index += 1 } return nil } diff --git a/pkg/cache/json_test.go b/pkg/cache/json_test.go index af8282646..9cb31a435 100644 --- a/pkg/cache/json_test.go +++ b/pkg/cache/json_test.go @@ -8,14 +8,16 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/operator-framework/operator-registry/pkg/lib/log" ) func TestJSON_StableDigest(t *testing.T) { cacheDir := t.TempDir() - c := NewJSON(cacheDir) + c := &cache{backend: newJSONBackend(cacheDir), log: log.Null()} require.NoError(t, c.Build(context.Background(), validFS)) - actualDigest, err := c.existingDigest() + actualDigest, err := c.backend.GetDigest(context.Background()) require.NoError(t, err) // NOTE: The entire purpose of this test is to ensure that we don't change the cache @@ -94,7 +96,7 @@ func TestJSON_CheckIntegrity(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cacheDir := t.TempDir() - c := NewJSON(cacheDir) + c := &cache{backend: newJSONBackend(cacheDir), log: log.Null()} if tc.build { require.NoError(t, c.Build(context.Background(), tc.fbcFS)) @@ -102,7 +104,7 @@ func TestJSON_CheckIntegrity(t *testing.T) { if tc.mod != nil { require.NoError(t, tc.mod(&tc, cacheDir)) } - tc.expect(t, c.CheckIntegrity(tc.fbcFS)) + tc.expect(t, c.CheckIntegrity(context.Background(), tc.fbcFS)) }) } } diff --git a/pkg/cache/pkgs.go b/pkg/cache/pkgs.go index 593d32410..e590823b4 100644 --- a/pkg/cache/pkgs.go +++ b/pkg/cache/pkgs.go @@ -67,7 +67,9 @@ func (pkgs packageIndex) GetChannelEntriesThatReplace(_ context.Context, name st return entries, nil } -func (pkgs packageIndex) GetBundleForChannel(ctx context.Context, c Cache, pkgName string, channelName string) (*api.Bundle, error) { +type getBundleFunc func(context.Context, bundleKey) (*api.Bundle, error) + +func (pkgs packageIndex) GetBundleForChannel(ctx context.Context, getBundle getBundleFunc, pkgName string, channelName string) (*api.Bundle, error) { pkg, ok := pkgs[pkgName] if !ok { return nil, fmt.Errorf("package %q not found", pkgName) @@ -76,10 +78,10 @@ func (pkgs packageIndex) GetBundleForChannel(ctx context.Context, c Cache, pkgNa if !ok { return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) } - return c.GetBundle(ctx, pkg.Name, ch.Name, ch.Head) + return getBundle(ctx, bundleKey{pkg.Name, ch.Name, ch.Head}) } -func (pkgs packageIndex) GetBundleThatReplaces(ctx context.Context, c Cache, name, pkgName, channelName string) (*api.Bundle, error) { +func (pkgs packageIndex) GetBundleThatReplaces(ctx context.Context, getBundle getBundleFunc, name, pkgName, channelName string) (*api.Bundle, error) { pkg, ok := pkgs[pkgName] if !ok { return nil, fmt.Errorf("package %s not found", pkgName) @@ -95,19 +97,19 @@ func (pkgs packageIndex) GetBundleThatReplaces(ctx context.Context, c Cache, nam // implementation to be non-deterministic as well. for _, b := range ch.Bundles { if bundleReplaces(b, name) { - return c.GetBundle(ctx, pkg.Name, ch.Name, b.Name) + return getBundle(ctx, bundleKey{pkg.Name, ch.Name, b.Name}) } } return nil, fmt.Errorf("no entry found for package %q, channel %q", pkgName, channelName) } -func (pkgs packageIndex) GetChannelEntriesThatProvide(ctx context.Context, c Cache, group, version, kind string) ([]*registry.ChannelEntry, error) { +func (pkgs packageIndex) GetChannelEntriesThatProvide(ctx context.Context, getBundle getBundleFunc, group, version, kind string) ([]*registry.ChannelEntry, error) { var entries []*registry.ChannelEntry for _, pkg := range pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { - provides, err := doesBundleProvide(ctx, c, b.Package, b.Channel, b.Name, group, version, kind) + provides, err := doesBundleProvide(ctx, getBundle, b.Package, b.Channel, b.Name, group, version, kind) if err != nil { return nil, err } @@ -137,13 +139,13 @@ func (pkgs packageIndex) GetChannelEntriesThatProvide(ctx context.Context, c Cac // --- // Separate, but possibly related, I noticed there are several channels in the channel entry // table who's minimum depth is 1. What causes 1 to be minimum depth in some cases and 0 in others? -func (pkgs packageIndex) GetLatestChannelEntriesThatProvide(ctx context.Context, c Cache, group, version, kind string) ([]*registry.ChannelEntry, error) { +func (pkgs packageIndex) GetLatestChannelEntriesThatProvide(ctx context.Context, getBundle getBundleFunc, group, version, kind string) ([]*registry.ChannelEntry, error) { var entries []*registry.ChannelEntry for _, pkg := range pkgs { for _, ch := range pkg.Channels { b := ch.Bundles[ch.Head] - provides, err := doesBundleProvide(ctx, c, b.Package, b.Channel, b.Name, group, version, kind) + provides, err := doesBundleProvide(ctx, getBundle, b.Package, b.Channel, b.Name, group, version, kind) if err != nil { return nil, err } diff --git a/pkg/cache/pogrebv1.go b/pkg/cache/pogrebv1.go new file mode 100644 index 000000000..616ba720e --- /dev/null +++ b/pkg/cache/pogrebv1.go @@ -0,0 +1,227 @@ +package cache + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "io" + "io/fs" + "os" + "path/filepath" + "sort" + + "github.com/akrylysov/pogreb" + pogrebfs "github.com/akrylysov/pogreb/fs" + "github.com/golang/protobuf/proto" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +var _ backend = &pogrebV1Backend{} + +func newPogrebV1Backend(baseDir string) *pogrebV1Backend { + return &pogrebV1Backend{ + baseDir: baseDir, + bundles: newBundleKeys(), + } +} + +const ( + pogrebV1CacheModeDir = 0750 + pogrebV1CacheModeFile = 0640 + + pograbV1CacheDir = "pogreb.v1" + pogrebDigestFile = pograbV1CacheDir + "/digest" + pogrebDbDir = pograbV1CacheDir + "/db" +) + +type pogrebV1Backend struct { + baseDir string + db *pogreb.DB + bundles bundleKeys +} + +func (q *pogrebV1Backend) Name() string { + return pograbV1CacheDir +} + +func (q *pogrebV1Backend) IsCachePresent() bool { + entries, err := os.ReadDir(q.baseDir) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return false + } + for _, entry := range entries { + if entry.IsDir() && entry.Name() == pograbV1CacheDir { + return true + } + } + return false +} + +func (q *pogrebV1Backend) Init() error { + if err := q.Close(); err != nil { + return fmt.Errorf("failed to close existing DB: %v", err) + } + if err := ensureEmptyDir(filepath.Join(q.baseDir, pograbV1CacheDir), pogrebV1CacheModeDir); err != nil { + return fmt.Errorf("ensure empty cache directory: %v", err) + } + q.bundles = newBundleKeys() + return q.Open() +} + +func (q *pogrebV1Backend) Open() error { + db, err := pogreb.Open(filepath.Join(q.baseDir, pogrebDbDir), &pogreb.Options{FileSystem: pogrebfs.OSMMap}) + if err != nil { + return err + } + q.db = db + return nil +} + +func (q *pogrebV1Backend) Close() error { + if q.db == nil { + return nil + } + return q.db.Close() +} + +func (q *pogrebV1Backend) GetPackageIndex(_ context.Context) (packageIndex, error) { + packagesData, err := q.db.Get([]byte("packages.json")) + if err != nil { + return nil, err + } + var pi packageIndex + if err := json.Unmarshal(packagesData, &pi); err != nil { + return nil, err + } + for _, pkg := range pi { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + q.bundles.Set(bundleKey{PackageName: pkg.Name, ChannelName: ch.Name, Name: b.Name}) + } + } + } + return pi, nil +} + +func (q *pogrebV1Backend) PutPackageIndex(_ context.Context, index packageIndex) error { + packageJson, err := json.Marshal(index) + if err != nil { + return err + } + return q.db.Put([]byte("packages.json"), packageJson) +} + +func (q *pogrebV1Backend) dbKey(in bundleKey) []byte { + return []byte(fmt.Sprintf("bundles/%s/%s/%s", in.PackageName, in.ChannelName, in.Name)) +} + +func (q *pogrebV1Backend) GetBundle(_ context.Context, key bundleKey) (*api.Bundle, error) { + d, err := q.db.Get(q.dbKey(key)) + if err != nil { + return nil, err + } + var b api.Bundle + if err := proto.Unmarshal(d, &b); err != nil { + return nil, err + } + return &b, nil +} + +func (q *pogrebV1Backend) PutBundle(_ context.Context, key bundleKey, bundle *api.Bundle) error { + d, err := proto.Marshal(bundle) + if err != nil { + return err + } + if err := q.db.Put(q.dbKey(key), d); err != nil { + return err + } + q.bundles.Set(key) + return nil +} + +func (q *pogrebV1Backend) GetDigest(_ context.Context) (string, error) { + return readDigestFile(filepath.Join(q.baseDir, pogrebDigestFile)) +} + +func (q *pogrebV1Backend) orderedKeys() ([]string, error) { + it := q.db.Items() + keys := make([]string, 0, q.db.Count()) + for { + k, _, err := it.Next() + if errors.Is(err, pogreb.ErrIterationDone) { + break + } + if err != nil { + return nil, err + } + keys = append(keys, string(k)) + } + sort.Strings(keys) + return keys, nil +} + +func (q *pogrebV1Backend) writeKeyValue(w io.Writer, k []byte) error { + v, err := q.db.Get(k) + if err != nil { + return err + } + if _, err := w.Write(k); err != nil { + return err + } + if _, err := w.Write(v); err != nil { + return err + } + return nil +} + +func (q *pogrebV1Backend) ComputeDigest(ctx context.Context, fbcFsys fs.FS) (string, error) { + computedHasher := fnv.New64a() + + // Use concurrency=1 to ensure deterministic ordering of meta blobs. + loadOpts := []declcfg.LoadOption{declcfg.WithConcurrency(1)} + if err := declcfg.WalkMetasFS(ctx, fbcFsys, func(path string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + if _, err := computedHasher.Write(meta.Blob); err != nil { + return err + } + return nil + }, loadOpts...); err != nil { + return "", err + } + + orderedKeys, err := q.orderedKeys() + if err != nil { + return "", err + } + for _, dbKey := range orderedKeys { + if err := q.writeKeyValue(computedHasher, []byte(dbKey)); err != nil { + return "", err + } + } + return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil +} + +func (q *pogrebV1Backend) PutDigest(_ context.Context, digest string) error { + return writeDigestFile(filepath.Join(q.baseDir, pogrebDigestFile), digest, pogrebV1CacheModeFile) +} + +func (q *pogrebV1Backend) SendBundles(_ context.Context, s registry.BundleSender) error { + return q.bundles.Walk(func(key bundleKey) error { + bundleData, err := q.db.Get(q.dbKey(key)) + if err != nil { + return fmt.Errorf("failed to get data for package %q, channel %q, key %q: %w", key.PackageName, key.ChannelName, key.Name, err) + } + var bundle api.Bundle + if err := proto.Unmarshal(bundleData, &bundle); err != nil { + return fmt.Errorf("failed to decode data for package %q, channel %q, key %q: %w", key.PackageName, key.ChannelName, key.Name, err) + } + return s.Send(&bundle) + }) +} diff --git a/pkg/cache/pogrebv1_test.go b/pkg/cache/pogrebv1_test.go new file mode 100644 index 000000000..f084509fe --- /dev/null +++ b/pkg/cache/pogrebv1_test.go @@ -0,0 +1,109 @@ +package cache + +import ( + "context" + "io/fs" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/lib/log" +) + +func TestPogrebV1_StableDigest(t *testing.T) { + cacheDir := t.TempDir() + c := &cache{backend: newPogrebV1Backend(cacheDir), log: log.Null()} + require.NoError(t, c.Build(context.Background(), validFS)) + + actualDigest, err := c.backend.GetDigest(context.Background()) + require.NoError(t, err) + + // NOTE: The entire purpose of this test is to ensure that we don't change the cache + // implementation and inadvertantly invalidate existing caches. + // + // Therefore, DO NOT CHANGE the expected digest value here unless validFS also + // changes. + // + // If validFS needs to change DO NOT CHANGE the json cache implementation + // in the same pull request. + require.Equal(t, "485a767449dd66d4", actualDigest) +} + +func TestPogrebV1_CheckIntegrity(t *testing.T) { + type testCase struct { + name string + build bool + fbcFS fs.FS + mod func(t *testing.T, tc *testCase, cacheDir string, backend backend) + expect func(t *testing.T, err error) + } + testCases := []testCase{ + { + name: "non-existent cache dir", + fbcFS: validFS, + mod: func(t *testing.T, tc *testCase, cacheDir string, _ backend) { + require.NoError(t, os.RemoveAll(cacheDir)) + }, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "read existing cache digest") + }, + }, + { + name: "empty cache dir", + fbcFS: validFS, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "read existing cache digest") + }, + }, + { + name: "valid cache dir", + build: true, + fbcFS: validFS, + expect: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, + { + name: "different FBC", + build: true, + fbcFS: validFS, + mod: func(t *testing.T, tc *testCase, _ string, _ backend) { + tc.fbcFS = badBundleFS + }, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "cache requires rebuild") + }, + }, + { + name: "different cache", + build: true, + fbcFS: validFS, + mod: func(t *testing.T, tc *testCase, cacheDir string, b backend) { + require.NoError(t, b.PutBundle(context.Background(), bundleKey{"foo", "bar", "baz"}, &api.Bundle{PackageName: "foo", ChannelName: "bar", CsvName: "baz"})) + }, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "cache requires rebuild") + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cacheDir := t.TempDir() + c := &cache{backend: newPogrebV1Backend(cacheDir), log: log.Null()} + + if tc.build { + require.NoError(t, c.Build(context.Background(), tc.fbcFS)) + } + if tc.mod != nil { + tc.mod(t, &tc, cacheDir, c.backend) + } + tc.expect(t, c.CheckIntegrity(context.Background(), tc.fbcFS)) + }) + } +} diff --git a/pkg/lib/log/null.go b/pkg/lib/log/null.go new file mode 100644 index 000000000..9b7886c0c --- /dev/null +++ b/pkg/lib/log/null.go @@ -0,0 +1,13 @@ +package log + +import ( + "io" + + "github.com/sirupsen/logrus" +) + +func Null() *logrus.Entry { + l := logrus.New() + l.SetOutput(io.Discard) + return logrus.NewEntry(l) +} diff --git a/pkg/lib/registry/registry_test.go b/pkg/lib/registry/registry_test.go index 13e72f700..71092faf8 100644 --- a/pkg/lib/registry/registry_test.go +++ b/pkg/lib/registry/registry_test.go @@ -102,7 +102,7 @@ func newCache(t *testing.T, bundles []*model.Bundle) cache.Cache { require.NoError(t, err) require.NoError(t, reg.Build(context.Background(), os.DirFS(fbcDir))) - require.NoError(t, reg.Load()) + require.NoError(t, reg.Load(context.Background())) return reg } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 4c0dab760..cdd5c33ac 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -32,11 +32,11 @@ const ( dbAddress = "localhost" + dbPort dbName = "test.db" - jsonCachePort = ":50053" - jsonCacheAddress = "localhost" + jsonCachePort + cachePort = ":50053" + cacheAddress = "localhost" + cachePort - jsonDeprecationCachePort = ":50054" - jsonDeprecationCacheAddress = "localhost" + jsonDeprecationCachePort + deprecationCachePort = ":50054" + deprecationCacheAddress = "localhost" + deprecationCachePort ) func createDBStore(dbPath string) *sqlite.SQLQuerier { @@ -69,23 +69,29 @@ func createDBStore(dbPath string) *sqlite.SQLQuerier { return store } -func fbcJsonCache(catalogDir, cacheDir string) (fbccache.Cache, error) { - store := fbccache.NewJSON(cacheDir) +func fbcCache(catalogDir, cacheDir string) (fbccache.Cache, error) { + store, err := fbccache.New(cacheDir) + if err != nil { + return nil, err + } if err := store.Build(context.Background(), os.DirFS(catalogDir)); err != nil { return nil, err } - if err := store.Load(); err != nil { + if err := store.Load(context.Background()); err != nil { return nil, err } return store, nil } -func fbcJsonCacheFromFs(catalogFS fs.FS, cacheDir string) (fbccache.Cache, error) { - store := fbccache.NewJSON(cacheDir) +func fbcCacheFromFs(catalogFS fs.FS, cacheDir string) (fbccache.Cache, error) { + store, err := fbccache.New(cacheDir) + if err != nil { + return nil, err + } if err := store.Build(context.Background(), catalogFS); err != nil { return nil, err } - if err := store.Load(); err != nil { + if err := store.Load(context.Background()); err != nil { return nil, err } return store, nil @@ -124,17 +130,17 @@ func TestMain(m *testing.M) { grpcServer := server(dbStore) - fbcJsonStore, err := fbcJsonCache(fbcDir, filepath.Join(tmpDir, "json-cache")) + fbcStore, err := fbcCache(fbcDir, filepath.Join(tmpDir, "cache")) if err != nil { - logrus.Fatalf("failed to create json cache: %v", err) + logrus.Fatalf("failed to create cache: %v", err) } - fbcServerSimple := server(fbcJsonStore) + fbcServerSimple := server(fbcStore) - fbcJsonDeprecationStore, err := fbcJsonCacheFromFs(validFS, filepath.Join(tmpDir, "json-deprecation-cache")) + fbcDeprecationStore, err := fbcCacheFromFs(validFS, filepath.Join(tmpDir, "deprecation-cache")) if err != nil { - logrus.Fatalf("failed to create json deprecation cache: %v", err) + logrus.Fatalf("failed to create deprecation cache: %v", err) } - fbcServerDeprecations := server(fbcJsonDeprecationStore) + fbcServerDeprecations := server(fbcDeprecationStore) go func() { lis, err := net.Listen("tcp", dbPort) @@ -146,21 +152,21 @@ func TestMain(m *testing.M) { } }() go func() { - lis, err := net.Listen("tcp", jsonCachePort) + lis, err := net.Listen("tcp", cachePort) if err != nil { logrus.Fatalf("failed to listen: %v", err) } if err := fbcServerSimple.Serve(lis); err != nil { - logrus.Fatalf("failed to serve fbc json cache: %v", err) + logrus.Fatalf("failed to serve fbc cache: %v", err) } }() go func() { - lis, err := net.Listen("tcp", jsonDeprecationCacheAddress) + lis, err := net.Listen("tcp", deprecationCacheAddress) if err != nil { logrus.Fatalf("failed to listen: %v", err) } if err := fbcServerDeprecations.Serve(lis); err != nil { - logrus.Fatalf("failed to serve fbc json cache: %v", err) + logrus.Fatalf("failed to serve fbc cache: %v", err) } }() exit := m.Run() @@ -186,8 +192,8 @@ func TestListPackages(t *testing.T) { ) t.Run("Sqlite", testListPackages(dbAddress, listPackagesExpected)) - t.Run("FBCJsonCache", testListPackages(jsonCacheAddress, listPackagesExpected)) - t.Run("FBCJsonCacheWithDeprecations", testListPackages(jsonDeprecationCacheAddress, listPackagesExpectedDep)) + t.Run("FBCCache", testListPackages(cacheAddress, listPackagesExpected)) + t.Run("FBCCacheWithDeprecations", testListPackages(deprecationCacheAddress, listPackagesExpectedDep)) } func testListPackages(addr string, expected []string) func(*testing.T) { @@ -262,8 +268,8 @@ func TestGetPackage(t *testing.T) { } ) t.Run("Sqlite", testGetPackage(dbAddress, getPackageExpected)) - t.Run("FBCJsonCache", testGetPackage(jsonCacheAddress, getPackageExpected)) - t.Run("FBCJsonCacheWithDeprecations", testGetPackage(jsonDeprecationCacheAddress, getPackageExpectedDep)) + t.Run("FBCCache", testGetPackage(cacheAddress, getPackageExpected)) + t.Run("FBCCacheWithDeprecations", testGetPackage(deprecationCacheAddress, getPackageExpectedDep)) } func testGetPackage(addr string, expected *api.Package) func(*testing.T) { @@ -314,8 +320,8 @@ func TestGetBundle(t *testing.T) { } ) t.Run("Sqlite", testGetBundle(dbAddress, etcdoperator_v0_9_2("alpha", false, false, includeManifestsAll))) - t.Run("FBCJsonCache", testGetBundle(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) - t.Run("FBCJsonCacheWithDeprecations", testGetBundle(jsonDeprecationCacheAddress, cockroachBundle)) + t.Run("FBCCache", testGetBundle(cacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) + t.Run("FBCCacheWithDeprecations", testGetBundle(deprecationCacheAddress, cockroachBundle)) } func testGetBundle(addr string, expected *api.Bundle) func(*testing.T) { @@ -338,7 +344,7 @@ func TestGetBundleForChannel(t *testing.T) { CsvJson: b.CsvJson + "\n", })) } - t.Run("FBCJsonCache", testGetBundleForChannel(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) + t.Run("FBCCache", testGetBundleForChannel(cacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) } func testGetBundleForChannel(addr string, expected *api.Bundle) func(*testing.T) { @@ -386,8 +392,8 @@ func TestGetChannelEntriesThatReplace(t *testing.T) { ) t.Run("Sqlite", testGetChannelEntriesThatReplace(dbAddress, getChannelEntriesThatReplaceExpected)) - t.Run("FBCJsonCache", testGetChannelEntriesThatReplace(jsonCacheAddress, getChannelEntriesThatReplaceExpected)) - t.Run("FBCJsonCacheWithDeprecations", testGetChannelEntriesThatReplace(jsonDeprecationCacheAddress, getChannelEntriesThatReplaceExpectedDep)) + t.Run("FBCCache", testGetChannelEntriesThatReplace(cacheAddress, getChannelEntriesThatReplaceExpected)) + t.Run("FBCCacheWithDeprecations", testGetChannelEntriesThatReplace(deprecationCacheAddress, getChannelEntriesThatReplaceExpectedDep)) } func testGetChannelEntriesThatReplace(addr string, expected []*api.ChannelEntry) func(*testing.T) { @@ -443,7 +449,7 @@ func testGetChannelEntriesThatReplace(addr string, expected []*api.ChannelEntry) func TestGetBundleThatReplaces(t *testing.T) { t.Run("Sqlite", testGetBundleThatReplaces(dbAddress, etcdoperator_v0_9_2("alpha", false, false, includeManifestsAll))) - t.Run("FBCJsonCache", testGetBundleThatReplaces(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) + t.Run("FBCCache", testGetBundleThatReplaces(cacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) } func testGetBundleThatReplaces(addr string, expected *api.Bundle) func(*testing.T) { @@ -459,7 +465,7 @@ func testGetBundleThatReplaces(addr string, expected *api.Bundle) func(*testing. func TestGetBundleThatReplacesSynthetic(t *testing.T) { t.Run("Sqlite", testGetBundleThatReplacesSynthetic(dbAddress, etcdoperator_v0_9_2("alpha", false, false, includeManifestsAll))) - t.Run("FBCJsonCache", testGetBundleThatReplacesSynthetic(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) + t.Run("FBCCache", testGetBundleThatReplacesSynthetic(cacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) } func testGetBundleThatReplacesSynthetic(addr string, expected *api.Bundle) func(*testing.T) { @@ -476,7 +482,7 @@ func testGetBundleThatReplacesSynthetic(addr string, expected *api.Bundle) func( func TestGetChannelEntriesThatProvide(t *testing.T) { t.Run("Sqlite", testGetChannelEntriesThatProvide(dbAddress)) - t.Run("FBCJsonCache", testGetChannelEntriesThatProvide(jsonCacheAddress)) + t.Run("FBCCache", testGetChannelEntriesThatProvide(cacheAddress)) } func testGetChannelEntriesThatProvide(addr string) func(t *testing.T) { @@ -593,7 +599,7 @@ func testGetChannelEntriesThatProvide(addr string) func(t *testing.T) { func TestGetLatestChannelEntriesThatProvide(t *testing.T) { t.Run("Sqlite", testGetLatestChannelEntriesThatProvide(dbAddress)) - t.Run("FBCJsonCache", testGetLatestChannelEntriesThatProvide(jsonCacheAddress)) + t.Run("FBCCache", testGetLatestChannelEntriesThatProvide(cacheAddress)) } func testGetLatestChannelEntriesThatProvide(addr string) func(t *testing.T) { @@ -669,7 +675,7 @@ func testGetLatestChannelEntriesThatProvide(addr string) func(t *testing.T) { func TestGetDefaultBundleThatProvides(t *testing.T) { t.Run("Sqlite", testGetDefaultBundleThatProvides(dbAddress, etcdoperator_v0_9_2("alpha", false, false, includeManifestsAll))) - t.Run("FBCJsonCache", testGetDefaultBundleThatProvides(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) + t.Run("FBCCache", testGetDefaultBundleThatProvides(cacheAddress, etcdoperator_v0_9_2("alpha", false, true, includeManifestsCSVOnly))) } func testGetDefaultBundleThatProvides(addr string, expected *api.Bundle) func(*testing.T) { @@ -687,7 +693,7 @@ func TestListBundles(t *testing.T) { t.Run("Sqlite", testListBundles(dbAddress, etcdoperator_v0_9_2("alpha", true, false, includeManifestsNone), etcdoperator_v0_9_2("stable", true, false, includeManifestsNone))) - t.Run("FBCJsonCache", testListBundles(jsonCacheAddress, + t.Run("FBCCache", testListBundles(cacheAddress, etcdoperator_v0_9_2("alpha", true, true, includeManifestsNone), etcdoperator_v0_9_2("stable", true, true, includeManifestsNone))) }