From da2cc6ae14720f8588088055164cf65d63c82e30 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Sun, 11 Mar 2018 06:54:04 -0600 Subject: [PATCH 1/9] Add a healthcheck endpoint on the ingesters that distributors can use --- pkg/distributor/distributor.go | 42 ++++++++++++++++++++++++++------ pkg/ingester/client/cortex.proto | 14 +++++++++++ pkg/ingester/ingester.go | 5 ++++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 54f810b69e..5dde11cc66 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -184,16 +184,17 @@ func (d *Distributor) Stop() { } func (d *Distributor) removeStaleIngesterClients() { - d.clientsMtx.Lock() - defer d.clientsMtx.Unlock() - ingesters := map[string]struct{}{} for _, ing := range d.ring.GetAll() { ingesters[ing.Addr] = struct{}{} } + wg := sync.WaitGroup{} + d.clientsMtx.Lock() for addr, client := range d.clients { if _, ok := ingesters[addr]; ok { + wg.Add(1) + go d.healthCheckAndRemoveIngester(addr, client, &wg) continue } level.Info(util.Logger).Log("msg", "removing stale ingester client", "addr", addr) @@ -201,12 +202,37 @@ func (d *Distributor) removeStaleIngesterClients() { // Do the gRPC closing in the background since it might take a while and // we're holding a mutex. - go func(addr string, closer io.Closer) { - if err := closer.Close(); err != nil { - level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err) - } - }(addr, client.(io.Closer)) + go closeClient(addr, client.(io.Closer)) + } + + // Make sure we are done healthchecking before returning. But want to unlock the mutex first + d.clientsMtx.Unlock() + wg.Wait() +} + +func closeClient(addr string, closer io.Closer) { + if err := closer.Close(); err != nil { + level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err) + } +} + +func (d *Distributor) healthCheckAndRemoveIngester(addr string, client client.IngesterClient, wg *sync.WaitGroup) { + level.Debug(util.Logger).Log("msg", "healthchecking ingester client", "addr", addr) + + ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) + ctx = user.InjectOrgID(ctx, "0") + resp, err := client.Check(ctx, &ingester_client.HealthCheckRequest{}) + cancel() + if err != nil || resp.Status != ingester_client.SERVING { + level.Warn(util.Logger).Log("msg", "removing ingester client failing healthcheck", "addr", addr, "reason", err) + + d.clientsMtx.Lock() + delete(d.clients, addr) + d.clientsMtx.Unlock() + + go closeClient(addr, client.(io.Closer)) } + wg.Done() } func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.IngesterClient, error) { diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index e4d3146538..0623b9bfc8 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -19,6 +19,7 @@ service Ingester { // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}; + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); } message WriteRequest { @@ -132,3 +133,16 @@ message LabelMatcher { string name = 2; string value = 3; } + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d050fc4bba..83ba1b7859 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -495,6 +495,11 @@ func (i *Ingester) AllUserStats(ctx old_ctx.Context, req *client.UserStatsReques return response, nil } +// Check implements the grpc healthcheck +func (i *Ingester) Check(ctx old_ctx.Context, req *client.HealthCheckRequest) (*client.HealthCheckResponse, error) { + return &client.HealthCheckResponse{Status: client.SERVING}, nil +} + // Describe implements prometheus.Collector. func (i *Ingester) Describe(ch chan<- *prometheus.Desc) { ch <- memorySeriesDesc From 9238ce316838998c2591df01b49bf87df152f8b0 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Mon, 12 Mar 2018 13:31:20 -0600 Subject: [PATCH 2/9] defer cancel for healthcheck timeout --- pkg/distributor/distributor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 5dde11cc66..94db3ae139 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -220,9 +220,9 @@ func (d *Distributor) healthCheckAndRemoveIngester(addr string, client client.In level.Debug(util.Logger).Log("msg", "healthchecking ingester client", "addr", addr) ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) + defer cancel() ctx = user.InjectOrgID(ctx, "0") resp, err := client.Check(ctx, &ingester_client.HealthCheckRequest{}) - cancel() if err != nil || resp.Status != ingester_client.SERVING { level.Warn(util.Logger).Log("msg", "removing ingester client failing healthcheck", "addr", addr, "reason", err) From ddbdcc235cc28ba0a84a768f35e4ae444ee5236a Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Mon, 12 Mar 2018 14:20:42 -0600 Subject: [PATCH 3/9] Vendor in health protobuf stuff --- Gopkg.lock | 3 +- pkg/distributor/distributor.go | 5 +- pkg/ingester/client/cortex.proto | 16 +- pkg/ingester/ingester.go | 5 +- .../grpc/health/grpc_health_v1/health.pb.go | 190 ++++++++++++++++++ .../grpc/health/grpc_health_v1/health.proto | 34 ++++ 6 files changed, 234 insertions(+), 19 deletions(-) create mode 100644 vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go create mode 100644 vendor/google.golang.org/grpc/health/grpc_health_v1/health.proto diff --git a/Gopkg.lock b/Gopkg.lock index 0ce747fe6c..a755b26bc6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1028,6 +1028,7 @@ "encoding/gzip", "grpclb/grpc_lb_v1/messages", "grpclog", + "health/grpc_health_v1", "internal", "keepalive", "metadata", @@ -1198,6 +1199,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "64ccb3a589c4a999f4c271cbeba54180e7a44eb18a74e52b0d3fcc375e85c660" + inputs-digest = "266378fab402345b841032ba476ff68df68885c4c41b6acd2f9e403f5806b271" solver-name = "gps-cdcl" solver-version = 1 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 94db3ae139..bb2d03f799 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -28,6 +28,7 @@ import ( ingester_client "github.com/weaveworks/cortex/pkg/ingester/client" "github.com/weaveworks/cortex/pkg/ring" "github.com/weaveworks/cortex/pkg/util" + "google.golang.org/grpc/health/grpc_health_v1" ) var errIngestionRateLimitExceeded = errors.New("ingestion rate limit exceeded") @@ -222,8 +223,8 @@ func (d *Distributor) healthCheckAndRemoveIngester(addr string, client client.In ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) defer cancel() ctx = user.InjectOrgID(ctx, "0") - resp, err := client.Check(ctx, &ingester_client.HealthCheckRequest{}) - if err != nil || resp.Status != ingester_client.SERVING { + resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { level.Warn(util.Logger).Log("msg", "removing ingester client failing healthcheck", "addr", addr, "reason", err) d.clientsMtx.Lock() diff --git a/pkg/ingester/client/cortex.proto b/pkg/ingester/client/cortex.proto index 0623b9bfc8..425e2dba33 100644 --- a/pkg/ingester/client/cortex.proto +++ b/pkg/ingester/client/cortex.proto @@ -5,6 +5,7 @@ package cortex; option go_package = "client"; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "google.golang.org/grpc/health/grpc_health_v1/health.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; @@ -19,7 +20,7 @@ service Ingester { // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server). rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}; - rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + rpc Check(grpc.health.v1.HealthCheckRequest) returns (grpc.health.v1.HealthCheckResponse); } message WriteRequest { @@ -133,16 +134,3 @@ message LabelMatcher { string name = 2; string value = 3; } - -message HealthCheckRequest { - string service = 1; -} - -message HealthCheckResponse { - enum ServingStatus { - UNKNOWN = 0; - SERVING = 1; - NOT_SERVING = 2; - } - ServingStatus status = 1; -} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 83ba1b7859..ccdea8db91 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( // Needed for gRPC compatibility. old_ctx "golang.org/x/net/context" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -496,8 +497,8 @@ func (i *Ingester) AllUserStats(ctx old_ctx.Context, req *client.UserStatsReques } // Check implements the grpc healthcheck -func (i *Ingester) Check(ctx old_ctx.Context, req *client.HealthCheckRequest) (*client.HealthCheckResponse, error) { - return &client.HealthCheckResponse{Status: client.SERVING}, nil +func (i *Ingester) Check(ctx old_ctx.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } // Describe implements prometheus.Collector. diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go new file mode 100644 index 0000000000..fdcbb9e0b7 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -0,0 +1,190 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: grpc_health_v1/health.proto + +/* +Package grpc_health_v1 is a generated protocol buffer package. + +It is generated from these files: + grpc_health_v1/health.proto + +It has these top-level messages: + HealthCheckRequest + HealthCheckResponse +*/ +package grpc_health_v1 + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type HealthCheckResponse_ServingStatus int32 + +const ( + HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 + HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 + HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 +) + +var HealthCheckResponse_ServingStatus_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", +} +var HealthCheckResponse_ServingStatus_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, +} + +func (x HealthCheckResponse_ServingStatus) String() string { + return proto.EnumName(HealthCheckResponse_ServingStatus_name, int32(x)) +} +func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) { + return fileDescriptor0, []int{1, 0} +} + +type HealthCheckRequest struct { + Service string `protobuf:"bytes,1,opt,name=service" json:"service,omitempty"` +} + +func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } +func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } +func (*HealthCheckRequest) ProtoMessage() {} +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *HealthCheckRequest) GetService() string { + if m != nil { + return m.Service + } + return "" +} + +type HealthCheckResponse struct { + Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } +func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } +func (*HealthCheckResponse) ProtoMessage() {} +func (*HealthCheckResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus { + if m != nil { + return m.Status + } + return HealthCheckResponse_UNKNOWN +} + +func init() { + proto.RegisterType((*HealthCheckRequest)(nil), "grpc.health.v1.HealthCheckRequest") + proto.RegisterType((*HealthCheckResponse)(nil), "grpc.health.v1.HealthCheckResponse") + proto.RegisterEnum("grpc.health.v1.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Health service + +type HealthClient interface { + Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) +} + +type healthClient struct { + cc *grpc.ClientConn +} + +func NewHealthClient(cc *grpc.ClientConn) HealthClient { + return &healthClient{cc} +} + +func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { + out := new(HealthCheckResponse) + err := grpc.Invoke(ctx, "/grpc.health.v1.Health/Check", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Health service + +type HealthServer interface { + Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) +} + +func RegisterHealthServer(s *grpc.Server, srv HealthServer) { + s.RegisterService(&_Health_serviceDesc, srv) +} + +func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HealthCheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HealthServer).Check(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.health.v1.Health/Check", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Health_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.health.v1.Health", + HandlerType: (*HealthServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Check", + Handler: _Health_Check_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "grpc_health_v1/health.proto", +} + +func init() { proto.RegisterFile("grpc_health_v1/health.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 213 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4e, 0x2f, 0x2a, 0x48, + 0x8e, 0xcf, 0x48, 0x4d, 0xcc, 0x29, 0xc9, 0x88, 0x2f, 0x33, 0xd4, 0x87, 0xb0, 0xf4, 0x0a, 0x8a, + 0xf2, 0x4b, 0xf2, 0x85, 0xf8, 0x40, 0x92, 0x7a, 0x50, 0xa1, 0x32, 0x43, 0x25, 0x3d, 0x2e, 0x21, + 0x0f, 0x30, 0xc7, 0x39, 0x23, 0x35, 0x39, 0x3b, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, + 0x82, 0x8b, 0xbd, 0x38, 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, + 0x08, 0xc6, 0x55, 0x9a, 0xc3, 0xc8, 0x25, 0x8c, 0xa2, 0xa1, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, + 0xc8, 0x93, 0x8b, 0xad, 0xb8, 0x24, 0xb1, 0xa4, 0xb4, 0x18, 0xac, 0x81, 0xcf, 0xc8, 0x50, 0x0f, + 0xd5, 0x22, 0x3d, 0x2c, 0x9a, 0xf4, 0x82, 0x41, 0x86, 0xe6, 0xa5, 0x07, 0x83, 0x35, 0x06, 0x41, + 0x0d, 0x50, 0xb2, 0xe2, 0xe2, 0x45, 0x91, 0x10, 0xe2, 0xe6, 0x62, 0x0f, 0xf5, 0xf3, 0xf6, 0xf3, + 0x0f, 0xf7, 0x13, 0x60, 0x00, 0x71, 0x82, 0x5d, 0x83, 0xc2, 0x3c, 0xfd, 0xdc, 0x05, 0x18, 0x85, + 0xf8, 0xb9, 0xb8, 0xfd, 0xfc, 0x43, 0xe2, 0x61, 0x02, 0x4c, 0x46, 0x51, 0x5c, 0x6c, 0x10, 0x8b, + 0x84, 0x02, 0xb8, 0x58, 0xc1, 0x96, 0x09, 0x29, 0xe1, 0x75, 0x09, 0xd8, 0xbf, 0x52, 0xca, 0x44, + 0xb8, 0x36, 0x89, 0x0d, 0x1c, 0x82, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x53, 0x2b, 0x65, + 0x20, 0x60, 0x01, 0x00, 0x00, +} diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.proto b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.proto new file mode 100644 index 0000000000..6072fdc3b8 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.proto @@ -0,0 +1,34 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +service Health{ + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +} From f641950d0c0a63f19a3f2fa6abb219cf53a64fce Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Mon, 12 Mar 2018 16:44:08 -0600 Subject: [PATCH 4/9] Split removeStaleIngesterClients and healtCheckAndRemoveIngesters to two methods --- pkg/distributor/distributor.go | 44 ++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index bb2d03f799..eaf0852f6e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -171,6 +171,7 @@ func (d *Distributor) Run() { select { case <-cleanupClients.C: d.removeStaleIngesterClients() + d.healthCheckAndRemoveIngesters() case <-d.quit: close(d.done) return @@ -185,17 +186,16 @@ func (d *Distributor) Stop() { } func (d *Distributor) removeStaleIngesterClients() { + d.clientsMtx.Lock() + defer d.clientsMtx.Unlock() + ingesters := map[string]struct{}{} for _, ing := range d.ring.GetAll() { ingesters[ing.Addr] = struct{}{} } - wg := sync.WaitGroup{} - d.clientsMtx.Lock() for addr, client := range d.clients { if _, ok := ingesters[addr]; ok { - wg.Add(1) - go d.healthCheckAndRemoveIngester(addr, client, &wg) continue } level.Info(util.Logger).Log("msg", "removing stale ingester client", "addr", addr) @@ -205,10 +205,6 @@ func (d *Distributor) removeStaleIngesterClients() { // we're holding a mutex. go closeClient(addr, client.(io.Closer)) } - - // Make sure we are done healthchecking before returning. But want to unlock the mutex first - d.clientsMtx.Unlock() - wg.Wait() } func closeClient(addr string, closer io.Closer) { @@ -217,23 +213,35 @@ func closeClient(addr string, closer io.Closer) { } } -func (d *Distributor) healthCheckAndRemoveIngester(addr string, client client.IngesterClient, wg *sync.WaitGroup) { - level.Debug(util.Logger).Log("msg", "healthchecking ingester client", "addr", addr) +func (d *Distributor) healthCheckAndRemoveIngesters() { + ingesters := d.ring.GetAll() + for _, ingester := range ingesters { + d.healthCheckAndRemoveIngester(ingester) + } +} + +func (d *Distributor) healthCheckAndRemoveIngester(ingester *ring.IngesterDesc) { + client, err := d.getClientFor(ingester) + if err != nil { + d.removeClientFor(ingester, err) + } ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) defer cancel() ctx = user.InjectOrgID(ctx, "0") + resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - level.Warn(util.Logger).Log("msg", "removing ingester client failing healthcheck", "addr", addr, "reason", err) - - d.clientsMtx.Lock() - delete(d.clients, addr) - d.clientsMtx.Unlock() - - go closeClient(addr, client.(io.Closer)) + d.removeClientFor(ingester, err) + go closeClient(ingester.Addr, client.(io.Closer)) } - wg.Done() +} + +func (d *Distributor) removeClientFor(ingester *ring.IngesterDesc, err error) { + level.Warn(util.Logger).Log("msg", "removing ingester client", "addr", ingester.Addr, "reason", err) + d.clientsMtx.Lock() + defer d.clientsMtx.Unlock() + delete(d.clients, ingester.Addr) } func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.IngesterClient, error) { From 2c4426d27e37a96df398e286cbbc271a6db3dc5f Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 13 Mar 2018 13:21:42 -0600 Subject: [PATCH 5/9] Add config for turning on health check behavior --- pkg/distributor/distributor.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eaf0852f6e..641f5284f6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -20,6 +20,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/cortex/pkg/prom1/storage/metric" + "google.golang.org/grpc/health/grpc_health_v1" billing "github.com/weaveworks/billing-client" "github.com/weaveworks/common/instrument" @@ -28,7 +29,6 @@ import ( ingester_client "github.com/weaveworks/cortex/pkg/ingester/client" "github.com/weaveworks/cortex/pkg/ring" "github.com/weaveworks/cortex/pkg/util" - "google.golang.org/grpc/health/grpc_health_v1" ) var errIngestionRateLimitExceeded = errors.New("ingestion rate limit exceeded") @@ -74,11 +74,12 @@ type Config struct { BillingConfig billing.Config IngesterClientConfig ingester_client.Config - ReplicationFactor int - RemoteTimeout time.Duration - ClientCleanupPeriod time.Duration - IngestionRateLimit float64 - IngestionBurstSize int + ReplicationFactor int + RemoteTimeout time.Duration + ClientCleanupPeriod time.Duration + IngestionRateLimit float64 + IngestionBurstSize int + HealthCheckIngesters bool // for testing ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error) @@ -94,6 +95,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") + flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during the cleanup period.") } // New constructs a new Distributor @@ -171,7 +173,9 @@ func (d *Distributor) Run() { select { case <-cleanupClients.C: d.removeStaleIngesterClients() - d.healthCheckAndRemoveIngesters() + if d.cfg.HealthCheckIngesters { + d.healthCheckAndRemoveIngesters() + } case <-d.quit: close(d.done) return @@ -223,7 +227,7 @@ func (d *Distributor) healthCheckAndRemoveIngesters() { func (d *Distributor) healthCheckAndRemoveIngester(ingester *ring.IngesterDesc) { client, err := d.getClientFor(ingester) if err != nil { - d.removeClientFor(ingester, err) + return } ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) From 4f2addba9f57e2f46c1f30af09656b2907a46b2d Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 13 Mar 2018 15:28:02 -0600 Subject: [PATCH 6/9] Refactored distributor client cache to ingester/client/IngesterClientCache --- pkg/distributor/distributor.go | 113 +++++-------------- pkg/ingester/client/cache.go | 89 +++++++++++++++ pkg/{util => ingester/client}/compat.go | 89 ++++++++------- pkg/{util => ingester/client}/compat_test.go | 2 +- pkg/ingester/client/healthcheck.go | 26 +++++ pkg/ingester/ingester.go | 10 +- pkg/ingester/ingester_claim.go | 2 +- pkg/ingester/ingester_lifecycle.go | 2 +- pkg/ingester/ingester_lifecycle_test.go | 8 +- pkg/ingester/ingester_test.go | 31 ++--- pkg/querier/querier.go | 4 +- pkg/ruler/compat.go | 3 +- 12 files changed, 220 insertions(+), 159 deletions(-) create mode 100644 pkg/ingester/client/cache.go rename pkg/{util => ingester/client}/compat.go (66%) rename pkg/{util => ingester/client}/compat_test.go (99%) create mode 100644 pkg/ingester/client/healthcheck.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 641f5284f6..a2b11564a9 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "hash/fnv" - "io" "sync" "sync/atomic" "time" @@ -20,7 +19,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/cortex/pkg/prom1/storage/metric" - "google.golang.org/grpc/health/grpc_health_v1" billing "github.com/weaveworks/billing-client" "github.com/weaveworks/common/instrument" @@ -45,12 +43,12 @@ var ( // Distributor is a storage.SampleAppender and a client.Querier which // forwards appends and queries to individual ingesters. type Distributor struct { - cfg Config - ring ring.ReadRing - clientsMtx sync.RWMutex - clients map[string]client.IngesterClient - quit chan struct{} - done chan struct{} + cfg Config + ring ring.ReadRing + clientsMtx sync.RWMutex + clientCache *ingester_client.IngesterClientCache + quit chan struct{} + done chan struct{} billingClient *billing.Client @@ -119,7 +117,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { d := &Distributor{ cfg: cfg, ring: ring, - clients: map[string]client.IngesterClient{}, + clientCache: ingester_client.NewIngesterClientCache(cfg.ingesterClientFactory, cfg.IngesterClientConfig), quit: make(chan struct{}), done: make(chan struct{}), billingClient: billingClient, @@ -190,85 +188,34 @@ func (d *Distributor) Stop() { } func (d *Distributor) removeStaleIngesterClients() { - d.clientsMtx.Lock() - defer d.clientsMtx.Unlock() - ingesters := map[string]struct{}{} for _, ing := range d.ring.GetAll() { ingesters[ing.Addr] = struct{}{} } - for addr, client := range d.clients { + for _, addr := range d.clientCache.RegisteredAddresses() { if _, ok := ingesters[addr]; ok { continue } level.Info(util.Logger).Log("msg", "removing stale ingester client", "addr", addr) - delete(d.clients, addr) - - // Do the gRPC closing in the background since it might take a while and - // we're holding a mutex. - go closeClient(addr, client.(io.Closer)) - } -} - -func closeClient(addr string, closer io.Closer) { - if err := closer.Close(); err != nil { - level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err) + d.clientCache.RemoveClientFor(addr) } } func (d *Distributor) healthCheckAndRemoveIngesters() { - ingesters := d.ring.GetAll() - for _, ingester := range ingesters { - d.healthCheckAndRemoveIngester(ingester) - } -} - -func (d *Distributor) healthCheckAndRemoveIngester(ingester *ring.IngesterDesc) { - client, err := d.getClientFor(ingester) - if err != nil { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) - defer cancel() - ctx = user.InjectOrgID(ctx, "0") - - resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - d.removeClientFor(ingester, err) - go closeClient(ingester.Addr, client.(io.Closer)) - } -} - -func (d *Distributor) removeClientFor(ingester *ring.IngesterDesc, err error) { - level.Warn(util.Logger).Log("msg", "removing ingester client", "addr", ingester.Addr, "reason", err) - d.clientsMtx.Lock() - defer d.clientsMtx.Unlock() - delete(d.clients, ingester.Addr) -} - -func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.IngesterClient, error) { - d.clientsMtx.RLock() - client, ok := d.clients[ingester.Addr] - d.clientsMtx.RUnlock() - if ok { - return client, nil - } - - d.clientsMtx.Lock() - defer d.clientsMtx.Unlock() - client, ok = d.clients[ingester.Addr] - if ok { - return client, nil - } - - client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.IngesterClientConfig) - if err != nil { - return nil, err + for _, addr := range d.clientCache.RegisteredAddresses() { + client, err := d.clientCache.GetClientFor(addr) + if err != nil { + // if there is no client, don't need to health check it + level.Warn(util.Logger).Log("msg", "could not create client for", "addr", addr) + continue + } + err = ingester_client.HealthCheck(client, d.cfg.RemoteTimeout) + if err != nil { + level.Warn(util.Logger).Log("msg", "removing ingester failing healtcheck", "addr", addr, "reason", err) + d.clientCache.RemoveClientFor(addr) + } } - d.clients[ingester.Addr] = client - return client, nil } func tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) { @@ -451,7 +398,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe } func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, samples []*sampleTracker) error { - c, err := d.getClientFor(ingester) + c, err := d.clientCache.GetClientFor(ingester.Addr) if err != nil { return err } @@ -488,7 +435,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . metricNameMatcher, _, ok := util.ExtractMetricNameMatcherFromMatchers(matchers) - req, err := util.ToQueryRequest(from, to, matchers) + req, err := ingester_client.ToQueryRequest(from, to, matchers) if err != nil { return err } @@ -568,7 +515,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.Inge } func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *client.QueryRequest) (model.Matrix, error) { - client, err := d.getClientFor(ing) + client, err := d.clientCache.GetClientFor(ing.Addr) if err != nil { return nil, err } @@ -580,7 +527,7 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, return nil, err } - return util.FromQueryResponse(resp), nil + return ingester_client.FromQueryResponse(resp), nil } // forAllIngesters runs f, in parallel, for all ingesters @@ -589,7 +536,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{} ingesters := d.ring.GetAll() for _, ingester := range ingesters { go func(ingester *ring.IngesterDesc) { - client, err := d.getClientFor(ingester) + client, err := d.clientCache.GetClientFor(ingester.Addr) if err != nil { errs <- err return @@ -648,7 +595,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod // MetricsForLabelMatchers gets the metrics that match said matchers func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { - req, err := util.ToMetricsForLabelMatchersRequest(from, through, matchers) + req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers) if err != nil { return nil, err } @@ -662,7 +609,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through metrics := map[model.Fingerprint]model.Metric{} for _, resp := range resps { - ms := util.FromMetricsForLabelMatchersResponse(resp.(*client.MetricsForLabelMatchersResponse)) + ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*client.MetricsForLabelMatchersResponse)) for _, m := range ms { metrics[m.Fingerprint()] = m } @@ -716,7 +663,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) { // Not using d.forAllIngesters(), so we can fail after first error. ingesters := d.ring.GetAll() for _, ingester := range ingesters { - client, err := d.getClientFor(ingester) + client, err := d.clientCache.GetClientFor(ingester.Addr) if err != nil { return nil, err } @@ -775,6 +722,6 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( numClientsDesc, prometheus.GaugeValue, - float64(len(d.clients)), + float64(d.clientCache.Count()), ) } diff --git a/pkg/ingester/client/cache.go b/pkg/ingester/client/cache.go new file mode 100644 index 0000000000..0fa5b6948b --- /dev/null +++ b/pkg/ingester/client/cache.go @@ -0,0 +1,89 @@ +package client + +import ( + io "io" + "sync" + + "github.com/go-kit/kit/log/level" + "github.com/weaveworks/cortex/pkg/util" +) + +// Factory defines the signature for an ingester client factory +type Factory func(addr string, cfg Config) (IngesterClient, error) + +// IngesterClientCache holds a cache of ingester clients +type IngesterClientCache struct { + sync.RWMutex + clients map[string]IngesterClient + + ingesterClientFactory Factory + ingesterClientConfig Config +} + +// NewIngesterClientCache creates a new cache +func NewIngesterClientCache(factory Factory, config Config) *IngesterClientCache { + return &IngesterClientCache{ + clients: map[string]IngesterClient{}, + ingesterClientFactory: factory, + ingesterClientConfig: config, + } +} + +// GetClientFor gets the client for the specified address. If it does not exist it will make a new client +// at that address +func (cache *IngesterClientCache) GetClientFor(addr string) (IngesterClient, error) { + cache.RLock() + client, ok := cache.clients[addr] + cache.RUnlock() + if ok { + return client, nil + } + + cache.Lock() + defer cache.Unlock() + client, ok = cache.clients[addr] + if ok { + return client, nil + } + + client, err := cache.ingesterClientFactory(addr, cache.ingesterClientConfig) + if err != nil { + return nil, err + } + cache.clients[addr] = client + return client, nil +} + +// RemoveClientFor removes the client with the specified address +func (cache *IngesterClientCache) RemoveClientFor(addr string) { + cache.Lock() + defer cache.Unlock() + client, ok := cache.clients[addr] + if ok { + delete(cache.clients, addr) + // Close in the background since this operation may take awhile and we have a mutex + go func(addr string, closer io.Closer) { + if err := closer.Close(); err != nil { + level.Error(util.Logger).Log("msg", "error closing connection to ingester", "ingester", addr, "err", err) + } + }(addr, client.(io.Closer)) + } +} + +// RegisteredAddresses returns all the addresses that a client is cached for +func (cache *IngesterClientCache) RegisteredAddresses() []string { + result := []string{} + cache.RLock() + defer cache.RUnlock() + for addr := range cache.clients { + result = append(result, addr) + } + return result +} + +// Count returns how many clients are in the cache +func (cache *IngesterClientCache) Count() int { + cache.RLock() + defer cache.RUnlock() + return len(cache.clients) +} diff --git a/pkg/util/compat.go b/pkg/ingester/client/compat.go similarity index 66% rename from pkg/util/compat.go rename to pkg/ingester/client/compat.go index 04f41575a6..d4acda7aa1 100644 --- a/pkg/util/compat.go +++ b/pkg/ingester/client/compat.go @@ -1,15 +1,14 @@ -package util +package client import ( "fmt" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/cortex/pkg/ingester/client" ) // FromWriteRequest converts a WriteRequest proto into an array of samples. -func FromWriteRequest(req *client.WriteRequest) []model.Sample { +func FromWriteRequest(req *WriteRequest) []model.Sample { // Just guess that there is one sample per timeseries samples := make([]model.Sample, 0, len(req.Timeseries)) for _, ts := range req.Timeseries { @@ -25,15 +24,15 @@ func FromWriteRequest(req *client.WriteRequest) []model.Sample { } // ToWriteRequest converts an array of samples into a WriteRequest proto. -func ToWriteRequest(samples []model.Sample) *client.WriteRequest { - req := &client.WriteRequest{ - Timeseries: make([]client.TimeSeries, 0, len(samples)), +func ToWriteRequest(samples []model.Sample) *WriteRequest { + req := &WriteRequest{ + Timeseries: make([]TimeSeries, 0, len(samples)), } for _, s := range samples { - ts := client.TimeSeries{ + ts := TimeSeries{ Labels: ToLabelPairs(s.Metric), - Samples: []client.Sample{ + Samples: []Sample{ { Value: float64(s.Value), TimestampMs: int64(s.Timestamp), @@ -47,13 +46,13 @@ func ToWriteRequest(samples []model.Sample) *client.WriteRequest { } // ToQueryRequest builds a QueryRequest proto. -func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*client.QueryRequest, error) { +func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequest, error) { ms, err := toLabelMatchers(matchers) if err != nil { return nil, err } - return &client.QueryRequest{ + return &QueryRequest{ StartTimestampMs: int64(from), EndTimestampMs: int64(to), Matchers: ms, @@ -61,7 +60,7 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*client.Qu } // FromQueryRequest unpacks a QueryRequest proto. -func FromQueryRequest(req *client.QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) { +func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) { matchers, err := fromLabelMatchers(req.Matchers) if err != nil { return 0, 0, nil, err @@ -72,15 +71,15 @@ func FromQueryRequest(req *client.QueryRequest) (model.Time, model.Time, []*labe } // ToQueryResponse builds a QueryResponse proto. -func ToQueryResponse(matrix model.Matrix) *client.QueryResponse { - resp := &client.QueryResponse{} +func ToQueryResponse(matrix model.Matrix) *QueryResponse { + resp := &QueryResponse{} for _, ss := range matrix { - ts := client.TimeSeries{ + ts := TimeSeries{ Labels: ToLabelPairs(ss.Metric), - Samples: make([]client.Sample, 0, len(ss.Values)), + Samples: make([]Sample, 0, len(ss.Values)), } for _, s := range ss.Values { - ts.Samples = append(ts.Samples, client.Sample{ + ts.Samples = append(ts.Samples, Sample{ Value: float64(s.Value), TimestampMs: int64(s.Timestamp), }) @@ -91,7 +90,7 @@ func ToQueryResponse(matrix model.Matrix) *client.QueryResponse { } // FromQueryResponse unpacks a QueryResponse proto. -func FromQueryResponse(resp *client.QueryResponse) model.Matrix { +func FromQueryResponse(resp *QueryResponse) model.Matrix { m := make(model.Matrix, 0, len(resp.Timeseries)) for _, ts := range resp.Timeseries { var ss model.SampleStream @@ -110,21 +109,21 @@ func FromQueryResponse(resp *client.QueryResponse) model.Matrix { } // ToMetricsForLabelMatchersRequest builds a MetricsForLabelMatchersRequest proto -func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*client.MetricsForLabelMatchersRequest, error) { +func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) { ms, err := toLabelMatchers(matchers) if err != nil { return nil, err } - return &client.MetricsForLabelMatchersRequest{ + return &MetricsForLabelMatchersRequest{ StartTimestampMs: int64(from), EndTimestampMs: int64(to), - MatchersSet: []*client.LabelMatchers{{Matchers: ms}}, + MatchersSet: []*LabelMatchers{{Matchers: ms}}, }, nil } // FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto -func FromMetricsForLabelMatchersRequest(req *client.MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) { +func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) { matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet)) for _, matchers := range req.MatchersSet { matchers, err := fromLabelMatchers(matchers.Matchers) @@ -139,12 +138,12 @@ func FromMetricsForLabelMatchersRequest(req *client.MetricsForLabelMatchersReque } // ToMetricsForLabelMatchersResponse builds a MetricsForLabelMatchersResponse proto -func ToMetricsForLabelMatchersResponse(metrics []model.Metric) *client.MetricsForLabelMatchersResponse { - resp := &client.MetricsForLabelMatchersResponse{ - Metric: make([]*client.Metric, 0, len(metrics)), +func ToMetricsForLabelMatchersResponse(metrics []model.Metric) *MetricsForLabelMatchersResponse { + resp := &MetricsForLabelMatchersResponse{ + Metric: make([]*Metric, 0, len(metrics)), } for _, metric := range metrics { - resp.Metric = append(resp.Metric, &client.Metric{ + resp.Metric = append(resp.Metric, &Metric{ Labels: ToLabelPairs(metric), }) } @@ -152,7 +151,7 @@ func ToMetricsForLabelMatchersResponse(metrics []model.Metric) *client.MetricsFo } // FromMetricsForLabelMatchersResponse unpacks a MetricsForLabelMatchersResponse proto -func FromMetricsForLabelMatchersResponse(resp *client.MetricsForLabelMatchersResponse) []model.Metric { +func FromMetricsForLabelMatchersResponse(resp *MetricsForLabelMatchersResponse) []model.Metric { metrics := []model.Metric{} for _, m := range resp.Metric { metrics = append(metrics, FromLabelPairs(m.Labels)) @@ -160,23 +159,23 @@ func FromMetricsForLabelMatchersResponse(resp *client.MetricsForLabelMatchersRes return metrics } -func toLabelMatchers(matchers []*labels.Matcher) ([]*client.LabelMatcher, error) { - result := make([]*client.LabelMatcher, 0, len(matchers)) +func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) { + result := make([]*LabelMatcher, 0, len(matchers)) for _, matcher := range matchers { - var mType client.MatchType + var mType MatchType switch matcher.Type { case labels.MatchEqual: - mType = client.EQUAL + mType = EQUAL case labels.MatchNotEqual: - mType = client.NOT_EQUAL + mType = NOT_EQUAL case labels.MatchRegexp: - mType = client.REGEX_MATCH + mType = REGEX_MATCH case labels.MatchNotRegexp: - mType = client.REGEX_NO_MATCH + mType = REGEX_NO_MATCH default: return nil, fmt.Errorf("invalid matcher type") } - result = append(result, &client.LabelMatcher{ + result = append(result, &LabelMatcher{ Type: mType, Name: string(matcher.Name), Value: string(matcher.Value), @@ -185,18 +184,18 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*client.LabelMatcher, error) return result, nil } -func fromLabelMatchers(matchers []*client.LabelMatcher) ([]*labels.Matcher, error) { +func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { result := make([]*labels.Matcher, 0, len(matchers)) for _, matcher := range matchers { var mtype labels.MatchType switch matcher.Type { - case client.EQUAL: + case EQUAL: mtype = labels.MatchEqual - case client.NOT_EQUAL: + case NOT_EQUAL: mtype = labels.MatchNotEqual - case client.REGEX_MATCH: + case REGEX_MATCH: mtype = labels.MatchRegexp - case client.REGEX_NO_MATCH: + case REGEX_NO_MATCH: mtype = labels.MatchNotRegexp default: return nil, fmt.Errorf("invalid matcher type") @@ -210,11 +209,11 @@ func fromLabelMatchers(matchers []*client.LabelMatcher) ([]*labels.Matcher, erro return result, nil } -// ToLabelPairs builds a []client.LabelPair from a model.Metric -func ToLabelPairs(metric model.Metric) []client.LabelPair { - labelPairs := make([]client.LabelPair, 0, len(metric)) +// ToLabelPairs builds a []LabelPair from a model.Metric +func ToLabelPairs(metric model.Metric) []LabelPair { + labelPairs := make([]LabelPair, 0, len(metric)) for k, v := range metric { - labelPairs = append(labelPairs, client.LabelPair{ + labelPairs = append(labelPairs, LabelPair{ Name: []byte(k), Value: []byte(v), }) @@ -222,8 +221,8 @@ func ToLabelPairs(metric model.Metric) []client.LabelPair { return labelPairs } -// FromLabelPairs unpack a []client.LabelPair to a model.Metric -func FromLabelPairs(labelPairs []client.LabelPair) model.Metric { +// FromLabelPairs unpack a []LabelPair to a model.Metric +func FromLabelPairs(labelPairs []LabelPair) model.Metric { metric := make(model.Metric, len(labelPairs)) for _, l := range labelPairs { metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) diff --git a/pkg/util/compat_test.go b/pkg/ingester/client/compat_test.go similarity index 99% rename from pkg/util/compat_test.go rename to pkg/ingester/client/compat_test.go index 2e6d24c577..5bdb04c4f2 100644 --- a/pkg/util/compat_test.go +++ b/pkg/ingester/client/compat_test.go @@ -1,4 +1,4 @@ -package util +package client import ( "fmt" diff --git a/pkg/ingester/client/healthcheck.go b/pkg/ingester/client/healthcheck.go new file mode 100644 index 0000000000..0cc022fea1 --- /dev/null +++ b/pkg/ingester/client/healthcheck.go @@ -0,0 +1,26 @@ +package client + +import ( + fmt "fmt" + "time" + + "github.com/weaveworks/common/user" + "golang.org/x/net/context" + "google.golang.org/grpc/health/grpc_health_v1" +) + +// HealthCheck will check if the client is still healthy, returning an error if it is not +func HealthCheck(client IngesterClient, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ctx = user.InjectOrgID(ctx, "0") + + resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return err + } + if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return fmt.Errorf("Failing healthcheck status: %s", resp.Status) + } + return nil +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ccdea8db91..7f95e7cdde 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -297,7 +297,7 @@ func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) { // Push implements client.IngesterServer func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) { var lastPartialErr error - samples := util.FromWriteRequest(req) + samples := client.FromWriteRequest(req) samples: for j := range samples { @@ -366,7 +366,7 @@ func (i *Ingester) append(ctx context.Context, sample *model.Sample) error { // Query implements service.IngesterServer func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client.QueryResponse, error) { - start, end, matchers, err := util.FromQueryRequest(req) + start, end, matchers, err := client.FromQueryRequest(req) if err != nil { return nil, err } @@ -376,7 +376,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client return nil, err } - return util.ToQueryResponse(matrix), nil + return client.ToQueryResponse(matrix), nil } func (i *Ingester) query(ctx context.Context, from, through model.Time, matchers []*labels.Matcher) (model.Matrix, error) { @@ -435,7 +435,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr } // TODO Right now we ignore start and end. - _, _, matchersSet, err := util.FromMetricsForLabelMatchersRequest(req) + _, _, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req) if err != nil { return nil, err } @@ -457,7 +457,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr result = append(result, metric) } - return util.ToMetricsForLabelMatchersResponse(result), nil + return client.ToMetricsForLabelMatchersResponse(result), nil } // UserStats returns ingestion statistics for the current user. diff --git a/pkg/ingester/ingester_claim.go b/pkg/ingester/ingester_claim.go index 118e1c02f7..28e703950c 100644 --- a/pkg/ingester/ingester_claim.go +++ b/pkg/ingester/ingester_claim.go @@ -75,7 +75,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e fromIngesterID = wireSeries.FromIngesterId level.Info(util.Logger).Log("msg", "processing TransferChunks request from ingester", "ingester", fromIngesterID) } - metric := util.FromLabelPairs(wireSeries.Labels) + metric := client.FromLabelPairs(wireSeries.Labels) userCtx := user.InjectOrgID(stream.Context(), wireSeries.UserId) descs, err := fromWireChunks(wireSeries.Chunks) if err != nil { diff --git a/pkg/ingester/ingester_lifecycle.go b/pkg/ingester/ingester_lifecycle.go index e82e77c43e..1c61e7ffd9 100644 --- a/pkg/ingester/ingester_lifecycle.go +++ b/pkg/ingester/ingester_lifecycle.go @@ -384,7 +384,7 @@ func (i *Ingester) transferChunks() error { err = stream.Send(&client.TimeSeriesChunk{ FromIngesterId: i.id, UserId: userID, - Labels: util.ToLabelPairs(pair.series.metric), + Labels: client.ToLabelPairs(pair.series.metric), Chunks: chunks, }) state.fpLocker.Unlock(pair.fp) diff --git a/pkg/ingester/ingester_lifecycle_test.go b/pkg/ingester/ingester_lifecycle_test.go index 58ffbf25b2..6248f1b323 100644 --- a/pkg/ingester/ingester_lifecycle_test.go +++ b/pkg/ingester/ingester_lifecycle_test.go @@ -100,7 +100,7 @@ func TestIngesterTransfer(t *testing.T) { } ) ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing1.Push(ctx, util.ToWriteRequest([]model.Sample{ + _, err = ing1.Push(ctx, client.ToWriteRequest([]model.Sample{ { Metric: m, Timestamp: ts, @@ -131,7 +131,7 @@ func TestIngesterTransfer(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") require.NoError(t, err) - request, err := util.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher}) + request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher}) require.NoError(t, err) response, err := ing2.Query(ctx, request) @@ -139,7 +139,7 @@ func TestIngesterTransfer(t *testing.T) { assert.Equal(t, &client.QueryResponse{ Timeseries: []client.TimeSeries{ { - Labels: util.ToLabelPairs(m), + Labels: client.ToLabelPairs(m), Samples: []client.Sample{ { Value: 456, @@ -286,7 +286,7 @@ func TestIngesterFlush(t *testing.T) { } ) ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{ + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{ { Metric: m, Timestamp: ts, diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index e8cd810bfc..a1a2d31469 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -18,6 +18,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/ingester/client" "github.com/weaveworks/cortex/pkg/util" ) @@ -130,7 +131,7 @@ func TestIngesterAppend(t *testing.T) { // Append samples. for _, userID := range userIDs { ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing.Push(ctx, util.ToWriteRequest(matrixToSamples(testData[userID]))) + _, err = ing.Push(ctx, client.ToWriteRequest(matrixToSamples(testData[userID]))) require.NoError(t, err) } @@ -140,13 +141,13 @@ func TestIngesterAppend(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchRegexp, model.JobLabel, ".+") require.NoError(t, err) - req, err := util.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) + req, err := client.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) require.NoError(t, err) resp, err := ing.Query(ctx, req) require.NoError(t, err) - res := util.FromQueryResponse(resp) + res := client.FromQueryResponse(resp) sort.Sort(res) assert.Equal(t, testData[userID], res) } @@ -218,11 +219,11 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) { // Append only one series first, expect no error. ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample1})) + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{sample1})) require.NoError(t, err) // Append to two series, expect series-exceeded error. - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample2, sample3})) + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{sample2, sample3})) if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests { t.Fatalf("expected error about exceeding metrics per user, got %v", err) } @@ -231,13 +232,13 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric") require.NoError(t, err) - req, err := util.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) + req, err := client.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) require.NoError(t, err) resp, err := ing.Query(ctx, req) require.NoError(t, err) - res := util.FromQueryResponse(resp) + res := client.FromQueryResponse(resp) sort.Sort(res) expected := model.Matrix{ @@ -289,11 +290,11 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) { // Append only one series first, expect no error. ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample1})) + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{sample1})) require.NoError(t, err) // Append to two series, expect series-exceeded error. - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample2, sample3})) + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{sample2, sample3})) if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests { t.Fatalf("expected error about exceeding series per metric, got %v", err) } @@ -302,13 +303,13 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric") require.NoError(t, err) - req, err := util.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) + req, err := client.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) require.NoError(t, err) resp, err := ing.Query(ctx, req) require.NoError(t, err) - res := util.FromQueryResponse(resp) + res := client.FromQueryResponse(resp) sort.Sort(res) expected := model.Matrix{ @@ -360,11 +361,11 @@ func TestIngesterRejectOldSamples(t *testing.T) { // Append recent sample, expect no error. userID := "1" ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample1})) + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{sample1})) require.NoError(t, err) // Append old sample, expect bad request error. - _, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample2, sample3})) + _, err = ing.Push(ctx, client.ToWriteRequest([]model.Sample{sample2, sample3})) if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusBadRequest { t.Fatalf("expected error about old samples not accepted, got %v", err) } @@ -373,13 +374,13 @@ func TestIngesterRejectOldSamples(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "testmetric") require.NoError(t, err) - req, err := util.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) + req, err := client.ToQueryRequest(model.Earliest, model.Latest, []*labels.Matcher{matcher}) require.NoError(t, err) resp, err := ing.Query(ctx, req) require.NoError(t, err) - res := util.FromQueryResponse(resp) + res := client.FromQueryResponse(resp) sort.Sort(res) // Expect recent sample including partial but no old sample. diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f04d120a4a..cb5859d13a 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -143,7 +143,7 @@ func (q MergeQueryable) RemoteReadHandler(w http.ResponseWriter, r *http.Request errors := make(chan error) for i, qr := range req.Queries { go func(i int, qr *client.QueryRequest) { - from, to, matchers, err := util.FromQueryRequest(qr) + from, to, matchers, err := client.FromQueryRequest(qr) if err != nil { errors <- err return @@ -161,7 +161,7 @@ func (q MergeQueryable) RemoteReadHandler(w http.ResponseWriter, r *http.Request return } - resp.Results[i] = util.ToQueryResponse(matrix) + resp.Results[i] = client.ToQueryResponse(matrix) errors <- nil }(i, qr) } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 12b5beae2d..476a8fe66e 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/weaveworks/cortex/pkg/ingester/client" - "github.com/weaveworks/cortex/pkg/util" ) // Pusher is an ingester server that accepts pushes. @@ -52,7 +51,7 @@ func (a *appendableAppender) AddFast(l labels.Labels, ref uint64, t int64, v flo } func (a *appendableAppender) Commit() error { - if _, err := a.pusher.Push(a.ctx, util.ToWriteRequest(a.samples)); err != nil { + if _, err := a.pusher.Push(a.ctx, client.ToWriteRequest(a.samples)); err != nil { return err } a.samples = nil From 41475c6cd0a6d2cb4dac618f40c86e8780f00bb5 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Tue, 13 Mar 2018 18:08:38 -0600 Subject: [PATCH 7/9] Add some cache and healtcheck tests --- pkg/ingester/client/cache_test.go | 62 +++++++++++++++++++++++++ pkg/ingester/client/healthcheck_test.go | 45 ++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 pkg/ingester/client/cache_test.go create mode 100644 pkg/ingester/client/healthcheck_test.go diff --git a/pkg/ingester/client/cache_test.go b/pkg/ingester/client/cache_test.go new file mode 100644 index 0000000000..a7b14c2f3e --- /dev/null +++ b/pkg/ingester/client/cache_test.go @@ -0,0 +1,62 @@ +package client + +import ( + fmt "fmt" + "testing" + + "google.golang.org/grpc/health/grpc_health_v1" +) + +func (i mockIngester) Close() error { + return nil +} + +func TestIngesterCache(t *testing.T) { + buildCount := 0 + factory := func(addr string, _ Config) (IngesterClient, error) { + if addr == "bad" { + return nil, fmt.Errorf("Fail") + } + buildCount++ + return mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, nil + } + cache := NewIngesterClientCache(factory, Config{}) + + cache.GetClientFor("1") + if buildCount != 1 { + t.Errorf("Did not create client") + } + + cache.GetClientFor("1") + if buildCount != 1 { + t.Errorf("Created client that should have been cached") + } + + cache.GetClientFor("2") + if cache.Count() != 2 { + t.Errorf("Expected Count() = 2, got %d", cache.Count()) + } + + cache.RemoveClientFor("1") + if cache.Count() != 1 { + t.Errorf("Expected Count() = 1, got %d", cache.Count()) + } + + cache.GetClientFor("1") + if buildCount != 3 || cache.Count() != 2 { + t.Errorf("Did not re-create client correctly") + } + + _, err := cache.GetClientFor("bad") + if err == nil { + t.Errorf("Bad create should have thrown an error") + } + if cache.Count() != 2 { + t.Errorf("Bad create should not have been added to cache") + } + + addrs := cache.RegisteredAddresses() + if len(addrs) != cache.Count() { + t.Errorf("Lengths of registered addresses and cache.Count() do not match") + } +} diff --git a/pkg/ingester/client/healthcheck_test.go b/pkg/ingester/client/healthcheck_test.go new file mode 100644 index 0000000000..0885dcce84 --- /dev/null +++ b/pkg/ingester/client/healthcheck_test.go @@ -0,0 +1,45 @@ +package client + +import ( + fmt "fmt" + "testing" + "time" + + "golang.org/x/net/context" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" +) + +type mockIngester struct { + IngesterClient + happy bool + status grpc_health_v1.HealthCheckResponse_ServingStatus +} + +func (i mockIngester) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + if !i.happy { + return nil, fmt.Errorf("Fail") + } + return &grpc_health_v1.HealthCheckResponse{Status: i.status}, nil +} + +func TestHealthCheck(t *testing.T) { + tcs := []struct { + ingester mockIngester + hasError bool + }{ + {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, true}, + {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, false}, + {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, + {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, true}, + {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_SERVING}, true}, + {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, + } + for _, tc := range tcs { + err := HealthCheck(tc.ingester, 50*time.Millisecond) + hasError := err != nil + if hasError != tc.hasError { + t.Errorf("Expected error: %t, error: %v", tc.hasError, err) + } + } +} From 343da84c2061b4ddbe9e3868296944af9c80e504 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 16 Mar 2018 13:25:58 -0600 Subject: [PATCH 8/9] Renaming cache -> pool --- pkg/distributor/distributor.go | 36 ++++++------- pkg/ingester/client/{cache.go => pool.go} | 54 +++++++++---------- .../client/{cache_test.go => pool_test.go} | 2 +- 3 files changed, 46 insertions(+), 46 deletions(-) rename pkg/ingester/client/{cache.go => pool.go} (57%) rename pkg/ingester/client/{cache_test.go => pool_test.go} (96%) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index a2b11564a9..b3740594f6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -43,12 +43,12 @@ var ( // Distributor is a storage.SampleAppender and a client.Querier which // forwards appends and queries to individual ingesters. type Distributor struct { - cfg Config - ring ring.ReadRing - clientsMtx sync.RWMutex - clientCache *ingester_client.IngesterClientCache - quit chan struct{} - done chan struct{} + cfg Config + ring ring.ReadRing + clientsMtx sync.RWMutex + ingesterPool *ingester_client.IngesterPool + quit chan struct{} + done chan struct{} billingClient *billing.Client @@ -93,7 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") - flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during the cleanup period.") + flag.BoolVar(&cfg.HealthCheckIngesters, "distributor.health-check-ingesters", false, "Run a health check on each ingester client during periodic cleanup.") } // New constructs a new Distributor @@ -117,7 +117,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { d := &Distributor{ cfg: cfg, ring: ring, - clientCache: ingester_client.NewIngesterClientCache(cfg.ingesterClientFactory, cfg.IngesterClientConfig), + ingesterPool: ingester_client.NewIngesterPool(cfg.ingesterClientFactory, cfg.IngesterClientConfig), quit: make(chan struct{}), done: make(chan struct{}), billingClient: billingClient, @@ -193,18 +193,18 @@ func (d *Distributor) removeStaleIngesterClients() { ingesters[ing.Addr] = struct{}{} } - for _, addr := range d.clientCache.RegisteredAddresses() { + for _, addr := range d.ingesterPool.RegisteredAddresses() { if _, ok := ingesters[addr]; ok { continue } level.Info(util.Logger).Log("msg", "removing stale ingester client", "addr", addr) - d.clientCache.RemoveClientFor(addr) + d.ingesterPool.RemoveClientFor(addr) } } func (d *Distributor) healthCheckAndRemoveIngesters() { - for _, addr := range d.clientCache.RegisteredAddresses() { - client, err := d.clientCache.GetClientFor(addr) + for _, addr := range d.ingesterPool.RegisteredAddresses() { + client, err := d.ingesterPool.GetClientFor(addr) if err != nil { // if there is no client, don't need to health check it level.Warn(util.Logger).Log("msg", "could not create client for", "addr", addr) @@ -213,7 +213,7 @@ func (d *Distributor) healthCheckAndRemoveIngesters() { err = ingester_client.HealthCheck(client, d.cfg.RemoteTimeout) if err != nil { level.Warn(util.Logger).Log("msg", "removing ingester failing healtcheck", "addr", addr, "reason", err) - d.clientCache.RemoveClientFor(addr) + d.ingesterPool.RemoveClientFor(addr) } } } @@ -398,7 +398,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe } func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, samples []*sampleTracker) error { - c, err := d.clientCache.GetClientFor(ingester.Addr) + c, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return err } @@ -515,7 +515,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.Inge } func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *client.QueryRequest) (model.Matrix, error) { - client, err := d.clientCache.GetClientFor(ing.Addr) + client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err } @@ -536,7 +536,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{} ingesters := d.ring.GetAll() for _, ingester := range ingesters { go func(ingester *ring.IngesterDesc) { - client, err := d.clientCache.GetClientFor(ingester.Addr) + client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { errs <- err return @@ -663,7 +663,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) { // Not using d.forAllIngesters(), so we can fail after first error. ingesters := d.ring.GetAll() for _, ingester := range ingesters { - client, err := d.clientCache.GetClientFor(ingester.Addr) + client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return nil, err } @@ -722,6 +722,6 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( numClientsDesc, prometheus.GaugeValue, - float64(d.clientCache.Count()), + float64(d.ingesterPool.Count()), ) } diff --git a/pkg/ingester/client/cache.go b/pkg/ingester/client/pool.go similarity index 57% rename from pkg/ingester/client/cache.go rename to pkg/ingester/client/pool.go index 0fa5b6948b..f281600a61 100644 --- a/pkg/ingester/client/cache.go +++ b/pkg/ingester/client/pool.go @@ -11,8 +11,8 @@ import ( // Factory defines the signature for an ingester client factory type Factory func(addr string, cfg Config) (IngesterClient, error) -// IngesterClientCache holds a cache of ingester clients -type IngesterClientCache struct { +// IngesterPool holds a cache of ingester clients +type IngesterPool struct { sync.RWMutex clients map[string]IngesterClient @@ -20,9 +20,9 @@ type IngesterClientCache struct { ingesterClientConfig Config } -// NewIngesterClientCache creates a new cache -func NewIngesterClientCache(factory Factory, config Config) *IngesterClientCache { - return &IngesterClientCache{ +// NewIngesterPool creates a new cache +func NewIngesterPool(factory Factory, config Config) *IngesterPool { + return &IngesterPool{ clients: map[string]IngesterClient{}, ingesterClientFactory: factory, ingesterClientConfig: config, @@ -31,36 +31,36 @@ func NewIngesterClientCache(factory Factory, config Config) *IngesterClientCache // GetClientFor gets the client for the specified address. If it does not exist it will make a new client // at that address -func (cache *IngesterClientCache) GetClientFor(addr string) (IngesterClient, error) { - cache.RLock() - client, ok := cache.clients[addr] - cache.RUnlock() +func (pool *IngesterPool) GetClientFor(addr string) (IngesterClient, error) { + pool.RLock() + client, ok := pool.clients[addr] + pool.RUnlock() if ok { return client, nil } - cache.Lock() - defer cache.Unlock() - client, ok = cache.clients[addr] + pool.Lock() + defer pool.Unlock() + client, ok = pool.clients[addr] if ok { return client, nil } - client, err := cache.ingesterClientFactory(addr, cache.ingesterClientConfig) + client, err := pool.ingesterClientFactory(addr, pool.ingesterClientConfig) if err != nil { return nil, err } - cache.clients[addr] = client + pool.clients[addr] = client return client, nil } // RemoveClientFor removes the client with the specified address -func (cache *IngesterClientCache) RemoveClientFor(addr string) { - cache.Lock() - defer cache.Unlock() - client, ok := cache.clients[addr] +func (pool *IngesterPool) RemoveClientFor(addr string) { + pool.Lock() + defer pool.Unlock() + client, ok := pool.clients[addr] if ok { - delete(cache.clients, addr) + delete(pool.clients, addr) // Close in the background since this operation may take awhile and we have a mutex go func(addr string, closer io.Closer) { if err := closer.Close(); err != nil { @@ -71,19 +71,19 @@ func (cache *IngesterClientCache) RemoveClientFor(addr string) { } // RegisteredAddresses returns all the addresses that a client is cached for -func (cache *IngesterClientCache) RegisteredAddresses() []string { +func (pool *IngesterPool) RegisteredAddresses() []string { result := []string{} - cache.RLock() - defer cache.RUnlock() - for addr := range cache.clients { + pool.RLock() + defer pool.RUnlock() + for addr := range pool.clients { result = append(result, addr) } return result } // Count returns how many clients are in the cache -func (cache *IngesterClientCache) Count() int { - cache.RLock() - defer cache.RUnlock() - return len(cache.clients) +func (pool *IngesterPool) Count() int { + pool.RLock() + defer pool.RUnlock() + return len(pool.clients) } diff --git a/pkg/ingester/client/cache_test.go b/pkg/ingester/client/pool_test.go similarity index 96% rename from pkg/ingester/client/cache_test.go rename to pkg/ingester/client/pool_test.go index a7b14c2f3e..b37f087b2d 100644 --- a/pkg/ingester/client/cache_test.go +++ b/pkg/ingester/client/pool_test.go @@ -20,7 +20,7 @@ func TestIngesterCache(t *testing.T) { buildCount++ return mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } - cache := NewIngesterClientCache(factory, Config{}) + cache := NewIngesterPool(factory, Config{}) cache.GetClientFor("1") if buildCount != 1 { From 72930ea5e3c2aec6d7dc95ce0623edf8ae8e3644 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 16 Mar 2018 15:15:32 -0600 Subject: [PATCH 9/9] Move CleanUnhealthy to the ingester client package, add tests --- pkg/distributor/distributor.go | 20 +----- pkg/ingester/client/healthcheck.go | 26 ------- pkg/ingester/client/healthcheck_test.go | 45 ------------ pkg/ingester/client/pool.go | 51 ++++++++++++-- pkg/ingester/client/pool_test.go | 93 +++++++++++++++++++++---- 5 files changed, 127 insertions(+), 108 deletions(-) delete mode 100644 pkg/ingester/client/healthcheck.go delete mode 100644 pkg/ingester/client/healthcheck_test.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b3740594f6..9e2efae532 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -117,7 +117,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { d := &Distributor{ cfg: cfg, ring: ring, - ingesterPool: ingester_client.NewIngesterPool(cfg.ingesterClientFactory, cfg.IngesterClientConfig), + ingesterPool: ingester_client.NewIngesterPool(cfg.ingesterClientFactory, cfg.IngesterClientConfig, cfg.RemoteTimeout), quit: make(chan struct{}), done: make(chan struct{}), billingClient: billingClient, @@ -172,7 +172,7 @@ func (d *Distributor) Run() { case <-cleanupClients.C: d.removeStaleIngesterClients() if d.cfg.HealthCheckIngesters { - d.healthCheckAndRemoveIngesters() + d.ingesterPool.CleanUnhealthy() } case <-d.quit: close(d.done) @@ -202,22 +202,6 @@ func (d *Distributor) removeStaleIngesterClients() { } } -func (d *Distributor) healthCheckAndRemoveIngesters() { - for _, addr := range d.ingesterPool.RegisteredAddresses() { - client, err := d.ingesterPool.GetClientFor(addr) - if err != nil { - // if there is no client, don't need to health check it - level.Warn(util.Logger).Log("msg", "could not create client for", "addr", addr) - continue - } - err = ingester_client.HealthCheck(client, d.cfg.RemoteTimeout) - if err != nil { - level.Warn(util.Logger).Log("msg", "removing ingester failing healtcheck", "addr", addr, "reason", err) - d.ingesterPool.RemoveClientFor(addr) - } - } -} - func tokenForLabels(userID string, labels []client.LabelPair) (uint32, error) { for _, label := range labels { if label.Name.Equal(labelNameBytes) { diff --git a/pkg/ingester/client/healthcheck.go b/pkg/ingester/client/healthcheck.go deleted file mode 100644 index 0cc022fea1..0000000000 --- a/pkg/ingester/client/healthcheck.go +++ /dev/null @@ -1,26 +0,0 @@ -package client - -import ( - fmt "fmt" - "time" - - "github.com/weaveworks/common/user" - "golang.org/x/net/context" - "google.golang.org/grpc/health/grpc_health_v1" -) - -// HealthCheck will check if the client is still healthy, returning an error if it is not -func HealthCheck(client IngesterClient, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - ctx = user.InjectOrgID(ctx, "0") - - resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - if err != nil { - return err - } - if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { - return fmt.Errorf("Failing healthcheck status: %s", resp.Status) - } - return nil -} diff --git a/pkg/ingester/client/healthcheck_test.go b/pkg/ingester/client/healthcheck_test.go deleted file mode 100644 index 0885dcce84..0000000000 --- a/pkg/ingester/client/healthcheck_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package client - -import ( - fmt "fmt" - "testing" - "time" - - "golang.org/x/net/context" - grpc "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" -) - -type mockIngester struct { - IngesterClient - happy bool - status grpc_health_v1.HealthCheckResponse_ServingStatus -} - -func (i mockIngester) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { - if !i.happy { - return nil, fmt.Errorf("Fail") - } - return &grpc_health_v1.HealthCheckResponse{Status: i.status}, nil -} - -func TestHealthCheck(t *testing.T) { - tcs := []struct { - ingester mockIngester - hasError bool - }{ - {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, true}, - {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, false}, - {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, - {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, true}, - {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_SERVING}, true}, - {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, - } - for _, tc := range tcs { - err := HealthCheck(tc.ingester, 50*time.Millisecond) - hasError := err != nil - if hasError != tc.hasError { - t.Errorf("Expected error: %t, error: %v", tc.hasError, err) - } - } -} diff --git a/pkg/ingester/client/pool.go b/pkg/ingester/client/pool.go index f281600a61..d81749c1ad 100644 --- a/pkg/ingester/client/pool.go +++ b/pkg/ingester/client/pool.go @@ -1,11 +1,16 @@ package client import ( + fmt "fmt" io "io" "sync" + "time" "github.com/go-kit/kit/log/level" + "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/util" + context "golang.org/x/net/context" + grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1" ) // Factory defines the signature for an ingester client factory @@ -18,23 +23,30 @@ type IngesterPool struct { ingesterClientFactory Factory ingesterClientConfig Config + healthCheckTimeout time.Duration } // NewIngesterPool creates a new cache -func NewIngesterPool(factory Factory, config Config) *IngesterPool { +func NewIngesterPool(factory Factory, config Config, healthCheckTimeout time.Duration) *IngesterPool { return &IngesterPool{ clients: map[string]IngesterClient{}, ingesterClientFactory: factory, ingesterClientConfig: config, + healthCheckTimeout: healthCheckTimeout, } } +func (pool *IngesterPool) fromCache(addr string) (IngesterClient, bool) { + pool.RLock() + defer pool.RUnlock() + client, ok := pool.clients[addr] + return client, ok +} + // GetClientFor gets the client for the specified address. If it does not exist it will make a new client // at that address func (pool *IngesterPool) GetClientFor(addr string) (IngesterClient, error) { - pool.RLock() - client, ok := pool.clients[addr] - pool.RUnlock() + client, ok := pool.fromCache(addr) if ok { return client, nil } @@ -87,3 +99,34 @@ func (pool *IngesterPool) Count() int { defer pool.RUnlock() return len(pool.clients) } + +// CleanUnhealthy loops through all ingesters and deletes any that fails a healtcheck. +func (pool *IngesterPool) CleanUnhealthy() { + for _, addr := range pool.RegisteredAddresses() { + client, ok := pool.fromCache(addr) + // not ok means someone removed a client between the start of this loop and now + if ok { + err := healthCheck(client, pool.healthCheckTimeout) + if err != nil { + level.Warn(util.Logger).Log("msg", "removing ingester failing healtcheck", "addr", addr, "reason", err) + pool.RemoveClientFor(addr) + } + } + } +} + +// healthCheck will check if the client is still healthy, returning an error if it is not +func healthCheck(client IngesterClient, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ctx = user.InjectOrgID(ctx, "0") + + resp, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return err + } + if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return fmt.Errorf("Failing healthcheck status: %s", resp.Status) + } + return nil +} diff --git a/pkg/ingester/client/pool_test.go b/pkg/ingester/client/pool_test.go index b37f087b2d..5e18a2c382 100644 --- a/pkg/ingester/client/pool_test.go +++ b/pkg/ingester/client/pool_test.go @@ -3,14 +3,51 @@ package client import ( fmt "fmt" "testing" + "time" + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) +type mockIngester struct { + IngesterClient + happy bool + status grpc_health_v1.HealthCheckResponse_ServingStatus +} + +func (i mockIngester) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) { + if !i.happy { + return nil, fmt.Errorf("Fail") + } + return &grpc_health_v1.HealthCheckResponse{Status: i.status}, nil +} + func (i mockIngester) Close() error { return nil } +func TestHealthCheck(t *testing.T) { + tcs := []struct { + ingester mockIngester + hasError bool + }{ + {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, true}, + {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, false}, + {mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, + {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_UNKNOWN}, true}, + {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_SERVING}, true}, + {mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, true}, + } + for _, tc := range tcs { + err := healthCheck(tc.ingester, 50*time.Millisecond) + hasError := err != nil + if hasError != tc.hasError { + t.Errorf("Expected error: %t, error: %v", tc.hasError, err) + } + } +} + func TestIngesterCache(t *testing.T) { buildCount := 0 factory := func(addr string, _ Config) (IngesterClient, error) { @@ -20,43 +57,69 @@ func TestIngesterCache(t *testing.T) { buildCount++ return mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } - cache := NewIngesterPool(factory, Config{}) + pool := NewIngesterPool(factory, Config{}, 50*time.Millisecond) - cache.GetClientFor("1") + pool.GetClientFor("1") if buildCount != 1 { t.Errorf("Did not create client") } - cache.GetClientFor("1") + pool.GetClientFor("1") if buildCount != 1 { t.Errorf("Created client that should have been cached") } - cache.GetClientFor("2") - if cache.Count() != 2 { - t.Errorf("Expected Count() = 2, got %d", cache.Count()) + pool.GetClientFor("2") + if pool.Count() != 2 { + t.Errorf("Expected Count() = 2, got %d", pool.Count()) } - cache.RemoveClientFor("1") - if cache.Count() != 1 { - t.Errorf("Expected Count() = 1, got %d", cache.Count()) + pool.RemoveClientFor("1") + if pool.Count() != 1 { + t.Errorf("Expected Count() = 1, got %d", pool.Count()) } - cache.GetClientFor("1") - if buildCount != 3 || cache.Count() != 2 { + pool.GetClientFor("1") + if buildCount != 3 || pool.Count() != 2 { t.Errorf("Did not re-create client correctly") } - _, err := cache.GetClientFor("bad") + _, err := pool.GetClientFor("bad") if err == nil { t.Errorf("Bad create should have thrown an error") } - if cache.Count() != 2 { + if pool.Count() != 2 { t.Errorf("Bad create should not have been added to cache") } - addrs := cache.RegisteredAddresses() - if len(addrs) != cache.Count() { + addrs := pool.RegisteredAddresses() + if len(addrs) != pool.Count() { t.Errorf("Lengths of registered addresses and cache.Count() do not match") } } + +func TestCleanUnhealthy(t *testing.T) { + goodAddrs := []string{"good1", "good2"} + badAddrs := []string{"bad1", "bad2"} + clients := map[string]IngesterClient{} + for _, addr := range goodAddrs { + clients[addr] = mockIngester{happy: true, status: grpc_health_v1.HealthCheckResponse_SERVING} + } + for _, addr := range badAddrs { + clients[addr] = mockIngester{happy: false, status: grpc_health_v1.HealthCheckResponse_NOT_SERVING} + } + pool := &IngesterPool{ + clients: clients, + } + pool.CleanUnhealthy() + for _, addr := range badAddrs { + if _, ok := pool.clients[addr]; ok { + t.Errorf("Found bad ingester after clean: %s\n", addr) + } + } + for _, addr := range goodAddrs { + if _, ok := pool.clients[addr]; !ok { + t.Errorf("Could not find good ingester after clean: %s\n", addr) + } + } +}