Skip to content

Commit

Permalink
refactor remote client
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Oct 6, 2022
1 parent d852a92 commit 7d26053
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 110 deletions.
199 changes: 108 additions & 91 deletions internal/pkg/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package remote

import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
Expand All @@ -27,27 +30,25 @@ const (
retryOnBadConnTimeout = 5 * time.Minute
)

type requestFunc func(string, string, url.Values, io.Reader) (*http.Request, error)
type wrapperFunc func(rt http.RoundTripper) (http.RoundTripper, error)

type requestClient struct {
host string
request requestFunc
client http.Client
lastUsed time.Time
lastErr error
lastErrOcc time.Time
}

// Client wraps an http.Client and takes care of making the raw calls, the client should
// stay simple and specificals should be implemented in external action instead of adding new methods
// to the client. For authenticated calls or sending fields on every request, create customer RoundTripper
// implementations that will take care of the boiler plates.
// Client wraps a http.Client and takes care of making the raw calls, the client should
// stay simple and specifics should be implemented in external action instead of adding new methods
// to the client. For authenticated calls or sending fields on every request, create a custom RoundTripper
// implementation that will take care of the boiler plate.
type Client struct {
log *logger.Logger
lock sync.Mutex
clients []*requestClient
config Config
log *logger.Logger
clientMu sync.Mutex
clients []*requestClient
config Config
}

// NewConfigFromURL returns a Config based on a received host.
Expand Down Expand Up @@ -100,11 +101,11 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client

hosts := cfg.GetHosts()
hostCount := len(hosts)
log.With("hosts", hosts).Debugf("creating remote client with %d hosts",
hostCount, hosts)
log.With("hosts", hosts).Debugf(
"creating remote client with %d hosts", hostCount, hosts)
clients := make([]*requestClient, hostCount)
for i, host := range hosts {
connStr, err := urlutil.MakeURL(string(cfg.Protocol), p, host, 0)
baseURL, err := urlutil.MakeURL(string(cfg.Protocol), p, host, 0)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}
Expand All @@ -129,11 +130,9 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
Timeout: cfg.Transport.Timeout,
}

basePath := prefixRequestFactory(connStr)
clients[i] = &requestClient{
host: connStr,
request: basePath,
client: httpClient,
host: baseURL,
client: httpClient,
}
}

Expand Down Expand Up @@ -162,50 +161,58 @@ func (c *Client) Send(
}

c.log.Debugf("Request method: %s, path: %s, reqID: %s", method, path, reqID)
c.lock.Lock()
defer c.lock.Unlock()
requester := c.nextRequester()
c.clientMu.Lock()
defer c.clientMu.Unlock()

req, err := requester.request(method, path, params, body)
if err != nil {
return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path)
}
var err error
var req *http.Request
var resp *http.Response

// Add generals headers to the request, we are dealing exclusively with JSON.
// Content-Type / Accepted type can be overridden by the caller.
req.Header.Set("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
// This header should be specific to fleet-server or remove it
req.Header.Set("kbn-xsrf", "1") // Without this Kibana will refuse to answer the request.
c.sortClients()
for i, requester := range c.clients {
req, err = requester.newRequest(method, path, params, body)
if err != nil {
return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path)
}

// If available, add the request id as an HTTP header
if reqID != "" {
req.Header.Add("X-Request-ID", reqID)
}
// Add generals headers to the request, we are dealing exclusively with JSON.
// Content-Type / Accepted type can be overridden by the caller.
req.Header.Set("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
// This header should be specific to fleet-server or remove it
req.Header.Set("kbn-xsrf", "1") // Without this Kibana will refuse to answer the request.

// copy headers.
for header, values := range headers {
for _, v := range values {
req.Header.Add(header, v)
// If available, add the request id as an HTTP header
if reqID != "" {
req.Header.Add("X-Request-ID", reqID)
}
}

requester.lastUsed = time.Now().UTC()
// copy headers.
for header, values := range headers {
for _, v := range values {
req.Header.Add(header, v)
}
}

resp, err := requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()
requester.lastUsed = time.Now().UTC()

resp, err = requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()

// Using debug level as the caller is responsible to choose how to
// handle failed attempts.
c.log.Debugf("requester to host %s errored", requester.host)
return resp, err
// Using debug level as the error is only relevant if all clients fail.
c.log.With("error", err).Debugf("requester %d/%d to host %s errored",
i, len(c.clients), requester.host)
continue
}

requester.lastErr = nil
requester.lastErrOcc = time.Time{}
return resp, nil
}

requester.lastErr = nil
requester.lastErrOcc = time.Time{}
return resp, nil
return nil, fmt.Errorf("all hosts failed, last error: %w", err)
}

