Skip to content

Commit

Permalink
Do not restrict to Cache API, allow Fetch API (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat authored Mar 23, 2022
1 parent 476de12 commit d6908ad
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 100 deletions.
36 changes: 7 additions & 29 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,6 @@ import (
"time"
)

// Whitelist is an interface for a set of URL whitelists. When provided
// to fetching operations, urls are checked against this object, and
// the object must return true for urls to be fetched.
type Whitelist interface {
IsAllowed(string) bool
}

// WhitelistFunc is a httprc.Whitelist object based on a function.
// You can perform any sort of check against the given URL to determine
// if it can be fetched or not.
type WhitelistFunc func(string) bool

func (w WhitelistFunc) IsAllowed(u string) bool {
return w(u)
}

// ErrSink is an abstraction that allows users to consume errors
// produced while the cache queue is running.
type HTTPClient interface {
Expand Down Expand Up @@ -70,35 +54,29 @@ const defaultRefreshWindow = 15 * time.Minute
//
// Internally the HTTP fetching is done using a pool of HTTP fetch
// workers. The default number of workers is 3. You may change this
// number by specifying the `httprc.WithFetchWorkerCount`
func New(ctx context.Context, options ...ConstructorOption) *Cache {
// number by specifying the `httprc.WithFetcherWorkerCount`
func NewCache(ctx context.Context, options ...CacheOption) *Cache {
var refreshWindow time.Duration
var errSink ErrSink
var nfetchers int
var wl Whitelist
var fetcherOptions []FetcherOption
for _, option := range options {
//nolint:forcetypeassert
switch option.Ident() {
case identRefreshWindow{}:
refreshWindow = option.Value().(time.Duration)
case identFetchWorkerCount{}:
nfetchers = option.Value().(int)
case identFetcherWorkerCount{}, identWhitelist{}:
fetcherOptions = append(fetcherOptions, option)
case identErrSink{}:
errSink = option.Value().(ErrSink)
case identWhitelist{}:
wl = option.Value().(Whitelist)
}
}

if refreshWindow < time.Second {
refreshWindow = defaultRefreshWindow
}

if nfetchers < 1 {
nfetchers = 3
}

fetch := newFetcher(ctx, nfetchers)
fetch := NewFetcher(ctx, fetcherOptions...)
queue := newQueue(ctx, refreshWindow, fetch, errSink)

return &Cache{
Expand Down Expand Up @@ -162,7 +140,7 @@ func (c *Cache) Get(ctx context.Context, u string) (interface{}, error) {
// has this entry been fetched?
if !e.hasBeenFetched() {
if err := c.queue.fetchAndStore(ctx, e); err != nil {
return nil, fmt.Errorf(`failed to fetch %q: %w`, e.request.URL, err)
return nil, fmt.Errorf(`failed to fetch %q: %w`, u, err)
}
}

Expand Down
139 changes: 114 additions & 25 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,109 @@ package httprc

import (
"context"
"fmt"
"net/http"
"sync"
)

// fetchRequest is a set of data that can be used to make an HTTP
// request.
type fetchRequest struct {
mu sync.Mutex
mu sync.RWMutex

// Client contains the HTTP Client that can be used to make a
// client contains the HTTP Client that can be used to make a
// request. By setting a custom *http.Client, you can for example
// provide a custom http.Transport
//
// If not specified, http.DefaultClient will be used.
Client HTTPClient
client HTTPClient

// URL contains the URL to be fetched
URL string
wl Whitelist

// u contains the URL to be fetched
url string

// reply is a field that is only used by the internals of the fetcher
// it is used to return the result of fetching
reply chan *fetchResult
}

type fetchResult struct {
Response *http.Response
Error error
mu sync.RWMutex
res *http.Response
err error
}

func (fr *fetchResult) reply(ctx context.Context, reply chan *fetchResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case reply <- fr:
}

close(reply)
return nil
}

type fetcher struct {
requests chan *fetchRequest
}

func newFetcher(ctx context.Context, nworkers int) *fetcher {
type Fetcher interface {
Fetch(context.Context, string, ...FetchOption) (*http.Response, error)
fetch(context.Context, *fetchRequest) (*http.Response, error)
}

func NewFetcher(ctx context.Context, options ...FetcherOption) Fetcher {
var nworkers int
var wl Whitelist
for _, option := range options {
//nolint:forcetypeassert
switch option.Ident() {
case identFetcherWorkerCount{}:
nworkers = option.Value().(int)
case identWhitelist{}:
wl = option.Value().(Whitelist)
}
}

if nworkers < 1 {
nworkers = 3
}

incoming := make(chan *fetchRequest)
for i := 0; i < nworkers; i++ {
go runFetchWorker(ctx, incoming)
go runFetchWorker(ctx, incoming, wl)
}
return &fetcher{
requests: incoming,
}
}

// Fetch requests that a HTTP request be made on behalf of the caller,
// and returns the http.Response object.
func (f *fetcher) Fetch(ctx context.Context, req *fetchRequest) (*http.Response, error) {
reply := make(chan *fetchResult)
func (f *fetcher) Fetch(ctx context.Context, u string, options ...FetchOption) (*http.Response, error) {
var client HTTPClient
var wl Whitelist
for _, option := range options {
//nolint:forcetypeassert
switch option.Ident() {
case identHTTPClient{}:
client = option.Value().(HTTPClient)
case identWhitelist{}:
wl = option.Value().(Whitelist)
}
}

req := fetchRequest{
client: client,
url: u,
wl: wl,
}

return f.fetch(ctx, &req)
}

// fetch (unexported) is the main fetching implemntation.
// it allows the caller to reuse the same *fetchRequest object
func (f *fetcher) fetch(ctx context.Context, req *fetchRequest) (*http.Response, error) {
reply := make(chan *fetchResult, 1)
req.mu.Lock()
req.reply = reply
req.mu.Unlock()
Expand All @@ -65,29 +121,62 @@ func (f *fetcher) Fetch(ctx context.Context, req *fetchRequest) (*http.Response,
case <-ctx.Done():
return nil, ctx.Err()
case fr := <-reply:
return fr.Response, fr.Error
fr.mu.RLock()
res := fr.res
err := fr.err
fr.mu.RUnlock()
return res, err
}
}

func runFetchWorker(ctx context.Context, incoming chan *fetchRequest) {
func runFetchWorker(ctx context.Context, incoming chan *fetchRequest, wl Whitelist) {
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case req := <-incoming:
req.mu.RLock()
reply := req.reply
client := req.client
if client == nil {
client = http.DefaultClient
}
url := req.url
reqwl := req.wl
req.mu.RUnlock()

var wls []Whitelist
for _, v := range []Whitelist{wl, reqwl} {
if v != nil {
wls = append(wls, v)
}
}

if len(wls) > 0 {
for _, wl := range wls {
if !wl.IsAllowed(url) {
r := &fetchResult{
err: fmt.Errorf(`fetching url %q not allowed based on whitelist`, url),
}
if err := r.reply(ctx, reply); err != nil {
break LOOP
}
return
}
}
}

// The body is handled by the consumer of the fetcher
//nolint:bodyclose
res, err := req.Client.Get(req.URL)
r := &fetchResult{Response: res, Error: err}
select {
case <-ctx.Done():
res, err := client.Get(url)
r := &fetchResult{
res: res,
err: err,
}
if err := r.reply(ctx, reply); err != nil {
break LOOP
case req.reply <- r:
}
req.mu.Lock()
close(req.reply)
req.mu.Unlock()
}
}
}
4 changes: 2 additions & 2 deletions httprc_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
goodbyeWorld = `Goodbye World!`
)

func Example() {
func ExampleCache() {
var mu sync.RWMutex

msg := helloWorld
Expand All @@ -37,7 +37,7 @@ func Example() {
fmt.Printf("%s\n", err)
})

c := httprc.New(ctx,
c := httprc.NewCache(ctx,
httprc.WithErrSink(errSink),
httprc.WithRefreshWindow(time.Second), // force checks every second
)
Expand Down
6 changes: 1 addition & 5 deletions httprc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestWhitelist(t *testing.T) {

}

type dummyErrSink struct {
mu sync.RWMutex
errors []error
Expand Down Expand Up @@ -55,7 +51,7 @@ func TestCache(t *testing.T) {
}))

errSink := &dummyErrSink{}
c := httprc.New(ctx,
c := httprc.NewCache(ctx,
httprc.WithRefreshWindow(time.Second),
httprc.WithErrSink(errSink),
)
Expand Down
32 changes: 23 additions & 9 deletions options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@ interfaces:
- name: RegisterOption
comment: |
RegisterOption desribes options that can be passed to `(httprc.Cache).Register()`
- name: ConstructorOption
- name: CacheOption
comment: |
ConstructorOption desribes options that can be passed to `New()`
CacheOption desribes options that can be passed to `New()`
- name: FetcherOption
methods:
- cacheOption
comment: |
FetcherOption describes options that can be passed to `(httprc.Fetcher).NewFetcher()`
- name: FetchOption
comment: |
FetchOption describes options that can be passed to `(httprc.Fetcher).Fetch()`
options:
- ident: FetchWorkerCount
interface: ConstructorOption
- ident: FetcherWorkerCount
interface: FetcherOption
argument_type: int
comment: |
WithFetchWorkerCount specifies the number of HTTP fetch workers that are spawned
in the backend. By default 3 workers are spawned.
- ident: Whitelist
interface: ConstructorOption
interface: FetcherOption
argument_type: Whitelist
comment: |
WithWhitelist specifies the Whitelist object that can control which URLs can be
registered to the cache.
WithWhitelist specifies the Whitelist object that can control which URLs are
allowed to be processed.
It can be passed to `httprc.NewCache` as a whitelist applied to all
URLs that are fetched by the cache, or it can be passed on a per-URL
basis using `(httprc.Cache).Register()`. If both are specified,
the url must fulfill _both_ the cache-wide whitelist and the per-URL
whitelist.
- ident: Transformer
interface: RegisterOption
argument_type: Transformer
Expand Down Expand Up @@ -71,7 +85,7 @@ options:
be considered a DoS attack, and there is no backoff mechanism for failed
attempts.
- ident: RefreshWindow
interface: ConstructorOption
interface: CacheOption
argument_type: time.Duration
comment: |
WithRefreshWindow specifies the interval between checks for refreshes.
Expand All @@ -86,7 +100,7 @@ options:
be considered a DoS attack, and there is no backoff mechanism for failed
attempts.
- ident: ErrSink
interface: ConstructorOption
interface: CacheOption
argument_type: ErrSink
comment: |
WithErrSink specifies the `httprc.ErrSink` object that handles errors
Expand Down
Loading

0 comments on commit d6908ad

Please sign in to comment.