Skip to content

Commit

Permalink
declcfg/load: improvements (#1106)
Browse files Browse the repository at this point in the history
* *: thread through context into FBC load

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>

* declcfg/load: allow configuring the parallelism

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>

---------

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov authored Jun 6, 2023
1 parent b51aaf0 commit 647537d
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 53 deletions.
4 changes: 2 additions & 2 deletions alpha/action/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (r Render) renderReference(ctx context.Context, ref string) (*declcfg.Decla
if !r.AllowedRefMask.Allowed(RefDCDir) {
return nil, fmt.Errorf("cannot render declarative config directory: %w", ErrNotAllowed)
}
return declcfg.LoadFS(os.DirFS(ref))
return declcfg.LoadFS(ctx, os.DirFS(ref))
} else {
// The only supported file type is an sqlite DB file,
// since declarative configs will be in a directory.
Expand Down Expand Up @@ -169,7 +169,7 @@ func (r Render) imageToDeclcfg(ctx context.Context, imageRef string) (*declcfg.D
if !r.AllowedRefMask.Allowed(RefDCImage) {
return nil, fmt.Errorf("cannot render declarative config image: %w", ErrNotAllowed)
}
cfg, err = declcfg.LoadFS(os.DirFS(filepath.Join(tmpDir, configsDir)))
cfg, err = declcfg.LoadFS(ctx, os.DirFS(filepath.Join(tmpDir, configsDir)))
if err != nil {
return nil, err
}
Expand Down
29 changes: 23 additions & 6 deletions alpha/declcfg/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,29 +109,46 @@ func walkFiles(root fs.FS, fn func(root fs.FS, path string, err error) error) er
})
}

type LoadOptions struct {
concurrency int
}

type LoadOption func(*LoadOptions)

func WithConcurrency(concurrency int) LoadOption {
return func(opts *LoadOptions) {
opts.concurrency = concurrency
}
}

