Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a healthcheck endpoint on the ingesters that distributors can use #741

Merged
3 changes: 2 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 41 additions & 55 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"hash/fnv"
"io"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -44,12 +43,12 @@ var (
// Distributor is a storage.SampleAppender and a client.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
cfg Config
ring ring.ReadRing
clientsMtx sync.RWMutex
clients map[string]client.IngesterClient
quit chan struct{}
done chan struct{}
cfg Config
ring ring.ReadRing
clientsMtx sync.RWMutex
clientCache *ingester_client.IngesterClientCache
quit chan struct{}
done chan struct{}

billingClient *billing.Client

Expand All @@ -73,11 +72,12 @@ type Config struct {
BillingConfig billing.Config
IngesterClientConfig ingester_client.Config

ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
IngestionBurstSize int
ReplicationFactor int
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
IngestionBurstSize int
HealthCheckIngesters bool

// for testing
ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error)
Expand All @@ -93,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during the cleanup period.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we say "during periodic cleanup" ? First off I thought "the cleanup period" was maybe a period during shutdown when we clean up.

}

// New constructs a new Distributor
Expand All @@ -116,7 +117,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
d := &Distributor{
cfg: cfg,
ring: ring,
clients: map[string]client.IngesterClient{},
clientCache: ingester_client.NewIngesterClientCache(cfg.ingesterClientFactory, cfg.IngesterClientConfig),
quit: make(chan struct{}),
done: make(chan struct{}),
billingClient: billingClient,
Expand Down Expand Up @@ -170,6 +171,9 @@ func (d *Distributor) Run() {
select {
case <-cleanupClients.C:
d.removeStaleIngesterClients()
if d.cfg.HealthCheckIngesters {
d.healthCheckAndRemoveIngesters()
}
case <-d.quit:
close(d.done)
return
Expand All @@ -184,52 +188,34 @@ func (d *Distributor) Stop() {
}

func (d *Distributor) removeStaleIngesterClients() {
d.clientsMtx.Lock()
defer d.clientsMtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-( I prefer the defer way, mainly because it makes the lock more robust to future modifications.

As the remote timeout is set to 2s, and the removeStateIngesterClients functions only runs every 15s, do we really need the extra goroutines and waitgroup?

Copy link
Contributor Author

@csmarchbanks csmarchbanks Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer the defer way. Since we have the mutex, even 1 timeout of 2s would block any other rules getting evaluated during that time which concerned me.

I could get rid of the wait group, but keep the goroutines. That should be ok since all the healthchecks should be done after at most 2s, and cleanup period is 15s by default. What do you think of that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course (d'oh).

How about making it all nice and inline, synchronous code that builds a new clients dict without holding the lock, and then replaces the old one under the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split the logic for removing stale ingester clients and healthchecking the ingester clients. I think everything is properly deferred now. Let me know what you think!


ingesters := map[string]struct{}{}
for _, ing := range d.ring.GetAll() {
ingesters[ing.Addr] = struct{}{}
}

for addr, client := range d.clients {
for _, addr := range d.clientCache.RegisteredAddresses() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we are doing two kinds of cleanup now: removing cache entries which are "stale" because the ring no longer references an ingester at that address, and removing entries which fail healthcheck. Do we still need the first one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could get away with only the healthcheck loop (after everyone has upgraded their ingesters and turned the flag on). However, I like the difference in expected behavior/logging from removing stale ingesters (expected) vs. failing ingesters (unexpected).

if _, ok := ingesters[addr]; ok {
continue
}
level.Info(util.Logger).Log("msg", "removing stale ingester client", "addr", addr)
delete(d.clients, addr)

// Do the gRPC closing in the background since it might take a while and
// we're holding a mutex.
go func(addr string, closer io.Closer) {
if err := closer.Close(); err != nil {
level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err)
}
}(addr, client.(io.Closer))
d.clientCache.RemoveClientFor(addr)
}
}

func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.IngesterClient, error) {
d.clientsMtx.RLock()
client, ok := d.clients[ingester.Addr]
d.clientsMtx.RUnlock()
if ok {
return client, nil
}

d.clientsMtx.Lock()
defer d.clientsMtx.Unlock()
client, ok = d.clients[ingester.Addr]
if ok {
return client, nil
}

client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.IngesterClientConfig)
if err != nil {
return nil, err
func (d *Distributor) healthCheckAndRemoveIngesters() {
for _, addr := range d.clientCache.RegisteredAddresses() {
client, err := d.clientCache.GetClientFor(addr)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can only happen due to some race between RegisteredAddresses() and GetClientFor() - maybe change the cache API to remove the possibility? Or move this whole loop into the cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this to the pool, but it still has an if around it in case someone deletes the entry from the pool while a previous healthcheck is happening. Right now that shouldn't happen, but that would be an annoying bug to run into if things change.

// if there is no client, don't need to health check it
level.Warn(util.Logger).Log("msg", "could not create client for", "addr", addr)
continue
}
err = ingester_client.HealthCheck(client, d.cfg.RemoteTimeout)
if err != nil {
level.Warn(util.Logger).Log("msg", "removing ingester failing healtcheck", "addr", addr, "reason", err)
d.clientCache.RemoveClientFor(addr)
}
}
d.clients[ingester.Addr] = client
return client, nil
}

func tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) {
Expand Down Expand Up @@ -412,7 +398,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
}

func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, samples []*sampleTracker) error {
c, err := d.getClientFor(ingester)
c, err := d.clientCache.GetClientFor(ingester.Addr)
if err != nil {
return err
}
Expand Down Expand Up @@ -449,7 +435,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .

metricNameMatcher, _, ok := util.ExtractMetricNameMatcherFromMatchers(matchers)

req, err := util.ToQueryRequest(from, to, matchers)
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}
Expand Down Expand Up @@ -529,7 +515,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.Inge
}

