Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update downloader's Etag based on bundle activation #2286

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions docs/content/management.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ and data from multiple sources, you can implement your bundle service
to generate bundles that are scoped to a subset of OPA's policy and
data cache.

> We recommend that whenever possible, you implement policy and data
> 🚨 We recommend that whenever possible, you implement policy and data
> aggregation centrally, however, in some cases that's not possible
> (e.g., due to latency requirements.)
> (e.g., due to latency requirements.).
> When using multiple sources there are **no** ordering guarantees for which bundle loads first and
takes over some root. If multiple bundles conflict, but are loaded at different
times, OPA may go into an error state. It is highly recommended to use
the health check and include bundle state: [Monitoring OPA](../monitoring#health-checks)

To scope bundles to a subset of OPA's policy and data cache, include
a top-level `roots` key in the bundle that defines the roots of the
Expand Down Expand Up @@ -253,11 +257,6 @@ When OPA loads scoped bundles, it validates that:
If bundle validation fails, OPA will report the validation error via
the Status API.

> **Warning!** There are *no* ordering guarantees for which bundle loads first and
takes over some root. If multiple bundles conflict, but are loaded at different
times, OPA may go into an error state. It is highly recommended to use
the health check and include bundle state: [Monitoring OPA](#health-checks)

### Debugging Your Bundles

When you run OPA, you can provide bundle files over the command line. This
Expand Down
9 changes: 7 additions & 2 deletions download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (d *Downloader) WithLogAttrs(attrs [][2]string) *Downloader {
return d
}

// ClearCache resets the etag value on the downloader
func (d *Downloader) ClearCache() {
d.etag = ""
}

// Start tells the Downloader to begin downloading bundles.
func (d *Downloader) Start(ctx context.Context) {
go d.loop()
Expand Down Expand Up @@ -125,12 +130,12 @@ func (d *Downloader) oneShot(ctx context.Context) error {
m := metrics.New()
b, etag, err := d.download(ctx, m)

d.etag = etag

if d.f != nil {
d.f(ctx, Update{ETag: etag, Bundle: b, Error: err, Metrics: m})
}

d.etag = etag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud a little bit. I'm wondering if we are maybe leaking too much of the "downloader" details out by making the clients have to set the etag. From a high level I like that we can create a downloader and it will just like do the right thing with regards to cache controls. In this particular case we are kind of working around issues with bundle dependencies by manually setting them to trigger re-downloading and trying to re-activate the bundle.

I guess maybe what I'm leaning towards is having the downloader still automatically set the etag, but maybe just do it before calling the listener so they can clear it in their callback if they need to.

I'm wondering too if we maybe want to just have a like ClearCache() or Reset() or something a little bit more opaque instead of making the bundle and discovery plugins know that setting the etag to "" means it will download it again. I'm less sure about this direction though.. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrick-east I like the idea of setting the etag in downloader and clearing it in the client callback if some error occurs. I was thinking if the callback could be updated to return an error and then the downloader would set/reset the etag accordingly. But your approach seems good.

So in the downloader we would be adding a ClearCache() api that would basically set the etag to "" , correct ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrick-east updated the code as per the above discussion.


return err
}

Expand Down
84 changes: 68 additions & 16 deletions download/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"fmt"

"net/http"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -38,31 +39,62 @@ func TestStartStop(t *testing.T) {
d.Stop(ctx)
}

func TestEtagCaching(t *testing.T) {
func TestEtagCachingLifecycle(t *testing.T) {

ctx := context.Background()
fixture := newTestFixture(t)
fixture.server.expEtag = "some etag value"
fixture.d = New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(fixture.oneShot)
defer fixture.server.stop()

updates := []Update{}
// check etag on the downloader is empty
if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}

d := New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(func(ctx context.Context, u Update) {
updates = append(updates, u)
})
// simulate successful bundle activation and check updated etag on the downloader
fixture.server.expEtag = "some etag value"
err := fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
} else if len(fixture.updates) != 1 {
t.Fatal("expected update")
} else if fixture.d.etag != fixture.server.expEtag {
t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag)
}

err := d.oneShot(ctx)
// simulate downloader error and check etag is cleared
fixture.server.expCode = 500
err = fixture.d.oneShot(ctx)
if err == nil {
t.Fatal("Expected error but got nil")
} else if len(fixture.updates) != 2 {
t.Fatal("expected update")
} else if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}

// simulate successful bundle activation and check updated etag on the downloader
fixture.server.expCode = 0
fixture.server.expEtag = "some new etag value"
err = fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
} else if len(updates) != 1 || updates[0].ETag != "some etag value" {
} else if len(fixture.updates) != 3 {
t.Fatal("expected update")
} else if fixture.d.etag != fixture.server.expEtag {
t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag)
}

err = d.oneShot(ctx)
// simulate bundle activation error and check etag is cleared
fixture.mockBundleActivationError = true
fixture.server.expEtag = "some newer etag value"
err = fixture.d.oneShot(ctx)
if err != nil {
t.Fatal("Unexpected:", err)
} else if len(updates) != 2 || updates[1].Bundle != nil {
t.Fatal("expected no change")
} else if len(fixture.updates) != 4 {
t.Fatal("expected update")
} else if fixture.d.etag != "" {
t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag)
}
}

Expand Down Expand Up @@ -112,9 +144,11 @@ func TestFailureUnexpected(t *testing.T) {
}

