diff --git a/docs/content/management.md b/docs/content/management.md index 76f028d3df..c9efeaafd7 100644 --- a/docs/content/management.md +++ b/docs/content/management.md @@ -214,9 +214,13 @@ and data from multiple sources, you can implement your bundle service to generate bundles that are scoped to a subset of OPA's policy and data cache. -> We recommend that whenever possible, you implement policy and data +> 🚨 We recommend that whenever possible, you implement policy and data > aggregation centrally, however, in some cases that's not possible -> (e.g., due to latency requirements.) +> (e.g., due to latency requirements.). +> When using multiple sources there are **no** ordering guarantees for which bundle loads first and + takes over some root. If multiple bundles conflict, but are loaded at different + times, OPA may go into an error state. It is highly recommended to use + the health check and include bundle state: [Monitoring OPA](../monitoring#health-checks) To scope bundles to a subset of OPA's policy and data cache, include a top-level `roots` key in the bundle that defines the roots of the @@ -253,11 +257,6 @@ When OPA loads scoped bundles, it validates that: If bundle validation fails, OPA will report the validation error via the Status API. -> **Warning!** There are *no* ordering guarantees for which bundle loads first and - takes over some root. If multiple bundles conflict, but are loaded at different - times, OPA may go into an error state. It is highly recommended to use - the health check and include bundle state: [Monitoring OPA](#health-checks) - ### Debugging Your Bundles When you run OPA, you can provide bundle files over the command line. This diff --git a/download/download.go b/download/download.go index 7744bb458e..c387d967e3 100644 --- a/download/download.go +++ b/download/download.go @@ -73,6 +73,11 @@ func (d *Downloader) WithLogAttrs(attrs [][2]string) *Downloader { return d } +// ClearCache resets the etag value on the downloader +func (d *Downloader) ClearCache() { + d.etag = "" +} + // Start tells the Downloader to begin downloading bundles. func (d *Downloader) Start(ctx context.Context) { go d.loop() @@ -125,12 +130,12 @@ func (d *Downloader) oneShot(ctx context.Context) error { m := metrics.New() b, etag, err := d.download(ctx, m) + d.etag = etag + if d.f != nil { d.f(ctx, Update{ETag: etag, Bundle: b, Error: err, Metrics: m}) } - d.etag = etag - return err } diff --git a/download/download_test.go b/download/download_test.go index ec9a0281e7..e71ed64f98 100644 --- a/download/download_test.go +++ b/download/download_test.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "net/http/httptest" "strings" @@ -38,31 +39,62 @@ func TestStartStop(t *testing.T) { d.Stop(ctx) } -func TestEtagCaching(t *testing.T) { +func TestEtagCachingLifecycle(t *testing.T) { ctx := context.Background() fixture := newTestFixture(t) - fixture.server.expEtag = "some etag value" + fixture.d = New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(fixture.oneShot) defer fixture.server.stop() - updates := []Update{} + // check etag on the downloader is empty + if fixture.d.etag != "" { + t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag) + } - d := New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(func(ctx context.Context, u Update) { - updates = append(updates, u) - }) + // simulate successful bundle activation and check updated etag on the downloader + fixture.server.expEtag = "some etag value" + err := fixture.d.oneShot(ctx) + if err != nil { + t.Fatal("Unexpected:", err) + } else if len(fixture.updates) != 1 { + t.Fatal("expected update") + } else if fixture.d.etag != fixture.server.expEtag { + t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag) + } - err := d.oneShot(ctx) + // simulate downloader error and check etag is cleared + fixture.server.expCode = 500 + err = fixture.d.oneShot(ctx) + if err == nil { + t.Fatal("Expected error but got nil") + } else if len(fixture.updates) != 2 { + t.Fatal("expected update") + } else if fixture.d.etag != "" { + t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag) + } + + // simulate successful bundle activation and check updated etag on the downloader + fixture.server.expCode = 0 + fixture.server.expEtag = "some new etag value" + err = fixture.d.oneShot(ctx) if err != nil { t.Fatal("Unexpected:", err) - } else if len(updates) != 1 || updates[0].ETag != "some etag value" { + } else if len(fixture.updates) != 3 { t.Fatal("expected update") + } else if fixture.d.etag != fixture.server.expEtag { + t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag) } - err = d.oneShot(ctx) + // simulate bundle activation error and check etag is cleared + fixture.mockBundleActivationError = true + fixture.server.expEtag = "some newer etag value" + err = fixture.d.oneShot(ctx) if err != nil { t.Fatal("Unexpected:", err) - } else if len(updates) != 2 || updates[1].Bundle != nil { - t.Fatal("expected no change") + } else if len(fixture.updates) != 4 { + t.Fatal("expected update") + } else if fixture.d.etag != "" { + t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag) } } @@ -112,9 +144,11 @@ func TestFailureUnexpected(t *testing.T) { } type testFixture struct { - d *Downloader - client rest.Client - server *testServer + d *Downloader + client rest.Client + server *testServer + updates []Update + mockBundleActivationError bool } func newTestFixture(t *testing.T) testFixture { @@ -162,10 +196,27 @@ func newTestFixture(t *testing.T) testFixture { } return testFixture{ - client: tc, - server: &ts, + client: tc, + server: &ts, + updates: []Update{}, + } +} + +func (t *testFixture) oneShot(ctx context.Context, u Update) { + + t.updates = append(t.updates, u) + + if u.Error != nil { + t.d.ClearCache() + return } + if u.Bundle != nil { + if t.mockBundleActivationError { + t.d.ClearCache() + return + } + } } type testServer struct { @@ -201,6 +252,7 @@ func (t *testServer) handle(w http.ResponseWriter, r *http.Request) { if t.expEtag != "" { etag := r.Header.Get("If-None-Match") if etag == t.expEtag { + w.Header().Add("Etag", t.expEtag) w.WriteHeader(304) return } diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index de2634bec1..7d109bda51 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -291,6 +291,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { if u.Error != nil { p.logError(name, "Bundle download failed: %v", u.Error) p.status[name].SetError(u.Error) + p.downloaders[name].ClearCache() return } @@ -305,6 +306,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { if err := p.activate(ctx, name, u.Bundle); err != nil { p.logError(name, "Bundle activation failed: %v", err) p.status[name].SetError(err) + p.downloaders[name].ClearCache() return } diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index a4dc06a278..da6cb637da 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -36,6 +36,7 @@ func TestPluginOneShot(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName, Metrics: metrics.New()} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ensurePluginState(t, plugin, plugins.StateNotReady) @@ -94,6 +95,7 @@ func TestPluginOneShotCompileError(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ensurePluginState(t, plugin, plugins.StateNotReady) @@ -180,6 +182,7 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ensurePluginState(t, plugin, plugins.StateNotReady) @@ -261,6 +264,7 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } // Start with non-conflicting updates @@ -319,11 +323,12 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { func TestPluginOneShotActivationPrefixMatchingRoots(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleNames := []string{"test-bundle1", "test-bundle2"} for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } plugin.oneShot(ctx, bundleNames[0], download.Update{Bundle: &bundle.Bundle{ @@ -372,6 +377,7 @@ func TestPluginListener(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ch := make(chan Status) plugin.Register("test", func(status Status) { @@ -461,9 +467,10 @@ func validateStatus(t *testing.T, actual Status, expected string, expectStatusEr func TestPluginListenerErrorClearedOn304(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ch := make(chan Status) plugin.Register("test", func(status Status) { @@ -507,7 +514,7 @@ func TestPluginListenerErrorClearedOn304(t *testing.T) { func TestPluginBulkListener(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleNames := []string{ "b1", "b2", @@ -515,6 +522,7 @@ func TestPluginBulkListener(t *testing.T) { } for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } bulkChan := make(chan map[string]*Status) @@ -679,7 +687,7 @@ func TestPluginBulkListener(t *testing.T) { func TestPluginBulkListenerStatusCopyOnly(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleNames := []string{ "b1", "b2", @@ -687,6 +695,7 @@ func TestPluginBulkListenerStatusCopyOnly(t *testing.T) { } for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } bulkChan := make(chan map[string]*Status) @@ -730,9 +739,10 @@ func TestPluginActivateScopedBundle(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) // Transact test data and policies that represent data coming from // _outside_ the bundle. The test will verify that data _outside_ @@ -847,9 +857,10 @@ func TestPluginSetCompilerOnContext(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) module := ` package test @@ -1106,9 +1117,10 @@ func TestPluginRequestVsDownloadTimestamp(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) b := &bundle.Bundle{} b.Manifest.Init() @@ -1149,9 +1161,10 @@ func TestUpgradeLegacyBundleToMuiltiBundleSameBundle(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) // Start with a "legacy" style config for a single bundle plugin.config = Config{ @@ -1246,6 +1259,7 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) { } bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) tsURLBase := "/opa-test/" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1321,6 +1335,8 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) { }, } + delete(plugin.downloaders, bundleName) + plugin.downloaders["b2"] = download.New(download.Config{}, plugin.manager.Client(""), "b2") plugin.Reconfigure(ctx, multiBundleConf) module = "package a.c\n\nbar=1" diff --git a/plugins/discovery/discovery.go b/plugins/discovery/discovery.go index d31a970518..03f6e9ebc7 100644 --- a/plugins/discovery/discovery.go +++ b/plugins/discovery/discovery.go @@ -134,6 +134,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) { if u.Error != nil { c.logError("Discovery download failed: %v", u.Error) c.status.SetError(u.Error) + c.downloader.ClearCache() return } @@ -143,6 +144,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) { if err := c.reconfigure(ctx, u); err != nil { c.logError("Discovery reconfiguration error occurred: %v", err) c.status.SetError(err) + c.downloader.ClearCache() return }