Skip to content

Commit

Permalink
plugins/status: Update to report plugin statuses
Browse files Browse the repository at this point in the history
This adds in the status updates for the status plugin (reported to the
plugin manager) as well as hooking the status plugin up as a listener
for *all* plugin status updates to include in the status API events.

Signed-off-by: Patrick East <east.patrick@gmail.com>
  • Loading branch information
patrick-east authored and tsandall committed Feb 7, 2020
1 parent ff90014 commit 766fd8f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 39 deletions.
52 changes: 34 additions & 18 deletions docs/content/management.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,12 @@ OPA can periodically report status updates to remote HTTP servers. The
updates contain status information for OPA itself as well as the
[Bundles](#bundles) that have been downloaded and activated.

OPA sends status reports whenever bundles are downloaded and activated. If
the bundle download or activation fails for any reason, the status update
will include error information describing the failure.
OPA sends status reports whenever one of the following happens:

* Bundles are downloaded and activated -- If the bundle download or activation fails for any reason, the status update
will include error information describing the failure. This includes Discovery bundles.
* A plugin state has changed -- All plugin status is reported, and an update to any plugin will
trigger a Status API report which contains the latest state.

The status updates will include a set of labels that uniquely identify the
OPA instance. OPA automatically includes an `id` value in the label set that
Expand All @@ -457,23 +460,34 @@ on the agent, updates will be sent to `/status`.

```json
{
"labels": {
"app": "my-example-app",
"id": "1780d507-aea2-45cc-ae50-fa153c8e4a5a",
"version": "{{< current_version >}}"
"labels": {
"app": "my-example-app",
"id": "1780d507-aea2-45cc-ae50-fa153c8e4a5a",
"version": "{{< current_version >}}"
},
"bundles": {
"http/example/authz": {
"active_revision": "ABC",
"last_successful_download": "2018-01-01T00:00:00.000Z",
"last_successful_activation": "2018-01-01T00:00:00.000Z",
"metrics": {
"timer_rego_data_parse_ns": 12345,
"timer_rego_module_compile_ns": 12345,
"timer_rego_module_parse_ns": 12345
}
}
},
"plugins": {
"bundle": {
"state": "OK"
},
"bundles": {
"http/example/authz": {
"active_revision": "TODO",
"last_successful_download": "2018-01-01T00:00:00.000Z",
"last_successful_activation": "2018-01-01T00:00:00.000Z",
"metrics": {
"timer_rego_data_parse_ns": 12345,
"timer_rego_module_compile_ns": 12345,
"timer_rego_module_parse_ns": 12345
}
}
"discovery": {
"state": "OK"
},
"status": {
"state": "OK"
}
},
"metrics": {
"prometheus": {
"go_gc_duration_seconds": {
Expand Down Expand Up @@ -609,6 +623,8 @@ Status updates contain the following fields:
| `discovery.active_revision` | `string` | Opaque revision identifier of the last successful discovery activation. |
| `discovery.last_successful_download` | `string` | RFC3339 timestamp of last successful discovery bundle download. |
| `discovery.last_successful_activation` | `string` | RFC3339 timestamp of last successful discovery bundle activation. |
| `plugins` | `object` | A set of objects describing the state of configured plugins in OPA's runtime. |
| `plugins[_].state | `string` | The state of each plugin. |
| `metrics.prometheus` | `object` | Global performance metrics for the OPA instance. |

If the bundle download or activation failed, the status update will contain
Expand Down
60 changes: 47 additions & 13 deletions plugins/status/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
// UpdateRequestV1 represents the status update message that OPA sends to
// remote HTTP endpoints.
type UpdateRequestV1 struct {
Labels map[string]string `json:"labels"`
Bundle *bundle.Status `json:"bundle,omitempty"` // Deprecated: Use bulk `bundles` status updates instead
Bundles map[string]*bundle.Status `json:"bundles,omitempty"`
Discovery *bundle.Status `json:"discovery,omitempty"`
Metrics map[string]interface{} `json:"metrics,omitempty"`
Labels map[string]string `json:"labels"`
Bundle *bundle.Status `json:"bundle,omitempty"` // Deprecated: Use bulk `bundles` status updates instead
Bundles map[string]*bundle.Status `json:"bundles,omitempty"`
Discovery *bundle.Status `json:"discovery,omitempty"`
Metrics map[string]interface{} `json:"metrics,omitempty"`
Plugins map[string]*plugins.Status `json:"plugins,omitempty"`
}

// Plugin implements status reporting. Updates can be triggered by the caller.
Expand All @@ -44,6 +45,8 @@ type Plugin struct {
stop chan chan struct{}
reconfig chan interface{}
metrics metrics.Metrics
lastPluginStatuses map[string]*plugins.Status
pluginStatusCh chan map[string]*plugins.Status
}

// Config contains configuration for the plugin.
Expand Down Expand Up @@ -106,15 +109,20 @@ func ParseConfig(config []byte, services []string) (*Config, error) {

// New returns a new Plugin with the given config.
func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {
return &Plugin{
manager: manager,
config: *parsedConfig,
bundleCh: make(chan bundle.Status),
bulkBundleCh: make(chan map[string]*bundle.Status),
discoCh: make(chan bundle.Status),
stop: make(chan chan struct{}),
reconfig: make(chan interface{}),
p := &Plugin{
manager: manager,
config: *parsedConfig,
bundleCh: make(chan bundle.Status),
bulkBundleCh: make(chan map[string]*bundle.Status),
discoCh: make(chan bundle.Status),
stop: make(chan chan struct{}),
reconfig: make(chan interface{}),
pluginStatusCh: make(chan map[string]*plugins.Status),
}

p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady})

return p
}

// WithMetrics sets the global metrics provider to be used by the plugin.
Expand All @@ -137,16 +145,28 @@ func Lookup(manager *plugins.Manager) *Plugin {
// Start starts the plugin.
func (p *Plugin) Start(ctx context.Context) error {
p.logInfo("Starting status reporter.")

go p.loop()

// Setup a listener for plugin statuses, but only after starting the loop
// to prevent blocking threads pushing the plugin updates.
p.manager.RegisterPluginStatusListener(Name, p.UpdatePluginStatus)

// Set the status plugin's status to OK now that everything is registered and
// the loop is running. This will trigger an update on the listener with the
// current status of all the other plugins too.
p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateOK})
return nil
}