type testFixture struct {
d *Downloader
client rest.Client
server *testServer
d *Downloader
client rest.Client
server *testServer
updates []Update
mockBundleActivationError bool
}

func newTestFixture(t *testing.T) testFixture {
Expand Down Expand Up @@ -162,10 +196,27 @@ func newTestFixture(t *testing.T) testFixture {
}

return testFixture{
client: tc,
server: &ts,
client: tc,
server: &ts,
updates: []Update{},
}
}

func (t *testFixture) oneShot(ctx context.Context, u Update) {

t.updates = append(t.updates, u)

if u.Error != nil {
t.d.ClearCache()
return
}

if u.Bundle != nil {
if t.mockBundleActivationError {
t.d.ClearCache()
return
}
}
}

type testServer struct {
Expand Down Expand Up @@ -201,6 +252,7 @@ func (t *testServer) handle(w http.ResponseWriter, r *http.Request) {
if t.expEtag != "" {
etag := r.Header.Get("If-None-Match")
if etag == t.expEtag {
w.Header().Add("Etag", t.expEtag)
w.WriteHeader(304)
return
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
if u.Error != nil {
p.logError(name, "Bundle download failed: %v", u.Error)
p.status[name].SetError(u.Error)
p.downloaders[name].ClearCache()
return
}

Expand All @@ -305,6 +306,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) {
if err := p.activate(ctx, name, u.Bundle); err != nil {
p.logError(name, "Bundle activation failed: %v", err)
p.status[name].SetError(err)
p.downloaders[name].ClearCache()
return
}

Expand Down
32 changes: 24 additions & 8 deletions plugins/bundle/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestPluginOneShot(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName, Metrics: metrics.New()}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

ensurePluginState(t, plugin, plugins.StateNotReady)

Expand Down Expand Up @@ -94,6 +95,7 @@ func TestPluginOneShotCompileError(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

ensurePluginState(t, plugin, plugins.StateNotReady)

Expand Down Expand Up @@ -180,6 +182,7 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

ensurePluginState(t, plugin, plugins.StateNotReady)

Expand Down Expand Up @@ -261,6 +264,7 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) {

for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}

// Start with non-conflicting updates
Expand Down Expand Up @@ -319,11 +323,12 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) {
func TestPluginOneShotActivationPrefixMatchingRoots(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleNames := []string{"test-bundle1", "test-bundle2"}

for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}

plugin.oneShot(ctx, bundleNames[0], download.Update{Bundle: &bundle.Bundle{
Expand Down Expand Up @@ -372,6 +377,7 @@ func TestPluginListener(t *testing.T) {
plugin := New(&Config{}, manager)
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)
ch := make(chan Status)

plugin.Register("test", func(status Status) {
Expand Down Expand Up @@ -461,9 +467,10 @@ func validateStatus(t *testing.T, actual Status, expected string, expectStatusEr
func TestPluginListenerErrorClearedOn304(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)
ch := make(chan Status)

plugin.Register("test", func(status Status) {
Expand Down Expand Up @@ -507,14 +514,15 @@ func TestPluginListenerErrorClearedOn304(t *testing.T) {
func TestPluginBulkListener(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleNames := []string{
"b1",
"b2",
"b3",
}
for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}
bulkChan := make(chan map[string]*Status)

Expand Down Expand Up @@ -679,14 +687,15 @@ func TestPluginBulkListener(t *testing.T) {
func TestPluginBulkListenerStatusCopyOnly(t *testing.T) {
ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleNames := []string{
"b1",
"b2",
"b3",
}
for _, name := range bundleNames {
plugin.status[name] = &Status{Name: name}
plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name)
}
bulkChan := make(chan map[string]*Status)

Expand Down Expand Up @@ -730,9 +739,10 @@ func TestPluginActivateScopedBundle(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

// Transact test data and policies that represent data coming from
// _outside_ the bundle. The test will verify that data _outside_
Expand Down Expand Up @@ -847,9 +857,10 @@ func TestPluginSetCompilerOnContext(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

module := `
package test
Expand Down Expand Up @@ -1106,9 +1117,10 @@ func TestPluginRequestVsDownloadTimestamp(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

b := &bundle.Bundle{}
b.Manifest.Init()
Expand Down Expand Up @@ -1149,9 +1161,10 @@ func TestUpgradeLegacyBundleToMuiltiBundleSameBundle(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}}
plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

// Start with a "legacy" style config for a single bundle
plugin.config = Config{
Expand Down Expand Up @@ -1246,6 +1259,7 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) {
}
bundleName := "test-bundle"
plugin.status[bundleName] = &Status{Name: bundleName}
plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName)

tsURLBase := "/opa-test/"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1321,6 +1335,8 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) {
},
}

delete(plugin.downloaders, bundleName)
plugin.downloaders["b2"] = download.New(download.Config{}, plugin.manager.Client(""), "b2")
plugin.Reconfigure(ctx, multiBundleConf)

module = "package a.c\n\nbar=1"
Expand Down
2 changes: 2 additions & 0 deletions plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {
if u.Error != nil {
c.logError("Discovery download failed: %v", u.Error)
c.status.SetError(u.Error)
c.downloader.ClearCache()
return
}

Expand All @@ -143,6 +144,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {
if err := c.reconfigure(ctx, u); err != nil {
c.logError("Discovery reconfiguration error occurred: %v", err)
c.status.SetError(err)
c.downloader.ClearCache()
return
}

Expand Down