Skip to content

Commit

Permalink
plugins: Additive updates to services when discovery enabled
Browse files Browse the repository at this point in the history
Earlier with discovery enabled updates to the 'services' configuration
was not allowed to protect against accidental changes to the discovery service
itself. Since adding new services could be useful, this change allows modifications
to the 'services' configuration. The only exception is that the service used to download
the discovery bundle cannot be modified.

Fixes #2058

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Apr 17, 2020
1 parent 71454de commit d67c0f7
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 20 deletions.
5 changes: 2 additions & 3 deletions docs/content/management.md
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,5 @@ configuration labels or environment variables.

### Limitations

The discovery feature cannot be used to dynamically modify `services`, `labels`
and `discovery`. This means that these configuration settings should be included
in the bootup configuration file provided to OPA.
The discovery feature cannot be used to dynamically modify `labels` and `discovery`. This means that these configuration settings should be included
in the bootup configuration file provided to OPA. However, it can be used to modify `services` with the exception of the service used to download the discovery bundle.
34 changes: 20 additions & 14 deletions plugins/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,11 @@ func (c *Discovery) processUpdate(ctx context.Context, u download.Update) {

func (c *Discovery) reconfigure(ctx context.Context, u download.Update) error {

config, ps, err := processBundle(ctx, c.manager, c.factories, u.Bundle, c.config.query, c.metrics)
ps, err := processBundle(ctx, c.manager, c.factories, u.Bundle, c.config.query, c.config.service, c.metrics)
if err != nil {
return err
}

if err := c.manager.Reconfigure(config); err != nil {
return err
}

// TODO(tsandall): we don't currently support changes to discovery
// configuration. These changes are risky because errors would be
// unrecoverable (without keeping track of changes and rolling back...)

// TODO(tsandall): add protection against discovery -service- changing.
for _, p := range ps.Start {
if err := p.Start(ctx); err != nil {
return err
Expand Down Expand Up @@ -220,15 +211,30 @@ func (c *Discovery) logrusFields() logrus.Fields {
}
}

func processBundle(ctx context.Context, manager *plugins.Manager, factories map[string]plugins.Factory, b *bundleApi.Bundle, query string, m metrics.Metrics) (*config.Config, *pluginSet, error) {
func processBundle(ctx context.Context, manager *plugins.Manager, factories map[string]plugins.Factory, b *bundleApi.Bundle, query, service string, m metrics.Metrics) (*pluginSet, error) {

config, err := evaluateBundle(ctx, manager.ID, manager.Info, b, query)
if err != nil {
return nil, nil, err
return nil, err
}

// check for updates to the discovery configuration
if config.Discovery != nil {
return nil, fmt.Errorf("updates to the discovery configuration are not allowed")
}

client := manager.Client(service)

if err := manager.Reconfigure(config); err != nil {
return nil, err
}

// check for updates to the discovery service
if !client.Equal(manager.Client(service)) {
return nil, fmt.Errorf("updates to the discovery service are not allowed")
}

ps, err := getPluginSet(factories, manager, config, m)
return config, ps, err
return getPluginSet(factories, manager, config, m)
}

func evaluateBundle(ctx context.Context, id string, info *ast.Term, b *bundleApi.Bundle, query string) (*config.Config, error) {
Expand Down
189 changes: 186 additions & 3 deletions plugins/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestProcessBundle(t *testing.T) {
}
`)

_, ps, err := processBundle(ctx, manager, nil, initialBundle, "data.config", nil)
ps, err := processBundle(ctx, manager, nil, initialBundle, "data.config", "default", nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -139,7 +139,7 @@ func TestProcessBundle(t *testing.T) {
}
`)

_, ps, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", nil)
ps, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", "default", nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -156,7 +156,7 @@ func TestProcessBundle(t *testing.T) {
}
`)

_, _, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", nil)
_, err = processBundle(ctx, manager, nil, updatedBundle, "data.config", "default", nil)
if err == nil {
t.Fatal("Expected error but got success")
}
Expand Down Expand Up @@ -274,6 +274,189 @@ func TestReconfigure(t *testing.T) {

}

func TestReconfigureWithUpdates(t *testing.T) {

ctx := context.Background()

manager, err := plugins.New([]byte(`{
"labels": {"x": "y"},
"services": {
"localhost": {
"url": "http://localhost:9999"
}
},
"discovery": {"name": "config"},
}`), "test-id", inmem.New())
if err != nil {
t.Fatal(err)
}

disco, err := New(manager)
if err != nil {
t.Fatal(err)
}

initialBundle := makeDataBundle(1, `
{
"config": {
"bundle": {"name": "test1"},
"status": {},
"decision_logs": {}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: initialBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

// update the discovery configuration
updatedBundle := makeDataBundle(2, `
{
"config": {
"discovery": {
"name": "config",
"decision": "/foo/bar"
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err == nil {
t.Fatal("Expected error but got nil")
}

expectedErrMsg := "updates to the discovery configuration are not allowed"
if err.Error() != expectedErrMsg {
t.Fatalf("Expected error message: %v but got: %v", expectedErrMsg, err.Error())
}

// update the discovery service
updatedBundle = makeDataBundle(3, `
{
"config": {
"services": {
"localhost": {
"url": "http://localhost:9999",
"credentials": {"bearer": {"token": "blah"}}
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err == nil {
t.Fatal("Expected error but got nil")
}

expectedErrMsg = "updates to the discovery service are not allowed"
if err.Error() != expectedErrMsg {
t.Fatalf("Expected error message: %v but got: %v", expectedErrMsg, err.Error())
}

// add a new service and a new bundle
updatedBundle = makeDataBundle(4, `
{
"config": {
"services": {
"acmecorp": {
"url": "http://localhost:8181"
}
},
"bundles": {
"authz": {
"service": "acmecorp"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

if len(disco.manager.Services()) != 2 {
t.Fatalf("Expected two services but got %v\n", len(disco.manager.Services()))
}

bPlugin := bundle.Lookup(disco.manager)
config := bPlugin.Config()
expected := "acmecorp"
if config.Bundles["authz"].Service != expected {
t.Fatalf("Expected service %v for bundle authz but got %v", expected, config.Bundles["authz"].Service)
}

// update existing bundle's config and add a new bundle
updatedBundle = makeDataBundle(5, `
{
"config": {
"bundles": {
"authz": {
"service": "localhost",
"resource": "foo/bar"
},
"main": {
"resource": "baz/bar"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}

bPlugin = bundle.Lookup(disco.manager)
config = bPlugin.Config()
expectedSvc := "localhost"
if config.Bundles["authz"].Service != expectedSvc {
t.Fatalf("Expected service %v for bundle authz but got %v", expectedSvc, config.Bundles["authz"].Service)
}

expectedRes := "foo/bar"
if config.Bundles["authz"].Resource != expectedRes {
t.Fatalf("Expected resource %v for bundle authz but got %v", expectedRes, config.Bundles["authz"].Resource)
}

expectedSvcs := map[string]bool{"localhost": true, "acmecorp": true}
if _, ok := expectedSvcs[config.Bundles["authz"].Service]; !ok {
t.Fatalf("Expected service for bundle main to be one of [%v, %v] but got %v", "localhost", "acmecorp", config.Bundles["authz"].Service)
}

// update existing (non-discovery)service's config
updatedBundle = makeDataBundle(6, `
{
"config": {
"services": {
"acmecorp": {
"url": "http://localhost:8181",
"credentials": {"bearer": {"token": "blah"}}
}
},
"bundles": {
"authz": {
"service": "localhost",
"resource": "foo/bar"
}
}
}
}
`)

err = disco.reconfigure(ctx, download.Update{Bundle: updatedBundle})
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
}

type testServer struct {
t *testing.T
mtx sync.Mutex
Expand Down
5 changes: 5 additions & 0 deletions plugins/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (c Client) WithBytes(body []byte) Client {
return c
}

// Equal returns true if this client is equal to the other.
func (c Client) Equal(other Client) bool {
return reflect.DeepEqual(c.config, other.config)
}

// Do executes a request using the client.
func (c Client) Do(ctx context.Context, method, path string) (*http.Response, error) {

Expand Down

0 comments on commit d67c0f7

Please sign in to comment.