diff --git a/docs/content/extensions.md b/docs/content/extensions.md index d20cc315f7..02ef6fee06 100644 --- a/docs/content/extensions.md +++ b/docs/content/extensions.md @@ -186,6 +186,15 @@ You can register your factory with OPA by calling [github.com/open-policy-agent/opa/runtime#RegisterPlugin](https://godoc.org/github.com/open-policy-agent/opa/runtime#RegisterPlugin) inside your main function. +### Plugin Status +The plugin may (optionally) report its current status to the plugin Manager via the `plugins.Manager#UpdatePluginStatus` +API. + +> If no status is provided the plugin is assumed to be working OK. + +Typically the plugin should report `StatusNotReady` at creation time and update to `StatusOK` (or `StatusErr`) when +appropriate. + ### Putting It Together The example below shows how you can implement a custom [Decision Logger](../management/#decision-logs) @@ -196,39 +205,45 @@ import ( "github.com/open-policy-agent/opa/plugins/logs" ) +const PluginName = "println_decision_logger" + type Config struct { Stderr bool `json:"stderr"` // false => stdout, true => stderr } type PrintlnLogger struct { - mtx sync.Mutex - config Config + manager *plugins.Manager + mtx sync.Mutex + config Config } func (p *PrintlnLogger) Start(ctx context.Context) error { - // No-op. + p.manager.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateOK}) return nil } func (p *PrintlnLogger) Stop(ctx context.Context) { - // No-op. + p.manager.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateNotReady}) } func (p *PrintlnLogger) Reconfigure(ctx context.Context, config interface{}) { - p.mtx.Lock() - defer p.mtx.Unlock() - p.config = config.(Config) + p.mtx.Lock() + defer p.mtx.Unlock() + p.config = config.(Config) } func (p *PrintlnLogger) Log(ctx context.Context, event logs.EventV1) error { - p.mtx.Lock() - defer p.mtx.Unlock() - w := os.Stdout - if p.config.Stderr { - w = os.Stderr - } - fmt.Fprintln(w, event) // ignoring errors! - return nil + p.mtx.Lock() + defer p.mtx.Unlock() + w := os.Stdout + if p.config.Stderr { + w = os.Stderr + } + _, err := fmt.Fprintln(w, event) + if err != nil { + p.manager.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateErr}) + } + return nil } ``` @@ -242,9 +257,13 @@ import ( type Factory struct{} -func (Factory) New(_ *plugins.Manager, config interface{}) plugins.Plugin { +func (Factory) New(m *plugins.Manager, config interface{}) plugins.Plugin { + + m.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateNotReady}) + return &PrintlnLogger{ - config: config.(Config), + manager: m, + config: config.(Config), } } @@ -264,7 +283,7 @@ import ( ) func main() { - runtime.RegisterPlugin("println_decision_logger", Factory{}) + runtime.RegisterPlugin(PluginName, Factory{}) if err := cmd.RootCommand.Execute(); err != nil { fmt.Println(err) diff --git a/docs/content/kubernetes-tutorial.md b/docs/content/kubernetes-tutorial.md index 9464a37a46..011f80a778 100644 --- a/docs/content/kubernetes-tutorial.md +++ b/docs/content/kubernetes-tutorial.md @@ -187,7 +187,7 @@ spec: name: opa-server readinessProbe: httpGet: - path: /health + path: /health?plugins&bundle scheme: HTTPS port: 443 initialDelaySeconds: 3 diff --git a/docs/content/management.md b/docs/content/management.md index 666eb674b3..d5584de76a 100644 --- a/docs/content/management.md +++ b/docs/content/management.md @@ -430,9 +430,12 @@ OPA can periodically report status updates to remote HTTP servers. The updates contain status information for OPA itself as well as the [Bundles](#bundles) that have been downloaded and activated. -OPA sends status reports whenever bundles are downloaded and activated. If -the bundle download or activation fails for any reason, the status update -will include error information describing the failure. +OPA sends status reports whenever one of the following happens: + +* Bundles are downloaded and activated -- If the bundle download or activation fails for any reason, the status update + will include error information describing the failure. This includes Discovery bundles. +* A plugin state has changed -- All plugin status is reported, and an update to any plugin will + trigger a Status API report which contains the latest state. The status updates will include a set of labels that uniquely identify the OPA instance. OPA automatically includes an `id` value in the label set that @@ -457,23 +460,34 @@ on the agent, updates will be sent to `/status`. ```json { - "labels": { - "app": "my-example-app", - "id": "1780d507-aea2-45cc-ae50-fa153c8e4a5a", - "version": "{{< current_version >}}" + "labels": { + "app": "my-example-app", + "id": "1780d507-aea2-45cc-ae50-fa153c8e4a5a", + "version": "{{< current_version >}}" + }, + "bundles": { + "http/example/authz": { + "active_revision": "ABC", + "last_successful_download": "2018-01-01T00:00:00.000Z", + "last_successful_activation": "2018-01-01T00:00:00.000Z", + "metrics": { + "timer_rego_data_parse_ns": 12345, + "timer_rego_module_compile_ns": 12345, + "timer_rego_module_parse_ns": 12345 + } + } + }, + "plugins": { + "bundle": { + "state": "OK" }, - "bundles": { - "http/example/authz": { - "active_revision": "TODO", - "last_successful_download": "2018-01-01T00:00:00.000Z", - "last_successful_activation": "2018-01-01T00:00:00.000Z", - "metrics": { - "timer_rego_data_parse_ns": 12345, - "timer_rego_module_compile_ns": 12345, - "timer_rego_module_parse_ns": 12345 - } - } + "discovery": { + "state": "OK" }, + "status": { + "state": "OK" + } + }, "metrics": { "prometheus": { "go_gc_duration_seconds": { @@ -609,6 +623,8 @@ Status updates contain the following fields: | `discovery.active_revision` | `string` | Opaque revision identifier of the last successful discovery activation. | | `discovery.last_successful_download` | `string` | RFC3339 timestamp of last successful discovery bundle download. | | `discovery.last_successful_activation` | `string` | RFC3339 timestamp of last successful discovery bundle activation. | +| `plugins` | `object` | A set of objects describing the state of configured plugins in OPA's runtime. | +| `plugins[_].state | `string` | The state of each plugin. | | `metrics.prometheus` | `object` | Global performance metrics for the OPA instance. | If the bundle download or activation failed, the status update will contain diff --git a/docs/content/monitoring.md b/docs/content/monitoring.md index 0af8367f95..cac7a8adac 100644 --- a/docs/content/monitoring.md +++ b/docs/content/monitoring.md @@ -30,3 +30,8 @@ scrape_configs: OPA exposes a `/health` API endpoint that can be used to perform health checks. See [Health API](../rest-api#health-api) for details. + +### Status API + +OPA provides a plugin which can push status to a remote service. +See [Status API](../management#status) for details. \ No newline at end of file diff --git a/docs/content/rest-api.md b/docs/content/rest-api.md index 8b1ff02fd1..8fa7e77306 100644 --- a/docs/content/rest-api.md +++ b/docs/content/rest-api.md @@ -2045,15 +2045,18 @@ that the server is operational. Optionally it can account for bundle activation (useful for "ready" checks at startup). #### Query Parameters -`bundle` - Boolean parameter to account for bundle activation status in response. +`bundles` - Boolean parameter to account for bundle activation status in response. This includes + any discovery bundles or bundles defined in the loaded discovery configuration. +`plugins` - Boolean parameter to account for plugin status in response. #### Status Codes -- **200** - OPA service is healthy. If `bundle=true` then all configured bundles have - been activated. -- **500** - OPA service is not healthy. If `bundle=true` this can mean any of the configured - bundles have not yet been activated. +- **200** - OPA service is healthy. If the `bundles` option is specified then all configured bundles have + been activated. If the `plugins` option is specified then all plugins are in an OK state. +- **500** - OPA service is not healthy. If the `bundles` option is specified this can mean any of the configured + bundles have not yet been activated. If the `plugins` option is specified then at least one + plugin is in a non-OK state. -> *Note*: The bundle activation check is only for initial startup. Subsequent downloads +> *Note*: The bundle activation check is only for initial bundle activation. Subsequent downloads will not affect the health check. The [Status](../management/#status) API should be used for more fine-grained bundle status monitoring. @@ -2064,7 +2067,12 @@ GET /health HTTP/1.1 #### Example Request (bundle activation) ```http -GET /health?bundle=true HTTP/1.1 +GET /health?bundles HTTP/1.1 +``` + +#### Example Request (plugin status) +```http +GET /health?plugins HTTP/1.1 ``` #### Healthy Response diff --git a/plugins/bundle/plugin.go b/plugins/bundle/plugin.go index ec4f799cd1..b8856f93fb 100644 --- a/plugins/bundle/plugin.go +++ b/plugins/bundle/plugin.go @@ -11,6 +11,7 @@ import ( "fmt" "reflect" "sync" + "time" "github.com/sirupsen/logrus" @@ -36,6 +37,7 @@ type Plugin struct { mtx sync.Mutex cfgMtx sync.Mutex legacyConfig bool + ready bool } // New returns a new Plugin with the given config. @@ -53,8 +55,10 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin { status: initialStatus, downloaders: make(map[string]*download.Downloader), etags: make(map[string]string), + ready: false, } - p.initDownloaders() + + manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) return p } @@ -75,6 +79,7 @@ func Lookup(manager *plugins.Manager) *Plugin { func (p *Plugin) Start(ctx context.Context) error { p.mtx.Lock() defer p.mtx.Unlock() + p.initDownloaders() for name, dl := range p.downloaders { p.logInfo(name, "Starting bundle downloader.") dl.Start(ctx) @@ -160,6 +165,8 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) { panic(errors.New("Unable deactivate bundle: " + err.Error())) } + readyNow := p.ready + for name, source := range p.config.Bundles { _, updated := updatedBundles[name] _, isNew := newBundles[name] @@ -173,8 +180,15 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) { } p.downloaders[name] = p.newDownloader(name, source) p.downloaders[name].Start(ctx) + readyNow = false } } + + if !readyNow { + p.ready = false + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) + } + } // Register a listener to receive status updates. The name must be comparable. @@ -298,6 +312,23 @@ func (p *Plugin) process(ctx context.Context, name string, u download.Update) { p.logInfo(name, "Bundle downloaded and activated successfully.") } p.etags[name] = u.ETag + + // If the plugin wasn't ready yet then check if we are now after activating this bundle. + if !p.ready { + readyNow := true // optimistically + for _, status := range p.status { + if len(status.Errors) > 0 || (status.LastSuccessfulActivation == time.Time{}) { + readyNow = false // Not ready yet, check again on next bundle activation. + break + } + } + + if readyNow { + p.ready = true + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) + } + + } return } diff --git a/plugins/bundle/plugin_test.go b/plugins/bundle/plugin_test.go index ad96922afd..f2b4eac489 100644 --- a/plugins/bundle/plugin_test.go +++ b/plugins/bundle/plugin_test.go @@ -31,10 +31,12 @@ func TestPluginOneShot(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName, Metrics: metrics.New()} + ensurePluginState(t, plugin, plugins.StateNotReady) + module := "package foo\n\ncorge=1" b := bundle.Bundle{ @@ -53,6 +55,8 @@ func TestPluginOneShot(t *testing.T) { plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b, Metrics: metrics.New()}) + ensurePluginState(t, plugin, plugins.StateOK) + txn := storage.NewTransactionOrDie(ctx, manager.Store) defer manager.Store.Abort(ctx, txn) @@ -85,9 +89,12 @@ func TestPluginOneShotCompileError(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + + ensurePluginState(t, plugin, plugins.StateNotReady) + raw1 := "package foo\n\np[x] { x = 1 }" b1 := &bundle.Bundle{ @@ -104,6 +111,8 @@ func TestPluginOneShotCompileError(t *testing.T) { b1.Manifest.Init() plugin.oneShot(ctx, bundleName, download.Update{Bundle: b1, Metrics: metrics.New()}) + ensurePluginState(t, plugin, plugins.StateOK) + b2 := &bundle.Bundle{ Data: map[string]interface{}{"a": "b"}, Modules: []bundle.ModuleFile{ @@ -116,6 +125,9 @@ func TestPluginOneShotCompileError(t *testing.T) { b2.Manifest.Init() plugin.oneShot(ctx, bundleName, download.Update{Bundle: b2}) + + ensurePluginState(t, plugin, plugins.StateOK) + txn := storage.NewTransactionOrDie(ctx, manager.Store) _, err := manager.Store.GetPolicy(ctx, txn, filepath.Join(bundleName, "/example.rego")) @@ -143,6 +155,8 @@ func TestPluginOneShotCompileError(t *testing.T) { b3.Manifest.Init() plugin.oneShot(ctx, bundleName, download.Update{Bundle: b3}) + ensurePluginState(t, plugin, plugins.StateOK) + txn = storage.NewTransactionOrDie(ctx, manager.Store) _, err = manager.Store.GetPolicy(ctx, txn, filepath.Join(bundleName, "/example.rego")) @@ -161,10 +175,12 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} + ensurePluginState(t, plugin, plugins.StateNotReady) + module1 := `package example p = 1` @@ -185,6 +201,8 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) { b1.Manifest.Init() plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b1}) + ensurePluginState(t, plugin, plugins.StateOK) + module2 := `package example p = 2` @@ -205,6 +223,8 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) { b2.Manifest.Init() plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b2}) + ensurePluginState(t, plugin, plugins.StateOK) + err := storage.Txn(ctx, manager.Store, storage.TransactionParams{}, func(txn storage.Transaction) error { ids, err := manager.Store.ListPolicies(ctx, txn) if err != nil { @@ -231,7 +251,10 @@ func TestPluginOneShotActivationRemovesOld(t *testing.T) { func TestPluginOneShotActivationConflictingRoots(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := New(&Config{}, manager) + + ensurePluginState(t, plugin, plugins.StateNotReady) + bundleNames := []string{"test-bundle1", "test-bundle2", "test-bundle3"} for _, name := range bundleNames { @@ -245,14 +268,18 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { }, }}) + ensurePluginState(t, plugin, plugins.StateNotReady) + plugin.oneShot(ctx, bundleNames[1], download.Update{Bundle: &bundle.Bundle{ Manifest: bundle.Manifest{ Roots: &[]string{"a/c"}, }, }}) + ensurePluginState(t, plugin, plugins.StateNotReady) + // ensure that both bundles are *not* in error status - ensureBundleOverlapStatus(t, &plugin, bundleNames, []bool{false, false, false}) + ensureBundleOverlapStatus(t, plugin, bundleNames, []bool{false, false, false}) // Add a third bundle that conflicts with one plugin.oneShot(ctx, bundleNames[2], download.Update{Bundle: &bundle.Bundle{ @@ -261,8 +288,10 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { }, }}) + ensurePluginState(t, plugin, plugins.StateNotReady) + // ensure that both in the conflict go into error state - ensureBundleOverlapStatus(t, &plugin, bundleNames, []bool{false, false, true}) + ensureBundleOverlapStatus(t, plugin, bundleNames, []bool{false, false, true}) // Update to fix conflict plugin.oneShot(ctx, bundleNames[2], download.Update{Bundle: &bundle.Bundle{ @@ -271,7 +300,8 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { }, }}) - ensureBundleOverlapStatus(t, &plugin, bundleNames, []bool{false, false, false}) + ensurePluginState(t, plugin, plugins.StateOK) + ensureBundleOverlapStatus(t, plugin, bundleNames, []bool{false, false, false}) // Ensure empty roots conflict with all roots plugin.oneShot(ctx, bundleNames[2], download.Update{Bundle: &bundle.Bundle{ @@ -280,7 +310,8 @@ func TestPluginOneShotActivationConflictingRoots(t *testing.T) { }, }}) - ensureBundleOverlapStatus(t, &plugin, bundleNames, []bool{false, false, true}) + ensurePluginState(t, plugin, plugins.StateOK) + ensureBundleOverlapStatus(t, plugin, bundleNames, []bool{false, false, true}) } func TestPluginOneShotActivationPrefixMatchingRoots(t *testing.T) { @@ -336,7 +367,7 @@ func TestPluginListener(t *testing.T) { ctx := context.Background() manager := getTestManager() - plugin := Plugin{manager: manager, status: map[string]*Status{}, etags: map[string]string{}} + plugin := New(&Config{}, manager) bundleName := "test-bundle" plugin.status[bundleName] = &Status{Name: bundleName} ch := make(chan Status) @@ -393,7 +424,7 @@ func TestPluginListener(t *testing.T) { Parsed: ast.MustParseModule(module), } - // Test that new update is successful. + // Test that the new update is successful. go plugin.oneShot(ctx, bundleName, download.Update{Bundle: &b}) s3 := <-ch @@ -900,6 +931,20 @@ func TestPluginReconfigure(t *testing.T) { var delay int64 = 10 baseConf := download.Config{Polling: download.PollingConfig{MinDelaySeconds: &delay, MaxDelaySeconds: &delay}} + // Expect the plugin to emit a "not ready" status update each time we change the configuration + updateCount := 0 + manager.RegisterPluginStatusListener(t.Name(), func(status map[string]*plugins.Status) { + updateCount++ + bStatus, ok := status[Name] + if !ok { + t.Errorf("Expected to find status for %s in plugin status update, got: %+v", Name, status) + } + + if bStatus.State != plugins.StateNotReady { + t.Errorf("Expected plugin status update to have state = %s, got %s", plugins.StateNotReady, bStatus.State) + } + }) + // Note: test stages are accumulating state with reconfigures between them, the order does matter! // Each stage defines the new config, side effects are validated. stages := []struct { @@ -920,7 +965,7 @@ func TestPluginReconfigure(t *testing.T) { }, }, { - name: "switch to mutli-bundle", + name: "switch to multi-bundle", cfg: &Config{ Bundles: map[string]*Source{ "b1": {Config: baseConf, Service: serviceName, Resource: "/bundles/bundle.tar.gz"}, @@ -1050,6 +1095,9 @@ func TestPluginReconfigure(t *testing.T) { } }) } + if len(stages) != updateCount { + t.Fatalf("Expected to have recieved %d updates, got %d", len(stages), updateCount) + } } func TestUpgradeLegacyBundleToMuiltiBundleSameBundle(t *testing.T) { @@ -1314,3 +1362,15 @@ func validateStoreState(ctx context.Context, t *testing.T, store storage.Store, t.Fatal(err) } } + +func ensurePluginState(t *testing.T, p *Plugin, state plugins.State) { + t.Helper() + status, ok := p.manager.PluginStatus()[Name] + if !ok { + t.Fatalf("Expected to find state for %s, found nil", Name) + return + } + if status.State != state { + t.Fatalf("Unexpected status state found in plugin manager for %s:\n\n\tFound:%+v\n\n\tExpected: %s", Name, status.State, state) + } +} diff --git a/plugins/discovery/discovery.go b/plugins/discovery/discovery.go index 9430f50601..d31a970518 100644 --- a/plugins/discovery/discovery.go +++ b/plugins/discovery/discovery.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "github.com/open-policy-agent/opa/metrics" @@ -26,6 +27,9 @@ import ( "github.com/open-policy-agent/opa/storage/inmem" ) +// Name is the discovery plugin name that will be registered with the plugin manager. +const Name = "discovery" + // Discovery implements configuration discovery for OPA. When discovery is // started it will periodically download a configuration bundle and try to // reconfigure the OPA. @@ -37,6 +41,7 @@ type Discovery struct { status *bundle.Status // discovery status etag string // discovery bundle etag for caching purposes metrics metrics.Metrics + readyOnce sync.Once } // Factories provides a set of factory functions to use for @@ -86,6 +91,7 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error) Name: *config.Name, } + manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) return result, nil } @@ -93,6 +99,9 @@ func New(manager *plugins.Manager, opts ...func(*Discovery)) (*Discovery, error) func (c *Discovery) Start(ctx context.Context) error { if c.downloader != nil { c.downloader.Start(ctx) + } else { + // If there is no dynamic discovery then update the status to OK. + c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) } return nil } @@ -102,6 +111,8 @@ func (c *Discovery) Stop(ctx context.Context) { if c.downloader != nil { c.downloader.Stop(ctx) } + + c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) } // Reconfigure is a no-op on discovery. @@ -137,6 +148,12 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) { c.status.SetError(nil) c.status.SetActivateSuccess(u.Bundle.Manifest.Revision) + + // On the first activation success mark the plugin as being in OK state + c.readyOnce.Do(func() { + c.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) + }) + if u.ETag != "" { c.logInfo("Discovery update processed successfully. Etag updated to %v.", u.ETag) } else { @@ -410,12 +427,8 @@ func registerBundleStatusUpdates(m *plugins.Manager) { // Depending on how the plugin was configured we will want to use different listeners // for backwards compatibility. if !bp.Config().IsMultiBundle() { - bp.Register(pluginlistener(status.Name), func(s bundle.Status) { - sp.UpdateBundleStatus(s) - }) + bp.Register(pluginlistener(status.Name), sp.UpdateBundleStatus) } else { - bp.RegisterBulkListener(pluginlistener(status.Name), func(s map[string]*bundle.Status) { - sp.BulkUpdateBundleStatus(s) - }) + bp.RegisterBulkListener(pluginlistener(status.Name), sp.BulkUpdateBundleStatus) } } diff --git a/plugins/discovery/discovery_test.go b/plugins/discovery/discovery_test.go index afe9d57aba..11af39af77 100644 --- a/plugins/discovery/discovery_test.go +++ b/plugins/discovery/discovery_test.go @@ -372,12 +372,14 @@ func TestStatusUpdates(t *testing.T) { for !ok && time.Since(t0) < time.Second { updates = ts.Updates() - ok = len(updates) == 5 && - updates[0].Discovery.ActiveRevision == "test-revision-1" && updates[0].Discovery.Code == "" && - updates[1].Discovery.ActiveRevision == "test-revision-1" && updates[1].Discovery.Code == "bundle_error" && - updates[2].Discovery.ActiveRevision == "test-revision-2" && updates[2].Discovery.Code == "" && - updates[3].Discovery.ActiveRevision == "test-revision-2" && updates[3].Discovery.Code == "bundle_error" && - updates[4].Discovery.ActiveRevision == "test-revision-2" && updates[4].Discovery.Code == "" + ok = len(updates) == 7 && + updates[0].Plugins["discovery"].State == plugins.StateNotReady && updates[0].Plugins["status"].State == plugins.StateOK && + updates[1].Plugins["discovery"].State == plugins.StateOK && updates[1].Plugins["status"].State == plugins.StateOK && + updates[2].Plugins["discovery"].State == plugins.StateOK && updates[2].Discovery.ActiveRevision == "test-revision-1" && updates[2].Discovery.Code == "" && + updates[3].Plugins["discovery"].State == plugins.StateOK && updates[3].Discovery.ActiveRevision == "test-revision-1" && updates[3].Discovery.Code == "bundle_error" && + updates[4].Plugins["discovery"].State == plugins.StateOK && updates[4].Discovery.ActiveRevision == "test-revision-2" && updates[4].Discovery.Code == "" && + updates[5].Plugins["discovery"].State == plugins.StateOK && updates[5].Discovery.ActiveRevision == "test-revision-2" && updates[5].Discovery.Code == "bundle_error" && + updates[6].Plugins["discovery"].State == plugins.StateOK && updates[6].Discovery.ActiveRevision == "test-revision-2" && updates[6].Discovery.Code == "" } if !ok { diff --git a/plugins/logs/plugin.go b/plugins/logs/plugin.go index 43738b59f3..529e7a0d7d 100644 --- a/plugins/logs/plugin.go +++ b/plugins/logs/plugin.go @@ -234,6 +234,8 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin { manager.RegisterCompilerTrigger(plugin.compilerUpdated) + manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) + return plugin } @@ -252,6 +254,7 @@ func Lookup(manager *plugins.Manager) *Plugin { func (p *Plugin) Start(ctx context.Context) error { p.logInfo("Starting decision logger.") go p.loop() + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) return nil } @@ -261,6 +264,7 @@ func (p *Plugin) Stop(ctx context.Context) { done := make(chan struct{}) p.stop <- done _ = <-done + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) } // Log appends a decision log event to the buffer for uploading. diff --git a/plugins/logs/plugin_test.go b/plugins/logs/plugin_test.go index 1959571f05..6cecf5d8ed 100644 --- a/plugins/logs/plugin_test.go +++ b/plugins/logs/plugin_test.go @@ -446,6 +446,8 @@ func TestPluginReconfigure(t *testing.T) { t.Fatal(err) } + ensurePluginState(t, fixture.plugin, plugins.StateOK) + minDelay := 2 maxDelay := 3 @@ -460,7 +462,10 @@ func TestPluginReconfigure(t *testing.T) { config, _ := ParseConfig(pluginConfig, fixture.manager.Services(), nil) fixture.plugin.Reconfigure(ctx, config) + ensurePluginState(t, fixture.plugin, plugins.StateOK) + fixture.plugin.Stop(ctx) + ensurePluginState(t, fixture.plugin, plugins.StateNotReady) actualMin := time.Duration(*fixture.plugin.config.Reporting.MinDelaySeconds) / time.Nanosecond expectedMin := time.Duration(minDelay) * time.Second @@ -887,8 +892,14 @@ func newTestFixture(t *testing.T) testFixture { config, _ := ParseConfig([]byte(pluginConfig), manager.Services(), nil) + if s, ok := manager.PluginStatus()[Name]; ok { + t.Fatalf("Unexpected status found in plugin manager for %s: %+v", Name, s) + } + p := New(config, manager) + ensurePluginState(t, p, plugins.StateNotReady) + return testFixture{ manager: manager, plugin: p, @@ -1026,3 +1037,15 @@ func getWellKnownMetrics() metrics.Metrics { m.Counter("test_counter").Incr() return m } + +func ensurePluginState(t *testing.T, p *Plugin, state plugins.State) { + t.Helper() + status, ok := p.manager.PluginStatus()[Name] + if !ok { + t.Fatalf("Expected to find state for %s, found nil", Name) + return + } + if status.State != state { + t.Fatalf("Unexpected status state found in plugin manager for %s:\n\n\tFound:%+v\n\n\tExpected: %s", Name, status.State, plugins.StateOK) + } +} diff --git a/plugins/plugins.go b/plugins/plugins.go index ac5a1ef229..6fe185d049 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -60,6 +60,10 @@ import ( // 1. Cast the config value to it's own type // 2. Instantiate a plugin object // 3. Return the plugin object +// 4. Update status via `plugins.Manager#UpdatePluginStatus` +// +// After a plugin has been created subsequent status updates can be +// send anytime the plugin enters a ready or error state. type Factory interface { Validate(manager *Manager, config []byte) (interface{}, error) New(manager *Manager, config interface{}) Plugin @@ -83,6 +87,32 @@ type Plugin interface { Reconfigure(ctx context.Context, config interface{}) } +// State defines the state that a Plugin instance is currently +// in with pre-defined states. +type State string + +const ( + // StateNotReady indicates that the Plugin is not in an error state, but isn't + // ready for normal operation yet. This should only happen at + // initialization time. + StateNotReady State = "NOT_READY" + + // StateOK signifies that the Plugin is operating normally. + StateOK State = "OK" + + // StateErr indicates that the Plugin is in an error state and should not + // be considered as functional. + StateErr State = "ERROR" +) + +// Status has a Plugin's current status plus an optional Message. +type Status struct { + State State `json:"state"` +} + +// StatusListener defines a handler to register for status updates. +type StatusListener func(status map[string]*Status) + // Manager implements lifecycle management of plugins and gives plugins access // to engine-wide components like storage. type Manager struct { @@ -91,12 +121,14 @@ type Manager struct { Info *ast.Term ID string - compiler *ast.Compiler - compilerMux sync.RWMutex - services map[string]rest.Client - plugins []namedplugin - registeredTriggers []func(txn storage.Transaction) - mtx sync.Mutex + compiler *ast.Compiler + compilerMux sync.RWMutex + services map[string]rest.Client + plugins []namedplugin + registeredTriggers []func(txn storage.Transaction) + mtx sync.Mutex + pluginStatus map[string]*Status + pluginStatusListeners map[string]StatusListener } type managerContextKey string @@ -147,10 +179,12 @@ func New(raw []byte, id string, store storage.Store, opts ...func(*Manager)) (*M } m := &Manager{ - Store: store, - Config: parsedConfig, - ID: id, - services: services, + Store: store, + Config: parsedConfig, + ID: id, + services: services, + pluginStatus: map[string]*Status{}, + pluginStatusListeners: map[string]StatusListener{}, } for _, f := range opts { @@ -176,6 +210,9 @@ func (m *Manager) Register(name string, plugin Plugin) { name: name, plugin: plugin, }) + if _, ok := m.pluginStatus[name]; !ok { + m.pluginStatus[name] = nil + } } // Plugins returns the list of plugins registered with the manager. @@ -241,19 +278,21 @@ func (m *Manager) Start(ctx context.Context) error { return err } - if err := func() error { + var toStart []Plugin + + func() { m.mtx.Lock() defer m.mtx.Unlock() - - for _, p := range m.plugins { - if err := p.plugin.Start(ctx); err != nil { - return err - } + toStart = make([]Plugin, len(m.plugins)) + for i := range m.plugins { + toStart[i] = m.plugins[i].plugin } + }() - return nil - }(); err != nil { - return err + for i := range toStart { + if err := toStart[i].Start(ctx); err != nil { + return err + } } config := storage.TriggerConfig{OnCommit: m.onCommit} @@ -266,10 +305,19 @@ func (m *Manager) Start(ctx context.Context) error { // Stop stops the manager, stopping all the plugins registered with it func (m *Manager) Stop(ctx context.Context) { - m.mtx.Lock() - defer m.mtx.Unlock() - for _, p := range m.plugins { - p.plugin.Stop(ctx) + var toStop []Plugin + + func() { + m.mtx.Lock() + defer m.mtx.Unlock() + toStop = make([]Plugin, len(m.plugins)) + for i := range m.plugins { + toStop[i] = m.plugins[i].plugin + } + }() + + for i := range toStop { + toStop[i].Stop(ctx) } } @@ -289,6 +337,70 @@ func (m *Manager) Reconfigure(config *config.Config) error { return nil } +// PluginStatus returns the current statuses of any plugins registered. +func (m *Manager) PluginStatus() map[string]*Status { + m.mtx.Lock() + defer m.mtx.Unlock() + + return m.copyPluginStatus() +} + +// RegisterPluginStatusListener registers a StatusListener to be +// called when plugin status updates occur. +func (m *Manager) RegisterPluginStatusListener(name string, listener StatusListener) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.pluginStatusListeners[name] = listener +} + +// UnregisterPluginStatusListener removes a StatusListener registered with the +// same name. +func (m *Manager) UnregisterPluginStatusListener(name string) { + m.mtx.Lock() + defer m.mtx.Unlock() + + delete(m.pluginStatusListeners, name) +} + +// UpdatePluginStatus updates a named plugins status. Any registered +// listeners will be called with a copy of the new state of all +// plugins. +func (m *Manager) UpdatePluginStatus(pluginName string, status *Status) { + + var toNotify map[string]StatusListener + var statuses map[string]*Status + + func() { + m.mtx.Lock() + defer m.mtx.Unlock() + m.pluginStatus[pluginName] = status + toNotify = make(map[string]StatusListener, len(m.pluginStatusListeners)) + for k, v := range m.pluginStatusListeners { + toNotify[k] = v + } + statuses = m.copyPluginStatus() + }() + + for _, l := range toNotify { + l(statuses) + } +} + +func (m *Manager) copyPluginStatus() map[string]*Status { + statusCpy := map[string]*Status{} + for k, v := range m.pluginStatus { + var cpy *Status + if v != nil { + cpy = &Status{ + State: v.State, + } + } + statusCpy[k] = cpy + } + return statusCpy +} + func (m *Manager) onCommit(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) { if event.PolicyChanged() { @@ -336,11 +448,15 @@ func loadCompilerFromStore(ctx context.Context, store storage.Store, txn storage // Client returns a client for communicating with a remote service. func (m *Manager) Client(name string) rest.Client { + m.mtx.Lock() + defer m.mtx.Unlock() return m.services[name] } // Services returns a list of services that m can provide clients for. func (m *Manager) Services() []string { + m.mtx.Lock() + defer m.mtx.Unlock() s := make([]string, 0, len(m.services)) for name := range m.services { s = append(s, name) diff --git a/plugins/plugins_test.go b/plugins/plugins_test.go new file mode 100644 index 0000000000..36c6e4d8ef --- /dev/null +++ b/plugins/plugins_test.go @@ -0,0 +1,116 @@ +// Copyright 2020 The OPA Authors. All rights reserved. +// Use of this source code is governed by an Apache2 +// license that can be found in the LICENSE file. + +package plugins + +import ( + "context" + "reflect" + "testing" + + "github.com/open-policy-agent/opa/storage/inmem" +) + +func TestManagerPluginStatusListener(t *testing.T) { + m, err := New([]byte{}, "test", inmem.New()) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + // Start by registering a single listener and validate that it was registered correctly + var l1Status map[string]*Status + m.RegisterPluginStatusListener("l1", func(status map[string]*Status) { + l1Status = status + }) + if len(m.pluginStatusListeners) != 1 || m.pluginStatusListeners["l1"] == nil { + t.Fatalf("Expected a single listener named 'l1' got: %+v", m.pluginStatusListeners) + } + + // Register a second one, validate both are there + var l2Status map[string]*Status + m.RegisterPluginStatusListener("l2", func(status map[string]*Status) { + l2Status = status + }) + if len(m.pluginStatusListeners) != 2 || m.pluginStatusListeners["l2"] == nil { + t.Fatalf("Expected a two listeners named 'l1' and 'l2' got: %+v", m.pluginStatusListeners) + } + + // Ensure starting statuses are empty by default + currentStatus := m.PluginStatus() + if len(currentStatus) != 0 { + t.Fatalf("Expected 0 statuses in current plugin status map, got: %+v", currentStatus) + } + + // Push an update to a plugin, ensure current status is reflected and listeners were called + m.UpdatePluginStatus("p1", &Status{State: StateOK}) + currentStatus = m.PluginStatus() + if len(currentStatus) != 1 || currentStatus["p1"].State != StateOK { + t.Fatalf("Expected 1 statuses in current plugin status map with state OK, got: %+v", currentStatus) + } + if !reflect.DeepEqual(currentStatus, l1Status) || !reflect.DeepEqual(l1Status, l2Status) { + t.Fatalf("Unexpected status in updates:\n\n\texpecting: %+v\n\n\tgot: l1: %+v l2: %+v\n", currentStatus, l1Status, l2Status) + } + + // Unregister the first listener, ensure it is removed + m.UnregisterPluginStatusListener("l1") + if len(m.pluginStatusListeners) != 1 || m.pluginStatusListeners["l2"] == nil { + t.Fatalf("Expected a single listeners named 'l2' got: %+v", m.pluginStatusListeners) + } + + // Send another update, ensure the status is ok and the remaining listener is still called + m.UpdatePluginStatus("p2", &Status{State: StateErr}) + currentStatus = m.PluginStatus() + if len(currentStatus) != 2 || currentStatus["p1"].State != StateOK || currentStatus["p2"].State != StateErr { + t.Fatalf("Unexpected current plugin status, got: %+v", currentStatus) + } + if !reflect.DeepEqual(currentStatus, l2Status) { + t.Fatalf("Unexpected status in updates:\n\n\texpecting: %+v\n\n\tgot: %+v\n", currentStatus, l2Status) + } + + // Unregister the last listener + m.UnregisterPluginStatusListener("l2") + if len(m.pluginStatusListeners) != 0 { + t.Fatalf("Expected zero listeners got: %+v", m.pluginStatusListeners) + } + + // Ensure updates can still be sent with no listeners + m.UpdatePluginStatus("p2", &Status{State: StateOK}) + currentStatus = m.PluginStatus() + if len(currentStatus) != 2 || currentStatus["p1"].State != StateOK || currentStatus["p2"].State != StateOK { + t.Fatalf("Unexpected current plugin status, got: %+v", currentStatus) + } +} + +func TestPluginStatusUpdateOnStartAndStop(t *testing.T) { + m, err := New([]byte{}, "test", inmem.New()) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + m.Register("p1", &testPlugin{m}) + + err = m.Start(context.Background()) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + m.Stop(context.Background()) +} + +type testPlugin struct { + m *Manager +} + +func (p *testPlugin) Start(ctx context.Context) error { + p.m.UpdatePluginStatus("p1", &Status{State: StateOK}) + return nil +} + +func (p *testPlugin) Stop(ctx context.Context) { + p.m.UpdatePluginStatus("p1", &Status{State: StateNotReady}) +} + +func (p *testPlugin) Reconfigure(ctx context.Context, config interface{}) { + p.m.UpdatePluginStatus("p1", &Status{State: StateNotReady}) +} diff --git a/plugins/status/plugin.go b/plugins/status/plugin.go index aef999dc79..29467c7e42 100644 --- a/plugins/status/plugin.go +++ b/plugins/status/plugin.go @@ -24,11 +24,12 @@ import ( // UpdateRequestV1 represents the status update message that OPA sends to // remote HTTP endpoints. type UpdateRequestV1 struct { - Labels map[string]string `json:"labels"` - Bundle *bundle.Status `json:"bundle,omitempty"` // Deprecated: Use bulk `bundles` status updates instead - Bundles map[string]*bundle.Status `json:"bundles,omitempty"` - Discovery *bundle.Status `json:"discovery,omitempty"` - Metrics map[string]interface{} `json:"metrics,omitempty"` + Labels map[string]string `json:"labels"` + Bundle *bundle.Status `json:"bundle,omitempty"` // Deprecated: Use bulk `bundles` status updates instead + Bundles map[string]*bundle.Status `json:"bundles,omitempty"` + Discovery *bundle.Status `json:"discovery,omitempty"` + Metrics map[string]interface{} `json:"metrics,omitempty"` + Plugins map[string]*plugins.Status `json:"plugins,omitempty"` } // Plugin implements status reporting. Updates can be triggered by the caller. @@ -44,6 +45,8 @@ type Plugin struct { stop chan chan struct{} reconfig chan interface{} metrics metrics.Metrics + lastPluginStatuses map[string]*plugins.Status + pluginStatusCh chan map[string]*plugins.Status } // Config contains configuration for the plugin. @@ -106,15 +109,20 @@ func ParseConfig(config []byte, services []string) (*Config, error) { // New returns a new Plugin with the given config. func New(parsedConfig *Config, manager *plugins.Manager) *Plugin { - return &Plugin{ - manager: manager, - config: *parsedConfig, - bundleCh: make(chan bundle.Status), - bulkBundleCh: make(chan map[string]*bundle.Status), - discoCh: make(chan bundle.Status), - stop: make(chan chan struct{}), - reconfig: make(chan interface{}), + p := &Plugin{ + manager: manager, + config: *parsedConfig, + bundleCh: make(chan bundle.Status), + bulkBundleCh: make(chan map[string]*bundle.Status), + discoCh: make(chan bundle.Status), + stop: make(chan chan struct{}), + reconfig: make(chan interface{}), + pluginStatusCh: make(chan map[string]*plugins.Status), } + + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) + + return p } // WithMetrics sets the global metrics provider to be used by the plugin. @@ -137,16 +145,28 @@ func Lookup(manager *plugins.Manager) *Plugin { // Start starts the plugin. func (p *Plugin) Start(ctx context.Context) error { p.logInfo("Starting status reporter.") + go p.loop() + + // Setup a listener for plugin statuses, but only after starting the loop + // to prevent blocking threads pushing the plugin updates. + p.manager.RegisterPluginStatusListener(Name, p.UpdatePluginStatus) + + // Set the status plugin's status to OK now that everything is registered and + // the loop is running. This will trigger an update on the listener with the + // current status of all the other plugins too. + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK}) return nil } // Stop stops the plugin. func (p *Plugin) Stop(ctx context.Context) { p.logInfo("Stopping status reporter.") + p.manager.UnregisterPluginStatusListener(Name) done := make(chan struct{}) p.stop <- done _ = <-done + p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady}) } // UpdateBundleStatus notifies the plugin that the policy bundle was updated. @@ -165,6 +185,11 @@ func (p *Plugin) UpdateDiscoveryStatus(status bundle.Status) { p.discoCh <- status } +// UpdatePluginStatus notifies the plugin that a plugin status was updated. +func (p *Plugin) UpdatePluginStatus(status map[string]*plugins.Status) { + p.pluginStatusCh <- status +} + // Reconfigure notifies the plugin with a new configuration. func (p *Plugin) Reconfigure(_ context.Context, config interface{}) { p.reconfig <- config @@ -176,6 +201,14 @@ func (p *Plugin) loop() { for { select { + case statuses := <-p.pluginStatusCh: + p.lastPluginStatuses = statuses + err := p.oneShot(ctx) + if err != nil { + p.logError("%v.", err) + } else { + p.logInfo("Status update sent successfully in response to plugin update.") + } case statuses := <-p.bulkBundleCh: p.lastBundleStatuses = statuses err := p.oneShot(ctx) @@ -219,6 +252,7 @@ func (p *Plugin) oneShot(ctx context.Context) error { Discovery: p.lastDiscoStatus, Bundle: p.lastBundleStatus, Bundles: p.lastBundleStatuses, + Plugins: p.lastPluginStatuses, } if p.metrics != nil { diff --git a/plugins/status/plugin_test.go b/plugins/status/plugin_test.go index bddf4a0805..fac28e5e95 100644 --- a/plugins/status/plugin_test.go +++ b/plugins/status/plugin_test.go @@ -40,9 +40,8 @@ func TestPluginStart(t *testing.T) { fixture.plugin.Start(ctx) defer fixture.plugin.Stop(ctx) - status := testStatus() - - fixture.plugin.UpdateBundleStatus(*status) + // Start will trigger a status update when the plugin state switches + // from "not ready" to "ok". result := <-fixture.server.ch exp := UpdateRequestV1{ @@ -51,12 +50,25 @@ func TestPluginStart(t *testing.T) { "app": "example-app", "version": version.Version, }, - Bundle: status, + Plugins: map[string]*plugins.Status{ + "status": {State: plugins.StateOK}, + }, } if !reflect.DeepEqual(result, exp) { t.Fatalf("Expected: %v but got: %v", exp, result) } + + status := testStatus() + + fixture.plugin.UpdateBundleStatus(*status) + result = <-fixture.server.ch + + exp.Bundle = status + + if !reflect.DeepEqual(result, exp) { + t.Fatalf("Expected: %v but got: %v", exp, result) + } } func TestPluginStartBulkUpdate(t *testing.T) { @@ -70,9 +82,8 @@ func TestPluginStartBulkUpdate(t *testing.T) { fixture.plugin.Start(ctx) defer fixture.plugin.Stop(ctx) - status := testStatus() - - fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{status.Name: status}) + // Start will trigger a status update when the plugin state switches + // from "not ready" to "ok". result := <-fixture.server.ch exp := UpdateRequestV1{ @@ -81,9 +92,18 @@ func TestPluginStartBulkUpdate(t *testing.T) { "app": "example-app", "version": version.Version, }, - Bundles: map[string]*bundle.Status{status.Name: status}, + Plugins: map[string]*plugins.Status{ + "status": {State: plugins.StateOK}, + }, } + status := testStatus() + + fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{status.Name: status}) + result = <-fixture.server.ch + + exp.Bundles = map[string]*bundle.Status{status.Name: status} + if !reflect.DeepEqual(result, exp) { t.Fatalf("Expected: %v but got: %v", exp, result) } @@ -100,6 +120,9 @@ func TestPluginStartBulkUpdateMultiple(t *testing.T) { fixture.plugin.Start(ctx) defer fixture.plugin.Stop(ctx) + // Ignore the plugin updating its status (tested elsewhere) + <-fixture.server.ch + statuses := map[string]*bundle.Status{} tDownload, _ := time.Parse("2018-01-01T00:00:00.0000000Z", time.RFC3339Nano) tActivate, _ := time.Parse("2018-01-01T00:00:01.0000000Z", time.RFC3339Nano) @@ -152,6 +175,9 @@ func TestPluginStartDiscovery(t *testing.T) { fixture.plugin.Start(ctx) defer fixture.plugin.Stop(ctx) + // Ignore the plugin updating its status (tested elsewhere) + <-fixture.server.ch + status := testStatus() fixture.plugin.UpdateDiscoveryStatus(*status) @@ -164,6 +190,9 @@ func TestPluginStartDiscovery(t *testing.T) { "version": version.Version, }, Discovery: status, + Plugins: map[string]*plugins.Status{ + "status": {State: plugins.StateOK}, + }, } if !reflect.DeepEqual(result, exp) { @@ -241,6 +270,9 @@ func TestMetrics(t *testing.T) { fixture.plugin.Start(ctx) defer fixture.plugin.Stop(ctx) + // Ignore the plugin updating its status (tested elsewhere) + <-fixture.server.ch + status := testStatus() fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{"bundle": status}) diff --git a/server/server.go b/server/server.go index fade60b99f..d0a9a0f3ed 100644 --- a/server/server.go +++ b/server/server.go @@ -110,8 +110,6 @@ type Server struct { pprofEnabled bool runtime *ast.Term httpListeners []httpListener - bundleStatuses map[string]*bundlePlugin.Status - bundleStatusMtx sync.RWMutex metrics Metrics defaultDecisionPath string } @@ -180,18 +178,6 @@ func (s *Server) Init(ctx context.Context) (*Server, error) { s.partials = map[string]rego.PartialResult{} s.preparedEvalQueries = newCache(pqMaxCacheSize) - bp := bundlePlugin.Lookup(s.manager) - if bp != nil { - - // initialize statuses to empty defaults for server /health check - s.bundleStatuses = map[string]*bundlePlugin.Status{} - for bundleName := range bp.Config().Bundles { - s.bundleStatuses[bundleName] = &bundlePlugin.Status{Name: bundleName} - } - - bp.RegisterBulkListener("REST API Server", s.updateBundleStatus) - } - // Check if there is a bundle revision available at the legacy storage path rev, err := bundle.LegacyReadRevisionFromStore(ctx, s.store, txn) if err == nil && rev != "" { @@ -871,12 +857,6 @@ func (s *Server) getCachedPreparedEvalQuery(key string, m metrics.Metrics) (*reg return nil, false } -func (s *Server) updateBundleStatus(status map[string]*bundlePlugin.Status) { - s.bundleStatusMtx.Lock() - defer s.bundleStatusMtx.Unlock() - s.bundleStatuses = status -} - func (s *Server) canEval(ctx context.Context) bool { // Create very simple query that binds a single variable. eval := rego.New(rego.Compiler(s.getCompiler()), @@ -886,7 +866,7 @@ func (s *Server) canEval(ctx context.Context) bool { if err != nil { return false } - type emptyObject struct{} + v, ok := rs[0].Bindings["x"] if ok { jsonNumber, ok := v.(json.Number) @@ -897,15 +877,22 @@ func (s *Server) canEval(ctx context.Context) bool { return false } -func (s *Server) bundlesActivated() bool { - s.bundleStatusMtx.RLock() - defer s.bundleStatusMtx.RUnlock() +func (s *Server) bundlesReady(pluginStatuses map[string]*plugins.Status) bool { - for _, status := range s.bundleStatuses { - // Ensure that all of the bundle statuses have an activation time set on them - if (status.LastSuccessfulActivation == time.Time{}) { - return false - } + // Look for a discovery plugin first, if it exists and isn't ready + // then don't bother with the others. + // Note: use "discovery" instead of `discovery.Name` to avoid import + // cycle problems.. + dpStatus, ok := pluginStatuses["discovery"] + if ok && dpStatus != nil && (dpStatus.State != plugins.StateOK) { + return false + } + + // The bundle plugin won't return "OK" until the first activation + // of each configured bundle. + bpStatus, ok := pluginStatuses[bundlePlugin.Name] + if ok && bpStatus != nil && (bpStatus.State != plugins.StateOK) { + return false } return true @@ -913,23 +900,51 @@ func (s *Server) bundlesActivated() bool { func (s *Server) unversionedGetHealth(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - includeBundleStatus := getBoolParam(r.URL, types.ParamBundleActivationV1, false) + includeBundleStatus := getBoolParam(r.URL, types.ParamBundleActivationV1, true) || + getBoolParam(r.URL, types.ParamBundlesActivationV1, true) + includePluginStatus := getBoolParam(r.URL, types.ParamPluginsV1, true) // Ensure the server can evaluate a simple query - type emptyObject struct{} if !s.canEval(ctx) { - writer.JSON(w, http.StatusInternalServerError, emptyObject{}, false) + writeHealthResponse(w, errors.New("unable to perform evaluation"), struct{}{}) return } + pluginStatuses := s.manager.PluginStatus() + // Ensure that bundles (if configured, and requested to be included in the result) - // have been activated successfully. - if includeBundleStatus && s.hasBundle() && !s.bundlesActivated() { - writer.JSON(w, http.StatusInternalServerError, emptyObject{}, false) + // have been activated successfully. This will include discovery bundles as well as + // normal bundles that are configured. + if includeBundleStatus && !s.bundlesReady(pluginStatuses) { + // For backwards compatibility we don't return a payload with statuses for the bundle endpoint + writeHealthResponse(w, errors.New("not all configured bundles have been activated"), struct{}{}) return } - writer.JSON(w, http.StatusOK, emptyObject{}, false) + if includePluginStatus { + // Ensure that all plugins (if requested to be included in the result) have an OK status. + hasErr := false + for _, status := range pluginStatuses { + if status.State != plugins.StateOK { + hasErr = true + break + } + } + if hasErr { + writeHealthResponse(w, errors.New("not all plugins in OK state"), struct{}{}) + return + } + } + writeHealthResponse(w, nil, struct{}{}) +} + +func writeHealthResponse(w http.ResponseWriter, err error, payload interface{}) { + status := http.StatusOK + if err != nil { + status = http.StatusInternalServerError + } + + writer.JSON(w, status, payload, false) } func (s *Server) v1CompilePost(w http.ResponseWriter, r *http.Request) { @@ -2128,10 +2143,6 @@ func (s *Server) getProvenance() *types.ProvenanceV1 { return p } -func (s *Server) hasBundle() bool { - return bundlePlugin.Lookup(s.manager) != nil || s.legacyRevision != "" -} - func (s *Server) hasLegacyBundle() bool { bp := bundlePlugin.Lookup(s.manager) return s.legacyRevision != "" || (bp != nil && !bp.Config().IsMultiBundle()) diff --git a/server/server_test.go b/server/server_test.go index 033067cdca..354cca15e1 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -67,186 +67,392 @@ func TestUnversionedGetHealthBundleNoBundleSet(t *testing.T) { f := newFixture(t) - req := newReqUnversioned(http.MethodGet, "/health?bundle=true", "") + req := newReqUnversioned(http.MethodGet, "/health?bundles=true", "") if err := f.executeRequest(req, 200, `{}`); err != nil { t.Fatalf("Unexpected error while health check: %v", err) } } -func TestUnversionedGetHealthCheckBundleActivationSingle(t *testing.T) { +func TestUnversionedGetHealthCheckOnlyBundlePlugin(t *testing.T) { f := newFixture(t) - bundleName := "test-bundle" // Initialize the server as if a bundle plugin was // configured on the manager. - f.server.manager.Register(pluginBundle.Name, &pluginBundle.Plugin{}) - f.server.bundleStatuses = map[string]*pluginBundle.Status{ - bundleName: &pluginBundle.Status{Name: bundleName}, - } + f.server.manager.UpdatePluginStatus("bundle", &plugins.Status{State: plugins.StateNotReady}) // The bundle hasn't been activated yet, expect the health check to fail - req := newReqUnversioned(http.MethodGet, "/health?bundle=true", "") + req := newReqUnversioned(http.MethodGet, "/health?bundles=true", "") if err := f.executeRequest(req, 500, `{}`); err != nil { t.Fatal(err) } // Set the bundle to be activated. - status := map[string]*pluginBundle.Status{ - bundleName: &pluginBundle.Status{}, - } - status[bundleName].SetActivateSuccess("") - f.server.updateBundleStatus(status) + f.server.manager.UpdatePluginStatus("bundle", &plugins.Status{State: plugins.StateOK}) // The heath check should now respond as healthy - req = newReqUnversioned(http.MethodGet, "/health?bundle=true", "") + req = newReqUnversioned(http.MethodGet, "/health?bundles=true", "") if err := f.executeRequest(req, 200, `{}`); err != nil { t.Fatal(err) } } -func TestUnversionedGetHealthCheckBundleActivationSingleLegacy(t *testing.T) { - - // Initialize the server as if there is no bundle plugin +func TestUnversionedGetHealthCheckDiscoveryWithBundle(t *testing.T) { f := newFixture(t) - ctx := context.Background() + // Initialize the server as if a discovery bundle is configured + f.server.manager.UpdatePluginStatus("discovery", &plugins.Status{State: plugins.StateNotReady}) - err := storage.Txn(ctx, f.server.store, storage.WriteParams, func(txn storage.Transaction) error { - return bundle.LegacyWriteManifestToStore(ctx, f.server.store, txn, bundle.Manifest{ - Revision: "a", - }) - }) + // The discovery bundle hasn't been activated yet, expect the health check to fail + req := newReqUnversioned(http.MethodGet, "/health?bundles=true", "") + if err := f.executeRequest(req, 500, `{}`); err != nil { + t.Fatal(err) + } - if err != nil { - t.Fatalf("Unexpected error: %s", err) + // Set the bundle to be not ready (plugin configured and created, but hasn't activated all bundles yet). + f.server.manager.UpdatePluginStatus("discovery", &plugins.Status{State: plugins.StateOK}) + f.server.manager.UpdatePluginStatus("bundle", &plugins.Status{State: plugins.StateNotReady}) + + // The discovery bundle is OK, but the newly configured bundle hasn't been activated yet, expect the health check to fail + req = newReqUnversioned(http.MethodGet, "/health?bundles=true", "") + if err := f.executeRequest(req, 500, `{}`); err != nil { + t.Fatal(err) } + // Set the bundle to be activated. + f.server.manager.UpdatePluginStatus("bundle", &plugins.Status{State: plugins.StateOK}) + // The heath check should now respond as healthy - req := newReqUnversioned(http.MethodGet, "/health?bundle=true", "") + req = newReqUnversioned(http.MethodGet, "/health?bundles=true", "") if err := f.executeRequest(req, 200, `{}`); err != nil { t.Fatal(err) } } -func TestUnversionedGetHealthCheckBundleActivationMulti(t *testing.T) { +func TestUnversionedGetHealthCheckBundleActivationSingleLegacy(t *testing.T) { + + // Initialize the server as if there is no bundle plugin f := newFixture(t) - // Initialize the server as if a bundle plugin was - // configured on the manager. - bp := pluginBundle.New(&pluginBundle.Config{Bundles: map[string]*pluginBundle.Source{ - "b1": {Service: "s1", Resource: "bundle.tar.gz"}, - "b2": {Service: "s2", Resource: "bundle.tar.gz"}, - "b3": {Service: "s3", Resource: "bundle.tar.gz"}, - }}, f.server.manager) - f.server.manager.Register(pluginBundle.Name, bp) - f.server.bundleStatuses = map[string]*pluginBundle.Status{ - "b1": {Name: "b1"}, - "b2": {Name: "b2"}, - "b3": {Name: "b3"}, - } + ctx := context.Background() - // No bundle has been activated yet, expect the health check to fail + // The server doesn't know about any bundles, so return a healthy status req := newReqUnversioned(http.MethodGet, "/health?bundle=true", "") - if err := f.executeRequest(req, 500, `{}`); err != nil { + if err := f.executeRequest(req, 200, `{}`); err != nil { t.Fatal(err) } - // Set one bundle to be activated - update := map[string]*pluginBundle.Status{ - "b1": {Name: "b1"}, - "b2": {Name: "b2"}, - "b3": {Name: "b3"}, - } - update["b2"].SetActivateSuccess("A") - f.server.updateBundleStatus(update) + err := storage.Txn(ctx, f.server.store, storage.WriteParams, func(txn storage.Transaction) error { + return bundle.LegacyWriteManifestToStore(ctx, f.server.store, txn, bundle.Manifest{ + Revision: "a", + }) + }) - // The heath check should still respond as unhealthy - req = newReqUnversioned(http.MethodGet, "/health?bundle=true", "") - if err := f.executeRequest(req, 500, `{}`); err != nil { - t.Fatal(err) + if err != nil { + t.Fatalf("Unexpected error: %s", err) } - // Activate all the bundles - update["b1"].SetActivateSuccess("B") - update["b3"].SetActivateSuccess("C") - f.server.updateBundleStatus(update) - - // The heath check should succeed now + // The heath check still respond as healthy with a legacy bundle found in storage req = newReqUnversioned(http.MethodGet, "/health?bundle=true", "") if err := f.executeRequest(req, 200, `{}`); err != nil { t.Fatal(err) } } -func TestInitWithBundlePlugin(t *testing.T) { - store := inmem.New() - m, err := plugins.New([]byte{}, "test", store) - if err != nil { - t.Fatalf("Unexpected error creating plugin manager: %s", err.Error()) - } +func TestBundlesReady(t *testing.T) { - bundleName := "test-bundle" - bundleConf := &pluginBundle.Config{ - Name: bundleName, - Service: "s1", - Bundles: map[string]*pluginBundle.Source{"b1": {}}, + cases := []struct { + note string + status map[string]*plugins.Status + ready bool + }{ + { + note: "nil status", + status: nil, + ready: true, + }, + { + note: "empty status", + status: map[string]*plugins.Status{}, + ready: true, + }, + { + note: "discovery not ready - bundle missing", + status: map[string]*plugins.Status{ + "discovery": {State: plugins.StateNotReady}, + }, + ready: false, + }, + { + note: "discovery ok - bundle missing", + status: map[string]*plugins.Status{ + "discovery": {State: plugins.StateOK}, + }, + ready: true, // bundles aren't enabled, only discovery plugin configured + }, + { + note: "discovery missing - bundle not ready", + status: map[string]*plugins.Status{ + "bundle": {State: plugins.StateNotReady}, + }, + ready: false, + }, + { + note: "discovery missing - bundle ok", + status: map[string]*plugins.Status{ + "bundle": {State: plugins.StateOK}, + }, + ready: true, // discovery isn't enabled, only bundle plugin configured + }, + { + note: "discovery not ready - bundle not ready", + status: map[string]*plugins.Status{ + "discovery": {State: plugins.StateNotReady}, + "bundle": {State: plugins.StateNotReady}, + }, + ready: false, + }, + { + note: "discovery ok - bundle not ready", + status: map[string]*plugins.Status{ + "discovery": {State: plugins.StateOK}, + "bundle": {State: plugins.StateNotReady}, + }, + ready: false, + }, + { + note: "discovery not ready - bundle ok", + status: map[string]*plugins.Status{ + "discovery": {State: plugins.StateNotReady}, + "bundle": {State: plugins.StateOK}, + }, + ready: false, + }, + { + note: "discovery ok - bundle ok", + status: map[string]*plugins.Status{ + "discovery": {State: plugins.StateOK}, + "bundle": {State: plugins.StateOK}, + }, + ready: true, + }, } - m.Register(pluginBundle.Name, pluginBundle.New(bundleConf, m)) - - server, err := New(). - WithStore(store). - WithManager(m). - Init(context.Background()) + for _, tc := range cases { + t.Run(tc.note, func(t *testing.T) { + f := newFixture(t) - if err != nil { - t.Fatalf("Unexpected error initializing server: %s", err.Error()) + actual := f.server.bundlesReady(tc.status) + if actual != tc.ready { + t.Errorf("Expected %t got %t", tc.ready, actual) + } + }) } +} - if !server.hasBundle() { - t.Error("server.hasBundle should be true") - } +func TestUnversionedGetHealthCheckDiscoveryWithPlugins(t *testing.T) { - isActivated := server.bundlesActivated() - if isActivated { - t.Error("bundle should not be initialized to activated status") - } -} + // Use the same server through the cases, the status updates apply incrementally to it. + f := newFixture(t) -func TestInitWithBundlePluginMultiBundle(t *testing.T) { - store := inmem.New() - m, err := plugins.New([]byte{}, "test", store) - if err != nil { - t.Fatalf("Unexpected error creating plugin manager: %s", err.Error()) + cases := []struct { + note string + statusUpdates map[string]*plugins.Status + exp int + }{ + { + note: "no plugins configured", + statusUpdates: nil, + exp: 200, + }, + { + note: "one plugin configured - not ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "one plugin configured - ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + }, + exp: 200, + }, + { + note: "one plugin configured - error state", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateErr}, + }, + exp: 500, + }, + { + note: "one plugin configured - recovered from error", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + }, + exp: 200, + }, + { + note: "add second plugin - not ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + "p2": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "add third plugin - not ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + "p2": {State: plugins.StateNotReady}, + "p3": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "mixed states - not ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + "p2": {State: plugins.StateErr}, + "p3": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "mixed states - still not ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + "p2": {State: plugins.StateErr}, + "p3": {State: plugins.StateOK}, + }, + exp: 500, + }, + { + note: "all plugins ready", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + "p2": {State: plugins.StateOK}, + "p3": {State: plugins.StateOK}, + }, + exp: 200, + }, + { + note: "one plugins fails", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateErr}, + "p2": {State: plugins.StateOK}, + "p3": {State: plugins.StateOK}, + }, + exp: 500, + }, + + { + note: "all plugins ready - recovery", + statusUpdates: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + "p2": {State: plugins.StateOK}, + "p3": {State: plugins.StateOK}, + }, + exp: 200, + }, } - bundleConf := &pluginBundle.Config{Bundles: map[string]*pluginBundle.Source{ - "b1": {}, - "b2": {}, - "b3": {}, - }} + for _, tc := range cases { + t.Run(tc.note, func(t *testing.T) { + for name, status := range tc.statusUpdates { + f.server.manager.UpdatePluginStatus(name, status) + } - m.Register(pluginBundle.Name, pluginBundle.New(bundleConf, m)) + req := newReqUnversioned(http.MethodGet, "/health?plugins", "") + if err := f.executeRequest(req, tc.exp, `{}`); err != nil { + t.Fatal(err) + } + }) + } +} - server, err := New(). - WithStore(store). - WithManager(m). - Init(context.Background()) +func TestUnversionedGetHealthCheckBundleAndPlugins(t *testing.T) { - if err != nil { - t.Fatalf("Unexpected error initializing server: %s", err.Error()) + cases := []struct { + note string + statuses map[string]*plugins.Status + exp int + }{ + { + note: "no plugins configured", + statuses: nil, + exp: 200, + }, + { + note: "only bundle plugin configured - not ready", + statuses: map[string]*plugins.Status{ + "bundle": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "only bundle plugin configured - ok", + statuses: map[string]*plugins.Status{ + "bundle": {State: plugins.StateOK}, + }, + exp: 200, + }, + { + note: "only custom plugin configured - not ready", + statuses: map[string]*plugins.Status{ + "p1": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "only custom plugin configured - ok", + statuses: map[string]*plugins.Status{ + "p1": {State: plugins.StateOK}, + }, + exp: 200, + }, + { + note: "both configured - bundle not ready", + statuses: map[string]*plugins.Status{ + "bundle": {State: plugins.StateNotReady}, + "p1": {State: plugins.StateOK}, + }, + exp: 500, + }, + { + note: "both configured - custom plugin not ready", + statuses: map[string]*plugins.Status{ + "bundle": {State: plugins.StateOK}, + "p1": {State: plugins.StateNotReady}, + }, + exp: 500, + }, + { + note: "both configured - both ready", + statuses: map[string]*plugins.Status{ + "bundle": {State: plugins.StateOK}, + "p1": {State: plugins.StateOK}, + }, + exp: 200, + }, } - if !server.hasBundle() { - t.Error("server.hasBundle should be true") - } + for _, tc := range cases { + t.Run(tc.note, func(t *testing.T) { + f := newFixture(t) - isActivated := server.bundlesActivated() - if isActivated { - t.Error("bundle should not be initialized to activated") + for name, status := range tc.statuses { + f.server.manager.UpdatePluginStatus(name, status) + } + + req := newReqUnversioned(http.MethodGet, "/health?plugins&bundles", "") + if err := f.executeRequest(req, tc.exp, `{}`); err != nil { + t.Fatal(err) + } + }) } } @@ -1680,9 +1886,6 @@ func TestDataProvenanceSingleBundle(t *testing.T) { // Initialize as if a bundle plugin is running bp := pluginBundle.New(&pluginBundle.Config{Name: "b1"}, f.server.manager) f.server.manager.Register(pluginBundle.Name, bp) - f.server.bundleStatuses = map[string]*pluginBundle.Status{ - "b1": {Name: "b1"}, - } req := newReqV1(http.MethodPost, "/data?provenance", "") f.reset() @@ -1791,11 +1994,6 @@ func TestDataProvenanceMultiBundle(t *testing.T) { }}, f.server.manager) f.server.manager.Register(pluginBundle.Name, bp) - f.server.bundleStatuses = map[string]*pluginBundle.Status{ - "b1": {Name: "b1"}, - "b2": {Name: "b2"}, - } - req := newReqV1(http.MethodPost, "/data?provenance", "") f.reset() f.server.Handler.ServeHTTP(f.recorder, req) diff --git a/server/types/types.go b/server/types/types.go index 3d6fc47d67..d2b48e22cb 100644 --- a/server/types/types.go +++ b/server/types/types.go @@ -427,7 +427,18 @@ const ( // ParamBundleActivationV1 defines the name of the HTTP URL parameter that // indicates the client wants to include bundle activation in the results // of the health API. + // Deprecated: Use ParamBundlesActivationV1 instead. ParamBundleActivationV1 = "bundle" + + // ParamBundlesActivationV1 defines the name of the HTTP URL parameter that + // indicates the client wants to include bundle activation in the results + // of the health API. + ParamBundlesActivationV1 = "bundles" + + // ParamPluginsV1 defines the name of the HTTP URL parameter that + // indicates the client wants to include bundle status in the results + // of the health API. + ParamPluginsV1 = "plugins" ) // BadRequestErr represents an error condition raised if the caller passes