From fec62e852209731f872995106df56f34b6c740a9 Mon Sep 17 00:00:00 2001 From: Ashutosh Narkar Date: Thu, 9 Apr 2020 12:10:15 -0700 Subject: [PATCH] Update downloader's Etag based on bundle activation Earlier the Etag on the downloader would be updated unconditionally after every attempt to download a bundle. This could lead to a situation wherein a bundle fails to activate and would remain in an unactivated state since any subsequent downloads of the same version of the bundle would not trigger the activation process. This change attempts to resolve the issue by allowing the client to reset the Etag on the downloader incase of downloader errors and bundle activation failures. The drawback now is that we could end up re-downloading the same version of a bundle multiple times till it successfully activates. This situtation is likely to occur when using multiple bundle sources where a bundle may depend on some other. Generally using multiple bundle sources isn't recommended so the extra network traffic as a result of the re-downloads although not ideal may not too harmful. Fixes #2220 Fixes #2279 Signed-off-by: Ashutosh Narkar --- docs/content/management.md | 13 +++--- download/download.go | 9 +++- download/download_test.go | 84 +++++++++++++++++++++++++++------- plugins/bundle/plugin.go | 2 + plugins/bundle/plugin_test.go | 32 +++++++++---- plugins/discovery/discovery.go | 2 + 6 files changed, 109 insertions(+), 33 deletions(-) diff --git a/docs/content/management.md b/docs/content/management.md index 76f028d3df..c9efeaafd7 100644 --- a/docs/content/management.md +++ b/docs/content/management.md @@ -214,9 +214,13 @@ and data from multiple sources, you can implement your bundle service to generate bundles that are scoped to a subset of OPA's policy and data cache. -> We recommend that whenever possible, you implement policy and data +> 🚨 We recommend that whenever possible, you implement policy and data > aggregation centrally, however, in some cases that's not possible -> (e.g., due to latency requirements.) +> (e.g., due to latency requirements.). +> When using multiple sources there are **no** ordering guarantees for which bundle loads first and + takes over some root. If multiple bundles conflict, but are loaded at different + times, OPA may go into an error state. It is highly recommended to use + the health check and include bundle state: [Monitoring OPA](../monitoring#health-checks) To scope bundles to a subset of OPA's policy and data cache, include a top-level `roots` key in the bundle that defines the roots of the @@ -253,11 +257,6 @@ When OPA loads scoped bundles, it validates that: If bundle validation fails, OPA will report the validation error via the Status API. -> **Warning!** There are *no* ordering guarantees for which bundle loads first and - takes over some root. If multiple bundles conflict, but are loaded at different - times, OPA may go into an error state. It is highly recommended to use - the health check and include bundle state: [Monitoring OPA](#health-checks) - ### Debugging Your Bundles When you run OPA, you can provide bundle files over the command line. This diff --git a/download/download.go b/download/download.go index 7744bb458e..c387d967e3 100644 --- a/download/download.go +++ b/download/download.go @@ -73,6 +73,11 @@ func (d *Downloader) WithLogAttrs(attrs [][2]string) *Downloader { return d } +// ClearCache resets the etag value on the downloader +func (d *Downloader) ClearCache() { + d.etag = "" +} + // Start tells the Downloader to begin downloading bundles. func (d *Downloader) Start(ctx context.Context) { go d.loop() @@ -125,12 +130,12 @@ func (d *Downloader) oneShot(ctx context.Context) error { m := metrics.New() b, etag, err := d.download(ctx, m) + d.etag = etag + if d.f != nil { d.f(ctx, Update{ETag: etag, Bundle: b, Error: err, Metrics: m}) } - d.etag = etag - return err } diff --git a/download/download_test.go b/download/download_test.go index ec9a0281e7..e71ed64f98 100644 --- a/download/download_test.go +++ b/download/download_test.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "net/http/httptest" "strings" @@ -38,31 +39,62 @@ func TestStartStop(t *testing.T) { d.Stop(ctx) } -func TestEtagCaching(t *testing.T) { +func TestEtagCachingLifecycle(t *testing.T) { ctx := context.Background() fixture := newTestFixture(t) - fixture.server.expEtag = "some etag value" + fixture.d = New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(fixture.oneShot) defer fixture.server.stop() - updates := []Update{} + // check etag on the downloader is empty + if fixture.d.etag != "" { + t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag) + } - d := New(Config{}, fixture.client, "/bundles/test/bundle1").WithCallback(func(ctx context.Context, u Update) { - updates = append(updates, u) - }) + // simulate successful bundle activation and check updated etag on the downloader + fixture.server.expEtag = "some etag value" + err := fixture.d.oneShot(ctx) + if err != nil { + t.Fatal("Unexpected:", err) + } else if len(fixture.updates) != 1 { + t.Fatal("expected update") + } else if fixture.d.etag != fixture.server.expEtag { + t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag) + } - err := d.oneShot(ctx) + // simulate downloader error and check etag is cleared + fixture.server.expCode = 500 + err = fixture.d.oneShot(ctx) + if err == nil { + t.Fatal("Expected error but got nil") + } else if len(fixture.updates) != 2 { + t.Fatal("expected update") + } else if fixture.d.etag != "" { + t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag) + } + + // simulate successful bundle activation and check updated etag on the downloader + fixture.server.expCode = 0 + fixture.server.expEtag = "some new etag value" + err = fixture.d.oneShot(ctx) if err != nil { t.Fatal("Unexpected:", err) - } else if len(updates) != 1 || updates[0].ETag != "some etag value" { + } else if len(fixture.updates) != 3 { t.Fatal("expected update") + } else if fixture.d.etag != fixture.server.expEtag { + t.Fatalf("Expected downloader ETag %v but got %v", fixture.server.expEtag, fixture.d.etag) } - err = d.oneShot(ctx) + // simulate bundle activation error and check etag is cleared + fixture.mockBundleActivationError = true + fixture.server.expEtag = "some newer etag value" + err = fixture.d.oneShot(ctx) if err != nil { t.Fatal("Unexpected:", err) - } else if len(updates) != 2 || updates[1].Bundle != nil { - t.Fatal("expected no change") + } else if len(fixture.updates) != 4 { + t.Fatal("expected update") + } else if fixture.d.etag != "" { + t.Fatalf("Expected empty downloader ETag but got %v", fixture.d.etag) } } @@ -112,9 +144,11 @@ func TestFailureUnexpected(t *testing.T) { } type testFixture struct { - d *Downloader - client rest.Client - server *testServer + d *Downloader + client rest.Client + server *testServer + updates []Update + mockBundleActivationError bool } func newTestFixture(t *testing.T) testFixture { @@ -162,10 +196,27 @@ func newTestFixture(t *testing.T) testFixture { } return testFixture{ - client: tc, - server: &ts, + client: tc, + server: &ts, + updates: []Update{}, + } +} + +func (t *testFixture) oneShot(ctx context.Context, u Update) { + + t.updates = append(t.updates, u) + + if u.Error != nil { + t.d.ClearCache() + return } + if u.Bundle != nil { + if t.mockBundleActivationError { + t.d.ClearCache() + return + } + } } type testServer struct { @@ -201,6 +252,7 @@ func (t *testServer) handle(w http.ResponseWriter, r *http.Request) { if t.expEtag != "" { etag := r.Header.Get("If-None-Match") if etag == t.expEtag { + w.Header().Add("Etag", t.expEtag) w.WriteHeader(304) return } diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index de2634bec1..7d109bda51 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -291,6 +291,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { if u.Error != nil { p.logError(name, "Bundle download failed: %v", u.Error) p.status[name].SetError(u.Error) + p.downloaders[name].ClearCache() return } @@ -305,6 +306,7 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { if err := p.activate(ctx, name, u.Bundle); err != nil { p.logError(name, "Bundle activation failed: %v", err) p.status[name].SetError(err) + p.downloaders[name].ClearCache() return } diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index a4dc06a278..da6cb637da 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -36,6 +36,7 @@ func TestPluginOneShot(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName, Metrics: metrics.New()} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ensurePluginState(t, plugin, plugins.StateNotReady) @@ -94,6 +95,7 @@ func TestPluginOneShotCompileError(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ensurePluginState(t, plugin, plugins.StateNotReady) @@ -180,6 +182,7 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ensurePluginState(t, plugin, plugins.StateNotReady) @@ -261,6 +264,7 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } // Start with non-conflicting updates @@ -319,11 +323,12 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { func TestPluginOneShotActivationPrefixMatchingRoots(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleNames := []string{"test-bundle1", "test-bundle2"} for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } plugin.oneShot(ctx, bundleNames[0], download.Update{Bundle: &bundle.Bundle{ @@ -372,6 +377,7 @@ func TestPluginListener(t *testing.T) { plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ch := make(chan Status) plugin.Register("test", func(status Status) { @@ -461,9 +467,10 @@ func validateStatus(t *testing.T, actual Status, expected string, expectStatusEr func TestPluginListenerErrorClearedOn304(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) ch := make(chan Status) plugin.Register("test", func(status Status) { @@ -507,7 +514,7 @@ func TestPluginListenerErrorClearedOn304(t *testing.T) { func TestPluginBulkListener(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleNames := []string{ "b1", "b2", @@ -515,6 +522,7 @@ func TestPluginBulkListener(t *testing.T) { } for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } bulkChan := make(chan map[string]*Status) @@ -679,7 +687,7 @@ func TestPluginBulkListener(t *testing.T) { func TestPluginBulkListenerStatusCopyOnly(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleNames := []string{ "b1", "b2", @@ -687,6 +695,7 @@ func TestPluginBulkListenerStatusCopyOnly(t *testing.T) { } for _, name := range bundleNames { plugin.status[name] = &Status{Name: name} + plugin.downloaders[name] = download.New(download.Config{}, plugin.manager.Client(""), name) } bulkChan := make(chan map[string]*Status) @@ -730,9 +739,10 @@ func TestPluginActivateScopedBundle(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) // Transact test data and policies that represent data coming from // _outside_ the bundle. The test will verify that data _outside_ @@ -847,9 +857,10 @@ func TestPluginSetCompilerOnContext(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) module := ` package test @@ -1106,9 +1117,10 @@ func TestPluginRequestVsDownloadTimestamp(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) b := &bundle.Bundle{} b.Manifest.Init() @@ -1149,9 +1161,10 @@ func TestUpgradeLegacyBundleToMuiltiBundleSameBundle(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}, downloaders: map[string]*download.Downloader{}} bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) // Start with a "legacy" style config for a single bundle plugin.config = Config{ @@ -1246,6 +1259,7 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) { } bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + plugin.downloaders[bundleName] = download.New(download.Config{}, plugin.manager.Client(""), bundleName) tsURLBase := "/opa-test/" ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -1321,6 +1335,8 @@ func TestUpgradeLegacyBundleToMuiltiBundleNewBundles(t *testing.T) { }, } + delete(plugin.downloaders, bundleName) + plugin.downloaders["b2"] = download.New(download.Config{}, plugin.manager.Client(""), "b2") plugin.Reconfigure(ctx, multiBundleConf) module = "package a.c\n\nbar=1" diff --git a/plugins/discovery/discovery.go b/plugins/discovery/discovery.go index d31a970518..03f6e9ebc7 100644 --- a/plugins/discovery/discovery.go +++ b/plugins/discovery/discovery.go @@ -134,6 +134,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) { if u.Error != nil { c.logError("Discovery download failed: %v", u.Error) c.status.SetError(u.Error) + c.downloader.ClearCache() return } @@ -143,6 +144,7 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) { if err := c.reconfigure(ctx, u); err != nil { c.logError("Discovery reconfiguration error occurred: %v", err) c.status.SetError(err) + c.downloader.ClearCache() return }