Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additive updates to services when discovery enabled #2318

Merged
merged 1 commit into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/content/management.md
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,8 @@ configuration labels or environment variables.

### Limitations

The discovery feature cannot be used to dynamically modify `services`, `labels`
and `discovery`. This means that these configuration settings should be included
in the bootup configuration file provided to OPA.
In practice, discovery services do not change frequently. These configuration sections are treated as
immutable to avoid accidental configuration errors rendering OPA unable to discover a new configuration.
If the discovered configuration changes the `discovery` or `labels` sections,
those changes are ignored. If the discovered configuration changes the discovery service,
an error will be logged.
48 changes: 48 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2020 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

// Package config implements helper functions to parse OPA's configuration.
package config

import (
"encoding/json"

"github.com/open-policy-agent/opa/plugins/rest"
"github.com/open-policy-agent/opa/util"
)

// ParseServicesConfig returns a set of named service clients. The service
// clients can be specified either as an array or as a map. Some systems (e.g.,
// Helm) do not have proper support for configuration values nested under
// arrays, so just support both here.
func ParseServicesConfig(raw json.RawMessage) (map[string]rest.Client, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a side note, but I imagine us having to change this API slightly in the future when we introduce support for other bundle download protocols like Git. In that case, the service will no longer be a rest.Client. No changes required at this time since this is just an internal API.


services := map[string]rest.Client{}

var arr []json.RawMessage
var obj map[string]json.RawMessage

if err := util.Unmarshal(raw, &arr); err == nil {
for _, s := range arr {
client, err := rest.New(s)
if err != nil {
return nil, err
}
services[client.Service()] = client
}
} else if util.Unmarshal(raw, &obj) == nil {
for k := range obj {
client, err := rest.New(obj[k], rest.Name(k))
if err != nil {
return nil, err
}
services[client.Service()] = client
}
} else {
// Return error from array decode as that is the default format.
return nil, err
}

return services, nil
}
40 changes: 26 additions & 14 deletions plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
bundleApi "github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/download"
cfg "github.com/open-policy-agent/opa/internal/config"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/plugins/bundle"
"github.com/open-policy-agent/opa/plugins/logs"
Expand Down Expand Up @@ -174,20 +175,11 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {

func (c *Discovery) reconfigure(ctx context.Context, u download.Update) error {

config, ps, err := processBundle(ctx, c.manager, c.factories, u.Bundle, c.config.query, c.metrics)
ps, err := processBundle(ctx, c.manager, c.factories, u.Bundle, c.config.query, c.config.service, c.metrics)
if err != nil {
return err
}

if err := c.manager.Reconfigure(config); err != nil {
return err
}

// TODO(tsandall): we don't currently support changes to discovery
// configuration. These changes are risky because errors would be
// unrecoverable (without keeping track of changes and rolling back...)

// TODO(tsandall): add protection against discovery -service- changing.
for _, p := range ps.Start {
if err := p.Start(ctx); err != nil {
return err
Expand Down Expand Up @@ -220,15 +212,35 @@ func (c *Discovery) logrusFields() logrus.Fields {
}
}

func processBundle(ctx context.Context, manager *plugins.Manager, factories map[string]plugins.Factory, b *bundleApi.Bundle, query string, m metrics.Metrics) (*config.Config, *pluginSet, error) {
func processBundle(ctx context.Context, manager *plugins.Manager, factories map[string]plugins.Factory, b *bundleApi.Bundle, query, service string, m metrics.Metrics) (*pluginSet, error) {

config, err := evaluateBundle(ctx, manager.ID, manager.Info, b, query)
if err != nil {
return nil, nil, err
return nil, err
}

// Note: We don't currently support changes to the discovery
// configuration. These changes are risky because errors would be
// unrecoverable (without keeping track of changes and rolling back...)

// check for updates to the discovery service
services, err := cfg.ParseServicesConfig(config.Services)
if err != nil {
return nil, err
}

if client, ok := services[service]; ok {
dClient := manager.Client(service)
if !client.Config().Equal(dClient.Config()) {
return nil, fmt.Errorf("updates to the discovery service are not allowed")
}
}

if err := manager.Reconfigure(config); err != nil {
return nil, err
}

ps, err := getPluginSet(factories, manager, config, m)
return config, ps, err
return getPluginSet(factories, manager, config, m)
}

func evaluateBundle(ctx context.Context, id string, info *ast.Term, b *bundleApi.Bundle, query string) (*config.Config, error) {
Expand Down
227 changes: 224 additions & 3 deletions plugins/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestProcessBundle(t *testing.T) {
}
`)

_, ps, err := processBundle(ctx, manager, nil, initialBundle, "data.config", nil)
ps, err := processBundle(ctx, manager, nil, initialBundle, "data.config", "default", nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -139,7 +139,7 @@ func TestProcessBundle(t *testing.T) {
}
`)

_, ps, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", nil)
ps, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", "default", nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -156,7 +156,7 @@ func TestProcessBundle(t *testing.T) {
}
`)

_, _, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", nil)
_, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", "default", nil)
if err == nil {
t.Fatal("Expected error but got success")
}
Expand Down Expand Up @@ -274,6 +274,227 @@ func TestReconfigure(t *testing.T) {

}

func TestReconfigureWithUpdates(t *testing.T) {

ctx := context.Background()

manager, err := plugins.New([]byte(`{
"labels": {"x": "y"},
"services": {
"localhost": {
"url": "http://localhost:9999"
}
},
"discovery": {"name": "config"},
}`), "test-id", inmem.New())
if err != nil {
t.Fatal(err)
}

disco, err := New(manager)
if err != nil {
t.Fatal(err)
}

initialBundle := makeDataBundle(1, `
{
"config": {
"bundle": {"name": "test1"},
"status": {},
"decision_logs": {}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: initialBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

originalConfig := disco.config
// update the discovery configuration and check
// the boot configuration is not overwritten
updatedBundle := makeDataBundle(2, `
{
"config": {
"discovery": {
"name": "config",
"decision": "/foo/bar"
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

if !reflect.DeepEqual(originalConfig, disco.config) {
t.Fatal("Discovery configuration updated")
}

// no update to the discovery configuration and check no error generated
updatedBundle = makeDataBundle(3, `
{
"config": {
"discovery": {
"name": "config"
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

if !reflect.DeepEqual(originalConfig, disco.config) {
t.Fatal("Discovery configuration updated")
}

// update the discovery service and check that error generated
updatedBundle = makeDataBundle(4, `
{
"config": {
"services": {
"localhost": {
"url": "http://localhost:9999",
"credentials": {"bearer": {"token": "blah"}}
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err == nil {
t.Fatal("Expected error but got nil")
}

expectedErrMsg := "updates to the discovery service are not allowed"
if err.Error() != expectedErrMsg {
t.Fatalf("Expected error message: %v but got: %v", expectedErrMsg, err.Error())
}

// no update to the discovery service and check no error generated
updatedBundle = makeDataBundle(5, `
{
"config": {
"services": {
"localhost": {
"url": "http://localhost:9999"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

// add a new service and a new bundle
updatedBundle = makeDataBundle(6, `
{
"config": {
"services": {
"acmecorp": {
"url": "http://localhost:8181"
}
},
"bundles": {
"authz": {
"service": "acmecorp"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

if len(disco.manager.Services()) != 2 {
t.Fatalf("Expected two services but got %v\n", len(disco.manager.Services()))
}

bPlugin := bundle.Lookup(disco.manager)
config := bPlugin.Config()
expected := "acmecorp"
if config.Bundles["authz"].Service != expected {
t.Fatalf("Expected service %v for bundle authz but got %v", expected, config.Bundles["authz"].Service)
}

// update existing bundle's config and add a new bundle
updatedBundle = makeDataBundle(7, `
{
"config": {
"bundles": {
"authz": {
"service": "localhost",
"resource": "foo/bar"
},
"main": {
"resource": "baz/bar"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

bPlugin = bundle.Lookup(disco.manager)
config = bPlugin.Config()
expectedSvc := "localhost"
if config.Bundles["authz"].Service != expectedSvc {
t.Fatalf("Expected service %v for bundle authz but got %v", expectedSvc, config.Bundles["authz"].Service)
}

expectedRes := "foo/bar"
if config.Bundles["authz"].Resource != expectedRes {
t.Fatalf("Expected resource %v for bundle authz but got %v", expectedRes, config.Bundles["authz"].Resource)
}

expectedSvcs := map[string]bool{"localhost": true, "acmecorp": true}
if _, ok := expectedSvcs[config.Bundles["main"].Service]; !ok {
t.Fatalf("Expected service for bundle main to be one of [%v, %v] but got %v", "localhost", "acmecorp", config.Bundles["main"].Service)
}

// update existing (non-discovery)service's config
updatedBundle = makeDataBundle(8, `
{
"config": {
"services": {
"acmecorp": {
"url": "http://localhost:8181",
"credentials": {"bearer": {"token": "blah"}}
}
},
"bundles": {
"authz": {
"service": "localhost",
"resource": "foo/bar"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
}

type testServer struct {
t *testing.T
mtx sync.Mutex
Expand Down
Loading