func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *client.QueryRequest) (model.Matrix, error) {
client, err := d.getClientFor(ing)
client, err := d.clientCache.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}
Expand All @@ -541,7 +527,7 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc,
return nil, err
}

return util.FromQueryResponse(resp), nil
return ingester_client.FromQueryResponse(resp), nil
}

// forAllIngesters runs f, in parallel, for all ingesters
Expand All @@ -550,7 +536,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
go func(ingester *ring.IngesterDesc) {
client, err := d.getClientFor(ingester)
client, err := d.clientCache.GetClientFor(ingester.Addr)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -609,7 +595,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
req, err := util.ToMetricsForLabelMatchersRequest(from, through, matchers)
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
if err != nil {
return nil, err
}
Expand All @@ -623,7 +609,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

metrics := map[model.Fingerprint]model.Metric{}
for _, resp := range resps {
ms := util.FromMetricsForLabelMatchersResponse(resp.(*client.MetricsForLabelMatchersResponse))
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*client.MetricsForLabelMatchersResponse))
for _, m := range ms {
metrics[m.Fingerprint()] = m
}
Expand Down Expand Up @@ -677,7 +663,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
// Not using d.forAllIngesters(), so we can fail after first error.
ingesters := d.ring.GetAll()
for _, ingester := range ingesters {
client, err := d.getClientFor(ingester)
client, err := d.clientCache.GetClientFor(ingester.Addr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -736,6 +722,6 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
numClientsDesc,
prometheus.GaugeValue,
float64(len(d.clients)),
float64(d.clientCache.Count()),
)
}
89 changes: 89 additions & 0 deletions pkg/ingester/client/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package client

import (
io "io"
"sync"

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/cortex/pkg/util"
)

// Factory defines the signature for an ingester client factory
type Factory func(addr string, cfg Config) (IngesterClient, error)

// IngesterClientCache holds a cache of ingester clients
type IngesterClientCache struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "cache" the right word? Elsewhere I've seen this called a "connection pool", except we have max one connection per endpoint. Maybe just some more explanation of the intended uses would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like IngesterPool, I will change it to that, and add some more explanations.

sync.RWMutex
clients map[string]IngesterClient

ingesterClientFactory Factory
ingesterClientConfig Config
}

// NewIngesterClientCache creates a new cache
func NewIngesterClientCache(factory Factory, config Config) *IngesterClientCache {
return &IngesterClientCache{
clients: map[string]IngesterClient{},
ingesterClientFactory: factory,
ingesterClientConfig: config,
}
}

// GetClientFor gets the client for the specified address. If it does not exist it will make a new client
// at that address
func (cache *IngesterClientCache) GetClientFor(addr string) (IngesterClient, error) {
cache.RLock()
client, ok := cache.clients[addr]
cache.RUnlock()
if ok {
return client, nil
}

cache.Lock()
defer cache.Unlock()
client, ok = cache.clients[addr]
if ok {
return client, nil
}

client, err := cache.ingesterClientFactory(addr, cache.ingesterClientConfig)
if err != nil {
return nil, err
}
cache.clients[addr] = client
return client, nil
}

// RemoveClientFor removes the client with the specified address
func (cache *IngesterClientCache) RemoveClientFor(addr string) {
cache.Lock()
defer cache.Unlock()
client, ok := cache.clients[addr]
if ok {
delete(cache.clients, addr)
// Close in the background since this operation may take awhile and we have a mutex
go func(addr string, closer io.Closer) {
if err := closer.Close(); err != nil {
level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err)
}
}(addr, client.(io.Closer))
}
}

// RegisteredAddresses returns all the addresses that a client is cached for
func (cache *IngesterClientCache) RegisteredAddresses() []string {
result := []string{}
cache.RLock()
defer cache.RUnlock()
for addr := range cache.clients {
result = append(result, addr)
}
return result
}

// Count returns how many clients are in the cache
func (cache *IngesterClientCache) Count() int {
cache.RLock()
defer cache.RUnlock()
return len(cache.clients)
}
62 changes: 62 additions & 0 deletions pkg/ingester/client/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package client

import (
fmt "fmt"
"testing"

"google.golang.org/grpc/health/grpc_health_v1"
)

func (i mockIngester) Close() error {
return nil
}

func TestIngesterCache(t *testing.T) {
buildCount := 0
factory := func(addr string, _ Config) (IngesterClient, error) {
if addr == "bad" {
return nil, fmt.Errorf("Fail")
}
buildCount++
return mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}
cache := NewIngesterClientCache(factory, Config{})

cache.GetClientFor("1")
if buildCount != 1 {
t.Errorf("Did not create client")
}

cache.GetClientFor("1")
if buildCount != 1 {
t.Errorf("Created client that should have been cached")
}

cache.GetClientFor("2")
if cache.Count() != 2 {
t.Errorf("Expected Count() = 2, got %d", cache.Count())
}

cache.RemoveClientFor("1")
if cache.Count() != 1 {
t.Errorf("Expected Count() = 1, got %d", cache.Count())
}

cache.GetClientFor("1")
if buildCount != 3 || cache.Count() != 2 {
t.Errorf("Did not re-create client correctly")
}

_, err := cache.GetClientFor("bad")
if err == nil {
t.Errorf("Bad create should have thrown an error")
}
if cache.Count() != 2 {
t.Errorf("Bad create should not have been added to cache")
}

addrs := cache.RegisteredAddresses()
if len(addrs) != cache.Count() {
t.Errorf("Lengths of registered addresses and cache.Count() do not match")
}
}
Loading