// URI returns the remote URI.
Expand All @@ -218,65 +225,75 @@ func (c *Client) URI() string {
func new(
log *logger.Logger,
cfg Config,
httpClients ...*requestClient,
clients ...*requestClient,
) (*Client, error) {
// Shuffle so all the agents don't access the hosts in the same order
rand.Shuffle(len(clients), func(i, j int) {
clients[i], clients[j] = clients[j], clients[i]
})

c := &Client{
log: log,
clients: httpClients,
clients: clients,
config: cfg,
}
return c, nil
}

// nextRequester returns the requester to use.
// sortClients returns the requester to use.
//
// It excludes clients that have errored in the last 5 minutes.
func (c *Client) nextRequester() *requestClient {
var selected *requestClient

func (c *Client) sortClients() {
now := time.Now().UTC()
for _, requester := range c.clients {
if requester.lastErr != nil && now.Sub(requester.lastErrOcc) > retryOnBadConnTimeout {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}

// Less reports whether the element with index i
// must sort before the element with index j.

sort.Slice(c.clients, func(i, j int) bool {
// First, set them good if the timout has elapsed
if c.clients[i].lastErr != nil &&
now.Sub(c.clients[i].lastErrOcc) > retryOnBadConnTimeout {
c.clients[i].lastErr = nil
c.clients[i].lastErrOcc = time.Time{}
}
if requester.lastErr != nil {
continue
if c.clients[j].lastErr != nil &&
now.Sub(c.clients[j].lastErrOcc) > retryOnBadConnTimeout {
c.clients[j].lastErr = nil
c.clients[j].lastErrOcc = time.Time{}
}
if requester.lastUsed.IsZero() {
// never been used, instant winner!
selected = requester
break

// Pick not yet used first, but if both haven't been used yet,
// we return false to comply with the sort.Interface definition.
if c.clients[i].lastUsed.IsZero() &&
c.clients[j].lastUsed.IsZero() {
return false
}
if selected == nil {
selected = requester
continue

// Pick not yet used first
if c.clients[i].lastUsed.IsZero() {
return true
}
if requester.lastUsed.Before(selected.lastUsed) {
selected = requester

// If none has errors, pick the last used
// Then, the one without errors
if c.clients[i].lastErr == nil &&
c.clients[j].lastErr == nil {
return c.clients[i].lastUsed.Before(c.clients[j].lastUsed)
}
}

if selected == nil {
// all are erroring; select the oldest one that errored
for _, requester := range c.clients {
if selected == nil {
selected = requester
continue
}
if requester.lastErrOcc.Before(selected.lastErrOcc) {
selected = requester
}
// Then, the one without error
if c.clients[i].lastErr == nil {
return true
}
}

return selected
// Lastly, the one that errored first
return c.clients[i].lastUsed.Before(c.clients[j].lastUsed)
})
}

func prefixRequestFactory(URL string) requestFunc {
return func(method, path string, params url.Values, body io.Reader) (*http.Request, error) {
path = strings.TrimPrefix(path, "/")
newPath := strings.Join([]string{URL, path, "?", params.Encode()}, "")
return http.NewRequest(method, newPath, body) //nolint:noctx // keep old behaviour
}
func (r requestClient) newRequest(method string, path string, params url.Values, body io.Reader) (*http.Request, error) {
path = strings.TrimPrefix(path, "/")
newPath := strings.Join([]string{r.host, path, "?", params.Encode()}, "")

return http.NewRequest(method, newPath, body)
}
Loading

0 comments on commit 7d26053

Please sign in to comment.