From 1410808ee9f20917476fabaa78aa8849ba7c7d20 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 30 Nov 2022 16:04:40 +0530 Subject: [PATCH 1/5] use grpc for communicating with compactor for query time filtering of data requested for deletion (#7804) **What this PR does / why we need it**: Add grpc support to compactor for getting delete requests and gen number for query time filtering. Since these requests are internal to Loki, it would be good to use grpc instead of HTTP same as all the internal requests we do in Loki. I have added a new config for accepting the grpc address of the compactor. I tried having just the existing config and detecting if it is a grpc server, but it was hard to do it reliably, considering the different deployment modes we support. I think it is safe to keep it the same and eventually deprecate the existing config. **Checklist** - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated --- CHANGELOG.md | 1 + docs/sources/configuration/_index.md | 4 + pkg/loki/common/common.go | 4 + pkg/loki/loki.go | 53 +- pkg/loki/modules.go | 71 +- .../queryrangebase/results_cache.go | 1 + .../indexshipper/compactor/client/grpc.go | 100 ++ .../compactor/client/grpc/grpc.pb.go | 1549 +++++++++++++++++ .../compactor/client/grpc/grpc.proto | 29 + .../indexshipper/compactor/client/http.go | 148 ++ .../indexshipper/compactor/compactor.go | 29 +- .../compactor/compactor_client.go | 42 - .../deletion/delete_requests_client.go | 90 +- .../deletion/delete_requests_client_test.go | 59 +- .../deletion/delete_requests_manager_test.go | 6 + .../deletion/grpc_request_handler.go | 94 + .../deletion/grpc_request_handler_test.go | 223 +++ .../compactor/deletion/metrics.go | 12 +- .../generationnumber/gennumber_client.go | 81 - .../generationnumber/gennumber_client_test.go | 40 - .../generationnumber/gennumber_loader.go | 9 + .../generationnumber/gennumber_loader_test.go | 2 + 22 files changed, 2321 insertions(+), 326 deletions(-) create mode 100644 pkg/storage/stores/indexshipper/compactor/client/grpc.go create mode 100644 pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go create mode 100644 pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto create mode 100644 pkg/storage/stores/indexshipper/compactor/client/http.go delete mode 100644 pkg/storage/stores/indexshipper/compactor/compactor_client.go create mode 100644 pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go create mode 100644 pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go delete mode 100644 pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go delete mode 100644 pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 45b2e7b73a5e..999f88e60f09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [7731](https://github.com/grafana/loki/pull/7731) **bitkill**: Add healthchecks to the docker-compose example. * [7759](https://github.com/grafana/loki/pull/7759) **kavirajk**: Improve error message for loading config with ENV variables. * [7785](https://github.com/grafana/loki/pull/7785) **dannykopping**: Add query blocker for queries and rules. +* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion. ##### Fixes diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index da5d39a76eb5..43e052a13416 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2911,6 +2911,10 @@ This way, one doesn't have to replicate configuration in multiple places. # CLI flag: -common.compactor-address [compactor_address: | default = ""] +# Address and port number where the compactor grpc requests are being served. +# CLI flag: -common.compactor-grpc-address +[compactor_grpc_address: | default = ""] + ## analytics The `analytics` block configures the reporting of Loki analytics to grafana.com. diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 721ffed9d12e..85173444f3ad 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -43,6 +43,9 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` + + // CompactorAddress is the grpc address of the compactor in the form host:port + CompactorGRPCAddress string `yaml:"compactor_grpc_address"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -57,6 +60,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") + f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port") } type Storage struct { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0ee5ddc8ba0d..4ac0eedf37f9 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -46,6 +46,7 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" @@ -70,30 +71,31 @@ type Config struct { UseBufferedLogger bool `yaml:"use_buffered_logger"` UseSyncLogger bool `yaml:"use_sync_logger"` - Common common.Config `yaml:"common,omitempty"` - Server server.Config `yaml:"server,omitempty"` - InternalServer internalserver.Config `yaml:"internal_server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"` - IngesterClient client.Config `yaml:"ingester_client,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage_config,omitempty"` - IndexGateway indexgateway.Config `yaml:"index_gateway"` - ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` - SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` - LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` - TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` - Worker worker.Config `yaml:"frontend_worker,omitempty"` - Frontend lokifrontend.Config `yaml:"frontend,omitempty"` - Ruler ruler.Config `yaml:"ruler,omitempty"` - QueryRange queryrange.Config `yaml:"query_range,omitempty"` - RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist"` - Tracing tracing.Config `yaml:"tracing"` - CompactorConfig compactor.Config `yaml:"compactor,omitempty"` - QueryScheduler scheduler.Config `yaml:"query_scheduler"` - UsageReport usagestats.Config `yaml:"analytics"` + Common common.Config `yaml:"common,omitempty"` + Server server.Config `yaml:"server,omitempty"` + InternalServer internalserver.Config `yaml:"internal_server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"` + CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage_config,omitempty"` + IndexGateway indexgateway.Config `yaml:"index_gateway"` + ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` + SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` + LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` + Worker worker.Config `yaml:"frontend_worker,omitempty"` + Frontend lokifrontend.Config `yaml:"frontend,omitempty"` + Ruler ruler.Config `yaml:"ruler,omitempty"` + QueryRange queryrange.Config `yaml:"query_range,omitempty"` + RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist"` + Tracing tracing.Config `yaml:"tracing"` + CompactorConfig compactor.Config `yaml:"compactor,omitempty"` + QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageReport usagestats.Config `yaml:"analytics"` } // RegisterFlags registers flag. @@ -116,7 +118,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Common.RegisterFlags(f) c.Distributor.RegisterFlags(f) c.Querier.RegisterFlags(f) - c.CompactorClient.RegisterFlags(f) + c.CompactorHTTPClient.RegisterFlags(f) + c.CompactorGRPCClient.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) c.StorageConfig.RegisterFlags(f) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5a9eb2bec442..467ec947ce48 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -12,8 +12,6 @@ import ( "strings" "time" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" - "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -43,6 +41,7 @@ import ( "github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/ruler" base_ruler "github.com/grafana/loki/pkg/ruler/base" "github.com/grafana/loki/pkg/runtime" @@ -54,6 +53,8 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber" "github.com/grafana/loki/pkg/storage/stores/series/index" @@ -677,45 +678,53 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) { var client generationnumber.CacheGenClient if t.supportIndexDeleteRequest() { - compactorAddress, err := t.compactorAddress() - if err != nil { - return nil, err - } - - httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) + compactorAddress, isGRPCAddress, err := t.compactorAddress() if err != nil { return nil, err } - client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient) - if err != nil { - return nil, err + reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer) + if isGRPCAddress { + client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg) + if err != nil { + return nil, err + } + } else { + client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient) + if err != nil { + return nil, err + } } } t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer) - return services.NewIdleService(nil, nil), nil + return services.NewIdleService(nil, func(failureCase error) error { + t.cacheGenerationLoader.Stop() + return nil + }), nil } func (t *Loki) supportIndexDeleteRequest() bool { return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) } -func (t *Loki) compactorAddress() (string, error) { +// compactorAddress returns the configured address of the compactor. +// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false +func (t *Loki) compactorAddress() (string, bool, error) { if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) { // In single binary or read modes, this module depends on Server - proto := "http" - if len(t.Cfg.Server.HTTPTLSConfig.TLSCertPath) > 0 && len(t.Cfg.Server.HTTPTLSConfig.TLSKeyPath) > 0 { - proto = "https" - } - return fmt.Sprintf("%s://%s:%d", proto, t.Cfg.Server.HTTPListenAddress, t.Cfg.Server.HTTPListenPort), nil + return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil } - if t.Cfg.Common.CompactorAddress == "" { - return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") + if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" { + return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured") } - return t.Cfg.Common.CompactorAddress, nil + if t.Cfg.Common.CompactorGRPCAddress != "" { + return t.Cfg.Common.CompactorGRPCAddress, true, nil + } + + return t.Cfg.Common.CompactorAddress, false, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { @@ -1012,6 +1021,7 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)) t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)) t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)) + grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler) } return t.compactor, nil @@ -1128,17 +1138,26 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri return deletion.NewNoOpDeleteRequestsStore(), nil } - compactorAddress, err := t.compactorAddress() + compactorAddress, isGRPCAddress, err := t.compactorAddress() if err != nil { return nil, err } - httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) - if err != nil { - return nil, err + reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer) + var compactorClient deletion.CompactorClient + if isGRPCAddress { + compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg) + if err != nil { + return nil, err + } + } else { + compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient) + if err != nil { + return nil, err + } } - client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType) + client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType) if err != nil { return nil, err } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 35d1b12d2152..5ce907177535 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -68,6 +68,7 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri type CacheGenNumberLoader interface { GetResultsCacheGenNumber(tenantIDs []string) string + Stop() } // ResultsCacheConfig is the config for the results cache. diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc.go b/pkg/storage/stores/indexshipper/compactor/client/grpc.go new file mode 100644 index 000000000000..a538ee45d126 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc.go @@ -0,0 +1,100 @@ +package client + +import ( + "context" + "flag" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/prometheus/common/model" + "github.com/weaveworks/common/user" + + "github.com/grafana/dskit/grpcclient" + deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" + "google.golang.org/grpc" +) + +type GRPCConfig struct { + GRPCClientConfig grpcclient.Config `yaml:",inline"` +} + +// RegisterFlags registers flags. +func (cfg *GRPCConfig) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.RegisterFlagsWithPrefix("", f) +} + +type compactorGRPCClient struct { + cfg GRPCConfig + + GRPCClientRequestDuration *prometheus.HistogramVec + conn *grpc.ClientConn + grpcClient deletion_grpc.CompactorClient +} + +// NewGRPCClient supports only methods which are used for internal communication of Loki like +// loading delete requests and cache gen numbers for query time filtering. +func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deletion.CompactorClient, error) { + client := &compactorGRPCClient{ + cfg: cfg, + GRPCClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_compactor", + Name: "grpc_request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using compactor GRPC client", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + } + + dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(client.GRPCClientRequestDuration)) + if err != nil { + return nil, err + } + + client.conn, err = grpc.Dial(addr, dialOpts...) + if err != nil { + return nil, err + } + + client.grpcClient = deletion_grpc.NewCompactorClient(client.conn) + return client, nil +} + +func (s *compactorGRPCClient) Stop() { + s.conn.Close() +} + +func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) { + ctx = user.InjectOrgID(ctx, userID) + grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{}) + if err != nil { + return nil, err + } + + deleteRequests := make([]deletion.DeleteRequest, len(grpcResp.DeleteRequests)) + for i, dr := range grpcResp.DeleteRequests { + deleteRequests[i] = deletion.DeleteRequest{ + RequestID: dr.RequestID, + StartTime: model.Time(dr.StartTime), + EndTime: model.Time(dr.EndTime), + Query: dr.Query, + Status: deletion.DeleteRequestStatus(dr.Status), + CreatedAt: model.Time(dr.CreatedAt), + } + } + + return deleteRequests, nil +} + +func (s *compactorGRPCClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + ctx = user.InjectOrgID(ctx, userID) + grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &deletion_grpc.GetCacheGenNumbersRequest{}) + if err != nil { + return "", err + } + + return grpcResp.ResultsCacheGen, nil +} + +func (s *compactorGRPCClient) Name() string { + return "grpc_client" +} diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go new file mode 100644 index 000000000000..533a2ecfd631 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go @@ -0,0 +1,1549 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto + +package grpc + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type GetDeleteRequestsRequest struct { +} + +func (m *GetDeleteRequestsRequest) Reset() { *m = GetDeleteRequestsRequest{} } +func (*GetDeleteRequestsRequest) ProtoMessage() {} +func (*GetDeleteRequestsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{0} +} +func (m *GetDeleteRequestsRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetDeleteRequestsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetDeleteRequestsRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetDeleteRequestsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetDeleteRequestsRequest.Merge(m, src) +} +func (m *GetDeleteRequestsRequest) XXX_Size() int { + return m.Size() +} +func (m *GetDeleteRequestsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetDeleteRequestsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetDeleteRequestsRequest proto.InternalMessageInfo + +type GetDeleteRequestsResponse struct { + DeleteRequests []*DeleteRequest `protobuf:"bytes,1,rep,name=deleteRequests,proto3" json:"deleteRequests,omitempty"` +} + +func (m *GetDeleteRequestsResponse) Reset() { *m = GetDeleteRequestsResponse{} } +func (*GetDeleteRequestsResponse) ProtoMessage() {} +func (*GetDeleteRequestsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{1} +} +func (m *GetDeleteRequestsResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetDeleteRequestsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetDeleteRequestsResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetDeleteRequestsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetDeleteRequestsResponse.Merge(m, src) +} +func (m *GetDeleteRequestsResponse) XXX_Size() int { + return m.Size() +} +func (m *GetDeleteRequestsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetDeleteRequestsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetDeleteRequestsResponse proto.InternalMessageInfo + +func (m *GetDeleteRequestsResponse) GetDeleteRequests() []*DeleteRequest { + if m != nil { + return m.DeleteRequests + } + return nil +} + +type DeleteRequest struct { + RequestID string `protobuf:"bytes,1,opt,name=requestID,proto3" json:"requestID,omitempty"` + StartTime int64 `protobuf:"varint,2,opt,name=startTime,proto3" json:"startTime,omitempty"` + EndTime int64 `protobuf:"varint,3,opt,name=endTime,proto3" json:"endTime,omitempty"` + Query string `protobuf:"bytes,4,opt,name=query,proto3" json:"query,omitempty"` + Status string `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"` + CreatedAt int64 `protobuf:"varint,6,opt,name=createdAt,proto3" json:"createdAt,omitempty"` +} + +func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } +func (*DeleteRequest) ProtoMessage() {} +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{2} +} +func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DeleteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteRequest.Merge(m, src) +} +func (m *DeleteRequest) XXX_Size() int { + return m.Size() +} +func (m *DeleteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo + +func (m *DeleteRequest) GetRequestID() string { + if m != nil { + return m.RequestID + } + return "" +} + +func (m *DeleteRequest) GetStartTime() int64 { + if m != nil { + return m.StartTime + } + return 0 +} + +func (m *DeleteRequest) GetEndTime() int64 { + if m != nil { + return m.EndTime + } + return 0 +} + +func (m *DeleteRequest) GetQuery() string { + if m != nil { + return m.Query + } + return "" +} + +func (m *DeleteRequest) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *DeleteRequest) GetCreatedAt() int64 { + if m != nil { + return m.CreatedAt + } + return 0 +} + +type GetCacheGenNumbersRequest struct { +} + +func (m *GetCacheGenNumbersRequest) Reset() { *m = GetCacheGenNumbersRequest{} } +func (*GetCacheGenNumbersRequest) ProtoMessage() {} +func (*GetCacheGenNumbersRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{3} +} +func (m *GetCacheGenNumbersRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetCacheGenNumbersRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetCacheGenNumbersRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetCacheGenNumbersRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetCacheGenNumbersRequest.Merge(m, src) +} +func (m *GetCacheGenNumbersRequest) XXX_Size() int { + return m.Size() +} +func (m *GetCacheGenNumbersRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetCacheGenNumbersRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetCacheGenNumbersRequest proto.InternalMessageInfo + +type GetCacheGenNumbersResponse struct { + ResultsCacheGen string `protobuf:"bytes,1,opt,name=resultsCacheGen,proto3" json:"resultsCacheGen,omitempty"` +} + +func (m *GetCacheGenNumbersResponse) Reset() { *m = GetCacheGenNumbersResponse{} } +func (*GetCacheGenNumbersResponse) ProtoMessage() {} +func (*GetCacheGenNumbersResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{4} +} +func (m *GetCacheGenNumbersResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetCacheGenNumbersResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetCacheGenNumbersResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetCacheGenNumbersResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetCacheGenNumbersResponse.Merge(m, src) +} +func (m *GetCacheGenNumbersResponse) XXX_Size() int { + return m.Size() +} +func (m *GetCacheGenNumbersResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetCacheGenNumbersResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetCacheGenNumbersResponse proto.InternalMessageInfo + +func (m *GetCacheGenNumbersResponse) GetResultsCacheGen() string { + if m != nil { + return m.ResultsCacheGen + } + return "" +} + +func init() { + proto.RegisterType((*GetDeleteRequestsRequest)(nil), "grpc.GetDeleteRequestsRequest") + proto.RegisterType((*GetDeleteRequestsResponse)(nil), "grpc.GetDeleteRequestsResponse") + proto.RegisterType((*DeleteRequest)(nil), "grpc.DeleteRequest") + proto.RegisterType((*GetCacheGenNumbersRequest)(nil), "grpc.GetCacheGenNumbersRequest") + proto.RegisterType((*GetCacheGenNumbersResponse)(nil), "grpc.GetCacheGenNumbersResponse") +} + +func init() { + proto.RegisterFile("pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto", fileDescriptor_af4e385854eeb93d) +} + +var fileDescriptor_af4e385854eeb93d = []byte{ + // 391 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xbb, 0x4e, 0xe3, 0x40, + 0x14, 0xf5, 0x6c, 0x1e, 0xab, 0xcc, 0x6a, 0x77, 0xc5, 0x80, 0xd0, 0x60, 0xd0, 0x60, 0xb9, 0x72, + 0x15, 0x4b, 0x81, 0x8e, 0x06, 0x48, 0x44, 0x44, 0x43, 0x61, 0x45, 0x82, 0xd6, 0xb1, 0xaf, 0x12, + 0x8b, 0xc4, 0x76, 0x66, 0xc6, 0x12, 0x74, 0x7c, 0x02, 0x9f, 0x41, 0x4b, 0xcb, 0x17, 0x50, 0xa6, + 0x4c, 0x49, 0x9c, 0x86, 0x32, 0x9f, 0x80, 0x32, 0x76, 0x12, 0xe5, 0xd5, 0x78, 0xe6, 0x9e, 0x73, + 0xe6, 0x3e, 0xce, 0x35, 0xbe, 0x8c, 0x1f, 0x3b, 0xb6, 0x90, 0x11, 0x77, 0x3b, 0xa0, 0x4e, 0x10, + 0x76, 0x10, 0xfa, 0xf0, 0x24, 0xba, 0x41, 0x1c, 0x03, 0xb7, 0xbd, 0xa8, 0x1f, 0xbb, 0x9e, 0x8c, + 0xb8, 0xed, 0xf5, 0x02, 0x08, 0xa5, 0xdd, 0xe1, 0xb1, 0xa7, 0x3e, 0xd5, 0x98, 0x47, 0x32, 0x22, + 0xc5, 0xd9, 0xdd, 0xd4, 0x31, 0x6d, 0x82, 0x6c, 0x40, 0x0f, 0x24, 0x38, 0x30, 0x48, 0x40, 0x48, + 0x91, 0x9f, 0xe6, 0x03, 0x3e, 0xda, 0xc2, 0x89, 0x38, 0x0a, 0x05, 0x90, 0x0b, 0xfc, 0xcf, 0x5f, + 0x61, 0x28, 0x32, 0x0a, 0xd6, 0x9f, 0xda, 0x7e, 0x55, 0xd5, 0x58, 0x79, 0xe5, 0xac, 0x49, 0xcd, + 0x77, 0x84, 0xff, 0xae, 0x28, 0xc8, 0x09, 0xae, 0xf0, 0xec, 0x7a, 0xdb, 0xa0, 0xc8, 0x40, 0x56, + 0xc5, 0x59, 0x02, 0x33, 0x56, 0x48, 0x97, 0xcb, 0x56, 0xd0, 0x07, 0xfa, 0xcb, 0x40, 0x56, 0xc1, + 0x59, 0x02, 0x84, 0xe2, 0xdf, 0x10, 0xfa, 0x8a, 0x2b, 0x28, 0x6e, 0x1e, 0x92, 0x03, 0x5c, 0x1a, + 0x24, 0xc0, 0x9f, 0x69, 0x51, 0x65, 0xcc, 0x02, 0x72, 0x88, 0xcb, 0x42, 0xba, 0x32, 0x11, 0xb4, + 0xa4, 0xe0, 0x3c, 0x9a, 0x55, 0xf1, 0x38, 0xb8, 0x12, 0xfc, 0x2b, 0x49, 0xcb, 0x59, 0x95, 0x05, + 0x60, 0x1e, 0x2b, 0x37, 0xea, 0xae, 0xd7, 0x85, 0x26, 0x84, 0x77, 0x49, 0xbf, 0x0d, 0x7c, 0x61, + 0xd5, 0x0d, 0xd6, 0xb7, 0x91, 0xb9, 0x57, 0x16, 0xfe, 0xcf, 0x41, 0x24, 0x3d, 0x29, 0xe6, 0x8a, + 0x7c, 0xc4, 0x75, 0xb8, 0xf6, 0x81, 0x70, 0xa5, 0x3e, 0xdf, 0x1c, 0x69, 0xe1, 0xbd, 0x8d, 0x05, + 0x10, 0x96, 0x19, 0xbc, 0x6b, 0x6b, 0xfa, 0xe9, 0x4e, 0x3e, 0xef, 0xe6, 0x1e, 0x93, 0xcd, 0x5e, + 0xc9, 0xf2, 0xd9, 0xf6, 0x11, 0x75, 0x63, 0xb7, 0x20, 0x4b, 0x7c, 0x7d, 0x3e, 0x1c, 0x33, 0x6d, + 0x34, 0x66, 0xda, 0x74, 0xcc, 0xd0, 0x4b, 0xca, 0xd0, 0x5b, 0xca, 0xd0, 0x67, 0xca, 0xd0, 0x30, + 0x65, 0xe8, 0x2b, 0x65, 0xe8, 0x3b, 0x65, 0xda, 0x34, 0x65, 0xe8, 0x75, 0xc2, 0xb4, 0xe1, 0x84, + 0x69, 0xa3, 0x09, 0xd3, 0xda, 0x65, 0xf5, 0x3b, 0x9e, 0xfd, 0x04, 0x00, 0x00, 0xff, 0xff, 0x37, + 0x5f, 0x43, 0x54, 0xd2, 0x02, 0x00, 0x00, +} + +func (this *GetDeleteRequestsRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetDeleteRequestsRequest) + if !ok { + that2, ok := that.(GetDeleteRequestsRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *GetDeleteRequestsResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetDeleteRequestsResponse) + if !ok { + that2, ok := that.(GetDeleteRequestsResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.DeleteRequests) != len(that1.DeleteRequests) { + return false + } + for i := range this.DeleteRequests { + if !this.DeleteRequests[i].Equal(that1.DeleteRequests[i]) { + return false + } + } + return true +} +func (this *DeleteRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DeleteRequest) + if !ok { + that2, ok := that.(DeleteRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.RequestID != that1.RequestID { + return false + } + if this.StartTime != that1.StartTime { + return false + } + if this.EndTime != that1.EndTime { + return false + } + if this.Query != that1.Query { + return false + } + if this.Status != that1.Status { + return false + } + if this.CreatedAt != that1.CreatedAt { + return false + } + return true +} +func (this *GetCacheGenNumbersRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetCacheGenNumbersRequest) + if !ok { + that2, ok := that.(GetCacheGenNumbersRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *GetCacheGenNumbersResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetCacheGenNumbersResponse) + if !ok { + that2, ok := that.(GetCacheGenNumbersResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.ResultsCacheGen != that1.ResultsCacheGen { + return false + } + return true +} +func (this *GetDeleteRequestsRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&grpc.GetDeleteRequestsRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetDeleteRequestsResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpc.GetDeleteRequestsResponse{") + if this.DeleteRequests != nil { + s = append(s, "DeleteRequests: "+fmt.Sprintf("%#v", this.DeleteRequests)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *DeleteRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 10) + s = append(s, "&grpc.DeleteRequest{") + s = append(s, "RequestID: "+fmt.Sprintf("%#v", this.RequestID)+",\n") + s = append(s, "StartTime: "+fmt.Sprintf("%#v", this.StartTime)+",\n") + s = append(s, "EndTime: "+fmt.Sprintf("%#v", this.EndTime)+",\n") + s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n") + s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n") + s = append(s, "CreatedAt: "+fmt.Sprintf("%#v", this.CreatedAt)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetCacheGenNumbersRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&grpc.GetCacheGenNumbersRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetCacheGenNumbersResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpc.GetCacheGenNumbersResponse{") + s = append(s, "ResultsCacheGen: "+fmt.Sprintf("%#v", this.ResultsCacheGen)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringGrpc(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// CompactorClient is the client API for Compactor service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type CompactorClient interface { + GetDeleteRequests(ctx context.Context, in *GetDeleteRequestsRequest, opts ...grpc.CallOption) (*GetDeleteRequestsResponse, error) + GetCacheGenNumbers(ctx context.Context, in *GetCacheGenNumbersRequest, opts ...grpc.CallOption) (*GetCacheGenNumbersResponse, error) +} + +type compactorClient struct { + cc *grpc.ClientConn +} + +func NewCompactorClient(cc *grpc.ClientConn) CompactorClient { + return &compactorClient{cc} +} + +func (c *compactorClient) GetDeleteRequests(ctx context.Context, in *GetDeleteRequestsRequest, opts ...grpc.CallOption) (*GetDeleteRequestsResponse, error) { + out := new(GetDeleteRequestsResponse) + err := c.cc.Invoke(ctx, "/grpc.Compactor/GetDeleteRequests", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *compactorClient) GetCacheGenNumbers(ctx context.Context, in *GetCacheGenNumbersRequest, opts ...grpc.CallOption) (*GetCacheGenNumbersResponse, error) { + out := new(GetCacheGenNumbersResponse) + err := c.cc.Invoke(ctx, "/grpc.Compactor/GetCacheGenNumbers", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CompactorServer is the server API for Compactor service. +type CompactorServer interface { + GetDeleteRequests(context.Context, *GetDeleteRequestsRequest) (*GetDeleteRequestsResponse, error) + GetCacheGenNumbers(context.Context, *GetCacheGenNumbersRequest) (*GetCacheGenNumbersResponse, error) +} + +// UnimplementedCompactorServer can be embedded to have forward compatible implementations. +type UnimplementedCompactorServer struct { +} + +func (*UnimplementedCompactorServer) GetDeleteRequests(ctx context.Context, req *GetDeleteRequestsRequest) (*GetDeleteRequestsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetDeleteRequests not implemented") +} +func (*UnimplementedCompactorServer) GetCacheGenNumbers(ctx context.Context, req *GetCacheGenNumbersRequest) (*GetCacheGenNumbersResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetCacheGenNumbers not implemented") +} + +func RegisterCompactorServer(s *grpc.Server, srv CompactorServer) { + s.RegisterService(&_Compactor_serviceDesc, srv) +} + +func _Compactor_GetDeleteRequests_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetDeleteRequestsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CompactorServer).GetDeleteRequests(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.Compactor/GetDeleteRequests", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CompactorServer).GetDeleteRequests(ctx, req.(*GetDeleteRequestsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Compactor_GetCacheGenNumbers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetCacheGenNumbersRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CompactorServer).GetCacheGenNumbers(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.Compactor/GetCacheGenNumbers", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CompactorServer).GetCacheGenNumbers(ctx, req.(*GetCacheGenNumbersRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Compactor_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.Compactor", + HandlerType: (*CompactorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetDeleteRequests", + Handler: _Compactor_GetDeleteRequests_Handler, + }, + { + MethodName: "GetCacheGenNumbers", + Handler: _Compactor_GetCacheGenNumbers_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto", +} + +func (m *GetDeleteRequestsRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetDeleteRequestsRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetDeleteRequestsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetDeleteRequestsResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetDeleteRequestsResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetDeleteRequestsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.DeleteRequests) > 0 { + for iNdEx := len(m.DeleteRequests) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.DeleteRequests[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintGrpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *DeleteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DeleteRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DeleteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.CreatedAt != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.CreatedAt)) + i-- + dAtA[i] = 0x30 + } + if len(m.Status) > 0 { + i -= len(m.Status) + copy(dAtA[i:], m.Status) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.Status))) + i-- + dAtA[i] = 0x2a + } + if len(m.Query) > 0 { + i -= len(m.Query) + copy(dAtA[i:], m.Query) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.Query))) + i-- + dAtA[i] = 0x22 + } + if m.EndTime != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.EndTime)) + i-- + dAtA[i] = 0x18 + } + if m.StartTime != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.StartTime)) + i-- + dAtA[i] = 0x10 + } + if len(m.RequestID) > 0 { + i -= len(m.RequestID) + copy(dAtA[i:], m.RequestID) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.RequestID))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *GetCacheGenNumbersRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetCacheGenNumbersRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetCacheGenNumbersRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetCacheGenNumbersResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetCacheGenNumbersResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetCacheGenNumbersResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ResultsCacheGen) > 0 { + i -= len(m.ResultsCacheGen) + copy(dAtA[i:], m.ResultsCacheGen) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.ResultsCacheGen))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintGrpc(dAtA []byte, offset int, v uint64) int { + offset -= sovGrpc(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *GetDeleteRequestsRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetDeleteRequestsResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.DeleteRequests) > 0 { + for _, e := range m.DeleteRequests { + l = e.Size() + n += 1 + l + sovGrpc(uint64(l)) + } + } + return n +} + +func (m *DeleteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.RequestID) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + if m.StartTime != 0 { + n += 1 + sovGrpc(uint64(m.StartTime)) + } + if m.EndTime != 0 { + n += 1 + sovGrpc(uint64(m.EndTime)) + } + l = len(m.Query) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + l = len(m.Status) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + if m.CreatedAt != 0 { + n += 1 + sovGrpc(uint64(m.CreatedAt)) + } + return n +} + +func (m *GetCacheGenNumbersRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetCacheGenNumbersResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ResultsCacheGen) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + return n +} + +func sovGrpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozGrpc(x uint64) (n int) { + return sovGrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *GetDeleteRequestsRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetDeleteRequestsRequest{`, + `}`, + }, "") + return s +} +func (this *GetDeleteRequestsResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForDeleteRequests := "[]*DeleteRequest{" + for _, f := range this.DeleteRequests { + repeatedStringForDeleteRequests += strings.Replace(f.String(), "DeleteRequest", "DeleteRequest", 1) + "," + } + repeatedStringForDeleteRequests += "}" + s := strings.Join([]string{`&GetDeleteRequestsResponse{`, + `DeleteRequests:` + repeatedStringForDeleteRequests + `,`, + `}`, + }, "") + return s +} +func (this *DeleteRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DeleteRequest{`, + `RequestID:` + fmt.Sprintf("%v", this.RequestID) + `,`, + `StartTime:` + fmt.Sprintf("%v", this.StartTime) + `,`, + `EndTime:` + fmt.Sprintf("%v", this.EndTime) + `,`, + `Query:` + fmt.Sprintf("%v", this.Query) + `,`, + `Status:` + fmt.Sprintf("%v", this.Status) + `,`, + `CreatedAt:` + fmt.Sprintf("%v", this.CreatedAt) + `,`, + `}`, + }, "") + return s +} +func (this *GetCacheGenNumbersRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetCacheGenNumbersRequest{`, + `}`, + }, "") + return s +} +func (this *GetCacheGenNumbersResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetCacheGenNumbersResponse{`, + `ResultsCacheGen:` + fmt.Sprintf("%v", this.ResultsCacheGen) + `,`, + `}`, + }, "") + return s +} +func valueToStringGrpc(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *GetDeleteRequestsRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetDeleteRequestsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetDeleteRequestsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetDeleteRequestsResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetDeleteRequestsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetDeleteRequestsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DeleteRequests", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DeleteRequests = append(m.DeleteRequests, &DeleteRequest{}) + if err := m.DeleteRequests[len(m.DeleteRequests)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DeleteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DeleteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DeleteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RequestID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType) + } + m.StartTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType) + } + m.EndTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Query = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CreatedAt", wireType) + } + m.CreatedAt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CreatedAt |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetCacheGenNumbersRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetCacheGenNumbersRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetCacheGenNumbersRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetCacheGenNumbersResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetCacheGenNumbersResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetCacheGenNumbersResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResultsCacheGen", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ResultsCacheGen = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipGrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthGrpc + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipGrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthGrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowGrpc = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto new file mode 100644 index 000000000000..52c7c462364c --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package grpc; + +service Compactor { + rpc GetDeleteRequests(GetDeleteRequestsRequest) returns (GetDeleteRequestsResponse); + rpc GetCacheGenNumbers(GetCacheGenNumbersRequest) returns (GetCacheGenNumbersResponse); +} + +message GetDeleteRequestsRequest {} + +message GetDeleteRequestsResponse { + repeated DeleteRequest deleteRequests = 1; +} + +message DeleteRequest { + string requestID = 1; + int64 startTime = 2; + int64 endTime = 3; + string query = 4; + string status = 5; + int64 createdAt = 6; +} + +message GetCacheGenNumbersRequest {} + +message GetCacheGenNumbersResponse { + string resultsCacheGen = 1; +} diff --git a/pkg/storage/stores/indexshipper/compactor/client/http.go b/pkg/storage/stores/indexshipper/compactor/client/http.go new file mode 100644 index 000000000000..edfbeb6f6990 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/http.go @@ -0,0 +1,148 @@ +package client + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "github.com/go-kit/log/level" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/grafana/loki/pkg/util/log" + "io" + "net/http" + "net/url" + "time" + + "github.com/grafana/dskit/crypto/tls" +) + +const ( + orgHeaderKey = "X-Scope-OrgID" + getDeletePath = "/loki/api/v1/delete" + cacheGenNumPath = "/loki/api/v1/cache/generation_numbers" +) + +type HTTPConfig struct { + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *HTTPConfig) RegisterFlags(f *flag.FlagSet) { + prefix := "boltdb.shipper.compactor.client" + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, + "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +type compactorHTTPClient struct { + httpClient *http.Client + + deleteRequestsURL string + cacheGenURL string +} + +// NewHTTPClient creates a client which talks to compactor over HTTP. +// It uses provided TLS config which creating HTTP client. +func NewHTTPClient(addr string, cfg HTTPConfig) (*compactorHTTPClient, error) { + u, err := url.Parse(addr) + if err != nil { + level.Error(log.Logger).Log("msg", "error parsing url", "err", err) + return nil, err + } + u.Path = getDeletePath + deleteRequestsURL := u.String() + + u.Path = cacheGenNumPath + cacheGenURL := u.String() + + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 250 + transport.MaxIdleConnsPerHost = 250 + + if cfg.TLSEnabled { + tlsCfg, err := cfg.TLS.GetTLSConfig() + if err != nil { + return nil, err + } + + transport.TLSClientConfig = tlsCfg + } + + return &compactorHTTPClient{ + httpClient: &http.Client{Timeout: 5 * time.Second, Transport: transport}, + deleteRequestsURL: deleteRequestsURL, + cacheGenURL: cacheGenURL, + }, nil +} + +func (c *compactorHTTPClient) Name() string { + return "http_client" +} + +func (c *compactorHTTPClient) Stop() {} + +func (c *compactorHTTPClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.deleteRequestsURL, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode/100 != 2 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + var deleteRequests []deletion.DeleteRequest + if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return nil, err + } + + return deleteRequests, nil +} + +func (c *compactorHTTPClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.cacheGenURL, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + + var genNumber string + if err := json.NewDecoder(resp.Body).Decode(&genNumber); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return "", err + } + + return genNumber, err +} diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index a75437c0a2af..ae79832e7efb 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -145,19 +145,20 @@ func (cfg *Config) Validate() error { type Compactor struct { services.Service - cfg Config - indexStorageClient shipper_storage.Client - tableMarker retention.TableMarker - sweeper *retention.Sweeper - deleteRequestsStore deletion.DeleteRequestsStore - DeleteRequestsHandler *deletion.DeleteRequestHandler - deleteRequestsManager *deletion.DeleteRequestsManager - expirationChecker retention.ExpirationChecker - metrics *metrics - running bool - wg sync.WaitGroup - indexCompactors map[string]IndexCompactor - schemaConfig config.SchemaConfig + cfg Config + indexStorageClient shipper_storage.Client + tableMarker retention.TableMarker + sweeper *retention.Sweeper + deleteRequestsStore deletion.DeleteRequestsStore + DeleteRequestsHandler *deletion.DeleteRequestHandler + DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler + deleteRequestsManager *deletion.DeleteRequestsManager + expirationChecker retention.ExpirationChecker + metrics *metrics + running bool + wg sync.WaitGroup + indexCompactors map[string]IndexCompactor + schemaConfig config.SchemaConfig // Ring used for running a single compactor ringLifecycler *ring.BasicLifecycler @@ -285,6 +286,8 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Over r, ) + c.DeleteRequestsGRPCHandler = deletion.NewGRPCRequestHandler(c.deleteRequestsStore, limits) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager( c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, diff --git a/pkg/storage/stores/indexshipper/compactor/compactor_client.go b/pkg/storage/stores/indexshipper/compactor/compactor_client.go deleted file mode 100644 index 63468568dc41..000000000000 --- a/pkg/storage/stores/indexshipper/compactor/compactor_client.go +++ /dev/null @@ -1,42 +0,0 @@ -package compactor - -import ( - "flag" - "net/http" - "time" - - "github.com/grafana/dskit/crypto/tls" -) - -// Config for compactor's generation-number client -type ClientConfig struct { - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { - prefix := "boltdb.shipper.compactor.client" - f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, - "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) -} - -// NewDeleteHTTPClient return a pointer to a http client instance based on the -// delete client tls settings. -func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.MaxIdleConns = 250 - transport.MaxIdleConnsPerHost = 250 - - if cfg.TLSEnabled { - tlsCfg, err := cfg.TLS.GetTLSConfig() - if err != nil { - return nil, err - } - - transport.TLSClientConfig = tlsCfg - } - - return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go index 3e8639e50c4c..ca479eb41cc1 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go @@ -2,24 +2,19 @@ package deletion import ( "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" "sync" "time" "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/pkg/util/log" ) -const ( - orgHeaderKey = "X-Scope-OrgID" - getDeletePath = "/loki/api/v1/delete" -) +type CompactorClient interface { + GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) + GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) + Name() string + Stop() +} type DeleteRequestsClient interface { GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) @@ -27,9 +22,8 @@ type DeleteRequestsClient interface { } type deleteRequestsClient struct { - url string - httpClient httpClient - mu sync.RWMutex + compactorClient CompactorClient + mu sync.RWMutex cache map[string][]DeleteRequest cacheDuration time.Duration @@ -40,10 +34,6 @@ type deleteRequestsClient struct { stopChan chan struct{} } -type httpClient interface { - Do(*http.Request) (*http.Response, error) -} - type DeleteRequestsStoreOption func(c *deleteRequestsClient) func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { @@ -52,22 +42,14 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { } } -func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { - u, err := url.Parse(addr) - if err != nil { - level.Error(log.Logger).Log("msg", "error parsing url", "err", err) - return nil, err - } - u.Path = getDeletePath - +func NewDeleteRequestsClient(compactorClient CompactorClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { client := &deleteRequestsClient{ - url: u.String(), - httpClient: c, - cacheDuration: 5 * time.Minute, - cache: make(map[string][]DeleteRequest), - clientType: clientType, - metrics: deleteClientMetrics, - stopChan: make(chan struct{}), + compactorClient: compactorClient, + cacheDuration: 5 * time.Minute, + cache: make(map[string][]DeleteRequest), + clientType: clientType, + metrics: deleteClientMetrics, + stopChan: make(chan struct{}), } for _, o := range opts { @@ -83,10 +65,10 @@ func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, return cachedRequests, nil } - c.metrics.deleteRequestsLookupsTotal.With(prometheus.Labels{"client_type": c.clientType}).Inc() - requests, err := c.getRequestsFromServer(ctx, userID) + c.metrics.deleteRequestsLookupsTotal.Inc() + requests, err := c.compactorClient.GetAllDeleteRequestsForUser(ctx, userID) if err != nil { - c.metrics.deleteRequestsLookupsFailedTotal.With(prometheus.Labels{"client_type": c.clientType}).Inc() + c.metrics.deleteRequestsLookupsFailedTotal.Inc() return nil, err } @@ -126,7 +108,7 @@ func (c *deleteRequestsClient) updateCache() { newCache := make(map[string][]DeleteRequest) for _, userID := range userIDs { - deleteReq, err := c.getRequestsFromServer(context.Background(), userID) + deleteReq, err := c.compactorClient.GetAllDeleteRequestsForUser(context.Background(), userID) if err != nil { level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) continue @@ -150,37 +132,3 @@ func (c *deleteRequestsClient) currentUserIDs() []string { return userIDs } - -func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - - req.Header.Set(orgHeaderKey, userID) - - resp, err := c.httpClient.Do(req) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - defer func() { - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - }() - - if resp.StatusCode/100 != 2 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - - var deleteRequests []DeleteRequest - if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { - level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) - return nil, err - } - - return deleteRequests, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go index 2f9834c3ea46..cac79b924ccb 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go @@ -2,9 +2,6 @@ package deletion import ( "context" - "io" - "net/http" - "strings" "testing" "time" @@ -15,9 +12,16 @@ import ( func TestGetCacheGenNumberForUser(t *testing.T) { deleteClientMetrics := NewDeleteRequestClientMetrics(prometheus.DefaultRegisterer) - t.Run("it requests results from the api", func(t *testing.T) { - httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} - client, err := NewDeleteRequestsClient("http://test-server", httpClient, deleteClientMetrics, "test_client") + t.Run("it requests results from the compactor client", func(t *testing.T) { + compactorClient := mockCompactorClient{ + delRequests: []DeleteRequest{ + { + RequestID: "test-request", + }, + }, + } + + client, err := NewDeleteRequestsClient(&compactorClient, deleteClientMetrics, "test_client") require.Nil(t, err) deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") @@ -25,22 +29,28 @@ func TestGetCacheGenNumberForUser(t *testing.T) { require.Len(t, deleteRequests, 1) require.Equal(t, "test-request", deleteRequests[0].RequestID) - - require.Equal(t, "http://test-server/loki/api/v1/delete", httpClient.req.URL.String()) - require.Equal(t, http.MethodGet, httpClient.req.Method) - require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) }) t.Run("it caches the results", func(t *testing.T) { - httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} - client, err := NewDeleteRequestsClient("http://test-server", httpClient, deleteClientMetrics, "test_client", WithRequestClientCacheDuration(100*time.Millisecond)) + compactorClient := mockCompactorClient{ + delRequests: []DeleteRequest{ + { + RequestID: "test-request", + }, + }, + } + client, err := NewDeleteRequestsClient(&compactorClient, deleteClientMetrics, "test_client", WithRequestClientCacheDuration(100*time.Millisecond)) require.Nil(t, err) deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) require.Equal(t, "test-request", deleteRequests[0].RequestID) - httpClient.ret = `[{"request_id":"different"}]` + compactorClient.delRequests = []DeleteRequest{ + { + RequestID: "different", + }, + } deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) @@ -56,16 +66,21 @@ func TestGetCacheGenNumberForUser(t *testing.T) { }) } -type mockHTTPClient struct { - ret string - req *http.Request +type mockCompactorClient struct { + delRequests []DeleteRequest + cacheGenNum string +} + +func (m *mockCompactorClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + return m.delRequests, nil } -func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { - c.req = req +func (m *mockCompactorClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + return m.cacheGenNum, nil +} - return &http.Response{ - StatusCode: 200, - Body: io.NopCloser(strings.NewReader(c.ret)), - }, nil +func (m *mockCompactorClient) Name() string { + return "" } + +func (m *mockCompactorClient) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go index 7c992e6cdd56..f24c3c51139f 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go @@ -679,6 +679,8 @@ type mockDeleteRequestsStore struct { getAllUser string getAllResult []DeleteRequest getAllErr error + + genNumber string } func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { @@ -708,3 +710,7 @@ func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Contex m.getAllUser = userID return m.getAllResult, m.getAllErr } + +func (m *mockDeleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + return m.genNumber, m.getErr +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go new file mode 100644 index 000000000000..15b5452e89d5 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go @@ -0,0 +1,94 @@ +package deletion + +import ( + "context" + "sort" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + util_log "github.com/grafana/loki/pkg/util/log" +) + +type GRPCRequestHandler struct { + deleteRequestsStore DeleteRequestsStore + limits Limits +} + +func NewGRPCRequestHandler(deleteRequestsStore DeleteRequestsStore, limits Limits) *GRPCRequestHandler { + return &GRPCRequestHandler{ + deleteRequestsStore: deleteRequestsStore, + limits: limits, + } +} + +func (g *GRPCRequestHandler) GetDeleteRequests(ctx context.Context, _ *grpc.GetDeleteRequestsRequest) (*grpc.GetDeleteRequestsResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + hasDelete, err := validDeletionLimit(g.limits, userID) + if err != nil { + return nil, err + } + + if !hasDelete { + return nil, errors.New(deletionNotAvailableMsg) + } + + deleteGroups, err := g.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + deletesPerRequest := partitionByRequestID(deleteGroups) + deleteRequests := mergeDeletes(deletesPerRequest) + + sort.Slice(deleteRequests, func(i, j int) bool { + return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt + }) + + resp := grpc.GetDeleteRequestsResponse{ + DeleteRequests: make([]*grpc.DeleteRequest, len(deleteRequests)), + } + for i, dr := range deleteRequests { + resp.DeleteRequests[i] = &grpc.DeleteRequest{ + RequestID: dr.RequestID, + StartTime: int64(dr.StartTime), + EndTime: int64(dr.EndTime), + Query: dr.Query, + Status: string(dr.Status), + CreatedAt: int64(dr.CreatedAt), + } + } + + return &resp, nil +} + +func (g *GRPCRequestHandler) GetCacheGenNumbers(ctx context.Context, _ *grpc.GetCacheGenNumbersRequest) (*grpc.GetCacheGenNumbersResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + hasDelete, err := validDeletionLimit(g.limits, userID) + if err != nil { + return nil, err + } + + if !hasDelete { + return nil, errors.New(deletionNotAvailableMsg) + } + + cacheGenNumber, err := g.deleteRequestsStore.GetCacheGenerationNumber(ctx, userID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting cache generation number", "err", err) + return nil, err + } + + return &grpc.GetCacheGenNumbersResponse{ResultsCacheGen: cacheGenNumber}, nil +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go new file mode 100644 index 000000000000..3851bbea795a --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go @@ -0,0 +1,223 @@ +package deletion + +import ( + "context" + "errors" + "net" + "testing" + "time" + + "github.com/grafana/dskit/tenant" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + compactor_client_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode" +) + +func server(t *testing.T, ctx context.Context, h *GRPCRequestHandler) (compactor_client_grpc.CompactorClient, func()) { + buffer := 101024 * 1024 + lis := bufconn.Listen(buffer) + + baseServer := grpc.NewServer(grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler) + }), grpc.ChainUnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return middleware.ServerUserHeaderInterceptor(ctx, req, info, handler) + })) + + compactor_client_grpc.RegisterCompactorServer(baseServer, h) + go func() { + require.NoError(t, baseServer.Serve(lis)) + }() + + conn, err := grpc.DialContext(ctx, "", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + closer := func() { + require.NoError(t, lis.Close()) + baseServer.Stop() + } + + client := compactor_client_grpc.NewCompactorClient(conn) + + return client, closer +} + +func grpcDeleteRequestsToDeleteRequests(requests []*compactor_client_grpc.DeleteRequest) []DeleteRequest { + resp := make([]DeleteRequest, len(requests)) + for i, grpcReq := range requests { + resp[i] = DeleteRequest{ + RequestID: grpcReq.RequestID, + StartTime: model.Time(grpcReq.StartTime), + EndTime: model.Time(grpcReq.EndTime), + Query: grpcReq.Query, + Status: DeleteRequestStatus(grpcReq.Status), + CreatedAt: model.Time(grpcReq.CreatedAt), + } + } + + return resp +} + +func TestGRPCGetDeleteRequests(t *testing.T) { + t.Run("it gets all the delete requests for the user", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}} + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, store.getAllResult, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("it merges requests with the same requestID", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, StartTime: now, EndTime: now.Add(time.Hour)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(2 * time.Hour), EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)}, + } + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, []DeleteRequest{ + {RequestID: "test-request-1", Status: StatusReceived, CreatedAt: now, StartTime: now, EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", Status: StatusReceived, CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + }, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("it only considers a request processed if all it's subqueries are processed", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + } + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: "66% Complete"}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + }, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllErr = errors.New("something bad") + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + _, err = grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "something bad", sts.Message()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + _, err := grpcClient.GetDeleteRequests(context.Background(), &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "no org id", sts.Message()) + }) + }) +} + +func TestGRPCGetCacheGenNumbers(t *testing.T) { + t.Run("get gen number", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.genNumber = "123" + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetCacheGenNumbers(ctx, &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.NoError(t, err) + require.Equal(t, store.genNumber, resp.ResultsCacheGen) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getErr = errors.New("something bad") + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + _, err = grpcClient.GetCacheGenNumbers(ctx, &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "something bad", sts.Message()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + _, err := grpcClient.GetCacheGenNumbers(context.Background(), &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "no org id", sts.Message()) + }) + }) +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go index 4da42c7384d0..b9489e5f23b8 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go @@ -6,24 +6,24 @@ import ( ) type DeleteRequestClientMetrics struct { - deleteRequestsLookupsTotal *prometheus.CounterVec - deleteRequestsLookupsFailedTotal *prometheus.CounterVec + deleteRequestsLookupsTotal prometheus.Counter + deleteRequestsLookupsFailedTotal prometheus.Counter } func NewDeleteRequestClientMetrics(r prometheus.Registerer) *DeleteRequestClientMetrics { m := DeleteRequestClientMetrics{} - m.deleteRequestsLookupsTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + m.deleteRequestsLookupsTotal = promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: "loki", Name: "delete_request_lookups_total", Help: "Number times the client has looked up delete requests", - }, []string{"client_type"}) + }) - m.deleteRequestsLookupsFailedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + m.deleteRequestsLookupsFailedTotal = promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: "loki", Name: "delete_request_lookups_failed_total", Help: "Number times the client has failed to look up delete requests", - }, []string{"client_type"}) + }) return &m } diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go deleted file mode 100644 index 7acb6d9bd619..000000000000 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go +++ /dev/null @@ -1,81 +0,0 @@ -package generationnumber - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/url" - - "github.com/go-kit/log/level" - - "github.com/grafana/loki/pkg/util/log" -) - -const ( - orgHeaderKey = "X-Scope-OrgID" - cacheGenNumPath = "/loki/api/v1/cache/generation_numbers" -) - -type CacheGenClient interface { - GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) - Name() string -} - -type genNumberClient struct { - url string - httpClient doer -} - -type doer interface { - Do(*http.Request) (*http.Response, error) -} - -func NewGenNumberClient(addr string, c doer) (CacheGenClient, error) { - u, err := url.Parse(addr) - if err != nil { - level.Error(log.Logger).Log("msg", "error parsing url", "err", err) - return nil, err - } - u.Path = cacheGenNumPath - - return &genNumberClient{ - url: u.String(), - httpClient: c, - }, nil -} - -func (c *genNumberClient) Name() string { - return "gen_number_client" -} - -func (c *genNumberClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - - req.Header.Set(orgHeaderKey, userID) - - resp, err := c.httpClient.Do(req) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - - var genNumber string - if err := json.NewDecoder(resp.Body).Decode(&genNumber); err != nil { - level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) - return "", err - } - - return genNumber, err -} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go deleted file mode 100644 index edd711f06dbf..000000000000 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package generationnumber - -import ( - "context" - "io" - "net/http" - "strings" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGetCacheGenNumberForUser(t *testing.T) { - httpClient := &mockHTTPClient{ret: `"42"`} - client, err := NewGenNumberClient("http://test-server", httpClient) - require.Nil(t, err) - - cacheGenNumber, err := client.GetCacheGenerationNumber(context.Background(), "userID") - require.Nil(t, err) - - require.Equal(t, "42", cacheGenNumber) - - require.Equal(t, "http://test-server/loki/api/v1/cache/generation_numbers", httpClient.req.URL.String()) - require.Equal(t, http.MethodGet, httpClient.req.Method) - require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) -} - -type mockHTTPClient struct { - ret string - req *http.Request -} - -func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { - c.req = req - - return &http.Response{ - StatusCode: 200, - Body: io.NopCloser(strings.NewReader(c.ret)), - }, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go index 55267987cf0c..d41b54d76270 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go @@ -16,6 +16,12 @@ import ( const reloadDuration = 5 * time.Minute +type CacheGenClient interface { + GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) + Name() string + Stop() +} + type GenNumberLoader struct { numberGetter CacheGenClient numbers map[string]string @@ -140,6 +146,7 @@ func (l *GenNumberLoader) getCacheGenNumber(userID string) string { func (l *GenNumberLoader) Stop() { close(l.quit) + l.numberGetter.Stop() } type noopNumberGetter struct{} @@ -151,3 +158,5 @@ func (g *noopNumberGetter) GetCacheGenerationNumber(_ context.Context, _ string) func (g *noopNumberGetter) Name() string { return "noop-getter" } + +func (g *noopNumberGetter) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go index 7c5581a8b181..afa59a12b47b 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go @@ -52,3 +52,5 @@ func (g *mockGenNumberClient) GetCacheGenerationNumber(ctx context.Context, user func (g *mockGenNumberClient) Name() string { return "" } + +func (g *mockGenNumberClient) Stop() {} From 5dc60567da1faa1d7c1d1d61fd9e80f51f331c2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Wed, 30 Nov 2022 11:41:09 +0100 Subject: [PATCH 2/5] [Helm] Add support for azure blob storage (#7500) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **What this PR does / why we need it**: Support Azure Blob Storage in helm chart. See https://grafana.com/docs/loki/latest/storage/#azure-storage-account-1 **Which issue(s) this PR fixes**: **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the `CONTRIBUTING.md` guide - [x] Documentation added - [ ] Tests updated - [x] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` Signed-off-by: Jan-Otto Kröpke --- docs/sources/installation/helm/reference.md | 18 +++++++++- production/helm/loki/CHANGELOG.md | 4 +++ production/helm/loki/Chart.yaml | 2 +- production/helm/loki/README.md | 2 +- production/helm/loki/templates/_helpers.tpl | 36 +++++++++++++++++-- .../helm/loki/templates/serviceaccount.yaml | 6 ++-- production/helm/loki/values.yaml | 18 +++++----- 7 files changed, 71 insertions(+), 15 deletions(-) diff --git a/docs/sources/installation/helm/reference.md b/docs/sources/installation/helm/reference.md index 2f5d26b42ac4..4f0846d245ce 100644 --- a/docs/sources/installation/helm/reference.md +++ b/docs/sources/installation/helm/reference.md @@ -70,7 +70,7 @@ null string
-"{{- if .Values.enterprise.adminApi.enabled }}\n{{- if or .Values.minio.enabled (eq .Values.loki.storage.type \"s3\") (eq .Values.loki.storage.type \"gcs\") }}\nadmin_client:\n  storage:\n    s3:\n      bucket_name: {{ .Values.loki.storage.bucketNames.admin }}\n{{- end }}\n{{- end }}\nauth:\n  type: {{ .Values.enterprise.adminApi.enabled | ternary \"enterprise\" \"trust\" }}\nauth_enabled: {{ .Values.loki.auth_enabled }}\ncluster_name: {{ include \"loki.clusterName\" . }}\nlicense:\n  path: /etc/loki/license/license.jwt\n"
+"{{- if .Values.enterprise.adminApi.enabled }}\n{{- if or .Values.minio.enabled (eq .Values.loki.storage.type \"s3\") (eq .Values.loki.storage.type \"gcs\") (eq .Values.loki.storage.type \"azure\") }}\nadmin_client:\n  storage:\n    s3:\n      bucket_name: {{ .Values.loki.storage.bucketNames.admin }}\n{{- end }}\n{{- end }}\nauth:\n  type: {{ .Values.enterprise.adminApi.enabled | ternary \"enterprise\" \"trust\" }}\nauth_enabled: {{ .Values.loki.auth_enabled }}\ncluster_name: {{ include \"loki.clusterName\" . }}\nlicense:\n  path: /etc/loki/license/license.jwt\n"
 
@@ -1501,6 +1501,13 @@ null Storage config. Providing this will automatically populate all necessary storage configs in the templated config.
 {
+  "azure": {
+    "accountKey": null,
+    "accountName": null,
+    "requestTimeout": null,
+    "useManagedIdentity": false,
+    "userAssignedId": null
+  },
   "bucketNames": {
     "admin": "admin",
     "chunks": "chunks",
@@ -2592,6 +2599,15 @@ true
 			
 []
 
+ + + + serviceAccount.labels + object + Labels for the service account +
+{}
+
diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 98cd1079bb03..fa50b731c5dd 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -11,6 +11,10 @@ Entries should be ordered as follows: Entries should include a reference to the pull request that introduced the change. +## 3.5.0 + +- [FEATURE] Add support for azure blob storage + ## 3.4.3 - [ENHANCEMENT] Allow to change Loki `-target` argument diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index 258ad62dd05a..943c0ad74c80 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -4,7 +4,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.6.1 -version: 3.4.3 +version: 3.5.0 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index d8f07175cbf9..e59f9b978e9e 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 3.4.3](https://img.shields.io/badge/Version-3.4.3-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.6.1](https://img.shields.io/badge/AppVersion-2.6.1-informational?style=flat-square) +![Version: 3.5.0](https://img.shields.io/badge/Version-3.5.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.6.1](https://img.shields.io/badge/AppVersion-2.6.1-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/templates/_helpers.tpl b/production/helm/loki/templates/_helpers.tpl index 7f268081ae87..f2b797146baa 100644 --- a/production/helm/loki/templates/_helpers.tpl +++ b/production/helm/loki/templates/_helpers.tpl @@ -204,7 +204,23 @@ gcs: bucket_name: {{ $.Values.loki.storage.bucketNames.chunks }} chunk_buffer_size: {{ .chunkBufferSize }} request_timeout: {{ .requestTimeout }} - enable_http2: {{ .enableHttp2}} + enable_http2: {{ .enableHttp2 }} +{{- end -}} +{{- else if eq .Values.loki.storage.type "azure" -}} +{{- with .Values.loki.storage.azure }} +azure: + account_name: {{ .accountName }} + {{- with .accountKey }} + account_key: {{ . }} + {{- end }} + container_name: {{ $.Values.loki.storage.bucketNames.chunks }} + use_managed_identity: {{ .useManagedIdentity }} + {{- with .userAssignedId }} + user_assigned_id: {{ . }} + {{- end }} + {{- with .requestTimeout }} + request_timeout: {{ . }} + {{- end }} {{- end -}} {{- else -}} {{- with .Values.loki.storage.filesystem }} @@ -253,7 +269,23 @@ gcs: bucket_name: {{ $.Values.loki.storage.bucketNames.ruler }} chunk_buffer_size: {{ .chunkBufferSize }} request_timeout: {{ .requestTimeout }} - enable_http2: {{ .enableHttp2}} + enable_http2: {{ .enableHttp2 }} +{{- end -}} +{{- else if eq .Values.loki.storage.type "azure" -}} +{{- with .Values.loki.storage.azure }} +azure: + account_name: {{ .accountName }} + {{- with .accountKey }} + account_key: {{ . }} + {{- end }} + container_name: {{ $.Values.loki.storage.bucketNames.ruler }} + use_managed_identity: {{ .useManagedIdentity }} + {{- with .userAssignedId }} + user_assigned_id: {{ . }} + {{- end }} + {{- with .requestTimeout }} + request_timeout: {{ . }} + {{- end }} {{- end -}} {{- end -}} {{- end -}} diff --git a/production/helm/loki/templates/serviceaccount.yaml b/production/helm/loki/templates/serviceaccount.yaml index 436ff14a572a..5734c012d55c 100644 --- a/production/helm/loki/templates/serviceaccount.yaml +++ b/production/helm/loki/templates/serviceaccount.yaml @@ -5,9 +5,12 @@ metadata: name: {{ include "loki.serviceAccountName" . }} labels: {{- include "loki.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.labels }} + {{- toYaml . | nindent 4 }} + {{- end }} {{- with .Values.serviceAccount.annotations }} annotations: - {{- toYaml . | nindent 4 }} + {{- tpl (toYaml . | nindent 4) $ }} {{- end }} automountServiceAccountToken: {{ .Values.serviceAccount.automountServiceAccountToken }} {{- with .Values.serviceAccount.imagePullSecrets }} @@ -15,4 +18,3 @@ imagePullSecrets: {{- toYaml . | nindent 2 }} {{- end }} {{- end }} - diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index 0af1434b6c9d..fa5cfc4888b2 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -123,13 +123,7 @@ loki: configs: - from: 2022-01-11 store: boltdb-shipper - {{- if eq .Values.loki.storage.type "s3" }} - object_store: s3 - {{- else if eq .Values.loki.storage.type "gcs" }} - object_store: gcs - {{- else }} - object_store: filesystem - {{- end }} + object_store: {{ .Values.loki.storage.type }} schema: v12 index: prefix: loki_index_ @@ -219,6 +213,12 @@ loki: chunkBufferSize: 0 requestTimeout: "0s" enableHttp2: true + azure: + accountName: null + accountKey: null + useManagedIdentity: false + userAssignedId: null + requestTimeout: null filesystem: chunks_directory: /var/loki/chunks rules_directory: /var/loki/rules @@ -301,7 +301,7 @@ enterprise: # enterprise specific sections of the config.yaml file config: | {{- if .Values.enterprise.adminApi.enabled }} - {{- if or .Values.minio.enabled (eq .Values.loki.storage.type "s3") (eq .Values.loki.storage.type "gcs") }} + {{- if or .Values.minio.enabled (eq .Values.loki.storage.type "s3") (eq .Values.loki.storage.type "gcs") (eq .Values.loki.storage.type "azure") }} admin_client: storage: s3: @@ -539,6 +539,8 @@ serviceAccount: imagePullSecrets: [] # -- Annotations for the service account annotations: {} + # -- Labels for the service account + labels: {} # -- Set this toggle to false to opt out of automounting API credentials for the service account automountServiceAccountToken: true From 89d81020ce810b63122cda86ff257fa3314c6009 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 30 Nov 2022 17:02:11 +0530 Subject: [PATCH 3/5] fix lint issues from PR 7804 (#7814) **What this PR does / why we need it**: I had enabled auto-merge in PR #7804, but somehow it still merged the PR without all the checks passing. This PR fixes the failing lint and tests. --- .../pkg/promtail/targets/cloudflare/fields.go | 2 +- .../lokipush/pushtargetmanager_test.go | 2 +- .../loki_micro_services_delete_test.go | 1 + pkg/logql/syntax/lex.go | 4 ++-- pkg/logqlmodel/logqlmodel.go | 3 ++- pkg/logqlmodel/metadata/context_test.go | 3 ++- .../queryrangebase/middleware_test.go | 2 ++ .../queryrangebase/results_cache_test.go | 2 ++ .../indexshipper/compactor/client/grpc.go | 9 +++++---- .../indexshipper/compactor/client/http.go | 9 +++++---- .../compactor/deletion/delete_request.go | 5 +++-- .../deletion/delete_requests_client.go | 1 + .../deletion/grpc_request_handler_test.go | 20 +++++++++---------- .../generationnumber/gennumber_loader.go | 1 + pkg/util/marshal/marshal.go | 2 +- 15 files changed, 39 insertions(+), 27 deletions(-) diff --git a/clients/pkg/promtail/targets/cloudflare/fields.go b/clients/pkg/promtail/targets/cloudflare/fields.go index c5189d7264fe..93f7195fe456 100644 --- a/clients/pkg/promtail/targets/cloudflare/fields.go +++ b/clients/pkg/promtail/targets/cloudflare/fields.go @@ -31,7 +31,7 @@ var ( allFields = append(extendedFields, []string{ "BotScore", "BotScoreSrc", "ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources", "FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID", - "RequestHeaders", "ResponseHeaders", + "RequestHeaders", "ResponseHeaders", }...) ) diff --git a/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go b/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go index 2c6267ac7216..0770ce6fcaec 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go +++ b/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go @@ -68,7 +68,7 @@ func Test_validateJobName(t *testing.T) { }, }, }, - wantErr: false, + wantErr: false, expectedJob: "job_1_2_3_4_job", }, } diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index c7b632a59ad2..f74ee0cfdd22 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/loki/integration/client" "github.com/grafana/loki/integration/cluster" + "github.com/grafana/loki/pkg/storage" ) diff --git a/pkg/logql/syntax/lex.go b/pkg/logql/syntax/lex.go index 3a01bbd07938..cc4c0be4d9f1 100644 --- a/pkg/logql/syntax/lex.go +++ b/pkg/logql/syntax/lex.go @@ -66,8 +66,8 @@ var tokens = map[string]int{ OpParserTypePattern: PATTERN, // fmt - OpFmtLabel: LABEL_FMT, - OpFmtLine: LINE_FMT, + OpFmtLabel: LABEL_FMT, + OpFmtLine: LINE_FMT, // filter functions OpFilterIP: IP, diff --git a/pkg/logqlmodel/logqlmodel.go b/pkg/logqlmodel/logqlmodel.go index 2c431aeb54d7..2e09bf0b877a 100644 --- a/pkg/logqlmodel/logqlmodel.go +++ b/pkg/logqlmodel/logqlmodel.go @@ -1,9 +1,10 @@ package logqlmodel import ( - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" "github.com/prometheus/prometheus/promql/parser" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel/stats" ) diff --git a/pkg/logqlmodel/metadata/context_test.go b/pkg/logqlmodel/metadata/context_test.go index 459fda2afd61..256abdb18ef7 100644 --- a/pkg/logqlmodel/metadata/context_test.go +++ b/pkg/logqlmodel/metadata/context_test.go @@ -5,8 +5,9 @@ import ( "errors" "testing" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" ) func TestHeaders(t *testing.T) { diff --git a/pkg/querier/queryrange/queryrangebase/middleware_test.go b/pkg/querier/queryrange/queryrangebase/middleware_test.go index 6b3b5e64b951..c910c70823b3 100644 --- a/pkg/querier/queryrange/queryrangebase/middleware_test.go +++ b/pkg/querier/queryrange/queryrangebase/middleware_test.go @@ -31,3 +31,5 @@ type fakeGenNumberLoader struct { func (l *fakeGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { return l.genNumber } + +func (l *fakeGenNumberLoader) Stop() {} diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index ae68de041b5e..3892b15f325c 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -1070,3 +1070,5 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader { func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { return "" } + +func (l mockCacheGenNumberLoader) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc.go b/pkg/storage/stores/indexshipper/compactor/client/grpc.go index a538ee45d126..d18bca5139e6 100644 --- a/pkg/storage/stores/indexshipper/compactor/client/grpc.go +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc.go @@ -3,16 +3,17 @@ package client import ( "context" "flag" - "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" - "github.com/prometheus/common/model" - "github.com/weaveworks/common/user" "github.com/grafana/dskit/grpcclient" - deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/weaveworks/common/instrument" + "github.com/weaveworks/common/user" "google.golang.org/grpc" + + deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" ) type GRPCConfig struct { diff --git a/pkg/storage/stores/indexshipper/compactor/client/http.go b/pkg/storage/stores/indexshipper/compactor/client/http.go index edfbeb6f6990..817892df2c61 100644 --- a/pkg/storage/stores/indexshipper/compactor/client/http.go +++ b/pkg/storage/stores/indexshipper/compactor/client/http.go @@ -5,15 +5,16 @@ import ( "encoding/json" "flag" "fmt" - "github.com/go-kit/log/level" - "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" - "github.com/grafana/loki/pkg/util/log" "io" "net/http" "net/url" "time" + "github.com/go-kit/log/level" "github.com/grafana/dskit/crypto/tls" + + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/grafana/loki/pkg/util/log" ) const ( @@ -44,7 +45,7 @@ type compactorHTTPClient struct { // NewHTTPClient creates a client which talks to compactor over HTTP. // It uses provided TLS config which creating HTTP client. -func NewHTTPClient(addr string, cfg HTTPConfig) (*compactorHTTPClient, error) { +func NewHTTPClient(addr string, cfg HTTPConfig) (deletion.CompactorClient, error) { u, err := url.Parse(addr) if err != nil { level.Error(log.Logger).Log("msg", "error parsing url", "err", err) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go index 0e51430730d2..f8d8d349d750 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go @@ -2,12 +2,13 @@ package deletion import ( "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/util/filter" util_log "github.com/grafana/loki/pkg/util/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" ) type DeleteRequest struct { diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go index ca479eb41cc1..f9ed88527200 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/grafana/loki/pkg/util/log" ) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go index 3851bbea795a..8d9483a70048 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go @@ -21,7 +21,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode" ) -func server(t *testing.T, ctx context.Context, h *GRPCRequestHandler) (compactor_client_grpc.CompactorClient, func()) { +func server(t *testing.T, h *GRPCRequestHandler) (compactor_client_grpc.CompactorClient, func()) { buffer := 101024 * 1024 lis := bufconn.Listen(buffer) @@ -36,7 +36,7 @@ func server(t *testing.T, ctx context.Context, h *GRPCRequestHandler) (compactor require.NoError(t, baseServer.Serve(lis)) }() - conn, err := grpc.DialContext(ctx, "", + conn, err := grpc.DialContext(context.Background(), "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return lis.Dial() }), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -73,7 +73,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { store := &mockDeleteRequestsStore{} store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}} h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) @@ -95,7 +95,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)}, } h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) @@ -123,7 +123,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, } h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) @@ -144,7 +144,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { store := &mockDeleteRequestsStore{} store.getAllErr = errors.New("something bad") h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) @@ -161,7 +161,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { t.Run("validation", func(t *testing.T) { t.Run("no org id", func(t *testing.T) { h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() _, err := grpcClient.GetDeleteRequests(context.Background(), &compactor_client_grpc.GetDeleteRequestsRequest{}) @@ -177,7 +177,7 @@ func TestGRPCGetCacheGenNumbers(t *testing.T) { store := &mockDeleteRequestsStore{} store.genNumber = "123" h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) @@ -194,7 +194,7 @@ func TestGRPCGetCacheGenNumbers(t *testing.T) { store := &mockDeleteRequestsStore{} store.getErr = errors.New("something bad") h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) @@ -211,7 +211,7 @@ func TestGRPCGetCacheGenNumbers(t *testing.T) { t.Run("validation", func(t *testing.T) { t.Run("no org id", func(t *testing.T) { h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) - grpcClient, closer := server(t, context.Background(), h) + grpcClient, closer := server(t, h) defer closer() _, err := grpcClient.GetCacheGenNumbers(context.Background(), &compactor_client_grpc.GetCacheGenNumbersRequest{}) diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go index d41b54d76270..63e1904e0204 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go @@ -39,6 +39,7 @@ func NewGenNumberLoader(g CacheGenClient, registerer prometheus.Registerer) *Gen numberGetter: g, numbers: make(map[string]string), metrics: newGenLoaderMetrics(registerer), + quit: make(chan struct{}), } go l.loop() diff --git a/pkg/util/marshal/marshal.go b/pkg/util/marshal/marshal.go index 59a504a85ccd..a14429e8e944 100644 --- a/pkg/util/marshal/marshal.go +++ b/pkg/util/marshal/marshal.go @@ -23,7 +23,7 @@ func WriteQueryResponseJSON(v logqlmodel.Result, w io.Writer) error { defer jsoniter.ConfigFastest.ReturnStream(s) err := EncodeResult(v, s) if err != nil { - return fmt.Errorf("could not write JSON repsonse: %w", err) + return fmt.Errorf("could not write JSON response: %w", err) } s.WriteRaw("\n") return s.Flush() From b5563cee08aad61d95b19595393fcca6f3dd67dc Mon Sep 17 00:00:00 2001 From: Anders Bennedsgaard Date: Wed, 30 Nov 2022 12:36:29 +0100 Subject: [PATCH 4/5] [Helm] Remove unused value and set default values explicitly (#7576) **What this PR does / why we need it**: `monitoring.alerts` are not used in the Loki Helm chart. This removes it. Furthermore, some values are referenced in the templates, but not used in the values file. This adds the default values. **Which issue(s) this PR fixes**: Fixes #7575 **Special notes for your reviewer**: https://github.com/AndersBennedsgaard/loki/blob/main/production/helm/loki/templates/single-binary/pdb.yaml#L15 should probably also be updated since `podDisruptionBudget` is not referenced in the values file. However, I am unsure what default values should be used. https://github.com/AndersBennedsgaard/loki/blob/main/production/helm/loki/templates/networkpolicy.yaml#L124 should probably also be updated since the `loki.rulerSelectorLabels` is not a valid helper function. However, I am unsure which selector labels to use. Especially since it seems like `ruler` is not used. Perhaps any reference to `ruler` should be removed? **Checklist** - [x] Reviewed the `CONTRIBUTING.md` guide - [x] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --- docs/sources/installation/helm/reference.md | 94 +++++++++++-------- .../provisioner/job-provisioner.yaml | 8 +- .../loki/templates/tokengen/job-tokengen.yaml | 4 +- production/helm/loki/values.yaml | 26 +++-- 4 files changed, 74 insertions(+), 58 deletions(-) diff --git a/docs/sources/installation/helm/reference.md b/docs/sources/installation/helm/reference.md index 4f0846d245ce..47c5869b92b0 100644 --- a/docs/sources/installation/helm/reference.md +++ b/docs/sources/installation/helm/reference.md @@ -81,12 +81,21 @@ null
 false
 
+ + + + enterprise.externalConfigName + string + Name of the external config secret to use +
+""
+
enterprise.externalLicenseName string - Name of external licesne secret to use + Name of external license secret to use
 null
 
@@ -157,6 +166,7 @@ null "annotations": {}, "enabled": true, "env": [], + "extraVolumeMounts": [], "image": { "pullPolicy": "IfNotPresent", "registry": "docker.io", @@ -202,6 +212,15 @@ true
 []
 
+ + + + enterprise.provisioner.extraVolumeMounts + list + Volume mounts to add to the provisioner pods +
+[]
+
@@ -318,6 +337,7 @@ null "extraVolumeMounts": [], "extraVolumes": [], "labels": {}, + "priorityClassName": "", "securityContext": { "fsGroup": 10001, "runAsGroup": 10001, @@ -400,6 +420,15 @@ true
 {}
 
+ + + + enterprise.tokengen.priorityClassName + string + The name of the PriorityClass for tokengen Pods +
+""
+
@@ -712,6 +741,15 @@ false } ]
+ + + + gateway.ingress.ingressClassName + string + Ingress Class Name. MAY be required for Kubernetes versions >= 1.18 +
+""
+
@@ -1030,6 +1068,15 @@ false
 "loki.example.com"
 
+ + + + ingress.ingressClassName + string + +
+""
+
@@ -1192,6 +1239,15 @@ false
 "/loki/api/v1/push"
 
+ + + + ingress.tls + list + +
+[]
+
@@ -1644,42 +1700,6 @@ false "rootUser": "enterprise-logs" } - - - - monitoring.alerts.annotations - object - Additional annotations for the alerts PrometheusRule resource -
-{}
-
- - - - monitoring.alerts.enabled - bool - If enabled, create PrometheusRule resource with Loki alerting rules -
-true
-
- - - - monitoring.alerts.labels - object - Additional labels for the alerts PrometheusRule resource -
-{}
-
- - - - monitoring.alerts.namespace - string - Alternative namespace to create alerting rules PrometheusRule resource in -
-null
-
diff --git a/production/helm/loki/templates/provisioner/job-provisioner.yaml b/production/helm/loki/templates/provisioner/job-provisioner.yaml index b05112ea7b7b..06dcd2fad964 100644 --- a/production/helm/loki/templates/provisioner/job-provisioner.yaml +++ b/production/helm/loki/templates/provisioner/job-provisioner.yaml @@ -71,8 +71,8 @@ spec: -token=canary {{- end }} volumeMounts: - {{- if .Values.enterprise.provisioner.extraVolumeMounts }} - {{ toYaml .Values.enterprise.provisioner.extraVolumeMounts | nindent 12 }} + {{- with .Values.enterprise.provisioner.extraVolumeMounts }} + {{ toYaml . | nindent 12 }} {{- end }} - name: bootstrap mountPath: /bootstrap @@ -102,8 +102,8 @@ spec: --from-literal=password="$(cat /bootstrap/token-canary)" {{- end }} volumeMounts: - {{- if .Values.enterprise.provisioner.extraVolumeMounts }} - {{ toYaml .Values.enterprise.provisioner.extraVolumeMounts | nindent 12 }} + {{- with .Values.enterprise.provisioner.extraVolumeMounts }} + {{ toYaml . | nindent 12 }} {{- end }} - name: bootstrap mountPath: /bootstrap diff --git a/production/helm/loki/templates/tokengen/job-tokengen.yaml b/production/helm/loki/templates/tokengen/job-tokengen.yaml index 489e5500030a..5e8561f3c939 100644 --- a/production/helm/loki/templates/tokengen/job-tokengen.yaml +++ b/production/helm/loki/templates/tokengen/job-tokengen.yaml @@ -31,8 +31,8 @@ spec: {{- toYaml . | nindent 8 }} {{- end }} spec: - {{- if .Values.enterprise.tokengen.priorityClassName }} - priorityClassName: {{ .Values.enterprise.tokengen.priorityClassName }} + {{- with .Values.enterprise.tokengen.priorityClassName }} + priorityClassName: {{ . }} {{- end }} securityContext: {{- toYaml .Values.enterprise.tokengen.securityContext | nindent 8 }} diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index fa5cfc4888b2..cd0455fcfc1f 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -290,9 +290,12 @@ enterprise: # -- Set to true when providing an external license useExternalLicense: false - # -- Name of external licesne secret to use + # -- Name of external license secret to use externalLicenseName: null + # -- Name of the external config secret to use + externalConfigName: "" + # -- If enabled, the correct admin_client storage will be configured. If disabled while running enterprise, # make sure auth is set to `type: trust`, or that `auth_enabled` is set to `false`. adminApi: @@ -359,6 +362,8 @@ enterprise: fsGroup: 10001 # -- Environment variables from secrets or configmaps to add to the tokengen pods extraEnvFrom: [] + # -- The name of the PriorityClass for tokengen Pods + priorityClassName: "" # -- Configuration for `provisioner` target provisioner: @@ -393,6 +398,8 @@ enterprise: tag: null # -- Docker image pull policy pullPolicy: IfNotPresent + # -- Volume mounts to add to the provisioner pods + extraVolumeMounts: [] nginxConfig: file: | @@ -609,17 +616,6 @@ monitoring: # - record: node_namespace_pod_container:container_cpu_usage_seconds_total:sum_rate # expr: sum(rate(container_cpu_usage_seconds_total[1m])) by (node, namespace, pod, container) - # Alerting rules for monitoring Loki - alerts: - # -- If enabled, create PrometheusRule resource with Loki alerting rules - enabled: true - # -- Alternative namespace to create alerting rules PrometheusRule resource in - namespace: null - # -- Additional annotations for the alerts PrometheusRule resource - annotations: {} - # -- Additional labels for the alerts PrometheusRule resource - labels: {} - # ServiceMonitor configuration serviceMonitor: # -- If enabled, ServiceMonitor resources for Prometheus Operator are created @@ -939,7 +935,7 @@ singleBinary: # You'll need to supply authn configuration for your ingress controller. ingress: enabled: false - # ingressClassName: nginx + ingressClassName: "" annotations: {} # nginx.ingress.kubernetes.io/auth-type: basic # nginx.ingress.kubernetes.io/auth-secret: loki-distributed-basic-auth @@ -971,7 +967,7 @@ ingress: hosts: - loki.example.com -# tls: + tls: [] # - hosts: # - loki.example.com # secretName: loki-distributed-tls @@ -1073,7 +1069,7 @@ gateway: # -- Specifies whether an ingress for the gateway should be created enabled: false # -- Ingress Class Name. MAY be required for Kubernetes versions >= 1.18 - # ingressClassName: nginx + ingressClassName: "" # -- Annotations for the gateway ingress annotations: {} # -- Hosts configuration for the gateway ingress From 54e6c19fc69be7f1b7d755361c3cf2a28aaa2b55 Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Wed, 30 Nov 2022 14:23:04 +0100 Subject: [PATCH 5/5] operator: Apply delete client changes for compat with release-2.7.x (#7815) --- operator/CHANGELOG.md | 1 + operator/internal/manifests/internal/config/build_test.go | 3 +-- operator/internal/manifests/internal/config/loki-config.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/operator/CHANGELOG.md b/operator/CHANGELOG.md index 9fd8987d7c92..189c7e65e82d 100644 --- a/operator/CHANGELOG.md +++ b/operator/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +- [7815](https://github.com/grafana/loki/pull/7815) **periklis**: Apply delete client changes for compat with release-2.7.x - [7809](https://github.com/grafana/loki/pull/7809) **xperimental**: Fix histogram-based alerting rules - [7808](https://github.com/grafana/loki/pull/7808) **xperimental**: Replace fifocache usage by embedded_cache - [7753](https://github.com/grafana/loki/pull/7753) **periklis**: Check for mandatory CA configmap name in ObjectStorageTLS spec diff --git a/operator/internal/manifests/internal/config/build_test.go b/operator/internal/manifests/internal/config/build_test.go index ec8f78efaeda..283a61be2fcb 100644 --- a/operator/internal/manifests/internal/config/build_test.go +++ b/operator/internal/manifests/internal/config/build_test.go @@ -2440,7 +2440,7 @@ querier: max_concurrent: 2 query_ingesters_within: 3h tail_max_duration: 1h -compactor_client: +delete_client: tls_enabled: true tls_cert_path: /var/run/tls/http/tls.crt tls_key_path: /var/run/tls/http/tls.key @@ -2631,7 +2631,6 @@ overrides: } cfg, rCfg, err := Build(opts) require.NoError(t, err) - t.Log(string(cfg)) require.YAMLEq(t, expCfg, string(cfg)) require.YAMLEq(t, expRCfg, string(rCfg)) } diff --git a/operator/internal/manifests/internal/config/loki-config.yaml b/operator/internal/manifests/internal/config/loki-config.yaml index d27054efcd00..0bfd2fc45354 100644 --- a/operator/internal/manifests/internal/config/loki-config.yaml +++ b/operator/internal/manifests/internal/config/loki-config.yaml @@ -181,7 +181,7 @@ querier: tail_max_duration: 1h max_concurrent: {{ .MaxConcurrent.AvailableQuerierCPUCores }} {{- if .Gates.HTTPEncryption }} -compactor_client: +delete_client: tls_enabled: true tls_cert_path: {{ .TLS.Paths.HTTP.Certificate }} tls_key_path: {{ .TLS.Paths.HTTP.Key }}