diff --git a/plugins/discovery/discovery.go b/plugins/discovery/discovery.go index c0b77a08009..8b04e61b596 100644 --- a/plugins/discovery/discovery.go +++ b/plugins/discovery/discovery.go @@ -213,6 +213,14 @@ func (c *Discovery) RegisterListener(name interface{}, f func(bundle.Status)) { c.listeners[name] = f } +// Unregister a listener to stop receiving status updates. +func (c *Discovery) Unregister(name interface{}) { + c.listenersMtx.Lock() + defer c.listenersMtx.Unlock() + + delete(c.listeners, name) +} + func (c *Discovery) getBundlePersistPath() (string, error) { persistDir, err := c.manager.Config.GetPersistenceDirectory() if err != nil { diff --git a/plugins/discovery/discovery_test.go b/plugins/discovery/discovery_test.go index 1b7c05d5c4e..948f041a457 100644 --- a/plugins/discovery/discovery_test.go +++ b/plugins/discovery/discovery_test.go @@ -28,6 +28,7 @@ import ( "github.com/open-policy-agent/opa/logging/test" "github.com/open-policy-agent/opa/metrics" "github.com/open-policy-agent/opa/plugins" + "github.com/open-policy-agent/opa/plugins/bundle" bundlePlugin "github.com/open-policy-agent/opa/plugins/bundle" "github.com/open-policy-agent/opa/plugins/logs" "github.com/open-policy-agent/opa/plugins/status" @@ -3765,6 +3766,54 @@ func TestPluginManualTriggerLifecycle(t *testing.T) { fixture.testDiscoReconfigurationScenario(ctx, m) } +func TestListeners(t *testing.T) { + manager, err := plugins.New([]byte(`{ + "labels": {"x": "y"}, + "services": { + "localhost": { + "url": "http://localhost:9999" + } + }, + "discovery": {"name": "config", "persist": true}, + }`), "test-id", inmem.New()) + if err != nil { + t.Fatal(err) + } + + testPlugin := &reconfigureTestPlugin{counts: map[string]int{}} + testFactory := testFactory{p: testPlugin} + + disco, err := New(manager, Factories(map[string]plugins.Factory{"test_plugin": testFactory})) + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + ensurePluginState(t, disco, plugins.StateNotReady) + + var status *bundle.Status + disco.RegisterListener("testlistener", func(s bundle.Status) { + status = &s + }) + + // simulate a bundle download error + disco.oneShot(ctx, download.Update{Error: fmt.Errorf("unknown error")}) + + if status == nil { + t.Fatalf("Expected discovery listener to receive status but was nil") + } + + status = nil + disco.Unregister("testlistener") + + // simulate a bundle download error + disco.oneShot(ctx, download.Update{Error: fmt.Errorf("unknown error")}) + if status != nil { + t.Fatalf("Expected discovery listener to be removed but received %v", status) + } +} + type testFixture struct { manager *plugins.Manager plugin *Discovery