Skip to content

Commit

Permalink
plugins/bundle: Avoid race during bundle reconfiguration and activation
Browse files Browse the repository at this point in the history
This change attempts to fix a race condition that could occur
when a reconfiguration on the bundle plugin occurs in parallel
with the activation of a downloaded bundle.

One scenario where this could occur is when the discovery plugin
attempts to reconfigure the bundle plugin and concurrently a bundle gets
downloaded and needs to be activated. A reconfig operation will perform
a write on the plugin's config. During a bundle activation, the plugin's
config is read. Currently we hold a lock when the config is being updated.
This change locks the config while reading thereby avoiding the race.

Fixes: #6849

Co-authored-by: Pushpalanka Jayawardhana <pushpalanka.jayawardhana@zalando.de>
Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar and Pushpalanka committed Aug 14, 2024
1 parent 35b07b5 commit 5c0f300
Showing 1 changed file with 52 additions and 20 deletions.
72 changes: 52 additions & 20 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Plugin struct {
downloaders map[string]Loader
logger logging.Logger
mtx sync.Mutex
cfgMtx sync.Mutex
cfgMtx sync.RWMutex
ready bool
bundlePersistPath string
stopped bool
Expand Down Expand Up @@ -149,16 +149,17 @@ func (p *Plugin) Stop(ctx context.Context) {
func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {
// Reconfiguring should not occur in parallel, lock to ensure
// nothing swaps underneath us with the current p.config and the updated one.
// Use p.cfgMtx instead of p.mtx so as to not block any bundle downloads/activations
// Use p.cfgMtx instead of p.mtx to not block any bundle downloads/activations
// that are in progress. We upgrade to p.mtx locking after stopping downloaders.
p.cfgMtx.Lock()
defer p.cfgMtx.Unlock()

// Look for any bundles that have had their config changed, are new, or have been removed
newConfig := config.(*Config)
newBundles, updatedBundles, deletedBundles := p.configDelta(newConfig)
p.config = *newConfig

p.cfgMtx.Unlock()

if len(updatedBundles) == 0 && len(newBundles) == 0 && len(deletedBundles) == 0 {
// no relevant config changes
return
Expand Down Expand Up @@ -213,7 +214,8 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {

readyNow := p.ready

for name, source := range p.config.Bundles {
bundles := p.getBundlesCpy()
for name, source := range bundles {
_, updated := updatedBundles[name]
_, isNew := newBundles[name]

Expand All @@ -225,7 +227,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {
p.log(name).Info("Bundle loader configuration changed. Restarting bundle loader.")
}

downloader := p.newDownloader(name, source)
downloader := p.newDownloader(name, source, bundles)

etag := p.readBundleEtagFromStore(ctx, name)
downloader.SetCache(etag)
Expand Down Expand Up @@ -310,14 +312,20 @@ func (p *Plugin) UnregisterBulkListener(name interface{}) {

// Config returns the plugins current configuration
func (p *Plugin) Config() *Config {
return &p.config
p.cfgMtx.RLock()
defer p.cfgMtx.RUnlock()
return &Config{
Name: p.config.Name,
Bundles: p.getBundlesCpy(),
}
}

func (p *Plugin) initDownloaders(ctx context.Context) {
bundles := p.getBundlesCpy()

// Initialize a downloader for each bundle configured.
for name, source := range p.config.Bundles {
downloader := p.newDownloader(name, source)
for name, source := range bundles {
downloader := p.newDownloader(name, source, bundles)

etag := p.readBundleEtagFromStore(ctx, name)
downloader.SetCache(etag)
Expand Down Expand Up @@ -351,8 +359,14 @@ func (p *Plugin) loadAndActivateBundlesFromDisk(ctx context.Context) {

persistedBundles := map[string]*bundle.Bundle{}

for name, src := range p.config.Bundles {
if p.persistBundle(name) {
bundles := p.getBundlesCpy()

p.cfgMtx.RLock()
isMultiBundle := p.config.IsMultiBundle()
p.cfgMtx.RUnlock()

for name, src := range bundles {
if p.persistBundle(name, bundles) {
b, err := p.loadBundleFromDisk(p.bundlePersistPath, name, src)
if err != nil {
p.log(name).Error("Failed to load bundle from disk: %v", err)
Expand All @@ -379,7 +393,7 @@ func (p *Plugin) loadAndActivateBundlesFromDisk(ctx context.Context) {
p.status[name].Metrics = metrics.New()
p.status[name].Type = b.Type()

err := p.activate(ctx, name, b)
err := p.activate(ctx, name, b, isMultiBundle)
if err != nil {
p.log(name).Error("Bundle activation failed: %v", err)
p.status[name].SetError(err)
Expand All @@ -401,7 +415,7 @@ func (p *Plugin) loadAndActivateBundlesFromDisk(ctx context.Context) {
}
}

func (p *Plugin) newDownloader(name string, source *Source) Loader {
func (p *Plugin) newDownloader(name string, source *Source, bundles map[string]*Source) Loader {

if u, err := url.Parse(source.Resource); err == nil {
switch u.Scheme {
Expand Down Expand Up @@ -433,14 +447,14 @@ func (p *Plugin) newDownloader(name string, source *Source) Loader {
WithCallback(callback).
WithBundleVerificationConfig(source.Signing).
WithSizeLimitBytes(source.SizeLimitBytes).
WithBundlePersistence(p.persistBundle(name)).
WithBundlePersistence(p.persistBundle(name, bundles)).
WithBundleParserOpts(p.manager.ParserOptions())
}
return download.New(conf, client, path).
WithCallback(callback).
WithBundleVerificationConfig(source.Signing).
WithSizeLimitBytes(source.SizeLimitBytes).
WithBundlePersistence(p.persistBundle(name)).
WithBundlePersistence(p.persistBundle(name, bundles)).
WithLazyLoadingMode(true).
WithBundleName(name).
WithBundleParserOpts(p.manager.ParserOptions())
Expand Down Expand Up @@ -499,7 +513,11 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
p.status[name].Metrics.Timer(metrics.RegoLoadBundles).Start()
defer p.status[name].Metrics.Timer(metrics.RegoLoadBundles).Stop()

if err := p.activate(ctx, name, u.Bundle); err != nil {
p.cfgMtx.RLock()
isMultiBundle := p.config.IsMultiBundle()
p.cfgMtx.RUnlock()

if err := p.activate(ctx, name, u.Bundle, isMultiBundle); err != nil {
p.log(name).Error("Bundle activation failed: %v", err)
p.status[name].SetError(err)
if !p.stopped {
Expand All @@ -509,7 +527,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
return
}

if u.Bundle.Type() == bundle.SnapshotBundleType && p.persistBundle(name) {
if u.Bundle.Type() == bundle.SnapshotBundleType && p.persistBundle(name, p.getBundlesCpy()) {
p.log(name).Debug("Persisting bundle to disk in progress.")

err := p.saveBundleToDisk(name, u.Raw)
Expand Down Expand Up @@ -568,7 +586,7 @@ func (p *Plugin) checkPluginReadiness() {
}
}

func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) error {
func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle, isMultiBundle bool) error {
p.log(name).Debug("Bundle activation in progress (%v). Opening storage transaction.", b.Manifest.Revision)

params := storage.WriteParams
Expand Down Expand Up @@ -621,7 +639,7 @@ func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) er
}
}

if p.config.IsMultiBundle() {
if isMultiBundle {
activateErr = bundle.Activate(opts)
} else {
activateErr = bundle.ActivateLegacy(opts)
Expand All @@ -642,8 +660,8 @@ func (p *Plugin) activate(ctx context.Context, name string, b *bundle.Bundle) er
return err
}

func (p *Plugin) persistBundle(name string) bool {
bundleSrc := p.config.Bundles[name]
func (p *Plugin) persistBundle(name string, bundles map[string]*Source) bool {
bundleSrc := bundles[name]

if bundleSrc == nil {
return false
Expand All @@ -654,6 +672,9 @@ func (p *Plugin) persistBundle(name string) bool {
// configDelta will return a map of new bundle sources, updated bundle sources, and a set of deleted bundle names
func (p *Plugin) configDelta(newConfig *Config) (map[string]*Source, map[string]*Source, map[string]struct{}) {
deletedBundles := map[string]struct{}{}

// p.cfgMtx lock held at calling site, so we don't need
// to get a copy of the bundles map here
for name := range p.config.Bundles {
deletedBundles[name] = struct{}{}
}
Expand Down Expand Up @@ -724,6 +745,17 @@ func (p *Plugin) getBundlePersistPath() (string, error) {
return filepath.Join(persistDir, "bundles"), nil
}

func (p *Plugin) getBundlesCpy() map[string]*Source {
p.cfgMtx.RLock()
defer p.cfgMtx.RUnlock()
bundlesCpy := map[string]*Source{}
for k, v := range p.config.Bundles {
v := *v
bundlesCpy[k] = &v
}
return bundlesCpy
}

type fileLoader struct {
name string
path string
Expand Down

0 comments on commit 5c0f300

Please sign in to comment.