Skip to content

Commit

Permalink
Update downloader's Etag based on bundle activation
Browse files Browse the repository at this point in the history
Earlier the Etag on the downloader would be updated unconditionally after every attempt to download a bundle. This could lead to a situation wherein a bundle fails to activate and would remain in an unactivated state since any subsequent downloads of the same version of the bundle would not trigger the activation process. This change attempts to resolve the issue by allowing the client to reset the Etag on the downloader incase of downloader errors and bundle activation failures. The drawback now is that we could end up re-downloading the same version of a bundle multiple times till it successfully activates. This situtation is likely to occur when using multiple bundle sources where a bundle may depend on some other. Generally using multiple bundle sources isn't recommended so the extra network traffic as a result of the re-downloads although not ideal may not too harmful.

Fixes #2220
Fixes #2279

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Apr 10, 2020
1 parent c5c8579 commit 78a6f18
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 33 deletions.
13 changes: 6 additions & 7 deletions docs/content/management.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ and data from multiple sources, you can implement your bundle service
to generate bundles that are scoped to a subset of OPA's policy and
data cache.

> We recommend that whenever possible, you implement policy and data
> 🚨 We recommend that whenever possible, you implement policy and data
> aggregation centrally, however, in some cases that's not possible
> (e.g., due to latency requirements.)
> (e.g., due to latency requirements.).
> When using multiple sources there are **no** ordering guarantees for which bundle loads first and
takes over some root. If multiple bundles conflict, but are loaded at different
times, OPA may go into an error state. It is highly recommended to use
the health check and include bundle state: [Monitoring OPA](../monitoring#health-checks)

To scope bundles to a subset of OPA's policy and data cache, include
a top-level `roots` key in the bundle that defines the roots of the
Expand Down Expand Up @@ -253,11 +257,6 @@ When OPA loads scoped bundles, it validates that:
If bundle validation fails, OPA will report the validation error via
the Status API.

> **Warning!** There are *no* ordering guarantees for which bundle loads first and
takes over some root. If multiple bundles conflict, but are loaded at different
times, OPA may go into an error state. It is highly recommended to use
the health check and include bundle state: [Monitoring OPA](#health-checks)

### Debugging Your Bundles

When you run OPA, you can provide bundle files over the command line. This
Expand Down
9 changes: 7 additions & 2 deletions download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (d *Downloader) WithLogAttrs(attrs [][2]string) *Downloader {
return d
}

// ClearCache resets the etag value on the downloader
func (d *Downloader) ClearCache() {
d.etag = ""
}

// Start tells the Downloader to begin downloading bundles.
func (d *Downloader) Start(ctx context.Context) {
go d.loop()
Expand Down Expand Up @@ -125,12 +130,12 @@ func (d *Downloader) oneShot(ctx context.Context) error {
m := metrics.New()
b, etag, err := d.download(ctx, m)

d.etag = etag

if d.f != nil {
d.f(ctx, Update{ETag: etag, Bundle: b, Error: err, Metrics: m})
}

d.etag = etag

return err
}

Expand Down
84 changes: 68 additions & 16 deletions download/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"fmt"

"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -38,31 +39,62 @@ func TestStartStop(t *testing.T) {
d.Stop(ctx)
}

func TestEtagCaching(t *testing.T) {
func TestEtagCachingLifecycle(t *testing.T) {

ctx := context.Background()
fixture := newTestFixture(t)
fixture.server.expEtag = "some etag value"
fixture.d = New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(fixture.oneShot)
defer fixture.server.stop()

updates := []Update{}
// check etag on the downloader is empty
if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}

d := New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(func(ctx context.Context, u Update) {
updates = append(updates, u)
})
// simulate successful bundle activation and check updated etag on the downloader
fixture.server.expEtag = "some etag value"
err := fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
} else if len(fixture.updates) != 1 {
t.Fatal("expected update")
} else if fixture.d.etag != fixture.server.expEtag {
t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag)
}

err := d.oneShot(ctx)
// simulate downloader error and check etag is cleared
fixture.server.expCode = 500
err = fixture.d.oneShot(ctx)
if err == nil {
t.Fatal("Expected error but got nil")
} else if len(fixture.updates) != 2 {
t.Fatal("expected update")
} else if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}

// simulate successful bundle activation and check updated etag on the downloader
fixture.server.expCode = 0
fixture.server.expEtag = "some new etag value"
err = fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
} else if len(updates) != 1 || updates[0].ETag != "some etag value" {
} else if len(fixture.updates) != 3 {
t.Fatal("expected update")
} else if fixture.d.etag != fixture.server.expEtag {
t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag)
}

err = d.oneShot(ctx)
// simulate bundle activation error and check etag is cleared
fixture.mockBundleActivationError = true
fixture.server.expEtag = "some newer etag value"
err = fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
} else if len(updates) != 2 || updates[1].Bundle != nil {
t.Fatal("expected no change")
} else if len(fixture.updates) != 4 {
t.Fatal("expected update")
} else if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}
}

Expand Down Expand Up @@ -112,9 +144,11 @@ func TestFailureUnexpected(t *testing.T) {
}

