Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix performance impact of bundle activation on policy queries #1516

Merged
merged 2 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions plugins/bundle/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ func (p *Plugin) process(ctx context.Context, u download.Update) {
func (p *Plugin) activate(ctx context.Context, b *bundle.Bundle) error {
p.logDebug("Bundle activation in progress. Opening storage transaction.")

return storage.Txn(ctx, p.manager.Store, storage.WriteParams, func(txn storage.Transaction) error {
params := storage.WriteParams
params.Context = storage.NewContext()

return storage.Txn(ctx, p.manager.Store, params, func(txn storage.Transaction) error {
p.logDebug("Opened storage transaction (%v).", txn.ID())
defer p.logDebug("Closing storage transaction (%v).", txn.ID())

Expand All @@ -207,7 +210,8 @@ func (p *Plugin) activate(ctx context.Context, b *bundle.Bundle) error {
return err
}

if err := p.erasePolicies(ctx, txn, erase); err != nil {
remaining, err := p.erasePolicies(ctx, txn, erase)
if err != nil {
return err
}

Expand All @@ -217,14 +221,17 @@ func (p *Plugin) activate(ctx context.Context, b *bundle.Bundle) error {
return err
}

if err := p.writeModules(ctx, txn, b.Modules); err != nil {
compiler, err := p.writeModules(ctx, txn, b.Modules, remaining)
if err != nil {
return err
}

if err := manifest.Write(ctx, p.manager.Store, txn, b.Manifest); err != nil {
return err
}

plugins.SetCompilerOnContext(params.Context, compiler)

return nil
})
}
Expand All @@ -246,34 +253,44 @@ func (p *Plugin) eraseData(ctx context.Context, txn storage.Transaction, roots m
return nil
}

func (p *Plugin) erasePolicies(ctx context.Context, txn storage.Transaction, roots map[string]struct{}) error {
func (p *Plugin) erasePolicies(ctx context.Context, txn storage.Transaction, roots map[string]struct{}) (map[string]*ast.Module, error) {

ids, err := p.manager.Store.ListPolicies(ctx, txn)
if err != nil {
return err
return nil, err
}

remaining := map[string]*ast.Module{}

for _, id := range ids {
bs, err := p.manager.Store.GetPolicy(ctx, txn, id)
if err != nil {
return err
return nil, err
}
module, err := ast.ParseModule(id, string(bs))
if err != nil {
return err
return nil, err
}
path, err := module.Package.Path.Ptr()
if err != nil {
return err
return nil, err
}
deleted := false
for root := range roots {
if strings.HasPrefix(path, root) {
if err := p.manager.Store.DeletePolicy(ctx, txn, id); err != nil {
return err
return nil, err
}
deleted = true
break
}
}
if !deleted {
remaining[id] = module
}
}
return nil

return remaining, nil
}

func (p *Plugin) writeData(ctx context.Context, txn storage.Transaction, roots []string, data map[string]interface{}) error {
Expand All @@ -296,22 +313,25 @@ func (p *Plugin) writeData(ctx context.Context, txn storage.Transaction, roots [
return nil
}

func (p *Plugin) writeModules(ctx context.Context, txn storage.Transaction, files []bundle.ModuleFile) error {
func (p *Plugin) writeModules(ctx context.Context, txn storage.Transaction, files []bundle.ModuleFile, remaining map[string]*ast.Module) (*ast.Compiler, error) {
modules := map[string]*ast.Module{}
for name, module := range remaining {
modules[name] = module
}
for _, file := range files {
modules[file.Path] = file.Parsed
}
compiler := ast.NewCompiler().
WithPathConflictsCheck(storage.NonEmpty(ctx, p.manager.Store, txn))
if compiler.Compile(modules); compiler.Failed() {
return compiler.Errors
return nil, compiler.Errors
}
for _, file := range files {
if err := p.manager.Store.UpsertPolicy(ctx, txn, file.Path, file.Raw); err != nil {
return err
return nil, err
}
}
return nil
return compiler, nil
}

func (p *Plugin) logError(fmt string, a ...interface{}) {
Expand Down
90 changes: 88 additions & 2 deletions plugins/bundle/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/download"
"github.com/open-policy-agent/opa/internal/manifest"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
Expand Down Expand Up @@ -430,8 +431,10 @@ func TestPluginActivateScopedBundle(t *testing.T) {
t.Fatal(err)
}

// Activate a bundle that is scoped to a/a3 ad a/a6.
module = "package a.a4\n\nbar=1"
// Activate a bundle that is scoped to a/a3 ad a/a6. Include a function
// inside package a.a4 that we can depend on outside of the bundle scope to
// exercise the compile check with remaining modules.
module = "package a.a4\n\nbar=1\n\nfunc(x) = x"

b = bundle.Bundle{
Manifest: bundle.Manifest{Revision: "quickbrownfaux", Roots: &[]string{"a/a3", "a/a4"}},
Expand Down Expand Up @@ -483,6 +486,89 @@ func TestPluginActivateScopedBundle(t *testing.T) {
t.Fatal(err)
}

// Upsert policy outside of bundle scope that depends on bundle.
if err := storage.Txn(ctx, manager.Store, storage.WriteParams, func(txn storage.Transaction) error {
return manager.Store.UpsertPolicy(ctx, txn, "not_scoped", []byte("package not_scoped\np { data.a.a4.func(1) = 1 }"))
}); err != nil {
t.Fatal(err)
}

b = bundle.Bundle{
Manifest: bundle.Manifest{Revision: "quickbrownfaux-2", Roots: &[]string{"a/a3", "a/a4"}},
Data: map[string]interface{}{},
Modules: []bundle.ModuleFile{},
}

b.Manifest.Init()
plugin.oneShot(ctx, download.Update{Bundle: &b})

// Ensure bundle activation failed by checking that previous revision is
// still active.
if err := storage.Txn(ctx, manager.Store, storage.TransactionParams{}, func(txn storage.Transaction) error {
revision, err := manifest.ReadBundleRevision(ctx, manager.Store, txn)
if err != nil {
return err
}
if revision != "quickbrownfaux" {
return fmt.Errorf("Expected revision to be quickbrownfaux but got: %v", revision)
}
return nil
}); err != nil {
t.Fatal(err)
}
}

func TestPluginSetCompilerOnContext(t *testing.T) {

ctx := context.Background()
manager := getTestManager()
plugin := Plugin{manager: manager, status: &Status{}}

module := `
package test

p = 1
`

b := bundle.Bundle{
Manifest: bundle.Manifest{Revision: "quickbrownfaux"},
Data: map[string]interface{}{},
Modules: []bundle.ModuleFile{
bundle.ModuleFile{
Path: "/test.rego",
Parsed: ast.MustParseModule(module),
Raw: []byte(module),
},
},
}

b.Manifest.Init()

events := []storage.TriggerEvent{}

if err := storage.Txn(ctx, manager.Store, storage.WriteParams, func(txn storage.Transaction) error {
manager.Store.Register(ctx, txn, storage.TriggerConfig{
OnCommit: func(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) {
events = append(events, event)
},
})
return nil
}); err != nil {
t.Fatal(err)
}

plugin.oneShot(ctx, download.Update{Bundle: &b})

exp := ast.MustParseModule(module)

// Expect two events. One for trigger registration, one for policy update.
if len(events) != 2 {
t.Fatalf("Expected 2 events but got: %+v", events)
} else if compiler := plugins.GetCompilerOnContext(events[1].Context); compiler == nil {
t.Fatalf("Expected compiler on 2nd event but got: %+v", events)
} else if !compiler.Modules["/test.rego"].Equal(exp) {
t.Fatalf("Expected module on compiler but got: %v", compiler.Modules)
}
}

func getTestManager() *plugins.Manager {
Expand Down
34 changes: 33 additions & 1 deletion plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ type Manager struct {
mtx sync.Mutex
}

type managerContextKey string

const managerCompilerContextKey = managerContextKey("compiler")

// SetCompilerOnContext puts the compiler into the storage context. Calling this
// function before commiting updated policies to storage allows the manager to
// skip parsing and compiling of modules. Instead, the manager will use the
// compiler that was stored on the context.
func SetCompilerOnContext(context *storage.Context, compiler *ast.Compiler) {
context.Put(managerCompilerContextKey, compiler)
}

// GetCompilerOnContext gets the compiler cached on the storage context.
func GetCompilerOnContext(context *storage.Context) *ast.Compiler {
compiler, ok := context.Get(managerCompilerContextKey).(*ast.Compiler)
if !ok {
return nil
}
return compiler
}

type namedplugin struct {
name string
plugin Plugin
Expand Down Expand Up @@ -270,8 +291,19 @@ func (m *Manager) Reconfigure(config *config.Config) error {

func (m *Manager) onCommit(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) {
if event.PolicyChanged() {
compiler, _ := loadCompilerFromStore(ctx, m.Store, txn)

var compiler *ast.Compiler

// If the context does not contain the compiler fallback to loading the
// compiler from the store. Currently the bundle plugin sets the
// compiler on the context but the server does not (nor would users
// implementing their own policy loading.)
if compiler = GetCompilerOnContext(event.Context); compiler == nil {
compiler, _ = loadCompilerFromStore(ctx, m.Store, txn)
}

m.setCompiler(compiler)

for _, f := range m.registeredTriggers {
f(txn)
}
Expand Down
4 changes: 3 additions & 1 deletion storage/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ type handle struct {

func (db *store) NewTransaction(ctx context.Context, params ...storage.TransactionParams) (storage.Transaction, error) {
var write bool
var context *storage.Context
if len(params) > 0 {
write = params[0].Write
context = params[0].Context
}
xid := atomic.AddUint64(&db.xid, uint64(1))
if write {
db.wmu.Lock()
} else {
db.rmu.RLock()
}
return newTransaction(xid, write, db), nil
return newTransaction(xid, write, context, db), nil
}

func (db *store) Commit(ctx context.Context, txn storage.Transaction) error {
Expand Down
29 changes: 29 additions & 0 deletions storage/inmem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,35 @@ func TestInMemoryTriggersUnregister(t *testing.T) {
}
}

func TestInMemoryContext(t *testing.T) {

ctx := context.Background()
store := New()
params := storage.WriteParams
params.Context = storage.NewContext()
params.Context.Put("foo", "bar")

txn, err := store.NewTransaction(ctx, params)
if err != nil {
t.Fatal(err)
}

store.Register(ctx, txn, storage.TriggerConfig{
OnCommit: func(ctx context.Context, txn storage.Transaction, event storage.TriggerEvent) {
if event.Context.Get("foo") != "bar" {
t.Fatalf("Expected foo/bar in context but got: %+v", event.Context)
} else if event.Context.Get("deadbeef") != nil {
t.Fatalf("Got unexpected deadbeef value in context: %+v", event.Context)
}
},
})

if err := store.Commit(ctx, txn); err != nil {
t.Fatal(err)
}

}

func loadExpectedResult(input string) interface{} {
if len(input) == 0 {
return nil
Expand Down
5 changes: 4 additions & 1 deletion storage/inmem/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ type transaction struct {
db *store
updates *list.List
policies map[string]policyUpdate
context *storage.Context
}

type policyUpdate struct {
value []byte
remove bool
}

func newTransaction(xid uint64, write bool, db *store) *transaction {
func newTransaction(xid uint64, write bool, context *storage.Context, db *store) *transaction {
return &transaction{
xid: xid,
write: write,
db: db,
policies: map[string]policyUpdate{},
updates: list.New(),
context: context,
}
}

Expand Down Expand Up @@ -139,6 +141,7 @@ func (txn *transaction) updateRoot(op storage.PatchOp, value interface{}) error
}

func (txn *transaction) Commit() (result storage.TriggerEvent) {
result.Context = txn.context
for curr := txn.updates.Front(); curr != nil; curr = curr.Next() {
action := curr.Value.(*update)
updated := action.Apply(txn.db.data)
Expand Down
Loading