// Stop stops the plugin.
func (p *Plugin) Stop(ctx context.Context) {
p.logInfo("Stopping status reporter.")
p.manager.UnregisterPluginStatusListener(Name)
done := make(chan struct{})
p.stop <- done
_ = <-done
p.manager.UpdatePluginStatus(Name, &plugins.Status{State: plugins.StateNotReady})
}

// UpdateBundleStatus notifies the plugin that the policy bundle was updated.
Expand All @@ -165,6 +185,11 @@ func (p *Plugin) UpdateDiscoveryStatus(status bundle.Status) {
p.discoCh <- status
}

// UpdatePluginStatus notifies the plugin that a plugin status was updated.
func (p *Plugin) UpdatePluginStatus(status map[string]*plugins.Status) {
p.pluginStatusCh <- status
}

// Reconfigure notifies the plugin with a new configuration.
func (p *Plugin) Reconfigure(_ context.Context, config interface{}) {
p.reconfig <- config
Expand All @@ -176,6 +201,14 @@ func (p *Plugin) loop() {

for {
select {
case statuses := <-p.pluginStatusCh:
p.lastPluginStatuses = statuses
err := p.oneShot(ctx)
if err != nil {
p.logError("%v.", err)
} else {
p.logInfo("Status update sent successfully in response to plugin update.")
}
case statuses := <-p.bulkBundleCh:
p.lastBundleStatuses = statuses
err := p.oneShot(ctx)
Expand Down Expand Up @@ -219,6 +252,7 @@ func (p *Plugin) oneShot(ctx context.Context) error {
Discovery: p.lastDiscoStatus,
Bundle: p.lastBundleStatus,
Bundles: p.lastBundleStatuses,
Plugins: p.lastPluginStatuses,
}

if p.metrics != nil {
Expand Down
48 changes: 40 additions & 8 deletions plugins/status/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func TestPluginStart(t *testing.T) {
fixture.plugin.Start(ctx)
defer fixture.plugin.Stop(ctx)

status := testStatus()

fixture.plugin.UpdateBundleStatus(*status)
// Start will trigger a status update when the plugin state switches
// from "not ready" to "ok".
result := <-fixture.server.ch

exp := UpdateRequestV1{
Expand All @@ -51,12 +50,25 @@ func TestPluginStart(t *testing.T) {
"app": "example-app",
"version": version.Version,
},
Bundle: status,
Plugins: map[string]*plugins.Status{
"status": {State: plugins.StateOK},
},
}

if !reflect.DeepEqual(result, exp) {
t.Fatalf("Expected: %v but got: %v", exp, result)
}

status := testStatus()

fixture.plugin.UpdateBundleStatus(*status)
result = <-fixture.server.ch

exp.Bundle = status

if !reflect.DeepEqual(result, exp) {
t.Fatalf("Expected: %v but got: %v", exp, result)
}
}

func TestPluginStartBulkUpdate(t *testing.T) {
Expand All @@ -70,9 +82,8 @@ func TestPluginStartBulkUpdate(t *testing.T) {
fixture.plugin.Start(ctx)
defer fixture.plugin.Stop(ctx)

status := testStatus()

fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{status.Name: status})
// Start will trigger a status update when the plugin state switches
// from "not ready" to "ok".
result := <-fixture.server.ch

exp := UpdateRequestV1{
Expand All @@ -81,9 +92,18 @@ func TestPluginStartBulkUpdate(t *testing.T) {
"app": "example-app",
"version": version.Version,
},
Bundles: map[string]*bundle.Status{status.Name: status},
Plugins: map[string]*plugins.Status{
"status": {State: plugins.StateOK},
},
}

status := testStatus()

fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{status.Name: status})
result = <-fixture.server.ch

exp.Bundles = map[string]*bundle.Status{status.Name: status}

if !reflect.DeepEqual(result, exp) {
t.Fatalf("Expected: %v but got: %v", exp, result)
}
Expand All @@ -100,6 +120,9 @@ func TestPluginStartBulkUpdateMultiple(t *testing.T) {
fixture.plugin.Start(ctx)
defer fixture.plugin.Stop(ctx)

// Ignore the plugin updating its status (tested elsewhere)
<-fixture.server.ch

statuses := map[string]*bundle.Status{}
tDownload, _ := time.Parse("2018-01-01T00:00:00.0000000Z", time.RFC3339Nano)
tActivate, _ := time.Parse("2018-01-01T00:00:01.0000000Z", time.RFC3339Nano)
Expand Down Expand Up @@ -152,6 +175,9 @@ func TestPluginStartDiscovery(t *testing.T) {
fixture.plugin.Start(ctx)
defer fixture.plugin.Stop(ctx)

// Ignore the plugin updating its status (tested elsewhere)
<-fixture.server.ch

status := testStatus()

fixture.plugin.UpdateDiscoveryStatus(*status)
Expand All @@ -164,6 +190,9 @@ func TestPluginStartDiscovery(t *testing.T) {
"version": version.Version,
},
Discovery: status,
Plugins: map[string]*plugins.Status{
"status": {State: plugins.StateOK},
},
}

if !reflect.DeepEqual(result, exp) {
Expand Down Expand Up @@ -241,6 +270,9 @@ func TestMetrics(t *testing.T) {
fixture.plugin.Start(ctx)
defer fixture.plugin.Stop(ctx)

// Ignore the plugin updating its status (tested elsewhere)
<-fixture.server.ch

status := testStatus()

fixture.plugin.BulkUpdateBundleStatus(map[string]*bundle.Status{"bundle": status})
Expand Down

0 comments on commit 766fd8f

Please sign in to comment.