type testFixture struct {
d *Downloader
client rest.Client
server *testServer
d *Downloader
client rest.Client
server *testServer
updates []Update
mockBundleActivationError bool
}

func newTestFixture(t *testing.T) testFixture {
Expand Down Expand Up @@ -162,10 +196,27 @@ func newTestFixture(t *testing.T) testFixture {
}

return testFixture{
client: tc,
server: &ts,
client: tc,
server: &ts,
updates: []Update{},
}
}

func (t *testFixture) oneShot(ctx context.Context, u Update) {

t.updates = append(t.updates, u)

if u.Error != nil {
t.d.ClearCache()
return
}

if u.Bundle != nil {
if t.mockBundleActivationError {
t.d.ClearCache()
return
}
}
}

type testServer struct {
Expand Down Expand Up @@ -201,6 +252,7 @@ func (t *testServer) handle(w http.ResponseWriter, r *http.Request) {
if t.expEtag != "" {
etag := r.Header.Get("If-None-Match")
if etag == t.expEtag {
w.Header().Add("Etag", t.expEtag)
w.WriteHeader(304)
return
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
if u.Error != nil {
p.logError(name, "Bundle download failed: %v", u.Error)
p.status[name].SetError(u.Error)
p.downloaders[name].ClearCache()
return
}

Expand All @@ -305,6 +306,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
if err := p.activate(ctx, name, u.Bundle); err != nil {
p.logError(name, "Bundle activation failed: %v", err)
p.status[name].SetError(err)
p.downloaders[name].ClearCache()
return
}

Expand Down
32 changes: 24 additions & 8 deletions plugins/bundle/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestPluginOneShot(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName, Metrics: metrics.New()}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

ensurePluginState(t, plugin, plugins.StateNotReady)

Expand Down Expand Up @@ -94,6 +95,7 @@ func TestPluginOneShotCompileError(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

ensurePluginState(t, plugin, plugins.StateNotReady)

Expand Down Expand Up @@ -180,6 +182,7 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

ensurePluginState(t, plugin, plugins.StateNotReady)

Expand Down Expand Up @@ -261,6 +264,7 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) {

for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}

// Start with non-conflicting updates
Expand Down Expand Up @@ -319,11 +323,12 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) {
func TestPluginOneShotActivationPrefixMatchingRoots(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleNames := []string{"test-bundle1", "test-bundle2"}

for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}

plugin.oneShot(ctx, bundleNames[0], download.Update{Bundle: &bundle.Bundle{
Expand Down Expand Up @@ -372,6 +377,7 @@ func TestPluginListener(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)
ch := make(chan Status)

plugin.Register("test", func(status Status) {
Expand Down Expand Up @@ -461,9 +467,10 @@ func validateStatus(t *testing.T, actual Status, expected string, expectStatusEr
func TestPluginListenerErrorClearedOn304(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)
ch := make(chan Status)

plugin.Register("test", func(status Status) {
Expand Down Expand Up @@ -507,14 +514,15 @@ func TestPluginListenerErrorClearedOn304(t *testing.T) {
func TestPluginBulkListener(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleNames := []string{
"b1",
"b2",
"b3",
}
for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}
bulkChan := make(chan map[string]*Status)

Expand Down Expand Up @@ -679,14 +687,15 @@ func TestPluginBulkListener(t *testing.T) {
func TestPluginBulkListenerStatusCopyOnly(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleNames := []string{
"b1",
"b2",
"b3",
}
for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}
bulkChan := make(chan map[string]*Status)

Expand Down Expand Up @@ -730,9 +739,10 @@ func TestPluginActivateScopedBundle(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

// Transact test data and policies that represent data coming from
// _outside_ the bundle. The test will verify that data _outside_
Expand Down Expand Up @@ -847,9 +857,10 @@ func TestPluginSetCompilerOnContext(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

module := `
package test
Expand Down Expand Up @@ -1106,9 +1117,10 @@ func TestPluginRequestVsDownloadTimestamp(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

b := &bundle.Bundle{}
b.Manifest.Init()
Expand Down Expand Up @@ -1149,9 +1161,10 @@ func TestUpgradeLegacyBundleToMuiltiBundleSameBundle(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

// Start with a "legacy" style config for a single bundle
plugin.config = Config{
Expand Down Expand Up @@ -1246,6 +1259,7 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) {
}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

tsURLBase := "/opa-test/"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1321,6 +1335,8 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) {
},
}

delete(plugin.downloaders, bundleName)
plugin.downloaders["b2"] = download.New(download.Config{}, plugin.manager.Client(""), "b2")
plugin.Reconfigure(ctx, multiBundleConf)

module = "package a.c\n\nbar=1"
Expand Down
2 changes: 2 additions & 0 deletions plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {
if u.Error != nil {
c.logError("Discovery download failed: %v", u.Error)
c.status.SetError(u.Error)
c.downloader.ClearCache()
return
}

Expand All @@ -143,6 +144,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {
if err := c.reconfigure(ctx, u); err != nil {
c.logError("Discovery reconfiguration error occurred: %v", err)
c.status.SetError(err)
c.downloader.ClearCache()
return
}

Expand Down

0 comments on commit 78a6f18

Please sign in to comment.