From 1410808ee9f20917476fabaa78aa8849ba7c7d20 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 30 Nov 2022 16:04:40 +0530 Subject: [PATCH] use grpc for communicating with compactor for query time filtering of data requested for deletion (#7804) **What this PR does / why we need it**: Add grpc support to compactor for getting delete requests and gen number for query time filtering. Since these requests are internal to Loki, it would be good to use grpc instead of HTTP same as all the internal requests we do in Loki. I have added a new config for accepting the grpc address of the compactor. I tried having just the existing config and detecting if it is a grpc server, but it was hard to do it reliably, considering the different deployment modes we support. I think it is safe to keep it the same and eventually deprecate the existing config. **Checklist** - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated --- CHANGELOG.md | 1 + docs/sources/configuration/_index.md | 4 + pkg/loki/common/common.go | 4 + pkg/loki/loki.go | 53 +- pkg/loki/modules.go | 71 +- .../queryrangebase/results_cache.go | 1 + .../indexshipper/compactor/client/grpc.go | 100 ++ .../compactor/client/grpc/grpc.pb.go | 1549 +++++++++++++++++ .../compactor/client/grpc/grpc.proto | 29 + .../indexshipper/compactor/client/http.go | 148 ++ .../indexshipper/compactor/compactor.go | 29 +- .../compactor/compactor_client.go | 42 - .../deletion/delete_requests_client.go | 90 +- .../deletion/delete_requests_client_test.go | 59 +- .../deletion/delete_requests_manager_test.go | 6 + .../deletion/grpc_request_handler.go | 94 + .../deletion/grpc_request_handler_test.go | 223 +++ .../compactor/deletion/metrics.go | 12 +- .../generationnumber/gennumber_client.go | 81 - .../generationnumber/gennumber_client_test.go | 40 - .../generationnumber/gennumber_loader.go | 9 + .../generationnumber/gennumber_loader_test.go | 2 + 22 files changed, 2321 insertions(+), 326 deletions(-) create mode 100644 pkg/storage/stores/indexshipper/compactor/client/grpc.go create mode 100644 pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go create mode 100644 pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto create mode 100644 pkg/storage/stores/indexshipper/compactor/client/http.go delete mode 100644 pkg/storage/stores/indexshipper/compactor/compactor_client.go create mode 100644 pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go create mode 100644 pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go delete mode 100644 pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go delete mode 100644 pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 45b2e7b73a5ee..999f88e60f094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [7731](https://github.com/grafana/loki/pull/7731) **bitkill**: Add healthchecks to the docker-compose example. * [7759](https://github.com/grafana/loki/pull/7759) **kavirajk**: Improve error message for loading config with ENV variables. * [7785](https://github.com/grafana/loki/pull/7785) **dannykopping**: Add query blocker for queries and rules. +* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion. ##### Fixes diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index da5d39a76eb59..43e052a13416d 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2911,6 +2911,10 @@ This way, one doesn't have to replicate configuration in multiple places. # CLI flag: -common.compactor-address [compactor_address: | default = ""] +# Address and port number where the compactor grpc requests are being served. +# CLI flag: -common.compactor-grpc-address +[compactor_grpc_address: | default = ""] + ## analytics The `analytics` block configures the reporting of Loki analytics to grafana.com. diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 721ffed9d12e3..85173444f3ad9 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -43,6 +43,9 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` + + // CompactorAddress is the grpc address of the compactor in the form host:port + CompactorGRPCAddress string `yaml:"compactor_grpc_address"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -57,6 +60,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") + f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port") } type Storage struct { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0ee5ddc8ba0dd..4ac0eedf37f97 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -46,6 +46,7 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" @@ -70,30 +71,31 @@ type Config struct { UseBufferedLogger bool `yaml:"use_buffered_logger"` UseSyncLogger bool `yaml:"use_sync_logger"` - Common common.Config `yaml:"common,omitempty"` - Server server.Config `yaml:"server,omitempty"` - InternalServer internalserver.Config `yaml:"internal_server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"` - IngesterClient client.Config `yaml:"ingester_client,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage_config,omitempty"` - IndexGateway indexgateway.Config `yaml:"index_gateway"` - ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` - SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` - LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` - TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` - Worker worker.Config `yaml:"frontend_worker,omitempty"` - Frontend lokifrontend.Config `yaml:"frontend,omitempty"` - Ruler ruler.Config `yaml:"ruler,omitempty"` - QueryRange queryrange.Config `yaml:"query_range,omitempty"` - RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist"` - Tracing tracing.Config `yaml:"tracing"` - CompactorConfig compactor.Config `yaml:"compactor,omitempty"` - QueryScheduler scheduler.Config `yaml:"query_scheduler"` - UsageReport usagestats.Config `yaml:"analytics"` + Common common.Config `yaml:"common,omitempty"` + Server server.Config `yaml:"server,omitempty"` + InternalServer internalserver.Config `yaml:"internal_server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"` + CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage_config,omitempty"` + IndexGateway indexgateway.Config `yaml:"index_gateway"` + ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` + SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` + LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` + Worker worker.Config `yaml:"frontend_worker,omitempty"` + Frontend lokifrontend.Config `yaml:"frontend,omitempty"` + Ruler ruler.Config `yaml:"ruler,omitempty"` + QueryRange queryrange.Config `yaml:"query_range,omitempty"` + RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist"` + Tracing tracing.Config `yaml:"tracing"` + CompactorConfig compactor.Config `yaml:"compactor,omitempty"` + QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageReport usagestats.Config `yaml:"analytics"` } // RegisterFlags registers flag. @@ -116,7 +118,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Common.RegisterFlags(f) c.Distributor.RegisterFlags(f) c.Querier.RegisterFlags(f) - c.CompactorClient.RegisterFlags(f) + c.CompactorHTTPClient.RegisterFlags(f) + c.CompactorGRPCClient.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) c.StorageConfig.RegisterFlags(f) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5a9eb2bec442c..467ec947ce48e 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -12,8 +12,6 @@ import ( "strings" "time" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" - "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -43,6 +41,7 @@ import ( "github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/ruler" base_ruler "github.com/grafana/loki/pkg/ruler/base" "github.com/grafana/loki/pkg/runtime" @@ -54,6 +53,8 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber" "github.com/grafana/loki/pkg/storage/stores/series/index" @@ -677,45 +678,53 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) { var client generationnumber.CacheGenClient if t.supportIndexDeleteRequest() { - compactorAddress, err := t.compactorAddress() - if err != nil { - return nil, err - } - - httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) + compactorAddress, isGRPCAddress, err := t.compactorAddress() if err != nil { return nil, err } - client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient) - if err != nil { - return nil, err + reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer) + if isGRPCAddress { + client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg) + if err != nil { + return nil, err + } + } else { + client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient) + if err != nil { + return nil, err + } } } t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer) - return services.NewIdleService(nil, nil), nil + return services.NewIdleService(nil, func(failureCase error) error { + t.cacheGenerationLoader.Stop() + return nil + }), nil } func (t *Loki) supportIndexDeleteRequest() bool { return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) } -func (t *Loki) compactorAddress() (string, error) { +// compactorAddress returns the configured address of the compactor. +// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false +func (t *Loki) compactorAddress() (string, bool, error) { if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) { // In single binary or read modes, this module depends on Server - proto := "http" - if len(t.Cfg.Server.HTTPTLSConfig.TLSCertPath) > 0 && len(t.Cfg.Server.HTTPTLSConfig.TLSKeyPath) > 0 { - proto = "https" - } - return fmt.Sprintf("%s://%s:%d", proto, t.Cfg.Server.HTTPListenAddress, t.Cfg.Server.HTTPListenPort), nil + return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil } - if t.Cfg.Common.CompactorAddress == "" { - return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") + if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" { + return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured") } - return t.Cfg.Common.CompactorAddress, nil + if t.Cfg.Common.CompactorGRPCAddress != "" { + return t.Cfg.Common.CompactorGRPCAddress, true, nil + } + + return t.Cfg.Common.CompactorAddress, false, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { @@ -1012,6 +1021,7 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)) t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)) t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)) + grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler) } return t.compactor, nil @@ -1128,17 +1138,26 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri return deletion.NewNoOpDeleteRequestsStore(), nil } - compactorAddress, err := t.compactorAddress() + compactorAddress, isGRPCAddress, err := t.compactorAddress() if err != nil { return nil, err } - httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) - if err != nil { - return nil, err + reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer) + var compactorClient deletion.CompactorClient + if isGRPCAddress { + compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg) + if err != nil { + return nil, err + } + } else { + compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient) + if err != nil { + return nil, err + } } - client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType) + client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType) if err != nil { return nil, err } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 35d1b12d21524..5ce9071775359 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -68,6 +68,7 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri type CacheGenNumberLoader interface { GetResultsCacheGenNumber(tenantIDs []string) string + Stop() } // ResultsCacheConfig is the config for the results cache. diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc.go b/pkg/storage/stores/indexshipper/compactor/client/grpc.go new file mode 100644 index 0000000000000..a538ee45d1260 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc.go @@ -0,0 +1,100 @@ +package client + +import ( + "context" + "flag" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/prometheus/common/model" + "github.com/weaveworks/common/user" + + "github.com/grafana/dskit/grpcclient" + deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" + "google.golang.org/grpc" +) + +type GRPCConfig struct { + GRPCClientConfig grpcclient.Config `yaml:",inline"` +} + +// RegisterFlags registers flags. +func (cfg *GRPCConfig) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.RegisterFlagsWithPrefix("", f) +} + +type compactorGRPCClient struct { + cfg GRPCConfig + + GRPCClientRequestDuration *prometheus.HistogramVec + conn *grpc.ClientConn + grpcClient deletion_grpc.CompactorClient +} + +// NewGRPCClient supports only methods which are used for internal communication of Loki like +// loading delete requests and cache gen numbers for query time filtering. +func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deletion.CompactorClient, error) { + client := &compactorGRPCClient{ + cfg: cfg, + GRPCClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_compactor", + Name: "grpc_request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using compactor GRPC client", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + } + + dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(client.GRPCClientRequestDuration)) + if err != nil { + return nil, err + } + + client.conn, err = grpc.Dial(addr, dialOpts...) + if err != nil { + return nil, err + } + + client.grpcClient = deletion_grpc.NewCompactorClient(client.conn) + return client, nil +} + +func (s *compactorGRPCClient) Stop() { + s.conn.Close() +} + +func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) { + ctx = user.InjectOrgID(ctx, userID) + grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{}) + if err != nil { + return nil, err + } + + deleteRequests := make([]deletion.DeleteRequest, len(grpcResp.DeleteRequests)) + for i, dr := range grpcResp.DeleteRequests { + deleteRequests[i] = deletion.DeleteRequest{ + RequestID: dr.RequestID, + StartTime: model.Time(dr.StartTime), + EndTime: model.Time(dr.EndTime), + Query: dr.Query, + Status: deletion.DeleteRequestStatus(dr.Status), + CreatedAt: model.Time(dr.CreatedAt), + } + } + + return deleteRequests, nil +} + +func (s *compactorGRPCClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + ctx = user.InjectOrgID(ctx, userID) + grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &deletion_grpc.GetCacheGenNumbersRequest{}) + if err != nil { + return "", err + } + + return grpcResp.ResultsCacheGen, nil +} + +func (s *compactorGRPCClient) Name() string { + return "grpc_client" +} diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go new file mode 100644 index 0000000000000..533a2ecfd6316 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go @@ -0,0 +1,1549 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto + +package grpc + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type GetDeleteRequestsRequest struct { +} + +func (m *GetDeleteRequestsRequest) Reset() { *m = GetDeleteRequestsRequest{} } +func (*GetDeleteRequestsRequest) ProtoMessage() {} +func (*GetDeleteRequestsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{0} +} +func (m *GetDeleteRequestsRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetDeleteRequestsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetDeleteRequestsRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetDeleteRequestsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetDeleteRequestsRequest.Merge(m, src) +} +func (m *GetDeleteRequestsRequest) XXX_Size() int { + return m.Size() +} +func (m *GetDeleteRequestsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetDeleteRequestsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetDeleteRequestsRequest proto.InternalMessageInfo + +type GetDeleteRequestsResponse struct { + DeleteRequests []*DeleteRequest `protobuf:"bytes,1,rep,name=deleteRequests,proto3" json:"deleteRequests,omitempty"` +} + +func (m *GetDeleteRequestsResponse) Reset() { *m = GetDeleteRequestsResponse{} } +func (*GetDeleteRequestsResponse) ProtoMessage() {} +func (*GetDeleteRequestsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{1} +} +func (m *GetDeleteRequestsResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetDeleteRequestsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetDeleteRequestsResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetDeleteRequestsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetDeleteRequestsResponse.Merge(m, src) +} +func (m *GetDeleteRequestsResponse) XXX_Size() int { + return m.Size() +} +func (m *GetDeleteRequestsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetDeleteRequestsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetDeleteRequestsResponse proto.InternalMessageInfo + +func (m *GetDeleteRequestsResponse) GetDeleteRequests() []*DeleteRequest { + if m != nil { + return m.DeleteRequests + } + return nil +} + +type DeleteRequest struct { + RequestID string `protobuf:"bytes,1,opt,name=requestID,proto3" json:"requestID,omitempty"` + StartTime int64 `protobuf:"varint,2,opt,name=startTime,proto3" json:"startTime,omitempty"` + EndTime int64 `protobuf:"varint,3,opt,name=endTime,proto3" json:"endTime,omitempty"` + Query string `protobuf:"bytes,4,opt,name=query,proto3" json:"query,omitempty"` + Status string `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"` + CreatedAt int64 `protobuf:"varint,6,opt,name=createdAt,proto3" json:"createdAt,omitempty"` +} + +func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } +func (*DeleteRequest) ProtoMessage() {} +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{2} +} +func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DeleteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteRequest.Merge(m, src) +} +func (m *DeleteRequest) XXX_Size() int { + return m.Size() +} +func (m *DeleteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo + +func (m *DeleteRequest) GetRequestID() string { + if m != nil { + return m.RequestID + } + return "" +} + +func (m *DeleteRequest) GetStartTime() int64 { + if m != nil { + return m.StartTime + } + return 0 +} + +func (m *DeleteRequest) GetEndTime() int64 { + if m != nil { + return m.EndTime + } + return 0 +} + +func (m *DeleteRequest) GetQuery() string { + if m != nil { + return m.Query + } + return "" +} + +func (m *DeleteRequest) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *DeleteRequest) GetCreatedAt() int64 { + if m != nil { + return m.CreatedAt + } + return 0 +} + +type GetCacheGenNumbersRequest struct { +} + +func (m *GetCacheGenNumbersRequest) Reset() { *m = GetCacheGenNumbersRequest{} } +func (*GetCacheGenNumbersRequest) ProtoMessage() {} +func (*GetCacheGenNumbersRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{3} +} +func (m *GetCacheGenNumbersRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetCacheGenNumbersRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetCacheGenNumbersRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetCacheGenNumbersRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetCacheGenNumbersRequest.Merge(m, src) +} +func (m *GetCacheGenNumbersRequest) XXX_Size() int { + return m.Size() +} +func (m *GetCacheGenNumbersRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetCacheGenNumbersRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetCacheGenNumbersRequest proto.InternalMessageInfo + +type GetCacheGenNumbersResponse struct { + ResultsCacheGen string `protobuf:"bytes,1,opt,name=resultsCacheGen,proto3" json:"resultsCacheGen,omitempty"` +} + +func (m *GetCacheGenNumbersResponse) Reset() { *m = GetCacheGenNumbersResponse{} } +func (*GetCacheGenNumbersResponse) ProtoMessage() {} +func (*GetCacheGenNumbersResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{4} +} +func (m *GetCacheGenNumbersResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetCacheGenNumbersResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetCacheGenNumbersResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetCacheGenNumbersResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetCacheGenNumbersResponse.Merge(m, src) +} +func (m *GetCacheGenNumbersResponse) XXX_Size() int { + return m.Size() +} +func (m *GetCacheGenNumbersResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetCacheGenNumbersResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetCacheGenNumbersResponse proto.InternalMessageInfo + +func (m *GetCacheGenNumbersResponse) GetResultsCacheGen() string { + if m != nil { + return m.ResultsCacheGen + } + return "" +} + +func init() { + proto.RegisterType((*GetDeleteRequestsRequest)(nil), "grpc.GetDeleteRequestsRequest") + proto.RegisterType((*GetDeleteRequestsResponse)(nil), "grpc.GetDeleteRequestsResponse") + proto.RegisterType((*DeleteRequest)(nil), "grpc.DeleteRequest") + proto.RegisterType((*GetCacheGenNumbersRequest)(nil), "grpc.GetCacheGenNumbersRequest") + proto.RegisterType((*GetCacheGenNumbersResponse)(nil), "grpc.GetCacheGenNumbersResponse") +} + +func init() { + proto.RegisterFile("pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto", fileDescriptor_af4e385854eeb93d) +} + +var fileDescriptor_af4e385854eeb93d = []byte{ + // 391 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xbb, 0x4e, 0xe3, 0x40, + 0x14, 0xf5, 0x6c, 0x1e, 0xab, 0xcc, 0x6a, 0x77, 0xc5, 0x80, 0xd0, 0x60, 0xd0, 0x60, 0xb9, 0x72, + 0x15, 0x4b, 0x81, 0x8e, 0x06, 0x48, 0x44, 0x44, 0x43, 0x61, 0x45, 0x82, 0xd6, 0xb1, 0xaf, 0x12, + 0x8b, 0xc4, 0x76, 0x66, 0xc6, 0x12, 0x74, 0x7c, 0x02, 0x9f, 0x41, 0x4b, 0xcb, 0x17, 0x50, 0xa6, + 0x4c, 0x49, 0x9c, 0x86, 0x32, 0x9f, 0x80, 0x32, 0x76, 0x12, 0xe5, 0xd5, 0x78, 0xe6, 0x9e, 0x73, + 0xe6, 0x3e, 0xce, 0x35, 0xbe, 0x8c, 0x1f, 0x3b, 0xb6, 0x90, 0x11, 0x77, 0x3b, 0xa0, 0x4e, 0x10, + 0x76, 0x10, 0xfa, 0xf0, 0x24, 0xba, 0x41, 0x1c, 0x03, 0xb7, 0xbd, 0xa8, 0x1f, 0xbb, 0x9e, 0x8c, + 0xb8, 0xed, 0xf5, 0x02, 0x08, 0xa5, 0xdd, 0xe1, 0xb1, 0xa7, 0x3e, 0xd5, 0x98, 0x47, 0x32, 0x22, + 0xc5, 0xd9, 0xdd, 0xd4, 0x31, 0x6d, 0x82, 0x6c, 0x40, 0x0f, 0x24, 0x38, 0x30, 0x48, 0x40, 0x48, + 0x91, 0x9f, 0xe6, 0x03, 0x3e, 0xda, 0xc2, 0x89, 0x38, 0x0a, 0x05, 0x90, 0x0b, 0xfc, 0xcf, 0x5f, + 0x61, 0x28, 0x32, 0x0a, 0xd6, 0x9f, 0xda, 0x7e, 0x55, 0xd5, 0x58, 0x79, 0xe5, 0xac, 0x49, 0xcd, + 0x77, 0x84, 0xff, 0xae, 0x28, 0xc8, 0x09, 0xae, 0xf0, 0xec, 0x7a, 0xdb, 0xa0, 0xc8, 0x40, 0x56, + 0xc5, 0x59, 0x02, 0x33, 0x56, 0x48, 0x97, 0xcb, 0x56, 0xd0, 0x07, 0xfa, 0xcb, 0x40, 0x56, 0xc1, + 0x59, 0x02, 0x84, 0xe2, 0xdf, 0x10, 0xfa, 0x8a, 0x2b, 0x28, 0x6e, 0x1e, 0x92, 0x03, 0x5c, 0x1a, + 0x24, 0xc0, 0x9f, 0x69, 0x51, 0x65, 0xcc, 0x02, 0x72, 0x88, 0xcb, 0x42, 0xba, 0x32, 0x11, 0xb4, + 0xa4, 0xe0, 0x3c, 0x9a, 0x55, 0xf1, 0x38, 0xb8, 0x12, 0xfc, 0x2b, 0x49, 0xcb, 0x59, 0x95, 0x05, + 0x60, 0x1e, 0x2b, 0x37, 0xea, 0xae, 0xd7, 0x85, 0x26, 0x84, 0x77, 0x49, 0xbf, 0x0d, 0x7c, 0x61, + 0xd5, 0x0d, 0xd6, 0xb7, 0x91, 0xb9, 0x57, 0x16, 0xfe, 0xcf, 0x41, 0x24, 0x3d, 0x29, 0xe6, 0x8a, + 0x7c, 0xc4, 0x75, 0xb8, 0xf6, 0x81, 0x70, 0xa5, 0x3e, 0xdf, 0x1c, 0x69, 0xe1, 0xbd, 0x8d, 0x05, + 0x10, 0x96, 0x19, 0xbc, 0x6b, 0x6b, 0xfa, 0xe9, 0x4e, 0x3e, 0xef, 0xe6, 0x1e, 0x93, 0xcd, 0x5e, + 0xc9, 0xf2, 0xd9, 0xf6, 0x11, 0x75, 0x63, 0xb7, 0x20, 0x4b, 0x7c, 0x7d, 0x3e, 0x1c, 0x33, 0x6d, + 0x34, 0x66, 0xda, 0x74, 0xcc, 0xd0, 0x4b, 0xca, 0xd0, 0x5b, 0xca, 0xd0, 0x67, 0xca, 0xd0, 0x30, + 0x65, 0xe8, 0x2b, 0x65, 0xe8, 0x3b, 0x65, 0xda, 0x34, 0x65, 0xe8, 0x75, 0xc2, 0xb4, 0xe1, 0x84, + 0x69, 0xa3, 0x09, 0xd3, 0xda, 0x65, 0xf5, 0x3b, 0x9e, 0xfd, 0x04, 0x00, 0x00, 0xff, 0xff, 0x37, + 0x5f, 0x43, 0x54, 0xd2, 0x02, 0x00, 0x00, +} + +func (this *GetDeleteRequestsRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetDeleteRequestsRequest) + if !ok { + that2, ok := that.(GetDeleteRequestsRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *GetDeleteRequestsResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetDeleteRequestsResponse) + if !ok { + that2, ok := that.(GetDeleteRequestsResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.DeleteRequests) != len(that1.DeleteRequests) { + return false + } + for i := range this.DeleteRequests { + if !this.DeleteRequests[i].Equal(that1.DeleteRequests[i]) { + return false + } + } + return true +} +func (this *DeleteRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DeleteRequest) + if !ok { + that2, ok := that.(DeleteRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.RequestID != that1.RequestID { + return false + } + if this.StartTime != that1.StartTime { + return false + } + if this.EndTime != that1.EndTime { + return false + } + if this.Query != that1.Query { + return false + } + if this.Status != that1.Status { + return false + } + if this.CreatedAt != that1.CreatedAt { + return false + } + return true +} +func (this *GetCacheGenNumbersRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetCacheGenNumbersRequest) + if !ok { + that2, ok := that.(GetCacheGenNumbersRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *GetCacheGenNumbersResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetCacheGenNumbersResponse) + if !ok { + that2, ok := that.(GetCacheGenNumbersResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.ResultsCacheGen != that1.ResultsCacheGen { + return false + } + return true +} +func (this *GetDeleteRequestsRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&grpc.GetDeleteRequestsRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetDeleteRequestsResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpc.GetDeleteRequestsResponse{") + if this.DeleteRequests != nil { + s = append(s, "DeleteRequests: "+fmt.Sprintf("%#v", this.DeleteRequests)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *DeleteRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 10) + s = append(s, "&grpc.DeleteRequest{") + s = append(s, "RequestID: "+fmt.Sprintf("%#v", this.RequestID)+",\n") + s = append(s, "StartTime: "+fmt.Sprintf("%#v", this.StartTime)+",\n") + s = append(s, "EndTime: "+fmt.Sprintf("%#v", this.EndTime)+",\n") + s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n") + s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n") + s = append(s, "CreatedAt: "+fmt.Sprintf("%#v", this.CreatedAt)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetCacheGenNumbersRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&grpc.GetCacheGenNumbersRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetCacheGenNumbersResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpc.GetCacheGenNumbersResponse{") + s = append(s, "ResultsCacheGen: "+fmt.Sprintf("%#v", this.ResultsCacheGen)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringGrpc(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// CompactorClient is the client API for Compactor service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type CompactorClient interface { + GetDeleteRequests(ctx context.Context, in *GetDeleteRequestsRequest, opts ...grpc.CallOption) (*GetDeleteRequestsResponse, error) + GetCacheGenNumbers(ctx context.Context, in *GetCacheGenNumbersRequest, opts ...grpc.CallOption) (*GetCacheGenNumbersResponse, error) +} + +type compactorClient struct { + cc *grpc.ClientConn +} + +func NewCompactorClient(cc *grpc.ClientConn) CompactorClient { + return &compactorClient{cc} +} + +func (c *compactorClient) GetDeleteRequests(ctx context.Context, in *GetDeleteRequestsRequest, opts ...grpc.CallOption) (*GetDeleteRequestsResponse, error) { + out := new(GetDeleteRequestsResponse) + err := c.cc.Invoke(ctx, "/grpc.Compactor/GetDeleteRequests", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *compactorClient) GetCacheGenNumbers(ctx context.Context, in *GetCacheGenNumbersRequest, opts ...grpc.CallOption) (*GetCacheGenNumbersResponse, error) { + out := new(GetCacheGenNumbersResponse) + err := c.cc.Invoke(ctx, "/grpc.Compactor/GetCacheGenNumbers", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CompactorServer is the server API for Compactor service. +type CompactorServer interface { + GetDeleteRequests(context.Context, *GetDeleteRequestsRequest) (*GetDeleteRequestsResponse, error) + GetCacheGenNumbers(context.Context, *GetCacheGenNumbersRequest) (*GetCacheGenNumbersResponse, error) +} + +// UnimplementedCompactorServer can be embedded to have forward compatible implementations. +type UnimplementedCompactorServer struct { +} + +func (*UnimplementedCompactorServer) GetDeleteRequests(ctx context.Context, req *GetDeleteRequestsRequest) (*GetDeleteRequestsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetDeleteRequests not implemented") +} +func (*UnimplementedCompactorServer) GetCacheGenNumbers(ctx context.Context, req *GetCacheGenNumbersRequest) (*GetCacheGenNumbersResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetCacheGenNumbers not implemented") +} + +func RegisterCompactorServer(s *grpc.Server, srv CompactorServer) { + s.RegisterService(&_Compactor_serviceDesc, srv) +} + +func _Compactor_GetDeleteRequests_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetDeleteRequestsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CompactorServer).GetDeleteRequests(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.Compactor/GetDeleteRequests", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CompactorServer).GetDeleteRequests(ctx, req.(*GetDeleteRequestsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Compactor_GetCacheGenNumbers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetCacheGenNumbersRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CompactorServer).GetCacheGenNumbers(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.Compactor/GetCacheGenNumbers", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CompactorServer).GetCacheGenNumbers(ctx, req.(*GetCacheGenNumbersRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Compactor_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.Compactor", + HandlerType: (*CompactorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetDeleteRequests", + Handler: _Compactor_GetDeleteRequests_Handler, + }, + { + MethodName: "GetCacheGenNumbers", + Handler: _Compactor_GetCacheGenNumbers_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto", +} + +func (m *GetDeleteRequestsRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetDeleteRequestsRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetDeleteRequestsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetDeleteRequestsResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetDeleteRequestsResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetDeleteRequestsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.DeleteRequests) > 0 { + for iNdEx := len(m.DeleteRequests) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.DeleteRequests[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintGrpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *DeleteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DeleteRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DeleteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.CreatedAt != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.CreatedAt)) + i-- + dAtA[i] = 0x30 + } + if len(m.Status) > 0 { + i -= len(m.Status) + copy(dAtA[i:], m.Status) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.Status))) + i-- + dAtA[i] = 0x2a + } + if len(m.Query) > 0 { + i -= len(m.Query) + copy(dAtA[i:], m.Query) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.Query))) + i-- + dAtA[i] = 0x22 + } + if m.EndTime != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.EndTime)) + i-- + dAtA[i] = 0x18 + } + if m.StartTime != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.StartTime)) + i-- + dAtA[i] = 0x10 + } + if len(m.RequestID) > 0 { + i -= len(m.RequestID) + copy(dAtA[i:], m.RequestID) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.RequestID))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *GetCacheGenNumbersRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetCacheGenNumbersRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetCacheGenNumbersRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetCacheGenNumbersResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetCacheGenNumbersResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetCacheGenNumbersResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ResultsCacheGen) > 0 { + i -= len(m.ResultsCacheGen) + copy(dAtA[i:], m.ResultsCacheGen) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.ResultsCacheGen))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintGrpc(dAtA []byte, offset int, v uint64) int { + offset -= sovGrpc(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *GetDeleteRequestsRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetDeleteRequestsResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.DeleteRequests) > 0 { + for _, e := range m.DeleteRequests { + l = e.Size() + n += 1 + l + sovGrpc(uint64(l)) + } + } + return n +} + +func (m *DeleteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.RequestID) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + if m.StartTime != 0 { + n += 1 + sovGrpc(uint64(m.StartTime)) + } + if m.EndTime != 0 { + n += 1 + sovGrpc(uint64(m.EndTime)) + } + l = len(m.Query) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + l = len(m.Status) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + if m.CreatedAt != 0 { + n += 1 + sovGrpc(uint64(m.CreatedAt)) + } + return n +} + +func (m *GetCacheGenNumbersRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetCacheGenNumbersResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ResultsCacheGen) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + return n +} + +func sovGrpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozGrpc(x uint64) (n int) { + return sovGrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *GetDeleteRequestsRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetDeleteRequestsRequest{`, + `}`, + }, "") + return s +} +func (this *GetDeleteRequestsResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForDeleteRequests := "[]*DeleteRequest{" + for _, f := range this.DeleteRequests { + repeatedStringForDeleteRequests += strings.Replace(f.String(), "DeleteRequest", "DeleteRequest", 1) + "," + } + repeatedStringForDeleteRequests += "}" + s := strings.Join([]string{`&GetDeleteRequestsResponse{`, + `DeleteRequests:` + repeatedStringForDeleteRequests + `,`, + `}`, + }, "") + return s +} +func (this *DeleteRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DeleteRequest{`, + `RequestID:` + fmt.Sprintf("%v", this.RequestID) + `,`, + `StartTime:` + fmt.Sprintf("%v", this.StartTime) + `,`, + `EndTime:` + fmt.Sprintf("%v", this.EndTime) + `,`, + `Query:` + fmt.Sprintf("%v", this.Query) + `,`, + `Status:` + fmt.Sprintf("%v", this.Status) + `,`, + `CreatedAt:` + fmt.Sprintf("%v", this.CreatedAt) + `,`, + `}`, + }, "") + return s +} +func (this *GetCacheGenNumbersRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetCacheGenNumbersRequest{`, + `}`, + }, "") + return s +} +func (this *GetCacheGenNumbersResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetCacheGenNumbersResponse{`, + `ResultsCacheGen:` + fmt.Sprintf("%v", this.ResultsCacheGen) + `,`, + `}`, + }, "") + return s +} +func valueToStringGrpc(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *GetDeleteRequestsRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetDeleteRequestsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetDeleteRequestsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetDeleteRequestsResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetDeleteRequestsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetDeleteRequestsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DeleteRequests", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DeleteRequests = append(m.DeleteRequests, &DeleteRequest{}) + if err := m.DeleteRequests[len(m.DeleteRequests)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DeleteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DeleteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DeleteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RequestID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType) + } + m.StartTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType) + } + m.EndTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Query = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CreatedAt", wireType) + } + m.CreatedAt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CreatedAt |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetCacheGenNumbersRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetCacheGenNumbersRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetCacheGenNumbersRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetCacheGenNumbersResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetCacheGenNumbersResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetCacheGenNumbersResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResultsCacheGen", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ResultsCacheGen = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipGrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthGrpc + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipGrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthGrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowGrpc = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto new file mode 100644 index 0000000000000..52c7c462364c5 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package grpc; + +service Compactor { + rpc GetDeleteRequests(GetDeleteRequestsRequest) returns (GetDeleteRequestsResponse); + rpc GetCacheGenNumbers(GetCacheGenNumbersRequest) returns (GetCacheGenNumbersResponse); +} + +message GetDeleteRequestsRequest {} + +message GetDeleteRequestsResponse { + repeated DeleteRequest deleteRequests = 1; +} + +message DeleteRequest { + string requestID = 1; + int64 startTime = 2; + int64 endTime = 3; + string query = 4; + string status = 5; + int64 createdAt = 6; +} + +message GetCacheGenNumbersRequest {} + +message GetCacheGenNumbersResponse { + string resultsCacheGen = 1; +} diff --git a/pkg/storage/stores/indexshipper/compactor/client/http.go b/pkg/storage/stores/indexshipper/compactor/client/http.go new file mode 100644 index 0000000000000..edfbeb6f69901 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/http.go @@ -0,0 +1,148 @@ +package client + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "github.com/go-kit/log/level" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/grafana/loki/pkg/util/log" + "io" + "net/http" + "net/url" + "time" + + "github.com/grafana/dskit/crypto/tls" +) + +const ( + orgHeaderKey = "X-Scope-OrgID" + getDeletePath = "/loki/api/v1/delete" + cacheGenNumPath = "/loki/api/v1/cache/generation_numbers" +) + +type HTTPConfig struct { + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *HTTPConfig) RegisterFlags(f *flag.FlagSet) { + prefix := "boltdb.shipper.compactor.client" + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, + "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +type compactorHTTPClient struct { + httpClient *http.Client + + deleteRequestsURL string + cacheGenURL string +} + +// NewHTTPClient creates a client which talks to compactor over HTTP. +// It uses provided TLS config which creating HTTP client. +func NewHTTPClient(addr string, cfg HTTPConfig) (*compactorHTTPClient, error) { + u, err := url.Parse(addr) + if err != nil { + level.Error(log.Logger).Log("msg", "error parsing url", "err", err) + return nil, err + } + u.Path = getDeletePath + deleteRequestsURL := u.String() + + u.Path = cacheGenNumPath + cacheGenURL := u.String() + + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 250 + transport.MaxIdleConnsPerHost = 250 + + if cfg.TLSEnabled { + tlsCfg, err := cfg.TLS.GetTLSConfig() + if err != nil { + return nil, err + } + + transport.TLSClientConfig = tlsCfg + } + + return &compactorHTTPClient{ + httpClient: &http.Client{Timeout: 5 * time.Second, Transport: transport}, + deleteRequestsURL: deleteRequestsURL, + cacheGenURL: cacheGenURL, + }, nil +} + +func (c *compactorHTTPClient) Name() string { + return "http_client" +} + +func (c *compactorHTTPClient) Stop() {} + +func (c *compactorHTTPClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.deleteRequestsURL, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode/100 != 2 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + var deleteRequests []deletion.DeleteRequest + if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return nil, err + } + + return deleteRequests, nil +} + +func (c *compactorHTTPClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.cacheGenURL, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + + var genNumber string + if err := json.NewDecoder(resp.Body).Decode(&genNumber); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return "", err + } + + return genNumber, err +} diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index a75437c0a2afb..ae79832e7efb0 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -145,19 +145,20 @@ func (cfg *Config) Validate() error { type Compactor struct { services.Service - cfg Config - indexStorageClient shipper_storage.Client - tableMarker retention.TableMarker - sweeper *retention.Sweeper - deleteRequestsStore deletion.DeleteRequestsStore - DeleteRequestsHandler *deletion.DeleteRequestHandler - deleteRequestsManager *deletion.DeleteRequestsManager - expirationChecker retention.ExpirationChecker - metrics *metrics - running bool - wg sync.WaitGroup - indexCompactors map[string]IndexCompactor - schemaConfig config.SchemaConfig + cfg Config + indexStorageClient shipper_storage.Client + tableMarker retention.TableMarker + sweeper *retention.Sweeper + deleteRequestsStore deletion.DeleteRequestsStore + DeleteRequestsHandler *deletion.DeleteRequestHandler + DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler + deleteRequestsManager *deletion.DeleteRequestsManager + expirationChecker retention.ExpirationChecker + metrics *metrics + running bool + wg sync.WaitGroup + indexCompactors map[string]IndexCompactor + schemaConfig config.SchemaConfig // Ring used for running a single compactor ringLifecycler *ring.BasicLifecycler @@ -285,6 +286,8 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Over r, ) + c.DeleteRequestsGRPCHandler = deletion.NewGRPCRequestHandler(c.deleteRequestsStore, limits) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager( c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, diff --git a/pkg/storage/stores/indexshipper/compactor/compactor_client.go b/pkg/storage/stores/indexshipper/compactor/compactor_client.go deleted file mode 100644 index 63468568dc415..0000000000000 --- a/pkg/storage/stores/indexshipper/compactor/compactor_client.go +++ /dev/null @@ -1,42 +0,0 @@ -package compactor - -import ( - "flag" - "net/http" - "time" - - "github.com/grafana/dskit/crypto/tls" -) - -// Config for compactor's generation-number client -type ClientConfig struct { - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { - prefix := "boltdb.shipper.compactor.client" - f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, - "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) -} - -// NewDeleteHTTPClient return a pointer to a http client instance based on the -// delete client tls settings. -func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.MaxIdleConns = 250 - transport.MaxIdleConnsPerHost = 250 - - if cfg.TLSEnabled { - tlsCfg, err := cfg.TLS.GetTLSConfig() - if err != nil { - return nil, err - } - - transport.TLSClientConfig = tlsCfg - } - - return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go index 3e8639e50c4c7..ca479eb41cc1b 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go @@ -2,24 +2,19 @@ package deletion import ( "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" "sync" "time" "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/pkg/util/log" ) -const ( - orgHeaderKey = "X-Scope-OrgID" - getDeletePath = "/loki/api/v1/delete" -) +type CompactorClient interface { + GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) + GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) + Name() string + Stop() +} type DeleteRequestsClient interface { GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) @@ -27,9 +22,8 @@ type DeleteRequestsClient interface { } type deleteRequestsClient struct { - url string - httpClient httpClient - mu sync.RWMutex + compactorClient CompactorClient + mu sync.RWMutex cache map[string][]DeleteRequest cacheDuration time.Duration @@ -40,10 +34,6 @@ type deleteRequestsClient struct { stopChan chan struct{} } -type httpClient interface { - Do(*http.Request) (*http.Response, error) -} - type DeleteRequestsStoreOption func(c *deleteRequestsClient) func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { @@ -52,22 +42,14 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { } } -func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { - u, err := url.Parse(addr) - if err != nil { - level.Error(log.Logger).Log("msg", "error parsing url", "err", err) - return nil, err - } - u.Path = getDeletePath - +func NewDeleteRequestsClient(compactorClient CompactorClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { client := &deleteRequestsClient{ - url: u.String(), - httpClient: c, - cacheDuration: 5 * time.Minute, - cache: make(map[string][]DeleteRequest), - clientType: clientType, - metrics: deleteClientMetrics, - stopChan: make(chan struct{}), + compactorClient: compactorClient, + cacheDuration: 5 * time.Minute, + cache: make(map[string][]DeleteRequest), + clientType: clientType, + metrics: deleteClientMetrics, + stopChan: make(chan struct{}), } for _, o := range opts { @@ -83,10 +65,10 @@ func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, return cachedRequests, nil } - c.metrics.deleteRequestsLookupsTotal.With(prometheus.Labels{"client_type": c.clientType}).Inc() - requests, err := c.getRequestsFromServer(ctx, userID) + c.metrics.deleteRequestsLookupsTotal.Inc() + requests, err := c.compactorClient.GetAllDeleteRequestsForUser(ctx, userID) if err != nil { - c.metrics.deleteRequestsLookupsFailedTotal.With(prometheus.Labels{"client_type": c.clientType}).Inc() + c.metrics.deleteRequestsLookupsFailedTotal.Inc() return nil, err } @@ -126,7 +108,7 @@ func (c *deleteRequestsClient) updateCache() { newCache := make(map[string][]DeleteRequest) for _, userID := range userIDs { - deleteReq, err := c.getRequestsFromServer(context.Background(), userID) + deleteReq, err := c.compactorClient.GetAllDeleteRequestsForUser(context.Background(), userID) if err != nil { level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) continue @@ -150,37 +132,3 @@ func (c *deleteRequestsClient) currentUserIDs() []string { return userIDs } - -func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - - req.Header.Set(orgHeaderKey, userID) - - resp, err := c.httpClient.Do(req) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - defer func() { - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - }() - - if resp.StatusCode/100 != 2 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - - var deleteRequests []DeleteRequest - if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { - level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) - return nil, err - } - - return deleteRequests, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go index 2f9834c3ea46d..cac79b924ccb0 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go @@ -2,9 +2,6 @@ package deletion import ( "context" - "io" - "net/http" - "strings" "testing" "time" @@ -15,9 +12,16 @@ import ( func TestGetCacheGenNumberForUser(t *testing.T) { deleteClientMetrics := NewDeleteRequestClientMetrics(prometheus.DefaultRegisterer) - t.Run("it requests results from the api", func(t *testing.T) { - httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} - client, err := NewDeleteRequestsClient("http://test-server", httpClient, deleteClientMetrics, "test_client") + t.Run("it requests results from the compactor client", func(t *testing.T) { + compactorClient := mockCompactorClient{ + delRequests: []DeleteRequest{ + { + RequestID: "test-request", + }, + }, + } + + client, err := NewDeleteRequestsClient(&compactorClient, deleteClientMetrics, "test_client") require.Nil(t, err) deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") @@ -25,22 +29,28 @@ func TestGetCacheGenNumberForUser(t *testing.T) { require.Len(t, deleteRequests, 1) require.Equal(t, "test-request", deleteRequests[0].RequestID) - - require.Equal(t, "http://test-server/loki/api/v1/delete", httpClient.req.URL.String()) - require.Equal(t, http.MethodGet, httpClient.req.Method) - require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) }) t.Run("it caches the results", func(t *testing.T) { - httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} - client, err := NewDeleteRequestsClient("http://test-server", httpClient, deleteClientMetrics, "test_client", WithRequestClientCacheDuration(100*time.Millisecond)) + compactorClient := mockCompactorClient{ + delRequests: []DeleteRequest{ + { + RequestID: "test-request", + }, + }, + } + client, err := NewDeleteRequestsClient(&compactorClient, deleteClientMetrics, "test_client", WithRequestClientCacheDuration(100*time.Millisecond)) require.Nil(t, err) deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) require.Equal(t, "test-request", deleteRequests[0].RequestID) - httpClient.ret = `[{"request_id":"different"}]` + compactorClient.delRequests = []DeleteRequest{ + { + RequestID: "different", + }, + } deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) @@ -56,16 +66,21 @@ func TestGetCacheGenNumberForUser(t *testing.T) { }) } -type mockHTTPClient struct { - ret string - req *http.Request +type mockCompactorClient struct { + delRequests []DeleteRequest + cacheGenNum string +} + +func (m *mockCompactorClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + return m.delRequests, nil } -func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { - c.req = req +func (m *mockCompactorClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + return m.cacheGenNum, nil +} - return &http.Response{ - StatusCode: 200, - Body: io.NopCloser(strings.NewReader(c.ret)), - }, nil +func (m *mockCompactorClient) Name() string { + return "" } + +func (m *mockCompactorClient) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go index 7c992e6cdd569..f24c3c51139f7 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go @@ -679,6 +679,8 @@ type mockDeleteRequestsStore struct { getAllUser string getAllResult []DeleteRequest getAllErr error + + genNumber string } func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { @@ -708,3 +710,7 @@ func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Contex m.getAllUser = userID return m.getAllResult, m.getAllErr } + +func (m *mockDeleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + return m.genNumber, m.getErr +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go new file mode 100644 index 0000000000000..15b5452e89d5a --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go @@ -0,0 +1,94 @@ +package deletion + +import ( + "context" + "sort" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + util_log "github.com/grafana/loki/pkg/util/log" +) + +type GRPCRequestHandler struct { + deleteRequestsStore DeleteRequestsStore + limits Limits +} + +func NewGRPCRequestHandler(deleteRequestsStore DeleteRequestsStore, limits Limits) *GRPCRequestHandler { + return &GRPCRequestHandler{ + deleteRequestsStore: deleteRequestsStore, + limits: limits, + } +} + +func (g *GRPCRequestHandler) GetDeleteRequests(ctx context.Context, _ *grpc.GetDeleteRequestsRequest) (*grpc.GetDeleteRequestsResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + hasDelete, err := validDeletionLimit(g.limits, userID) + if err != nil { + return nil, err + } + + if !hasDelete { + return nil, errors.New(deletionNotAvailableMsg) + } + + deleteGroups, err := g.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + deletesPerRequest := partitionByRequestID(deleteGroups) + deleteRequests := mergeDeletes(deletesPerRequest) + + sort.Slice(deleteRequests, func(i, j int) bool { + return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt + }) + + resp := grpc.GetDeleteRequestsResponse{ + DeleteRequests: make([]*grpc.DeleteRequest, len(deleteRequests)), + } + for i, dr := range deleteRequests { + resp.DeleteRequests[i] = &grpc.DeleteRequest{ + RequestID: dr.RequestID, + StartTime: int64(dr.StartTime), + EndTime: int64(dr.EndTime), + Query: dr.Query, + Status: string(dr.Status), + CreatedAt: int64(dr.CreatedAt), + } + } + + return &resp, nil +} + +func (g *GRPCRequestHandler) GetCacheGenNumbers(ctx context.Context, _ *grpc.GetCacheGenNumbersRequest) (*grpc.GetCacheGenNumbersResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + hasDelete, err := validDeletionLimit(g.limits, userID) + if err != nil { + return nil, err + } + + if !hasDelete { + return nil, errors.New(deletionNotAvailableMsg) + } + + cacheGenNumber, err := g.deleteRequestsStore.GetCacheGenerationNumber(ctx, userID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting cache generation number", "err", err) + return nil, err + } + + return &grpc.GetCacheGenNumbersResponse{ResultsCacheGen: cacheGenNumber}, nil +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go new file mode 100644 index 0000000000000..3851bbea795ae --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go @@ -0,0 +1,223 @@ +package deletion + +import ( + "context" + "errors" + "net" + "testing" + "time" + + "github.com/grafana/dskit/tenant" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + compactor_client_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode" +) + +func server(t *testing.T, ctx context.Context, h *GRPCRequestHandler) (compactor_client_grpc.CompactorClient, func()) { + buffer := 101024 * 1024 + lis := bufconn.Listen(buffer) + + baseServer := grpc.NewServer(grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler) + }), grpc.ChainUnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return middleware.ServerUserHeaderInterceptor(ctx, req, info, handler) + })) + + compactor_client_grpc.RegisterCompactorServer(baseServer, h) + go func() { + require.NoError(t, baseServer.Serve(lis)) + }() + + conn, err := grpc.DialContext(ctx, "", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + closer := func() { + require.NoError(t, lis.Close()) + baseServer.Stop() + } + + client := compactor_client_grpc.NewCompactorClient(conn) + + return client, closer +} + +func grpcDeleteRequestsToDeleteRequests(requests []*compactor_client_grpc.DeleteRequest) []DeleteRequest { + resp := make([]DeleteRequest, len(requests)) + for i, grpcReq := range requests { + resp[i] = DeleteRequest{ + RequestID: grpcReq.RequestID, + StartTime: model.Time(grpcReq.StartTime), + EndTime: model.Time(grpcReq.EndTime), + Query: grpcReq.Query, + Status: DeleteRequestStatus(grpcReq.Status), + CreatedAt: model.Time(grpcReq.CreatedAt), + } + } + + return resp +} + +func TestGRPCGetDeleteRequests(t *testing.T) { + t.Run("it gets all the delete requests for the user", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}} + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, store.getAllResult, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("it merges requests with the same requestID", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, StartTime: now, EndTime: now.Add(time.Hour)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(2 * time.Hour), EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)}, + } + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, []DeleteRequest{ + {RequestID: "test-request-1", Status: StatusReceived, CreatedAt: now, StartTime: now, EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", Status: StatusReceived, CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + }, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("it only considers a request processed if all it's subqueries are processed", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + } + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: "66% Complete"}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + }, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllErr = errors.New("something bad") + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + _, err = grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "something bad", sts.Message()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + _, err := grpcClient.GetDeleteRequests(context.Background(), &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "no org id", sts.Message()) + }) + }) +} + +func TestGRPCGetCacheGenNumbers(t *testing.T) { + t.Run("get gen number", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.genNumber = "123" + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetCacheGenNumbers(ctx, &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.NoError(t, err) + require.Equal(t, store.genNumber, resp.ResultsCacheGen) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getErr = errors.New("something bad") + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + _, err = grpcClient.GetCacheGenNumbers(ctx, &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "something bad", sts.Message()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, context.Background(), h) + defer closer() + + _, err := grpcClient.GetCacheGenNumbers(context.Background(), &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "no org id", sts.Message()) + }) + }) +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go index 4da42c7384d0c..b9489e5f23b8d 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go @@ -6,24 +6,24 @@ import ( ) type DeleteRequestClientMetrics struct { - deleteRequestsLookupsTotal *prometheus.CounterVec - deleteRequestsLookupsFailedTotal *prometheus.CounterVec + deleteRequestsLookupsTotal prometheus.Counter + deleteRequestsLookupsFailedTotal prometheus.Counter } func NewDeleteRequestClientMetrics(r prometheus.Registerer) *DeleteRequestClientMetrics { m := DeleteRequestClientMetrics{} - m.deleteRequestsLookupsTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + m.deleteRequestsLookupsTotal = promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: "loki", Name: "delete_request_lookups_total", Help: "Number times the client has looked up delete requests", - }, []string{"client_type"}) + }) - m.deleteRequestsLookupsFailedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + m.deleteRequestsLookupsFailedTotal = promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: "loki", Name: "delete_request_lookups_failed_total", Help: "Number times the client has failed to look up delete requests", - }, []string{"client_type"}) + }) return &m } diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go deleted file mode 100644 index 7acb6d9bd6193..0000000000000 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go +++ /dev/null @@ -1,81 +0,0 @@ -package generationnumber - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/url" - - "github.com/go-kit/log/level" - - "github.com/grafana/loki/pkg/util/log" -) - -const ( - orgHeaderKey = "X-Scope-OrgID" - cacheGenNumPath = "/loki/api/v1/cache/generation_numbers" -) - -type CacheGenClient interface { - GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) - Name() string -} - -type genNumberClient struct { - url string - httpClient doer -} - -type doer interface { - Do(*http.Request) (*http.Response, error) -} - -func NewGenNumberClient(addr string, c doer) (CacheGenClient, error) { - u, err := url.Parse(addr) - if err != nil { - level.Error(log.Logger).Log("msg", "error parsing url", "err", err) - return nil, err - } - u.Path = cacheGenNumPath - - return &genNumberClient{ - url: u.String(), - httpClient: c, - }, nil -} - -func (c *genNumberClient) Name() string { - return "gen_number_client" -} - -func (c *genNumberClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - - req.Header.Set(orgHeaderKey, userID) - - resp, err := c.httpClient.Do(req) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - - var genNumber string - if err := json.NewDecoder(resp.Body).Decode(&genNumber); err != nil { - level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) - return "", err - } - - return genNumber, err -} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go deleted file mode 100644 index edd711f06dbfd..0000000000000 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package generationnumber - -import ( - "context" - "io" - "net/http" - "strings" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGetCacheGenNumberForUser(t *testing.T) { - httpClient := &mockHTTPClient{ret: `"42"`} - client, err := NewGenNumberClient("http://test-server", httpClient) - require.Nil(t, err) - - cacheGenNumber, err := client.GetCacheGenerationNumber(context.Background(), "userID") - require.Nil(t, err) - - require.Equal(t, "42", cacheGenNumber) - - require.Equal(t, "http://test-server/loki/api/v1/cache/generation_numbers", httpClient.req.URL.String()) - require.Equal(t, http.MethodGet, httpClient.req.Method) - require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) -} - -type mockHTTPClient struct { - ret string - req *http.Request -} - -func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { - c.req = req - - return &http.Response{ - StatusCode: 200, - Body: io.NopCloser(strings.NewReader(c.ret)), - }, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go index 55267987cf0c3..d41b54d762708 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go @@ -16,6 +16,12 @@ import ( const reloadDuration = 5 * time.Minute +type CacheGenClient interface { + GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) + Name() string + Stop() +} + type GenNumberLoader struct { numberGetter CacheGenClient numbers map[string]string @@ -140,6 +146,7 @@ func (l *GenNumberLoader) getCacheGenNumber(userID string) string { func (l *GenNumberLoader) Stop() { close(l.quit) + l.numberGetter.Stop() } type noopNumberGetter struct{} @@ -151,3 +158,5 @@ func (g *noopNumberGetter) GetCacheGenerationNumber(_ context.Context, _ string) func (g *noopNumberGetter) Name() string { return "noop-getter" } + +func (g *noopNumberGetter) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go index 7c5581a8b1811..afa59a12b47b5 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go @@ -52,3 +52,5 @@ func (g *mockGenNumberClient) GetCacheGenerationNumber(ctx context.Context, user func (g *mockGenNumberClient) Name() string { return "" } + +func (g *mockGenNumberClient) Stop() {}