Skip to content

Commit

Permalink
Add sinks to receive error/trace, and proxy to serialize access
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Sep 27, 2024
1 parent 44e23ec commit eae2f03
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 16 deletions.
87 changes: 80 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,39 @@ import (
"slices"
"sync"
"time"

"github.com/lestrrat-go/httprc/v3/errsink"
"github.com/lestrrat-go/httprc/v3/proxysink"
"github.com/lestrrat-go/httprc/v3/tracesink"
)

// Client is the main entry point for the httprc package.
type Client struct {
mu sync.Mutex
numWorkers int
running bool
errSink ErrorSink
traceSink TraceSink
}

const DefaultWorkers = 5
const oneDay = 24 * time.Hour

func NewClient(options ...NewClientOption) *Client {
//nolint:stylecheck
var errSink ErrorSink = errsink.NewNop()
//nolint:stylecheck
var traceSink TraceSink = tracesink.NewNop()
numWorkers := DefaultWorkers
//nolint:forcetypeassert
for _, option := range options {
switch option.Ident() {
case identWorkers{}:
numWorkers = option.Value().(int)
case identErrorSink{}:
errSink = option.Value().(ErrorSink)
case identTraceSink{}:
traceSink = option.Value().(TraceSink)
}
}

Expand All @@ -32,6 +46,8 @@ func NewClient(options ...NewClientOption) *Client {
}
return &Client{
numWorkers: numWorkers,
errSink: errSink,
traceSink: traceSink,
}
}

Expand All @@ -46,10 +62,25 @@ type Controller struct {
outgoing chan Resource
items []Resource
tickDuration time.Duration
shutdown chan struct{}
}

