Skip to content

Commit

Permalink
Add ErrSink
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Mar 22, 2022
1 parent 4da2435 commit 1ae122f
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 12 deletions.
5 changes: 4 additions & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const defaultRefreshWindow = 15 * time.Minute
// number by specifying the `httprc.WithFetchWorkerCount`
func New(ctx context.Context, options ...ConstructorOption) *Cache {
var refreshWindow time.Duration
var errSink ErrSink
var nfetchers int
for _, option := range options {
//nolint:forcetypeassert
Expand All @@ -52,6 +53,8 @@ func New(ctx context.Context, options ...ConstructorOption) *Cache {
refreshWindow = option.Value().(time.Duration)
case identFetchWorkerCount{}:
nfetchers = option.Value().(int)
case identErrSink{}:
errSink = option.Value().(ErrSink)
}
}

Expand All @@ -64,7 +67,7 @@ func New(ctx context.Context, options ...ConstructorOption) *Cache {
}

fetch := newFetcher(ctx, nfetchers)
queue := newQueue(ctx, refreshWindow, fetch)
queue := newQueue(ctx, refreshWindow, fetch, errSink)

return &Cache{
queue: queue,
Expand Down
43 changes: 40 additions & 3 deletions httprc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/lestrrat-go/httprc"
"github.com/stretchr/testify/assert"
)

type dummyErrSink struct {
mu sync.RWMutex
errors []error
}

func (d *dummyErrSink) Error(err error) {
d.mu.Lock()
defer d.mu.Unlock()
d.errors = append(d.errors, err)
}

func TestCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -24,23 +36,30 @@ func TestCache(t *testing.T) {
default:
}

t.Logf("HTTP handler")
called++
w.Header().Set(`Cache-Control`, fmt.Sprintf(`max-age=%d`, 3))
w.WriteHeader(http.StatusOK)
}))
c := httprc.New(ctx, httprc.WithRefreshWindow(time.Second))

errSink := &dummyErrSink{}
c := httprc.New(ctx,
httprc.WithRefreshWindow(time.Second),
httprc.WithErrSink(errSink),
)

c.Register(srv.URL, httprc.WithHTTPClient(srv.Client()), httprc.WithMinRefreshInterval(time.Second))
if !assert.True(t, c.IsRegistered(srv.URL)) {
return
}

for i := 0; i < 3; i++ {
_, err := c.Get(ctx, srv.URL)
v, err := c.Get(ctx, srv.URL)
if !assert.NoError(t, err, `c.Get should succeed`) {
return
}
if !assert.IsType(t, []byte(nil), v, `c.Get should return []byte`) {
return
}
}
if !assert.Equal(t, 1, called, `there should only be one fetch request`) {
return
Expand All @@ -57,5 +76,23 @@ func TestCache(t *testing.T) {
return
}

if !assert.True(t, len(errSink.errors) == 0) {
return
}

c.Register(srv.URL,
httprc.WithHTTPClient(srv.Client()),
httprc.WithMinRefreshInterval(time.Second),
httprc.WithTransformer(httprc.TransformFunc(func(_ string, _ *http.Response) (interface{}, error) {
return nil, fmt.Errorf(`dummy error`)
})),
)

_, _ = c.Get(ctx, srv.URL)
time.Sleep(3 * time.Second)

if !assert.True(t, len(errSink.errors) > 0) {
return
}
cancel()
}
3 changes: 3 additions & 0 deletions options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ options:
- ident: RefreshWindow
interface: ConstructorOption
argument_type: time.Duration
- ident: ErrSink
interface: ConstructorOption
argument_type: ErrSink
11 changes: 10 additions & 1 deletion options_gen.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is auto-generated by internal/cmd/genoptions/main.go. DO NOT EDIT
// This file is auto-generated by github.com/lestrrat-go/option/cmd/genoptions. DO NOT EDIT

package httprc

Expand Down Expand Up @@ -32,13 +32,18 @@ type registerOption struct {

func (*registerOption) registerOption() {}

type identErrSink struct{}
type identFetchWorkerCount struct{}
type identHTTPClient struct{}
type identMinRefreshInterval struct{}
type identRefreshInterval struct{}
type identRefreshWindow struct{}
type identTransformer struct{}

func (identErrSink) String() string {
return "WithErrSink"
}

func (identFetchWorkerCount) String() string {
return "WithFetchWorkerCount"
}
Expand All @@ -63,6 +68,10 @@ func (identTransformer) String() string {
return "WithTransformer"
}

func WithErrSink(v ErrSink) ConstructorOption {
return &constructorOption{option.New(identErrSink{}, v)}
}

func WithFetchWorkerCount(v int) ConstructorOption {
return &constructorOption{option.New(identFetchWorkerCount{}, v)}
}
Expand Down
1 change: 1 addition & 0 deletions options_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func TestOptionIdent(t *testing.T) {
require.Equal(t, "WithErrSink", identErrSink{}.String())
require.Equal(t, "WithFetchWorkerCount", identFetchWorkerCount{}.String())
require.Equal(t, "WithHTTPClient", identHTTPClient{}.String())
require.Equal(t, "WithMinRefreshInterval", identMinRefreshInterval{}.String())
Expand Down
26 changes: 19 additions & 7 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ import (
"github.com/lestrrat-go/httpcc"
)

// ErrSink is an abstraction that allows users to consume errors
// produced while the cache queue is running.
type ErrSink interface {
// Error accepts errors produced during the cache queue's execution.
// The method should never block, otherwise the fetch loop may be
// paused for a prolonged amount of time.
Error(error)
}

// Transformer is responsible for converting an HTTP response
// into an appropriate form of your choosing.
type Transformer interface {
Expand Down Expand Up @@ -118,7 +127,7 @@ type queue struct {
list []*rqentry
}

func newQueue(ctx context.Context, window time.Duration, fetch *fetcher) *queue {
func newQueue(ctx context.Context, window time.Duration, fetch *fetcher, errSink ErrSink) *queue {
fetchLocker := &sync.Mutex{}
rq := &queue{
windowSize: window,
Expand All @@ -127,7 +136,7 @@ func newQueue(ctx context.Context, window time.Duration, fetch *fetcher) *queue
registry: make(map[string]*entry),
}

go rq.refreshLoop(ctx)
go rq.refreshLoop(ctx, errSink)

return rq
}
Expand Down Expand Up @@ -191,7 +200,7 @@ func (q *queue) IsRegistered(u string) bool {
return ok
}

func (q *queue) fetchLoop(ctx context.Context) {
func (q *queue) fetchLoop(ctx context.Context, errSink ErrSink) {
for {
q.fetchCond.L.Lock()
for len(q.fetchQueue) <= 0 {
Expand All @@ -218,18 +227,21 @@ func (q *queue) fetchLoop(ctx context.Context) {
if !ok {
continue
}
// TODO: send to error sink
_ = q.fetchAndStore(ctx, e)
if err := q.fetchAndStore(ctx, e); err != nil {
if errSink != nil {
errSink.Error(err)
}
}
}
}
}

// This loop is responsible for periodically updating the cached content
func (q *queue) refreshLoop(ctx context.Context) {
func (q *queue) refreshLoop(ctx context.Context, errSink ErrSink) {
// Tick every q.windowSize duration.
ticker := time.NewTicker(q.windowSize)

go q.fetchLoop(ctx)
go q.fetchLoop(ctx, errSink)
defer q.fetchCond.Signal()

for {
Expand Down

0 comments on commit 1ae122f

Please sign in to comment.