// LoadFS loads a declarative config from the provided root FS. LoadFS walks the
// filesystem from root and uses a gitignore-style filename matcher to skip files
// that match patterns found in .indexignore files found throughout the filesystem.
// If LoadFS encounters an error loading or parsing any file, the error will be
// immediately returned.
func LoadFS(root fs.FS) (*DeclarativeConfig, error) {
func LoadFS(ctx context.Context, root fs.FS, opts ...LoadOption) (*DeclarativeConfig, error) {
if root == nil {
return nil, fmt.Errorf("no declarative config filesystem provided")
}

concurrency := runtime.NumCPU()
options := LoadOptions{
concurrency: runtime.NumCPU(),
}
for _, opt := range opts {
opt(&options)
}

var (
fcfg = &DeclarativeConfig{}
pathChan = make(chan string, concurrency)
cfgChan = make(chan *DeclarativeConfig, concurrency)
pathChan = make(chan string, options.concurrency)
cfgChan = make(chan *DeclarativeConfig, options.concurrency)
)

// Create an errgroup to manage goroutines. The context is closed when any
// goroutine returns an error. Goroutines should check the context
// to see if they should return early (in the case of another goroutine
// returning an error).
eg, ctx := errgroup.WithContext(context.Background())
eg, ctx := errgroup.WithContext(ctx)

// Walk the FS and send paths to a channel for parsing.
eg.Go(func() error {
Expand All @@ -141,7 +158,7 @@ func LoadFS(root fs.FS) (*DeclarativeConfig, error) {
// Parse paths concurrently. The waitgroup ensures that all paths are parsed
// before the cfgChan is closed.
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
for i := 0; i < options.concurrency; i++ {
wg.Add(1)
eg.Go(func() error {
defer wg.Done()
Expand Down
28 changes: 17 additions & 11 deletions alpha/declcfg/load_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package declcfg_test

import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"os"
"runtime"
"testing"

"github.com/blang/semver/v4"
Expand All @@ -20,18 +22,22 @@ func BenchmarkLoadFS(b *testing.B) {
fbc := generateFBC(b, 300, 450, 3000)
b.ResetTimer()

for i := 0; i < b.N; i++ {
b.StopTimer()
tempDir := b.TempDir()
if err := declcfg.WriteFS(*fbc, tempDir, declcfg.WriteJSON, ".json"); err != nil {
b.Error(err)
}
b.StartTimer()
for _, n := range []int{1, runtime.NumCPU(), 2 * runtime.NumCPU()} {
b.Run(fmt.Sprintf("%d routines", n), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
tempDir := b.TempDir()
if err := declcfg.WriteFS(*fbc, tempDir, declcfg.WriteJSON, ".json"); err != nil {
b.Error(err)
}
b.StartTimer()

_, err := declcfg.LoadFS(os.DirFS(tempDir))
if err != nil {
b.Error(err)
}
_, err := declcfg.LoadFS(context.Background(), os.DirFS(tempDir), declcfg.WithConcurrency(n))
if err != nil {
b.Error(err)
}
}
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion alpha/declcfg/load_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package declcfg

import (
"context"
"encoding/json"
"io/fs"
"os"
Expand Down Expand Up @@ -338,7 +339,7 @@ func TestLoadFS(t *testing.T) {

for _, s := range specs {
t.Run(s.name, func(t *testing.T) {
cfg, err := LoadFS(s.fsys)
cfg, err := LoadFS(context.Background(), s.fsys)
s.assertion(t, err)
if err == nil {
require.NotNil(t, cfg)
Expand Down
22 changes: 11 additions & 11 deletions alpha/template/composite/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BuilderConfig struct {

type Builder interface {
Build(ctx context.Context, reg image.Registry, dir string, td TemplateDefinition) error
Validate(dir string) error
Validate(ctx context.Context, dir string) error
}

type BasicBuilder struct {
Expand Down Expand Up @@ -94,8 +94,8 @@ func (bb *BasicBuilder) Build(ctx context.Context, reg image.Registry, dir strin
return build(dcfg, destPath, bb.builderCfg.OutputType)
}

func (bb *BasicBuilder) Validate(dir string) error {
return validate(bb.builderCfg, dir)
func (bb *BasicBuilder) Validate(ctx context.Context, dir string) error {
return validate(ctx, bb.builderCfg, dir)
}

type SemverBuilder struct {
Expand Down Expand Up @@ -156,8 +156,8 @@ func (sb *SemverBuilder) Build(ctx context.Context, reg image.Registry, dir stri
return build(dcfg, destPath, sb.builderCfg.OutputType)
}

func (sb *SemverBuilder) Validate(dir string) error {
return validate(sb.builderCfg, dir)
func (sb *SemverBuilder) Validate(ctx context.Context, dir string) error {
return validate(ctx, sb.builderCfg, dir)
}

type RawBuilder struct {
Expand Down Expand Up @@ -216,8 +216,8 @@ func (rb *RawBuilder) Build(ctx context.Context, _ image.Registry, dir string, t
return build(dcfg, destPath, rb.builderCfg.OutputType)
}

func (rb *RawBuilder) Validate(dir string) error {
return validate(rb.builderCfg, dir)
func (rb *RawBuilder) Validate(ctx context.Context, dir string) error {
return validate(ctx, rb.builderCfg, dir)
}

type CustomBuilder struct {
Expand Down Expand Up @@ -285,8 +285,8 @@ func (cb *CustomBuilder) Build(ctx context.Context, reg image.Registry, dir stri
return build(dcfg, destPath, cb.builderCfg.OutputType)
}

func (cb *CustomBuilder) Validate(dir string) error {
return validate(cb.builderCfg, dir)
func (cb *CustomBuilder) Validate(ctx context.Context, dir string) error {
return validate(ctx, cb.builderCfg, dir)
}

func writeDeclCfg(dcfg declcfg.DeclarativeConfig, w io.Writer, output string) error {
Expand All @@ -300,7 +300,7 @@ func writeDeclCfg(dcfg declcfg.DeclarativeConfig, w io.Writer, output string) er
}
}

func validate(builderCfg BuilderConfig, dir string) error {
func validate(ctx context.Context, builderCfg BuilderConfig, dir string) error {

path := path.Join(builderCfg.WorkingDir, dir)
s, err := os.Stat(path)
Expand All @@ -311,7 +311,7 @@ func validate(builderCfg BuilderConfig, dir string) error {
return fmt.Errorf("%q is not a directory", path)
}

if err := config.Validate(os.DirFS(path)); err != nil {
if err := config.Validate(ctx, os.DirFS(path)); err != nil {
return fmt.Errorf("validation failure in path %q: %v", path, err)
}
return nil
Expand Down
10 changes: 5 additions & 5 deletions alpha/template/composite/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestBasicBuilder(t *testing.T) {
tc.buildAssertions(t, outPath, buildErr)

if tc.validate {
validateErr := tc.basicBuilder.Validate(outDir)
validateErr := tc.basicBuilder.Validate(context.Background(), outDir)
tc.validateAssertions(t, validateErr)
}
})
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestSemverBuilder(t *testing.T) {
tc.buildAssertions(t, outPath, buildErr)

if tc.validate {
validateErr := tc.semverBuilder.Validate(outDir)
validateErr := tc.semverBuilder.Validate(context.Background(), outDir)
tc.validateAssertions(t, validateErr)
}
})
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func TestRawBuilder(t *testing.T) {
tc.buildAssertions(t, outPath, buildErr)

if tc.validate {
validateErr := tc.rawBuilder.Validate(outDir)
validateErr := tc.rawBuilder.Validate(context.Background(), outDir)
tc.validateAssertions(t, validateErr)
}
})
Expand Down Expand Up @@ -1575,7 +1575,7 @@ func TestCustomBuilder(t *testing.T) {
tc.buildAssertions(t, outPath, buildErr)

if tc.validate {
validateErr := tc.customBuilder.Validate(outDir)
validateErr := tc.customBuilder.Validate(context.Background(), outDir)
tc.validateAssertions(t, validateErr)
}
})
Expand Down Expand Up @@ -1759,7 +1759,7 @@ const customBuiltFbcJson = `{
`

func TestValidateFailure(t *testing.T) {
err := validate(BuilderConfig{}, "")
err := validate(context.Background(), BuilderConfig{}, "")
require.Error(t, err)
require.Contains(t, err.Error(), "no such file or directory")
}
2 changes: 1 addition & 1 deletion alpha/template/composite/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (t *Template) Render(ctx context.Context, validate bool) error {

if validate {
// run the validation for the builder
err = builder.Validate(component.Destination.Path)
err = builder.Validate(ctx, component.Destination.Path)
if err != nil {
return fmt.Errorf("validating component %q: %w", component.Name, err)
}
Expand Down
2 changes: 1 addition & 1 deletion alpha/template/composite/composite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (tb *TestBuilder) Build(ctx context.Context, reg image.Registry, dir string
return nil
}

func (tb *TestBuilder) Validate(dir string) error {
func (tb *TestBuilder) Validate(ctx context.Context, dir string) error {
if tb.validateShouldError {
return fmt.Errorf("validate error!")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/opm/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *serve) run(ctx context.Context) error {
return err
}
} else {
if err := cache.LoadOrRebuild(store, os.DirFS(s.configDir)); err != nil {
if err := cache.LoadOrRebuild(ctx, store, os.DirFS(s.configDir)); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/opm/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewCmd() *cobra.Command {
Short: "Validate the declarative index config",
Long: "Validate the declarative config JSON file(s) in a given directory",
Args: cobra.ExactArgs(1),
RunE: func(_ *cobra.Command, args []string) error {
RunE: func(c *cobra.Command, args []string) error {
directory := args[0]
s, err := os.Stat(directory)
if err != nil {
Expand All @@ -27,7 +27,7 @@ func NewCmd() *cobra.Command {
return fmt.Errorf("%q is not a directory", directory)
}

if err := config.Validate(os.DirFS(directory)); err != nil {
if err := config.Validate(c.Context(), os.DirFS(directory)); err != nil {
logger.Fatal(err)
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type Cache interface {
registry.GRPCQuery

CheckIntegrity(fbc fs.FS) error
Build(fbc fs.FS) error
Build(ctx context.Context, fbc fs.FS) error
Load() error
}

func LoadOrRebuild(c Cache, fbc fs.FS) error {
func LoadOrRebuild(ctx context.Context, c Cache, fbc fs.FS) error {
if err := c.CheckIntegrity(fbc); err != nil {
if err := c.Build(fbc); err != nil {
if err := c.Build(ctx, fbc); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func genTestCaches(t *testing.T, fbcFS fs.FS) []Cache {
}

for _, c := range caches {
err := c.Build(fbcFS)
err := c.Build(context.Background(), fbcFS)
require.NoError(t, err)
err = c.Load()
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (q *JSON) computeDigest(fbcFsys fs.FS) (string, error) {
return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil
}

func (q *JSON) Build(fbcFsys fs.FS) error {
func (q *JSON) Build(ctx context.Context, fbcFsys fs.FS) error {
// ensure that generated cache is available to all future users
oldUmask := umask(000)
defer umask(oldUmask)
Expand All @@ -190,7 +190,7 @@ func (q *JSON) Build(fbcFsys fs.FS) error {
return fmt.Errorf("ensure clean base directory: %v", err)
}

fbc, err := declcfg.LoadFS(fbcFsys)
fbc, err := declcfg.LoadFS(ctx, fbcFsys)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/json_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"io/fs"
"os"
"path/filepath"
Expand All @@ -12,7 +13,7 @@ import (
func TestJSON_StableDigest(t *testing.T) {
cacheDir := t.TempDir()
c := NewJSON(cacheDir)
require.NoError(t, c.Build(validFS))
require.NoError(t, c.Build(context.Background(), validFS))

actualDigest, err := c.existingDigest()
require.NoError(t, err)
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestJSON_CheckIntegrity(t *testing.T) {
c := NewJSON(cacheDir)

if tc.build {
require.NoError(t, c.Build(tc.fbcFS))
require.NoError(t, c.Build(context.Background(), tc.fbcFS))
}
if tc.mod != nil {
require.NoError(t, tc.mod(&tc, cacheDir))
Expand Down
5 changes: 3 additions & 2 deletions pkg/lib/config/validate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"io/fs"

"github.com/operator-framework/operator-registry/alpha/declcfg"
Expand All @@ -13,9 +14,9 @@ import (
// directory: a filesystem where declarative config file(s) exist
// Outputs:
// error: a wrapped error that contains a tree of error strings
func Validate(root fs.FS) error {
func Validate(ctx context.Context, root fs.FS) error {
// Load config files and convert them to declcfg objects
cfg, err := declcfg.LoadFS(root)
cfg, err := declcfg.LoadFS(ctx, root)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 647537d

Please sign in to comment.