func (c *Controller) Stop() {
// Shutdown stops the client and all associated goroutines, and waits for them
// to finish. If the context is canceled, the function will return immediately:
// there fore you should not use the context you used to start the client (because
// presumably it's already canceled).
//
// Waiting for the client shutdown will also ensure that all sinks are properly
// flushed.
func (c *Controller) Shutdown(ctx context.Context) error {
c.cancel()

select {
case <-ctx.Done():
return ctx.Err()
case <-c.shutdown:
return nil
}
}

const (
Expand Down Expand Up @@ -137,7 +168,8 @@ func (c *Controller) handleRequest(req ctrlRequest) {
}
}

func (c *Controller) loop(ctx context.Context) {
func (c *Controller) loop(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case req := <-c.incoming:
Expand All @@ -159,12 +191,12 @@ func (c *Controller) loop(ctx context.Context) {
}
}

// Run sets the client into motion. It will start a number of worker goroutines,
// Start sets the client into motion. It will start a number of worker goroutines,
// and return a Controller object that you can use to control the execution of
// the client.
//
// If you attempt to call Run more than once, it will return an error.
func (c *Client) Run(octx context.Context) (*Controller, error) {
// If you attempt to call Start more than once, it will return an error.
func (c *Client) Start(octx context.Context) (*Controller, error) {
c.mu.Lock()
if c.running {
c.mu.Unlock()
Expand All @@ -173,12 +205,46 @@ func (c *Client) Run(octx context.Context) (*Controller, error) {
c.running = true
c.mu.Unlock()

// DON'T CANCEL THIS IN THIS METHOD! It's the responsibility of the
// controller to cancel this context.
ctx, cancel := context.WithCancel(octx)

var wg sync.WaitGroup

// start proxy goroutines that will accept sink requests
// and forward them to the appropriate sink
var errSink ErrorSink
if _, ok := c.errSink.(errsink.Nop); ok {
errSink = c.errSink
} else {
proxy := proxysink.New[error](c.errSink)
wg.Add(1)
go func(wg *sync.WaitGroup, proxy *proxysink.Proxy[error]) {
defer wg.Done()
proxy.Run(ctx)
}(&wg, proxy)

errSink = proxy
}

var traceSink TraceSink
if _, ok := c.traceSink.(tracesink.Nop); ok {
traceSink = c.traceSink
} else {
proxy := proxysink.New[string](c.traceSink)
wg.Add(1)
go func(wg *sync.WaitGroup, proxy *proxysink.Proxy[string]) {
defer wg.Done()
proxy.Run(ctx)
}(&wg, proxy)
traceSink = proxy
}

incoming := make(chan ctrlRequest, c.numWorkers)
outgoing := make(chan Resource, c.numWorkers)
wg.Add(c.numWorkers)
for range c.numWorkers {
go worker(ctx, outgoing)
go worker(ctx, &wg, outgoing, errSink, traceSink)
}

tickDuration := oneDay
Expand All @@ -188,8 +254,15 @@ func (c *Client) Run(octx context.Context) (*Controller, error) {
incoming: incoming,
tickDuration: tickDuration,
check: time.NewTicker(tickDuration),
shutdown: make(chan struct{}),
}
go ctrl.loop(ctx)
wg.Add(1)
go ctrl.loop(ctx, &wg)

go func(wg *sync.WaitGroup, ch chan struct{}) {
wg.Wait()
close(ch)
}(&wg, ctrl.shutdown)

return ctrl, nil
}
36 changes: 32 additions & 4 deletions client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@ func ExampleClient() {
json.NewEncoder(w).Encode(map[string]string{"hello": "world"})
}))

var options []httprc.NewClientOption
// If you would like to handle errors from asynchronous workers, you can specify a error sink.
// This is disabled in this example because the trace logs are dynamic
// and thus would interfere with the runnable example test.
// options = append(options, httprc.WithErrorSink(errsink.NewSlog(slog.New(slog.NewJSONHandler(os.Stdout, nil)))))

// If you would like to see the trace logs, you can specify a trace sink.
// This is disabled in this example because the trace logs are dynamic
// and thus would interfere with the runnable example test.
// options = append(options, httprc.WithTraceSink(tracesink.NewSlog(slog.New(slog.NewJSONHandler(os.Stdout, nil)))))

// Create a new client
cl := httprc.NewClient()
cl := httprc.NewClient(options...)

// Start the client, and obtain a Controller object
ctrl, err := cl.Run(ctx)
ctrl, err := cl.Start(ctx)
if err != nil {
fmt.Println(err.Error())
return
}
// The following is required if you want to make sure that there are no
// dangling goroutines hanging around when you exit. For example, if you
// are running tests to check for goroutine leaks, you should call this
// function before the end of your test.
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = ctrl.Shutdown(ctx)
}()

// Create a new resource that is synchronized every so often
r, err := httprc.NewResource[HelloWorld](srv.URL, httprc.JSONTransformer[HelloWorld]())
if err != nil {
fmt.Println(err.Error())
Expand All @@ -42,8 +63,15 @@ func ExampleClient() {
// Add the resource to the controller, so that it starts fetching
ctrl.AddResource(r)

time.Sleep(1 * time.Second)

{
tctx, tcancel := context.WithTimeout(ctx, time.Second)
defer tcancel()
if err := r.Ready(tctx); err != nil {
fmt.Println(err.Error())
return
}
}
time.Sleep(time.Second)
m := r.Resource()
fmt.Println(m.Hello)
// OUTPUT:
Expand Down
42 changes: 42 additions & 0 deletions errsink/errsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package errsink

import (
"context"
"log/slog"
)

type Interface interface {
Put(context.Context, error)
}

// Nop is an ErrorSink that does nothing. It does not require
// any initialization, so the zero value can be used.
type Nop struct{}

// NewNop returns a new NopErrorSink object. The constructor
// is provided for consistency.
func NewNop() Interface {
return Nop{}
}

// Put for NopErrorSink does nothing.
func (Nop) Put(context.Context, error) {}

type SlogLogger interface {
Log(context.Context, slog.Level, string, ...any)
}

type slogSink struct {
logger SlogLogger
}

// NewSlog returns a new ErrorSink that logs errors using the provided slog.Logger
func NewSlog(l SlogLogger) Interface {
return &slogSink{
logger: l,
}
}

func (s *slogSink) Put(ctx context.Context, v error) {
s.logger.Log(ctx, slog.LevelError, v.Error())
}
16 changes: 15 additions & 1 deletion httprc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ import (
"context"
"net/http"
"time"

"github.com/lestrrat-go/httprc/v3/errsink"
"github.com/lestrrat-go/httprc/v3/tracesink"
)

// ErrorSink is an interface that abstracts a sink for errors.
type ErrorSink = errsink.Interface

type TraceSink = tracesink.Interface

// HTTPClient is an interface that abstracts a "net/http".Client, so that
// users can provide their own implementation of the HTTP client, if need be.
type HTTPClient interface {
Expand All @@ -26,7 +34,13 @@ func (f TransformFunc[T]) Transform(ctx context.Context, res *http.Response) (T,
}

// Resource is a single resource that can be retrieved via HTTP, and (possibly) transformed
// into an arbitrary object type. See ResourceBase for a generic implementation.
// into an arbitrary object type.
//
// Realistically, there is no need for third-parties to implement this interface. This exists
// to provide a way to aggregate `httprc.ResourceBase` objects with different specialized types
// into a single collection.
//
// See ResourceBase for details
type Resource interface {
Get(any) error
Next() time.Time
Expand Down
Loading

0 comments on commit eae2f03

Please sign in to comment.