Skip to content

Commit

Permalink
Add refresh, add synchronous requests
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Sep 27, 2024
1 parent e074b5f commit 840600a
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 161 deletions.
149 changes: 5 additions & 144 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package httprc

import (
"context"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -51,152 +50,12 @@ func NewClient(options ...NewClientOption) *Client {
}
}

// Controller It is responsible for accepting
// a set of Syncer objects and processing them periodically in resource-specific intervals.
type Controller struct {
cancel context.CancelFunc
check *time.Ticker
// incoming accepts new control requests from external sources
incoming chan ctrlRequest
// outgoing sends Syncer objects to the worker pool
outgoing chan Resource
items []Resource
tickDuration time.Duration
shutdown chan struct{}
}

// Shutdown stops the client and all associated goroutines, and waits for them
// to finish. If the context is canceled, the function will return immediately:
// there fore you should not use the context you used to start the client (because
// presumably it's already canceled).
//
// Waiting for the client shutdown will also ensure that all sinks are properly
// flushed.
func (c *Controller) Shutdown(ctx context.Context) error {
c.cancel()

select {
case <-ctx.Done():
return ctx.Err()
case <-c.shutdown:
return nil
}
}

const (
addResource = iota
rmResource
)

type ctrlRequest struct {
op int
reply chan error
resource Resource
}

// AddResource adds a new resource to the controller. If the resource already
// exists, it will return an error.
func (c *Controller) AddResource(r Resource) error {
reply := make(chan error, 1)
c.incoming <- ctrlRequest{
op: addResource,
reply: reply,
resource: r,
}
return <-reply
}

// RemoveResource removes a resource from the controller. If the resource does
// not exist, it will return an error.
func (c *Controller) RemoveResource(s Resource) {
reply := make(chan error, 1)
c.incoming <- ctrlRequest{
op: rmResource,
reply: reply,
resource: s,
}
<-reply
}

func (c *Controller) handleRequest(req ctrlRequest) {
defer close(req.reply)
switch req.op {
case addResource:
r := req.resource
for _, item := range c.items {
if item.URL() == r.URL() {
// Already exists
req.reply <- errResourceAlreadyExists
return
}
}

c.items = append(c.items, r)
req.reply <- nil

// force the next check to happen immediately
if d := r.ConstantInterval(); d > 0 {
c.tickDuration = d
} else if d := r.MinimumInterval(); d > 0 {
c.tickDuration = d
}

c.check.Reset(time.Nanosecond)
case rmResource:
r := req.resource
minInterval := oneDay
loc := -1
for i, item := range c.items {
if d := item.MinimumInterval(); d < minInterval {
minInterval = d
}

if item.URL() == r.URL() {
loc = i
}
}

if loc < 0 {
req.reply <- errResourceNotFound
return
}

c.items = slices.Delete(c.items, loc, loc+1)
req.reply <- nil

c.check.Reset(minInterval)
}
}

func (c *Controller) loop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case req := <-c.incoming:
c.handleRequest(req)
case t := <-c.check.C:
// Always reset the ticker because the previous tick
// could have arrived by way of a forced tick
c.check.Reset(c.tickDuration)
for _, item := range c.items {
if item.IsBusy() || item.Next().After(t) {
continue
}
item.SetBusy(true)
c.outgoing <- item
}
case <-ctx.Done():
return
}
}
}

// Start sets the client into motion. It will start a number of worker goroutines,
// and return a Controller object that you can use to control the execution of
// the client.
//
// If you attempt to call Start more than once, it will return an error.
func (c *Client) Start(octx context.Context) (*Controller, error) {
func (c *Client) Start(octx context.Context) (Controller, error) {
c.mu.Lock()
if c.running {
c.mu.Unlock()
Expand Down Expand Up @@ -242,15 +101,17 @@ func (c *Client) Start(octx context.Context) (*Controller, error) {

incoming := make(chan ctrlRequest, c.numWorkers)
outgoing := make(chan Resource, c.numWorkers)
syncoutgoing := make(chan synchronousRequest, c.numWorkers)
wg.Add(c.numWorkers)
for range c.numWorkers {
go worker(ctx, &wg, outgoing, errSink, traceSink)
go worker(ctx, &wg, outgoing, syncoutgoing, errSink, traceSink)
}

tickDuration := oneDay
ctrl := &Controller{
ctrl := &controller{
cancel: cancel,
outgoing: outgoing,
syncoutgoing: syncoutgoing,
incoming: incoming,
tickDuration: tickDuration,
check: time.NewTicker(tickDuration),
Expand Down
6 changes: 1 addition & 5 deletions client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ func ExampleClient() {
// dangling goroutines hanging around when you exit. For example, if you
// are running tests to check for goroutine leaks, you should call this
// function before the end of your test.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = ctrl.Shutdown(ctx)
}()
defer ctrl.Shutdown(time.Second)

// Create a new resource that is synchronized every so often
r, err := httprc.NewResource[HelloWorld](srv.URL, httprc.JSONTransformer[HelloWorld]())
Expand Down
Loading

0 comments on commit 840600a

Please sign in to comment.