Skip to content

Commit

Permalink
only return when context is done
Browse files Browse the repository at this point in the history
  • Loading branch information
gulducat committed Oct 10, 2023
1 parent fbf36af commit 30ee9bb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
17 changes: 10 additions & 7 deletions policy/file/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ func (s *Source) ReloadIDsMonitor() {
<-s.reloadCompleteCh
}

// MonitorPolicy satisfies the MonitorPolicy function of the policy.Source
// interface.
// MonitorPolicy reads policy from a file on disk and writes it to req.ResultCh.
// Any error doing so will be sent to req.ErrCh.
// If MonitorPolicy receives on s.ReloadCh, it will re-check the file on disk,
// and write again to req.ResultCh if the policy has changed.
// Note: MonitorPolicy should only return when ctx is done.
func (s *Source) MonitorPolicy(ctx context.Context, req policy.MonitorPolicyReq) {

// Close channels when done with the monitoring loop.
Expand Down Expand Up @@ -134,12 +137,12 @@ func (s *Source) MonitorPolicy(ctx context.Context, req policy.MonitorPolicyReq)
p, _, err := s.handleIndividualPolicyRead(req.ID, file, name)
if err != nil {
policy.HandleSourceError(s.Name(), fmt.Errorf("failed to get policy %s: %w", req.ID, err), req.ErrCh)
return
} else {
s.policyMap[req.ID] = &filePolicy{file: file, name: name, policy: p}
// We must send to ResultCh each time a Handler invokes this method,
// or the Handler will error "failed to read policy in time"
req.ResultCh <- *p
}
s.policyMap[req.ID] = &filePolicy{file: file, name: name, policy: p}
// We must send to ResultCh each time a Handler invokes this method,
// or the Handler will error "failed to read policy in time"
req.ResultCh <- *p
}
s.policyMapLock.Unlock()

Expand Down
50 changes: 50 additions & 0 deletions policy/file/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,53 @@ func TestSource_MonitorPolicy(t *testing.T) {
cancel()
}
}

func TestSource_MonitorPolicy_ContinueOnError(t *testing.T) {
// start with a happy source
src, ids := testFileSource(t, "./test-fixtures")
pid := ids[0]

// then break it
src.policyMap[pid].file = "/nowhere"

errCh := make(chan error)
resultCh := make(chan sdk.ScalingPolicy)
reloadCh := make(chan struct{}, 1)
req := policy.MonitorPolicyReq{
ID: pid,
ErrCh: errCh,
ResultCh: resultCh,
ReloadCh: reloadCh,
}

// done chan separate from ctx to ensure the goroutine completes
done := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
src.MonitorPolicy(ctx, req)

select {
case <-ctx.Done():
t.Logf("good, context done")
default:
t.Error("MonitorPolicy should only return if the context is done")
}

close(done)
}()

select {
case <-time.After(time.Millisecond * 200):
t.Fatal("no error in time")
case err := <-errCh:
t.Logf("good show, got error as expected: %v", err)
case i := <-resultCh:
t.Fatalf("not expecting a policy, got: %+v", i)
}

cancel()
<-done
}

0 comments on commit 30ee9bb

Please sign in to comment.