Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make policy change handler try all fleet hosts before failing #1329

Merged
merged 14 commits into from
Oct 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"sort"
"time"

Expand Down Expand Up @@ -142,30 +143,35 @@ func (h *PolicyChange) handleFleetServerHosts(ctx context.Context, c *config.Con
err, "fail to create API client with updated hosts",
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Client.Hosts))
}

ctx, cancel := context.WithTimeout(ctx, apiStatusTimeout)
defer cancel()
resp, err := client.Send(ctx, "GET", "/api/status", nil, nil, nil)

resp, err := client.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil)
if err != nil {
return errors.New(
err, "fail to communicate with updated API client hosts",
err, "fail to communicate with Fleet Server API client hosts",
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Client.Hosts))
}

// discard body for proper cancellation and connection reuse
_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

reader, err := fleetToReader(h.agentInfo, h.config)
if err != nil {
return errors.New(
err, "fail to persist updated API client hosts",
err, "fail to persist new Fleet Server API client hosts",
errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Client.Hosts))
}

err = h.store.Save(reader)
if err != nil {
return errors.New(
err, "fail to persist updated API client hosts",
err, "fail to persist new Fleet Server API client hosts",
errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Client.Hosts))
}

for _, setter := range h.setters {
setter.SetClient(client)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/fleetapi/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewWithConfig(log *logger.Logger, cfg remote.Config) (*remote.Client, error

// ExtractError extracts error from a fleet-server response
func ExtractError(resp io.Reader) error {
// Lets try to extract a high level fleet-server error.
// Let's try to extract a high level fleet-server error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣

e := &struct {
StatusCode int `json:"statusCode"`
Error string `json:"error"`
Expand Down
218 changes: 126 additions & 92 deletions internal/pkg/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package remote

import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
Expand All @@ -17,6 +20,7 @@ import (

urlutil "github.com/elastic/elastic-agent-libs/kibana"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"

"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/id"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand All @@ -26,26 +30,27 @@ const (
retryOnBadConnTimeout = 5 * time.Minute
)

type requestFunc func(string, string, url.Values, io.Reader) (*http.Request, error)
var errRequestFailed = errors.New("request failed")

type wrapperFunc func(rt http.RoundTripper) (http.RoundTripper, error)

type requestClient struct {
request requestFunc
host string
client http.Client
lastUsed time.Time
lastErr error
lastErrOcc time.Time
}

// Client wraps an http.Client and takes care of making the raw calls, the client should
// stay simple and specificals should be implemented in external action instead of adding new methods
// to the client. For authenticated calls or sending fields on every request, create customer RoundTripper
// implementations that will take care of the boiler plates.
// Client wraps a http.Client and takes care of making the raw calls, the client should
// stay simple and specifics should be implemented in external action instead of adding new methods
// to the client. For authenticated calls or sending fields on every request, create a custom RoundTripper
// implementation that will take care of the boilerplate.
type Client struct {
log *logger.Logger
lock sync.Mutex
clients []*requestClient
config Config
log *logger.Logger
clientLock sync.Mutex
clients []*requestClient
config Config
}

// NewConfigFromURL returns a Config based on a received host.
Expand Down Expand Up @@ -97,9 +102,12 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
}

hosts := cfg.GetHosts()
clients := make([]*requestClient, len(hosts))
for i, host := range cfg.GetHosts() {
connStr, err := urlutil.MakeURL(string(cfg.Protocol), p, host, 0)
hostCount := len(hosts)
log.With("hosts", hosts).Debugf(
"creating remote client with %d hosts", hostCount)
clients := make([]*requestClient, hostCount)
for i, host := range hosts {
baseURL, err := urlutil.MakeURL(string(cfg.Protocol), p, host, 0)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}
Expand All @@ -125,17 +133,17 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
}

clients[i] = &requestClient{
request: prefixRequestFactory(connStr),
client: httpClient,
host: baseURL,
client: httpClient,
}
}

return new(log, cfg, clients...)
return newClient(log, cfg, clients...)
}

// Send executes a direct calls against the API, the method will takes cares of cloning
// also add necessary headers for likes: "Content-Type", "Accept", and "kbn-xsrf".
// No assumptions is done on the response concerning the received format, this will be the responsibility
// Send executes a direct calls against the API, the method will take care of cloning and
// also adding the necessary headers likes: "Content-Type", "Accept", and "kbn-xsrf".
// No assumptions are done on the response concerning the received format, this will be the responsibility
// of the implementation to correctly unpack any received data.
//
// NOTE:
Expand All @@ -155,45 +163,60 @@ func (c *Client) Send(
}

c.log.Debugf("Request method: %s, path: %s, reqID: %s", method, path, reqID)
c.lock.Lock()
defer c.lock.Unlock()
requester := c.nextRequester()
c.clientLock.Lock()
defer c.clientLock.Unlock()

req, err := requester.request(method, path, params, body)
if err != nil {
return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path)
}
var resp *http.Response
multiErr := errRequestFailed

// Add generals headers to the request, we are dealing exclusively with JSON.
// Content-Type / Accepted type can be override from the called.
req.Header.Set("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
// This header should be specific to fleet-server or remove it
req.Header.Set("kbn-xsrf", "1") // Without this Kibana will refuse to answer the request.
c.sortClients()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we should shuffle arrays with each request, it adds unnecessary CPU cycles. maybe it would be better to shuffle on error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting on each request will ensure we keep the previous behavior, preferring the no-error last used host to use next. Besides the number of fleet-server hosts are so small, that it's negligible the overhead to sort them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could possibly optimize skipping sorting if already known to be sorted before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs to be sorted every time because the sorting conditions change. Every time a host is used it loses priority, thus a new sorting is required.

It isn't much different from the original code that would do extensive search on each call to Send. Besides the host list is so small that i's negligible the impact on performance.

for i, requester := range c.clients {
req, err := requester.newRequest(method, path, params, body)
if err != nil {
return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path)
}

// If available, add the request id as an HTTP header
if reqID != "" {
req.Header.Add("X-Request-ID", reqID)
}
// Add generals headers to the request, we are dealing exclusively with JSON.
// Content-Type / Accepted type can be overridden by the caller.
req.Header.Set("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
// This header should be specific to fleet-server or remove it
req.Header.Set("kbn-xsrf", "1") // Without this Kibana will refuse to answer the request.

// copy headers.
for header, values := range headers {
for _, v := range values {
req.Header.Add(header, v)
// If available, add the request id as an HTTP header
if reqID != "" {
req.Header.Add("X-Request-ID", reqID)
}
}

requester.lastUsed = time.Now().UTC()
// copy headers.
for header, values := range headers {
for _, v := range values {
req.Header.Add(header, v)
}
}

requester.lastUsed = time.Now().UTC()

resp, err = requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()

msg := fmt.Sprintf("requester %d/%d to host %s errored",
i, len(c.clients), requester.host)
multiErr = fmt.Errorf("%s: %s: %s", msg, err, multiErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels hacky, i'd prefer https://github.com/hashicorp/go-multierror


// Using debug level as the error is only relevant if all clients fail.
c.log.With("error", err).Debugf(msg)
continue
}

resp, err := requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()
} else {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}
return resp, nil
}
return resp, err

return nil, fmt.Errorf("all hosts failed: %w", multiErr)
}

// URI returns the remote URI.
Expand All @@ -202,67 +225,78 @@ func (c *Client) URI() string {
return string(c.config.Protocol) + "://" + host + "/" + c.config.Path
}

// new creates new API client.
func new(
// newClient creates a new API client.
func newClient(
log *logger.Logger,
cfg Config,
httpClients ...*requestClient,
clients ...*requestClient,
) (*Client, error) {
// Shuffle so all the agents don't access the hosts in the same order
rand.Shuffle(len(clients), func(i, j int) {
clients[i], clients[j] = clients[j], clients[i]
})

c := &Client{
log: log,
clients: httpClients,
clients: clients,
config: cfg,
}
return c, nil
}

// nextRequester returns the requester to use.
//
// It excludes clients that have errored in the last 5 minutes.
func (c *Client) nextRequester() *requestClient {
var selected *requestClient

// sortClients sort the clients according to the following priority:
// - never used
// - without errors, last used first when more than one does not have errors
// - last errored.
// It also removes the last error after retryOnBadConnTimeout has elapsed.
func (c *Client) sortClients() {
now := time.Now().UTC()
for _, requester := range c.clients {
if requester.lastErr != nil && now.Sub(requester.lastErrOcc) > retryOnBadConnTimeout {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}

sort.Slice(c.clients, func(i, j int) bool {
// First, set them good if the timout has elapsed
if c.clients[i].lastErr != nil &&
now.Sub(c.clients[i].lastErrOcc) > retryOnBadConnTimeout {
c.clients[i].lastErr = nil
c.clients[i].lastErrOcc = time.Time{}
}
if requester.lastErr != nil {
continue
if c.clients[j].lastErr != nil &&
now.Sub(c.clients[j].lastErrOcc) > retryOnBadConnTimeout {
c.clients[j].lastErr = nil
c.clients[j].lastErrOcc = time.Time{}
}
if requester.lastUsed.IsZero() {
// never been used, instant winner!
selected = requester
break

// Pick not yet used first, but if both haven't been used yet,
// we return false to comply with the sort.Interface definition.
if c.clients[i].lastUsed.IsZero() &&
c.clients[j].lastUsed.IsZero() {
return false
}
if selected == nil {
selected = requester
continue

// Pick not yet used first
if c.clients[i].lastUsed.IsZero() {
return true
}
if requester.lastUsed.Before(selected.lastUsed) {
selected = requester

// If none has errors, pick the last used
// Then, the one without errors
if c.clients[i].lastErr == nil &&
c.clients[j].lastErr == nil {
return c.clients[i].lastUsed.Before(c.clients[j].lastUsed)
}
}
if selected == nil {
// all are erroring; select the oldest one that errored
for _, requester := range c.clients {
if selected == nil {
selected = requester
continue
}
if requester.lastErrOcc.Before(selected.lastErrOcc) {
selected = requester
}

// Then, the one without error
if c.clients[i].lastErr == nil {
return true
}
}
return selected

// Lastly, the one that errored last
return c.clients[i].lastUsed.Before(c.clients[j].lastUsed)
})
}

func prefixRequestFactory(URL string) requestFunc {
return func(method, path string, params url.Values, body io.Reader) (*http.Request, error) {
path = strings.TrimPrefix(path, "/")
newPath := strings.Join([]string{URL, path, "?", params.Encode()}, "")
return http.NewRequest(method, newPath, body) //nolint:noctx // keep old behaviour
}
func (r requestClient) newRequest(method string, path string, params url.Values, body io.Reader) (*http.Request, error) {
path = strings.TrimPrefix(path, "/")
newPath := strings.Join([]string{r.host, path, "?", params.Encode()}, "")

return http.NewRequest(method, newPath, body)
}
Loading