diff --git a/policy/file/source.go b/policy/file/source.go index 6b593da9..e1e970cc 100644 --- a/policy/file/source.go +++ b/policy/file/source.go @@ -104,8 +104,11 @@ func (s *Source) ReloadIDsMonitor() { <-s.reloadCompleteCh } -// MonitorPolicy satisfies the MonitorPolicy function of the policy.Source -// interface. +// MonitorPolicy reads policy from a file on disk and writes it to req.ResultCh. +// Any error doing so will be sent to req.ErrCh. +// If MonitorPolicy receives on s.ReloadCh, it will re-check the file on disk, +// and write again to req.ResultCh if the policy has changed. +// Note: MonitorPolicy should only return when ctx is done. func (s *Source) MonitorPolicy(ctx context.Context, req policy.MonitorPolicyReq) { // Close channels when done with the monitoring loop. @@ -134,12 +137,12 @@ func (s *Source) MonitorPolicy(ctx context.Context, req policy.MonitorPolicyReq) p, _, err := s.handleIndividualPolicyRead(req.ID, file, name) if err != nil { policy.HandleSourceError(s.Name(), fmt.Errorf("failed to get policy %s: %w", req.ID, err), req.ErrCh) - return + } else { + s.policyMap[req.ID] = &filePolicy{file: file, name: name, policy: p} + // We must send to ResultCh each time a Handler invokes this method, + // or the Handler will error "failed to read policy in time" + req.ResultCh <- *p } - s.policyMap[req.ID] = &filePolicy{file: file, name: name, policy: p} - // We must send to ResultCh each time a Handler invokes this method, - // or the Handler will error "failed to read policy in time" - req.ResultCh <- *p } s.policyMapLock.Unlock() diff --git a/policy/file/source_test.go b/policy/file/source_test.go index 5baf4bfd..09d8bcea 100644 --- a/policy/file/source_test.go +++ b/policy/file/source_test.go @@ -160,3 +160,53 @@ func TestSource_MonitorPolicy(t *testing.T) { cancel() } } + +func TestSource_MonitorPolicy_ContinueOnError(t *testing.T) { + // start with a happy source + src, ids := testFileSource(t, "./test-fixtures") + pid := ids[0] + + // then break it + src.policyMap[pid].file = "/nowhere" + + errCh := make(chan error) + resultCh := make(chan sdk.ScalingPolicy) + reloadCh := make(chan struct{}, 1) + req := policy.MonitorPolicyReq{ + ID: pid, + ErrCh: errCh, + ResultCh: resultCh, + ReloadCh: reloadCh, + } + + // done chan separate from ctx to ensure the goroutine completes + done := make(chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + src.MonitorPolicy(ctx, req) + + select { + case <-ctx.Done(): + t.Logf("good, context done") + default: + t.Error("MonitorPolicy should only return if the context is done") + } + + close(done) + }() + + select { + case <-time.After(time.Millisecond * 200): + t.Fatal("no error in time") + case err := <-errCh: + t.Logf("good show, got error as expected: %v", err) + case i := <-resultCh: + t.Fatalf("not expecting a policy, got: %+v", i) + } + + cancel() + <-done +}