Skip to content

Commit

Permalink
pkg/cache: add preferred pogreb database cache impl (#1278)
Browse files Browse the repository at this point in the history
* declcfg: concurrently load and process files in WalkMetasFS

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>

* pkg/cache: add preferred pogreb database cache impl

* refactoring to avoid code duplication

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>

* pkg/cache: memory efficient cache building

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>

* cmd/opm/serve: improve logging related to caches

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>

---------

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed May 14, 2024
1 parent 6bb0a3e commit 5b5181b
Show file tree
Hide file tree
Showing 19 changed files with 1,266 additions and 633 deletions.
11 changes: 2 additions & 9 deletions alpha/action/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/h2non/filetype"
"github.com/h2non/filetype/matchers"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/operator-framework/operator-registry/alpha/declcfg"
Expand All @@ -26,6 +24,7 @@ import (
"github.com/operator-framework/operator-registry/pkg/image"
"github.com/operator-framework/operator-registry/pkg/image/containerdregistry"
"github.com/operator-framework/operator-registry/pkg/lib/bundle"
"github.com/operator-framework/operator-registry/pkg/lib/log"
"github.com/operator-framework/operator-registry/pkg/registry"
"github.com/operator-framework/operator-registry/pkg/sqlite"
)
Expand Down Expand Up @@ -61,12 +60,6 @@ type Render struct {
skipSqliteDeprecationLog bool
}

func nullLogger() *logrus.Entry {
logger := logrus.New()
logger.SetOutput(io.Discard)
return logrus.NewEntry(logger)
}

func (r Render) Run(ctx context.Context) (*declcfg.DeclarativeConfig, error) {
if r.skipSqliteDeprecationLog {
// exhaust once with a no-op function.
Expand Down Expand Up @@ -119,7 +112,7 @@ func (r Render) createRegistry() (*containerdregistry.Registry, error) {
// The containerd registry impl is somewhat verbose, even on the happy path,
// so discard all logger logs. Any important failures will be returned from
// registry methods and eventually logged as fatal errors.
containerdregistry.WithLog(nullLogger()),
containerdregistry.WithLog(log.Null()),
)
if err != nil {
return nil, err
Expand Down
263 changes: 123 additions & 140 deletions alpha/declcfg/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"sync"

"github.com/joelanford/ignore"
"github.com/operator-framework/api/pkg/operators"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"

"github.com/operator-framework/api/pkg/operators"

"github.com/operator-framework/operator-registry/alpha/property"
)

Expand All @@ -25,22 +26,42 @@ const (

type WalkMetasFSFunc func(path string, meta *Meta, err error) error

func WalkMetasFS(root fs.FS, walkFn WalkMetasFSFunc) error {
return walkFiles(root, func(root fs.FS, path string, err error) error {
if err != nil {
return walkFn(path, nil, err)
}
// WalkMetasFS walks the filesystem rooted at root and calls walkFn for each individual meta object found in the root.
// By default, WalkMetasFS is not thread-safe because it invokes walkFn concurrently. In order to make it thread-safe,
// use the WithConcurrency(1) to avoid concurrent invocations of walkFn.
func WalkMetasFS(ctx context.Context, root fs.FS, walkFn WalkMetasFSFunc, opts ...LoadOption) error {
if root == nil {
return fmt.Errorf("no declarative config filesystem provided")
}

f, err := root.Open(path)
if err != nil {
return walkFn(path, nil, err)
}
defer f.Close()
options := LoadOptions{
concurrency: runtime.NumCPU(),
}
for _, opt := range opts {
opt(&options)
}

return WalkMetasReader(f, func(meta *Meta, err error) error {
return walkFn(path, meta, err)
})
pathChan := make(chan string, options.concurrency)

// Create an errgroup to manage goroutines. The context is closed when any
// goroutine returns an error. Goroutines should check the context
// to see if they should return early (in the case of another goroutine
// returning an error).
eg, ctx := errgroup.WithContext(ctx)

// Walk the FS and send paths to a channel for parsing.
eg.Go(func() error {
return sendPaths(ctx, root, pathChan)
})

// Parse paths concurrently. The waitgroup ensures that all paths are parsed
// before the cfgChan is closed.
for i := 0; i < options.concurrency; i++ {
eg.Go(func() error {
return parseMetaPaths(ctx, root, pathChan, walkFn, options)
})
}
return eg.Wait()
}

type WalkMetasReaderFunc func(meta *Meta, err error) error
Expand Down Expand Up @@ -126,59 +147,16 @@ func WithConcurrency(concurrency int) LoadOption {
// If LoadFS encounters an error loading or parsing any file, the error will be
// immediately returned.
func LoadFS(ctx context.Context, root fs.FS, opts ...LoadOption) (*DeclarativeConfig, error) {
if root == nil {
return nil, fmt.Errorf("no declarative config filesystem provided")
}

options := LoadOptions{
concurrency: runtime.NumCPU(),
}
for _, opt := range opts {
opt(&options)
}

var (
fcfg = &DeclarativeConfig{}
pathChan = make(chan string, options.concurrency)
cfgChan = make(chan *DeclarativeConfig, options.concurrency)
)

// Create an errgroup to manage goroutines. The context is closed when any
// goroutine returns an error. Goroutines should check the context
// to see if they should return early (in the case of another goroutine
// returning an error).
eg, ctx := errgroup.WithContext(ctx)

// Walk the FS and send paths to a channel for parsing.
eg.Go(func() error {
return sendPaths(ctx, root, pathChan)
})

// Parse paths concurrently. The waitgroup ensures that all paths are parsed
// before the cfgChan is closed.
var wg sync.WaitGroup
for i := 0; i < options.concurrency; i++ {
wg.Add(1)
eg.Go(func() error {
defer wg.Done()
return parsePaths(ctx, root, pathChan, cfgChan)
})
}

// Merge parsed configs into a single config.
eg.Go(func() error {
return mergeCfgs(ctx, cfgChan, fcfg)
})

// Wait for all path parsing goroutines to finish before closing cfgChan.
wg.Wait()
close(cfgChan)

// Wait for all goroutines to finish.
if err := eg.Wait(); err != nil {
builder := fbcBuilder{}
if err := WalkMetasFS(ctx, root, func(path string, meta *Meta, err error) error {
if err != nil {
return err
}
return builder.addMeta(meta)
}, opts...); err != nil {
return nil, err
}
return fcfg, nil
return &builder.cfg, nil
}

func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error {
Expand All @@ -196,7 +174,7 @@ func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error {
})
}

func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan chan<- *DeclarativeConfig) error {
func parseMetaPaths(ctx context.Context, root fs.FS, pathChan <-chan string, walkFn WalkMetasFSFunc, options LoadOptions) error {
for {
select {
case <-ctx.Done(): // don't block on receiving from pathChan
Expand All @@ -205,51 +183,35 @@ func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan
if !ok {
return nil
}
cfg, err := LoadFile(root, path)
file, err := root.Open(path)
if err != nil {
return err
}
select {
case cfgChan <- cfg:
case <-ctx.Done(): // don't block on sending to cfgChan
return ctx.Err()
if err := WalkMetasReader(file, func(meta *Meta, err error) error {
return walkFn(path, meta, err)
}); err != nil {
return err
}
}
}
}

func mergeCfgs(ctx context.Context, cfgChan <-chan *DeclarativeConfig, fcfg *DeclarativeConfig) error {
for {
select {
case <-ctx.Done(): // don't block on receiving from cfgChan
return ctx.Err()
case cfg, ok := <-cfgChan:
if !ok {
return nil
}
fcfg.Merge(cfg)
func readBundleObjects(b *Bundle) error {
var obj property.BundleObject
for i, props := range b.Properties {
if props.Type != property.TypeBundleObject {
continue
}
}
}

func readBundleObjects(bundles []Bundle) error {
for bi, b := range bundles {
var obj property.BundleObject
for i, props := range b.Properties {
if props.Type != property.TypeBundleObject {
continue
}
if err := json.Unmarshal(props.Value, &obj); err != nil {
return fmt.Errorf("package %q, bundle %q: parse property at index %d as bundle object: %v", b.Package, b.Name, i, err)
}
objJson, err := yaml.ToJSON(obj.Data)
if err != nil {
return fmt.Errorf("package %q, bundle %q: convert bundle object property at index %d to JSON: %v", b.Package, b.Name, i, err)
}
bundles[bi].Objects = append(bundles[bi].Objects, string(objJson))
if err := json.Unmarshal(props.Value, &obj); err != nil {
return fmt.Errorf("package %q, bundle %q: parse property at index %d as bundle object: %v", b.Package, b.Name, i, err)
}
bundles[bi].CsvJSON = extractCSV(bundles[bi].Objects)
objJson, err := yaml.ToJSON(obj.Data)
if err != nil {
return fmt.Errorf("package %q, bundle %q: convert bundle object property at index %d to JSON: %v", b.Package, b.Name, i, err)
}
b.Objects = append(b.Objects, string(objJson))
}
b.CsvJSON = extractCSV(b.Objects)
return nil
}

Expand All @@ -268,52 +230,16 @@ func extractCSV(objs []string) string {

// LoadReader reads yaml or json from the passed in io.Reader and unmarshals it into a DeclarativeConfig struct.
func LoadReader(r io.Reader) (*DeclarativeConfig, error) {
cfg := &DeclarativeConfig{}

if err := WalkMetasReader(r, func(in *Meta, err error) error {
builder := fbcBuilder{}
if err := WalkMetasReader(r, func(meta *Meta, err error) error {
if err != nil {
return err
}
switch in.Schema {
case SchemaPackage:
var p Package
if err := json.Unmarshal(in.Blob, &p); err != nil {
return fmt.Errorf("parse package: %v", err)
}
cfg.Packages = append(cfg.Packages, p)
case SchemaChannel:
var c Channel
if err := json.Unmarshal(in.Blob, &c); err != nil {
return fmt.Errorf("parse channel: %v", err)
}
cfg.Channels = append(cfg.Channels, c)
case SchemaBundle:
var b Bundle
if err := json.Unmarshal(in.Blob, &b); err != nil {
return fmt.Errorf("parse bundle: %v", err)
}
cfg.Bundles = append(cfg.Bundles, b)
case SchemaDeprecation:
var d Deprecation
if err := json.Unmarshal(in.Blob, &d); err != nil {
return fmt.Errorf("parse deprecation: %w", err)
}
cfg.Deprecations = append(cfg.Deprecations, d)
case "":
return fmt.Errorf("object '%s' is missing root schema field", string(in.Blob))
default:
cfg.Others = append(cfg.Others, *in)
}
return nil
return builder.addMeta(meta)
}); err != nil {
return nil, err
}

if err := readBundleObjects(cfg.Bundles); err != nil {
return nil, fmt.Errorf("read bundle objects: %v", err)
}

return cfg, nil
return &builder.cfg, nil
}

// LoadFile will unmarshall declarative config components from a single filename provided in 'path'
Expand All @@ -332,3 +258,60 @@ func LoadFile(root fs.FS, path string) (*DeclarativeConfig, error) {

return cfg, nil
}

type fbcBuilder struct {
cfg DeclarativeConfig

packagesMu sync.Mutex
channelsMu sync.Mutex
bundlesMu sync.Mutex
deprecationsMu sync.Mutex
othersMu sync.Mutex
}

func (c *fbcBuilder) addMeta(in *Meta) error {
switch in.Schema {
case SchemaPackage:
var p Package
if err := json.Unmarshal(in.Blob, &p); err != nil {
return fmt.Errorf("parse package: %v", err)
}
c.packagesMu.Lock()
c.cfg.Packages = append(c.cfg.Packages, p)
c.packagesMu.Unlock()
case SchemaChannel:
var ch Channel
if err := json.Unmarshal(in.Blob, &ch); err != nil {
return fmt.Errorf("parse channel: %v", err)
}
c.channelsMu.Lock()
c.cfg.Channels = append(c.cfg.Channels, ch)
c.channelsMu.Unlock()
case SchemaBundle:
var b Bundle
if err := json.Unmarshal(in.Blob, &b); err != nil {
return fmt.Errorf("parse bundle: %v", err)
}
if err := readBundleObjects(&b); err != nil {
return fmt.Errorf("read bundle objects: %v", err)
}
c.bundlesMu.Lock()
c.cfg.Bundles = append(c.cfg.Bundles, b)
c.bundlesMu.Unlock()
case SchemaDeprecation:
var d Deprecation
if err := json.Unmarshal(in.Blob, &d); err != nil {
return fmt.Errorf("parse deprecation: %w", err)
}
c.deprecationsMu.Lock()
c.cfg.Deprecations = append(c.cfg.Deprecations, d)
c.deprecationsMu.Unlock()
case "":
return fmt.Errorf("object '%s' is missing root schema field", string(in.Blob))
default:
c.othersMu.Lock()
c.cfg.Others = append(c.cfg.Others, *in)
c.othersMu.Unlock()
}
return nil
}
Loading

0 comments on commit 5b5181b

Please sign in to comment.