diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e48f39bac15a..b09673ddcbf22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,10 +45,16 @@ Check the history of the branch FIXME. #### Loki +##### Enhancements + +* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion. + ##### Fixes + * [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients ##### Changes + * [7877](https://github.com/grafana/loki/pull/7877)A **trevorwhitney**: Due to a known bug with experimental new delete mode feature, the default delete mode has been changed to `filter-only`. ## 2.7.0 diff --git a/clients/pkg/promtail/targets/cloudflare/fields.go b/clients/pkg/promtail/targets/cloudflare/fields.go index c5189d7264fe2..93f7195fe456b 100644 --- a/clients/pkg/promtail/targets/cloudflare/fields.go +++ b/clients/pkg/promtail/targets/cloudflare/fields.go @@ -31,7 +31,7 @@ var ( allFields = append(extendedFields, []string{ "BotScore", "BotScoreSrc", "ClientRequestBytes", "ClientSrcPort", "ClientXRequestedWith", "CacheTieredFill", "EdgeResponseCompressionRatio", "EdgeServerIP", "FirewallMatchesSources", "FirewallMatchesActions", "FirewallMatchesRuleIDs", "OriginResponseBytes", "OriginResponseTime", "ClientDeviceType", "WAFFlags", "WAFMatchedVar", "EdgeColoID", - "RequestHeaders", "ResponseHeaders", + "RequestHeaders", "ResponseHeaders", }...) ) diff --git a/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go b/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go index 2c6267ac7216e..0770ce6fcaecb 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go +++ b/clients/pkg/promtail/targets/lokipush/pushtargetmanager_test.go @@ -68,7 +68,7 @@ func Test_validateJobName(t *testing.T) { }, }, }, - wantErr: false, + wantErr: false, expectedJob: "job_1_2_3_4_job", }, } diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 373c5c09bf92c..7ff566087bb91 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2911,6 +2911,10 @@ This way, one doesn't have to replicate configuration in multiple places. # CLI flag: -common.compactor-address [compactor_address: | default = ""] +# Address and port number where the compactor grpc requests are being served. +# CLI flag: -common.compactor-grpc-address +[compactor_grpc_address: | default = ""] + ## analytics The `analytics` block configures the reporting of Loki analytics to grafana.com. diff --git a/pkg/logqlmodel/logqlmodel.go b/pkg/logqlmodel/logqlmodel.go index 2c431aeb54d78..2e09bf0b877a5 100644 --- a/pkg/logqlmodel/logqlmodel.go +++ b/pkg/logqlmodel/logqlmodel.go @@ -1,9 +1,10 @@ package logqlmodel import ( - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" "github.com/prometheus/prometheus/promql/parser" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel/stats" ) diff --git a/pkg/logqlmodel/metadata/context_test.go b/pkg/logqlmodel/metadata/context_test.go index 459fda2afd613..256abdb18ef77 100644 --- a/pkg/logqlmodel/metadata/context_test.go +++ b/pkg/logqlmodel/metadata/context_test.go @@ -5,8 +5,9 @@ import ( "errors" "testing" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" ) func TestHeaders(t *testing.T) { diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 721ffed9d12e3..85173444f3ad9 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -43,6 +43,9 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` + + // CompactorAddress is the grpc address of the compactor in the form host:port + CompactorGRPCAddress string `yaml:"compactor_grpc_address"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -57,6 +60,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") + f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port") } type Storage struct { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index a667be14d7fd4..2e0c7f6169280 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -45,6 +45,7 @@ import ( "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" @@ -69,30 +70,31 @@ type Config struct { UseBufferedLogger bool `yaml:"use_buffered_logger"` UseSyncLogger bool `yaml:"use_sync_logger"` - Common common.Config `yaml:"common,omitempty"` - Server server.Config `yaml:"server,omitempty"` - InternalServer internalserver.Config `yaml:"internal_server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - CompactorClient compactor.ClientConfig `yaml:"delete_client,omitempty"` - IngesterClient client.Config `yaml:"ingester_client,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage_config,omitempty"` - IndexGateway indexgateway.Config `yaml:"index_gateway"` - ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` - SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` - LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` - TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` - Worker worker.Config `yaml:"frontend_worker,omitempty"` - Frontend lokifrontend.Config `yaml:"frontend,omitempty"` - Ruler ruler.Config `yaml:"ruler,omitempty"` - QueryRange queryrange.Config `yaml:"query_range,omitempty"` - RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist"` - Tracing tracing.Config `yaml:"tracing"` - CompactorConfig compactor.Config `yaml:"compactor,omitempty"` - QueryScheduler scheduler.Config `yaml:"query_scheduler"` - UsageReport usagestats.Config `yaml:"analytics"` + Common common.Config `yaml:"common,omitempty"` + Server server.Config `yaml:"server,omitempty"` + InternalServer internalserver.Config `yaml:"internal_server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"` + CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage_config,omitempty"` + IndexGateway indexgateway.Config `yaml:"index_gateway"` + ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` + SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` + LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"` + Worker worker.Config `yaml:"frontend_worker,omitempty"` + Frontend lokifrontend.Config `yaml:"frontend,omitempty"` + Ruler ruler.Config `yaml:"ruler,omitempty"` + QueryRange queryrange.Config `yaml:"query_range,omitempty"` + RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist"` + Tracing tracing.Config `yaml:"tracing"` + CompactorConfig compactor.Config `yaml:"compactor,omitempty"` + QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageReport usagestats.Config `yaml:"analytics"` } // RegisterFlags registers flag. @@ -115,7 +117,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Common.RegisterFlags(f) c.Distributor.RegisterFlags(f) c.Querier.RegisterFlags(f) - c.CompactorClient.RegisterFlags(f) + c.CompactorHTTPClient.RegisterFlags(f) + c.CompactorGRPCClient.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) c.StorageConfig.RegisterFlags(f) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d547cc131bb94..f91eacc5b7336 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -12,8 +12,6 @@ import ( "strings" "time" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" - "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -43,6 +41,7 @@ import ( "github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/ruler" base_ruler "github.com/grafana/loki/pkg/ruler/base" "github.com/grafana/loki/pkg/runtime" @@ -54,6 +53,8 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber" "github.com/grafana/loki/pkg/storage/stores/series/index" @@ -676,41 +677,53 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) { var client generationnumber.CacheGenClient if t.supportIndexDeleteRequest() { - compactorAddress, err := t.compactorAddress() - if err != nil { - return nil, err - } - - httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) + compactorAddress, isGRPCAddress, err := t.compactorAddress() if err != nil { return nil, err } - client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient) - if err != nil { - return nil, err + reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer) + if isGRPCAddress { + client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg) + if err != nil { + return nil, err + } + } else { + client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient) + if err != nil { + return nil, err + } } } t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer) - return services.NewIdleService(nil, nil), nil + return services.NewIdleService(nil, func(failureCase error) error { + t.cacheGenerationLoader.Stop() + return nil + }), nil } func (t *Loki) supportIndexDeleteRequest() bool { return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) } -func (t *Loki) compactorAddress() (string, error) { +// compactorAddress returns the configured address of the compactor. +// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false +func (t *Loki) compactorAddress() (string, bool, error) { if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) { // In single binary or read modes, this module depends on Server - return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil + return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil } - if t.Cfg.Common.CompactorAddress == "" { - return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured") + if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" { + return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured") } - return t.Cfg.Common.CompactorAddress, nil + if t.Cfg.Common.CompactorGRPCAddress != "" { + return t.Cfg.Common.CompactorGRPCAddress, true, nil + } + + return t.Cfg.Common.CompactorAddress, false, nil } func (t *Loki) initQueryFrontend() (_ services.Service, err error) { @@ -1007,6 +1020,7 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)) t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)) t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)) + grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler) } return t.compactor, nil @@ -1123,17 +1137,26 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri return deletion.NewNoOpDeleteRequestsStore(), nil } - compactorAddress, err := t.compactorAddress() + compactorAddress, isGRPCAddress, err := t.compactorAddress() if err != nil { return nil, err } - httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) - if err != nil { - return nil, err + reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer) + var compactorClient deletion.CompactorClient + if isGRPCAddress { + compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg) + if err != nil { + return nil, err + } + } else { + compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient) + if err != nil { + return nil, err + } } - client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType) + client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType) if err != nil { return nil, err } diff --git a/pkg/querier/queryrange/queryrangebase/middleware_test.go b/pkg/querier/queryrange/queryrangebase/middleware_test.go index 6b3b5e64b9511..c910c70823b33 100644 --- a/pkg/querier/queryrange/queryrangebase/middleware_test.go +++ b/pkg/querier/queryrange/queryrangebase/middleware_test.go @@ -31,3 +31,5 @@ type fakeGenNumberLoader struct { func (l *fakeGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { return l.genNumber } + +func (l *fakeGenNumberLoader) Stop() {} diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 24a0cfb7d0a1c..ac2462f966862 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -68,6 +68,7 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri type CacheGenNumberLoader interface { GetResultsCacheGenNumber(tenantIDs []string) string + Stop() } // ResultsCacheConfig is the config for the results cache. diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index e73989b2e3fd6..9938affdc1984 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -1054,3 +1054,5 @@ func newMockCacheGenNumberLoader() CacheGenNumberLoader { func (mockCacheGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { return "" } + +func (l mockCacheGenNumberLoader) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc.go b/pkg/storage/stores/indexshipper/compactor/client/grpc.go new file mode 100644 index 0000000000000..d18bca5139e6d --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc.go @@ -0,0 +1,101 @@ +package client + +import ( + "context" + "flag" + + "github.com/grafana/dskit/grpcclient" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/weaveworks/common/instrument" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + + deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" +) + +type GRPCConfig struct { + GRPCClientConfig grpcclient.Config `yaml:",inline"` +} + +// RegisterFlags registers flags. +func (cfg *GRPCConfig) RegisterFlags(f *flag.FlagSet) { + cfg.GRPCClientConfig.RegisterFlagsWithPrefix("", f) +} + +type compactorGRPCClient struct { + cfg GRPCConfig + + GRPCClientRequestDuration *prometheus.HistogramVec + conn *grpc.ClientConn + grpcClient deletion_grpc.CompactorClient +} + +// NewGRPCClient supports only methods which are used for internal communication of Loki like +// loading delete requests and cache gen numbers for query time filtering. +func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deletion.CompactorClient, error) { + client := &compactorGRPCClient{ + cfg: cfg, + GRPCClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_compactor", + Name: "grpc_request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using compactor GRPC client", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + } + + dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(client.GRPCClientRequestDuration)) + if err != nil { + return nil, err + } + + client.conn, err = grpc.Dial(addr, dialOpts...) + if err != nil { + return nil, err + } + + client.grpcClient = deletion_grpc.NewCompactorClient(client.conn) + return client, nil +} + +func (s *compactorGRPCClient) Stop() { + s.conn.Close() +} + +func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) { + ctx = user.InjectOrgID(ctx, userID) + grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{}) + if err != nil { + return nil, err + } + + deleteRequests := make([]deletion.DeleteRequest, len(grpcResp.DeleteRequests)) + for i, dr := range grpcResp.DeleteRequests { + deleteRequests[i] = deletion.DeleteRequest{ + RequestID: dr.RequestID, + StartTime: model.Time(dr.StartTime), + EndTime: model.Time(dr.EndTime), + Query: dr.Query, + Status: deletion.DeleteRequestStatus(dr.Status), + CreatedAt: model.Time(dr.CreatedAt), + } + } + + return deleteRequests, nil +} + +func (s *compactorGRPCClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + ctx = user.InjectOrgID(ctx, userID) + grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &deletion_grpc.GetCacheGenNumbersRequest{}) + if err != nil { + return "", err + } + + return grpcResp.ResultsCacheGen, nil +} + +func (s *compactorGRPCClient) Name() string { + return "grpc_client" +} diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go new file mode 100644 index 0000000000000..533a2ecfd6316 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.pb.go @@ -0,0 +1,1549 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto + +package grpc + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type GetDeleteRequestsRequest struct { +} + +func (m *GetDeleteRequestsRequest) Reset() { *m = GetDeleteRequestsRequest{} } +func (*GetDeleteRequestsRequest) ProtoMessage() {} +func (*GetDeleteRequestsRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{0} +} +func (m *GetDeleteRequestsRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetDeleteRequestsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetDeleteRequestsRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetDeleteRequestsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetDeleteRequestsRequest.Merge(m, src) +} +func (m *GetDeleteRequestsRequest) XXX_Size() int { + return m.Size() +} +func (m *GetDeleteRequestsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetDeleteRequestsRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetDeleteRequestsRequest proto.InternalMessageInfo + +type GetDeleteRequestsResponse struct { + DeleteRequests []*DeleteRequest `protobuf:"bytes,1,rep,name=deleteRequests,proto3" json:"deleteRequests,omitempty"` +} + +func (m *GetDeleteRequestsResponse) Reset() { *m = GetDeleteRequestsResponse{} } +func (*GetDeleteRequestsResponse) ProtoMessage() {} +func (*GetDeleteRequestsResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{1} +} +func (m *GetDeleteRequestsResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetDeleteRequestsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetDeleteRequestsResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetDeleteRequestsResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetDeleteRequestsResponse.Merge(m, src) +} +func (m *GetDeleteRequestsResponse) XXX_Size() int { + return m.Size() +} +func (m *GetDeleteRequestsResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetDeleteRequestsResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetDeleteRequestsResponse proto.InternalMessageInfo + +func (m *GetDeleteRequestsResponse) GetDeleteRequests() []*DeleteRequest { + if m != nil { + return m.DeleteRequests + } + return nil +} + +type DeleteRequest struct { + RequestID string `protobuf:"bytes,1,opt,name=requestID,proto3" json:"requestID,omitempty"` + StartTime int64 `protobuf:"varint,2,opt,name=startTime,proto3" json:"startTime,omitempty"` + EndTime int64 `protobuf:"varint,3,opt,name=endTime,proto3" json:"endTime,omitempty"` + Query string `protobuf:"bytes,4,opt,name=query,proto3" json:"query,omitempty"` + Status string `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"` + CreatedAt int64 `protobuf:"varint,6,opt,name=createdAt,proto3" json:"createdAt,omitempty"` +} + +func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } +func (*DeleteRequest) ProtoMessage() {} +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{2} +} +func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DeleteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteRequest.Merge(m, src) +} +func (m *DeleteRequest) XXX_Size() int { + return m.Size() +} +func (m *DeleteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo + +func (m *DeleteRequest) GetRequestID() string { + if m != nil { + return m.RequestID + } + return "" +} + +func (m *DeleteRequest) GetStartTime() int64 { + if m != nil { + return m.StartTime + } + return 0 +} + +func (m *DeleteRequest) GetEndTime() int64 { + if m != nil { + return m.EndTime + } + return 0 +} + +func (m *DeleteRequest) GetQuery() string { + if m != nil { + return m.Query + } + return "" +} + +func (m *DeleteRequest) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *DeleteRequest) GetCreatedAt() int64 { + if m != nil { + return m.CreatedAt + } + return 0 +} + +type GetCacheGenNumbersRequest struct { +} + +func (m *GetCacheGenNumbersRequest) Reset() { *m = GetCacheGenNumbersRequest{} } +func (*GetCacheGenNumbersRequest) ProtoMessage() {} +func (*GetCacheGenNumbersRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{3} +} +func (m *GetCacheGenNumbersRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetCacheGenNumbersRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetCacheGenNumbersRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetCacheGenNumbersRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetCacheGenNumbersRequest.Merge(m, src) +} +func (m *GetCacheGenNumbersRequest) XXX_Size() int { + return m.Size() +} +func (m *GetCacheGenNumbersRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GetCacheGenNumbersRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GetCacheGenNumbersRequest proto.InternalMessageInfo + +type GetCacheGenNumbersResponse struct { + ResultsCacheGen string `protobuf:"bytes,1,opt,name=resultsCacheGen,proto3" json:"resultsCacheGen,omitempty"` +} + +func (m *GetCacheGenNumbersResponse) Reset() { *m = GetCacheGenNumbersResponse{} } +func (*GetCacheGenNumbersResponse) ProtoMessage() {} +func (*GetCacheGenNumbersResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_af4e385854eeb93d, []int{4} +} +func (m *GetCacheGenNumbersResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GetCacheGenNumbersResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_GetCacheGenNumbersResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *GetCacheGenNumbersResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetCacheGenNumbersResponse.Merge(m, src) +} +func (m *GetCacheGenNumbersResponse) XXX_Size() int { + return m.Size() +} +func (m *GetCacheGenNumbersResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetCacheGenNumbersResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetCacheGenNumbersResponse proto.InternalMessageInfo + +func (m *GetCacheGenNumbersResponse) GetResultsCacheGen() string { + if m != nil { + return m.ResultsCacheGen + } + return "" +} + +func init() { + proto.RegisterType((*GetDeleteRequestsRequest)(nil), "grpc.GetDeleteRequestsRequest") + proto.RegisterType((*GetDeleteRequestsResponse)(nil), "grpc.GetDeleteRequestsResponse") + proto.RegisterType((*DeleteRequest)(nil), "grpc.DeleteRequest") + proto.RegisterType((*GetCacheGenNumbersRequest)(nil), "grpc.GetCacheGenNumbersRequest") + proto.RegisterType((*GetCacheGenNumbersResponse)(nil), "grpc.GetCacheGenNumbersResponse") +} + +func init() { + proto.RegisterFile("pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto", fileDescriptor_af4e385854eeb93d) +} + +var fileDescriptor_af4e385854eeb93d = []byte{ + // 391 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xbb, 0x4e, 0xe3, 0x40, + 0x14, 0xf5, 0x6c, 0x1e, 0xab, 0xcc, 0x6a, 0x77, 0xc5, 0x80, 0xd0, 0x60, 0xd0, 0x60, 0xb9, 0x72, + 0x15, 0x4b, 0x81, 0x8e, 0x06, 0x48, 0x44, 0x44, 0x43, 0x61, 0x45, 0x82, 0xd6, 0xb1, 0xaf, 0x12, + 0x8b, 0xc4, 0x76, 0x66, 0xc6, 0x12, 0x74, 0x7c, 0x02, 0x9f, 0x41, 0x4b, 0xcb, 0x17, 0x50, 0xa6, + 0x4c, 0x49, 0x9c, 0x86, 0x32, 0x9f, 0x80, 0x32, 0x76, 0x12, 0xe5, 0xd5, 0x78, 0xe6, 0x9e, 0x73, + 0xe6, 0x3e, 0xce, 0x35, 0xbe, 0x8c, 0x1f, 0x3b, 0xb6, 0x90, 0x11, 0x77, 0x3b, 0xa0, 0x4e, 0x10, + 0x76, 0x10, 0xfa, 0xf0, 0x24, 0xba, 0x41, 0x1c, 0x03, 0xb7, 0xbd, 0xa8, 0x1f, 0xbb, 0x9e, 0x8c, + 0xb8, 0xed, 0xf5, 0x02, 0x08, 0xa5, 0xdd, 0xe1, 0xb1, 0xa7, 0x3e, 0xd5, 0x98, 0x47, 0x32, 0x22, + 0xc5, 0xd9, 0xdd, 0xd4, 0x31, 0x6d, 0x82, 0x6c, 0x40, 0x0f, 0x24, 0x38, 0x30, 0x48, 0x40, 0x48, + 0x91, 0x9f, 0xe6, 0x03, 0x3e, 0xda, 0xc2, 0x89, 0x38, 0x0a, 0x05, 0x90, 0x0b, 0xfc, 0xcf, 0x5f, + 0x61, 0x28, 0x32, 0x0a, 0xd6, 0x9f, 0xda, 0x7e, 0x55, 0xd5, 0x58, 0x79, 0xe5, 0xac, 0x49, 0xcd, + 0x77, 0x84, 0xff, 0xae, 0x28, 0xc8, 0x09, 0xae, 0xf0, 0xec, 0x7a, 0xdb, 0xa0, 0xc8, 0x40, 0x56, + 0xc5, 0x59, 0x02, 0x33, 0x56, 0x48, 0x97, 0xcb, 0x56, 0xd0, 0x07, 0xfa, 0xcb, 0x40, 0x56, 0xc1, + 0x59, 0x02, 0x84, 0xe2, 0xdf, 0x10, 0xfa, 0x8a, 0x2b, 0x28, 0x6e, 0x1e, 0x92, 0x03, 0x5c, 0x1a, + 0x24, 0xc0, 0x9f, 0x69, 0x51, 0x65, 0xcc, 0x02, 0x72, 0x88, 0xcb, 0x42, 0xba, 0x32, 0x11, 0xb4, + 0xa4, 0xe0, 0x3c, 0x9a, 0x55, 0xf1, 0x38, 0xb8, 0x12, 0xfc, 0x2b, 0x49, 0xcb, 0x59, 0x95, 0x05, + 0x60, 0x1e, 0x2b, 0x37, 0xea, 0xae, 0xd7, 0x85, 0x26, 0x84, 0x77, 0x49, 0xbf, 0x0d, 0x7c, 0x61, + 0xd5, 0x0d, 0xd6, 0xb7, 0x91, 0xb9, 0x57, 0x16, 0xfe, 0xcf, 0x41, 0x24, 0x3d, 0x29, 0xe6, 0x8a, + 0x7c, 0xc4, 0x75, 0xb8, 0xf6, 0x81, 0x70, 0xa5, 0x3e, 0xdf, 0x1c, 0x69, 0xe1, 0xbd, 0x8d, 0x05, + 0x10, 0x96, 0x19, 0xbc, 0x6b, 0x6b, 0xfa, 0xe9, 0x4e, 0x3e, 0xef, 0xe6, 0x1e, 0x93, 0xcd, 0x5e, + 0xc9, 0xf2, 0xd9, 0xf6, 0x11, 0x75, 0x63, 0xb7, 0x20, 0x4b, 0x7c, 0x7d, 0x3e, 0x1c, 0x33, 0x6d, + 0x34, 0x66, 0xda, 0x74, 0xcc, 0xd0, 0x4b, 0xca, 0xd0, 0x5b, 0xca, 0xd0, 0x67, 0xca, 0xd0, 0x30, + 0x65, 0xe8, 0x2b, 0x65, 0xe8, 0x3b, 0x65, 0xda, 0x34, 0x65, 0xe8, 0x75, 0xc2, 0xb4, 0xe1, 0x84, + 0x69, 0xa3, 0x09, 0xd3, 0xda, 0x65, 0xf5, 0x3b, 0x9e, 0xfd, 0x04, 0x00, 0x00, 0xff, 0xff, 0x37, + 0x5f, 0x43, 0x54, 0xd2, 0x02, 0x00, 0x00, +} + +func (this *GetDeleteRequestsRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetDeleteRequestsRequest) + if !ok { + that2, ok := that.(GetDeleteRequestsRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *GetDeleteRequestsResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetDeleteRequestsResponse) + if !ok { + that2, ok := that.(GetDeleteRequestsResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.DeleteRequests) != len(that1.DeleteRequests) { + return false + } + for i := range this.DeleteRequests { + if !this.DeleteRequests[i].Equal(that1.DeleteRequests[i]) { + return false + } + } + return true +} +func (this *DeleteRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DeleteRequest) + if !ok { + that2, ok := that.(DeleteRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.RequestID != that1.RequestID { + return false + } + if this.StartTime != that1.StartTime { + return false + } + if this.EndTime != that1.EndTime { + return false + } + if this.Query != that1.Query { + return false + } + if this.Status != that1.Status { + return false + } + if this.CreatedAt != that1.CreatedAt { + return false + } + return true +} +func (this *GetCacheGenNumbersRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetCacheGenNumbersRequest) + if !ok { + that2, ok := that.(GetCacheGenNumbersRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *GetCacheGenNumbersResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*GetCacheGenNumbersResponse) + if !ok { + that2, ok := that.(GetCacheGenNumbersResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.ResultsCacheGen != that1.ResultsCacheGen { + return false + } + return true +} +func (this *GetDeleteRequestsRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&grpc.GetDeleteRequestsRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetDeleteRequestsResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpc.GetDeleteRequestsResponse{") + if this.DeleteRequests != nil { + s = append(s, "DeleteRequests: "+fmt.Sprintf("%#v", this.DeleteRequests)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *DeleteRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 10) + s = append(s, "&grpc.DeleteRequest{") + s = append(s, "RequestID: "+fmt.Sprintf("%#v", this.RequestID)+",\n") + s = append(s, "StartTime: "+fmt.Sprintf("%#v", this.StartTime)+",\n") + s = append(s, "EndTime: "+fmt.Sprintf("%#v", this.EndTime)+",\n") + s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n") + s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n") + s = append(s, "CreatedAt: "+fmt.Sprintf("%#v", this.CreatedAt)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetCacheGenNumbersRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&grpc.GetCacheGenNumbersRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *GetCacheGenNumbersResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpc.GetCacheGenNumbersResponse{") + s = append(s, "ResultsCacheGen: "+fmt.Sprintf("%#v", this.ResultsCacheGen)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringGrpc(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// CompactorClient is the client API for Compactor service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type CompactorClient interface { + GetDeleteRequests(ctx context.Context, in *GetDeleteRequestsRequest, opts ...grpc.CallOption) (*GetDeleteRequestsResponse, error) + GetCacheGenNumbers(ctx context.Context, in *GetCacheGenNumbersRequest, opts ...grpc.CallOption) (*GetCacheGenNumbersResponse, error) +} + +type compactorClient struct { + cc *grpc.ClientConn +} + +func NewCompactorClient(cc *grpc.ClientConn) CompactorClient { + return &compactorClient{cc} +} + +func (c *compactorClient) GetDeleteRequests(ctx context.Context, in *GetDeleteRequestsRequest, opts ...grpc.CallOption) (*GetDeleteRequestsResponse, error) { + out := new(GetDeleteRequestsResponse) + err := c.cc.Invoke(ctx, "/grpc.Compactor/GetDeleteRequests", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *compactorClient) GetCacheGenNumbers(ctx context.Context, in *GetCacheGenNumbersRequest, opts ...grpc.CallOption) (*GetCacheGenNumbersResponse, error) { + out := new(GetCacheGenNumbersResponse) + err := c.cc.Invoke(ctx, "/grpc.Compactor/GetCacheGenNumbers", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CompactorServer is the server API for Compactor service. +type CompactorServer interface { + GetDeleteRequests(context.Context, *GetDeleteRequestsRequest) (*GetDeleteRequestsResponse, error) + GetCacheGenNumbers(context.Context, *GetCacheGenNumbersRequest) (*GetCacheGenNumbersResponse, error) +} + +// UnimplementedCompactorServer can be embedded to have forward compatible implementations. +type UnimplementedCompactorServer struct { +} + +func (*UnimplementedCompactorServer) GetDeleteRequests(ctx context.Context, req *GetDeleteRequestsRequest) (*GetDeleteRequestsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetDeleteRequests not implemented") +} +func (*UnimplementedCompactorServer) GetCacheGenNumbers(ctx context.Context, req *GetCacheGenNumbersRequest) (*GetCacheGenNumbersResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetCacheGenNumbers not implemented") +} + +func RegisterCompactorServer(s *grpc.Server, srv CompactorServer) { + s.RegisterService(&_Compactor_serviceDesc, srv) +} + +func _Compactor_GetDeleteRequests_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetDeleteRequestsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CompactorServer).GetDeleteRequests(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.Compactor/GetDeleteRequests", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CompactorServer).GetDeleteRequests(ctx, req.(*GetDeleteRequestsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Compactor_GetCacheGenNumbers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetCacheGenNumbersRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CompactorServer).GetCacheGenNumbers(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.Compactor/GetCacheGenNumbers", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CompactorServer).GetCacheGenNumbers(ctx, req.(*GetCacheGenNumbersRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Compactor_serviceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.Compactor", + HandlerType: (*CompactorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetDeleteRequests", + Handler: _Compactor_GetDeleteRequests_Handler, + }, + { + MethodName: "GetCacheGenNumbers", + Handler: _Compactor_GetCacheGenNumbers_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto", +} + +func (m *GetDeleteRequestsRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetDeleteRequestsRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetDeleteRequestsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetDeleteRequestsResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetDeleteRequestsResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetDeleteRequestsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.DeleteRequests) > 0 { + for iNdEx := len(m.DeleteRequests) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.DeleteRequests[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintGrpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *DeleteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DeleteRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DeleteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.CreatedAt != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.CreatedAt)) + i-- + dAtA[i] = 0x30 + } + if len(m.Status) > 0 { + i -= len(m.Status) + copy(dAtA[i:], m.Status) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.Status))) + i-- + dAtA[i] = 0x2a + } + if len(m.Query) > 0 { + i -= len(m.Query) + copy(dAtA[i:], m.Query) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.Query))) + i-- + dAtA[i] = 0x22 + } + if m.EndTime != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.EndTime)) + i-- + dAtA[i] = 0x18 + } + if m.StartTime != 0 { + i = encodeVarintGrpc(dAtA, i, uint64(m.StartTime)) + i-- + dAtA[i] = 0x10 + } + if len(m.RequestID) > 0 { + i -= len(m.RequestID) + copy(dAtA[i:], m.RequestID) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.RequestID))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *GetCacheGenNumbersRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetCacheGenNumbersRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetCacheGenNumbersRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *GetCacheGenNumbersResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GetCacheGenNumbersResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *GetCacheGenNumbersResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.ResultsCacheGen) > 0 { + i -= len(m.ResultsCacheGen) + copy(dAtA[i:], m.ResultsCacheGen) + i = encodeVarintGrpc(dAtA, i, uint64(len(m.ResultsCacheGen))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintGrpc(dAtA []byte, offset int, v uint64) int { + offset -= sovGrpc(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *GetDeleteRequestsRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetDeleteRequestsResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.DeleteRequests) > 0 { + for _, e := range m.DeleteRequests { + l = e.Size() + n += 1 + l + sovGrpc(uint64(l)) + } + } + return n +} + +func (m *DeleteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.RequestID) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + if m.StartTime != 0 { + n += 1 + sovGrpc(uint64(m.StartTime)) + } + if m.EndTime != 0 { + n += 1 + sovGrpc(uint64(m.EndTime)) + } + l = len(m.Query) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + l = len(m.Status) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + if m.CreatedAt != 0 { + n += 1 + sovGrpc(uint64(m.CreatedAt)) + } + return n +} + +func (m *GetCacheGenNumbersRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GetCacheGenNumbersResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ResultsCacheGen) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + return n +} + +func sovGrpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozGrpc(x uint64) (n int) { + return sovGrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *GetDeleteRequestsRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetDeleteRequestsRequest{`, + `}`, + }, "") + return s +} +func (this *GetDeleteRequestsResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForDeleteRequests := "[]*DeleteRequest{" + for _, f := range this.DeleteRequests { + repeatedStringForDeleteRequests += strings.Replace(f.String(), "DeleteRequest", "DeleteRequest", 1) + "," + } + repeatedStringForDeleteRequests += "}" + s := strings.Join([]string{`&GetDeleteRequestsResponse{`, + `DeleteRequests:` + repeatedStringForDeleteRequests + `,`, + `}`, + }, "") + return s +} +func (this *DeleteRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DeleteRequest{`, + `RequestID:` + fmt.Sprintf("%v", this.RequestID) + `,`, + `StartTime:` + fmt.Sprintf("%v", this.StartTime) + `,`, + `EndTime:` + fmt.Sprintf("%v", this.EndTime) + `,`, + `Query:` + fmt.Sprintf("%v", this.Query) + `,`, + `Status:` + fmt.Sprintf("%v", this.Status) + `,`, + `CreatedAt:` + fmt.Sprintf("%v", this.CreatedAt) + `,`, + `}`, + }, "") + return s +} +func (this *GetCacheGenNumbersRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetCacheGenNumbersRequest{`, + `}`, + }, "") + return s +} +func (this *GetCacheGenNumbersResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&GetCacheGenNumbersResponse{`, + `ResultsCacheGen:` + fmt.Sprintf("%v", this.ResultsCacheGen) + `,`, + `}`, + }, "") + return s +} +func valueToStringGrpc(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *GetDeleteRequestsRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetDeleteRequestsRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetDeleteRequestsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetDeleteRequestsResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetDeleteRequestsResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetDeleteRequestsResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DeleteRequests", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DeleteRequests = append(m.DeleteRequests, &DeleteRequest{}) + if err := m.DeleteRequests[len(m.DeleteRequests)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DeleteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DeleteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DeleteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RequestID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RequestID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTime", wireType) + } + m.StartTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTime", wireType) + } + m.EndTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Query = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CreatedAt", wireType) + } + m.CreatedAt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CreatedAt |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetCacheGenNumbersRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetCacheGenNumbersRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetCacheGenNumbersRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GetCacheGenNumbersResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GetCacheGenNumbersResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GetCacheGenNumbersResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ResultsCacheGen", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ResultsCacheGen = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipGrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthGrpc + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipGrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthGrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowGrpc = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto new file mode 100644 index 0000000000000..52c7c462364c5 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/grpc/grpc.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package grpc; + +service Compactor { + rpc GetDeleteRequests(GetDeleteRequestsRequest) returns (GetDeleteRequestsResponse); + rpc GetCacheGenNumbers(GetCacheGenNumbersRequest) returns (GetCacheGenNumbersResponse); +} + +message GetDeleteRequestsRequest {} + +message GetDeleteRequestsResponse { + repeated DeleteRequest deleteRequests = 1; +} + +message DeleteRequest { + string requestID = 1; + int64 startTime = 2; + int64 endTime = 3; + string query = 4; + string status = 5; + int64 createdAt = 6; +} + +message GetCacheGenNumbersRequest {} + +message GetCacheGenNumbersResponse { + string resultsCacheGen = 1; +} diff --git a/pkg/storage/stores/indexshipper/compactor/client/http.go b/pkg/storage/stores/indexshipper/compactor/client/http.go new file mode 100644 index 0000000000000..817892df2c61f --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/client/http.go @@ -0,0 +1,149 @@ +package client + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/crypto/tls" + + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" + "github.com/grafana/loki/pkg/util/log" +) + +const ( + orgHeaderKey = "X-Scope-OrgID" + getDeletePath = "/loki/api/v1/delete" + cacheGenNumPath = "/loki/api/v1/cache/generation_numbers" +) + +type HTTPConfig struct { + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *HTTPConfig) RegisterFlags(f *flag.FlagSet) { + prefix := "boltdb.shipper.compactor.client" + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, + "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +type compactorHTTPClient struct { + httpClient *http.Client + + deleteRequestsURL string + cacheGenURL string +} + +// NewHTTPClient creates a client which talks to compactor over HTTP. +// It uses provided TLS config which creating HTTP client. +func NewHTTPClient(addr string, cfg HTTPConfig) (deletion.CompactorClient, error) { + u, err := url.Parse(addr) + if err != nil { + level.Error(log.Logger).Log("msg", "error parsing url", "err", err) + return nil, err + } + u.Path = getDeletePath + deleteRequestsURL := u.String() + + u.Path = cacheGenNumPath + cacheGenURL := u.String() + + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 250 + transport.MaxIdleConnsPerHost = 250 + + if cfg.TLSEnabled { + tlsCfg, err := cfg.TLS.GetTLSConfig() + if err != nil { + return nil, err + } + + transport.TLSClientConfig = tlsCfg + } + + return &compactorHTTPClient{ + httpClient: &http.Client{Timeout: 5 * time.Second, Transport: transport}, + deleteRequestsURL: deleteRequestsURL, + cacheGenURL: cacheGenURL, + }, nil +} + +func (c *compactorHTTPClient) Name() string { + return "http_client" +} + +func (c *compactorHTTPClient) Stop() {} + +func (c *compactorHTTPClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.deleteRequestsURL, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + + if resp.StatusCode/100 != 2 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + var deleteRequests []deletion.DeleteRequest + if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return nil, err + } + + return deleteRequests, nil +} + +func (c *compactorHTTPClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.cacheGenURL, nil) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + + req.Header.Set(orgHeaderKey, userID) + + resp, err := c.httpClient.Do(req) + if err != nil { + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) + level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) + return "", err + } + + var genNumber string + if err := json.NewDecoder(resp.Body).Decode(&genNumber); err != nil { + level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) + return "", err + } + + return genNumber, err +} diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index a75437c0a2afb..ae79832e7efb0 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -145,19 +145,20 @@ func (cfg *Config) Validate() error { type Compactor struct { services.Service - cfg Config - indexStorageClient shipper_storage.Client - tableMarker retention.TableMarker - sweeper *retention.Sweeper - deleteRequestsStore deletion.DeleteRequestsStore - DeleteRequestsHandler *deletion.DeleteRequestHandler - deleteRequestsManager *deletion.DeleteRequestsManager - expirationChecker retention.ExpirationChecker - metrics *metrics - running bool - wg sync.WaitGroup - indexCompactors map[string]IndexCompactor - schemaConfig config.SchemaConfig + cfg Config + indexStorageClient shipper_storage.Client + tableMarker retention.TableMarker + sweeper *retention.Sweeper + deleteRequestsStore deletion.DeleteRequestsStore + DeleteRequestsHandler *deletion.DeleteRequestHandler + DeleteRequestsGRPCHandler *deletion.GRPCRequestHandler + deleteRequestsManager *deletion.DeleteRequestsManager + expirationChecker retention.ExpirationChecker + metrics *metrics + running bool + wg sync.WaitGroup + indexCompactors map[string]IndexCompactor + schemaConfig config.SchemaConfig // Ring used for running a single compactor ringLifecycler *ring.BasicLifecycler @@ -285,6 +286,8 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Over r, ) + c.DeleteRequestsGRPCHandler = deletion.NewGRPCRequestHandler(c.deleteRequestsStore, limits) + c.deleteRequestsManager = deletion.NewDeleteRequestsManager( c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, diff --git a/pkg/storage/stores/indexshipper/compactor/compactor_client.go b/pkg/storage/stores/indexshipper/compactor/compactor_client.go deleted file mode 100644 index e37cdcab5e446..0000000000000 --- a/pkg/storage/stores/indexshipper/compactor/compactor_client.go +++ /dev/null @@ -1,42 +0,0 @@ -package compactor - -import ( - "flag" - "net/http" - "time" - - "github.com/grafana/dskit/crypto/tls" -) - -// Config for compactor's delete and generation-number client -type ClientConfig struct { - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { - prefix := "boltdb.shipper.compactor.delete_client" - f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, - "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) -} - -// NewDeleteHTTPClient return a pointer to a http client instance based on the -// delete client tls settings. -func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.MaxIdleConns = 250 - transport.MaxIdleConnsPerHost = 250 - - if cfg.TLSEnabled { - tlsCfg, err := cfg.TLS.GetTLSConfig() - if err != nil { - return nil, err - } - - transport.TLSClientConfig = tlsCfg - } - - return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go index 3e8639e50c4c7..f9ed885272006 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go @@ -2,24 +2,20 @@ package deletion import ( "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" "sync" "time" "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/util/log" ) -const ( - orgHeaderKey = "X-Scope-OrgID" - getDeletePath = "/loki/api/v1/delete" -) +type CompactorClient interface { + GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) + GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) + Name() string + Stop() +} type DeleteRequestsClient interface { GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) @@ -27,9 +23,8 @@ type DeleteRequestsClient interface { } type deleteRequestsClient struct { - url string - httpClient httpClient - mu sync.RWMutex + compactorClient CompactorClient + mu sync.RWMutex cache map[string][]DeleteRequest cacheDuration time.Duration @@ -40,10 +35,6 @@ type deleteRequestsClient struct { stopChan chan struct{} } -type httpClient interface { - Do(*http.Request) (*http.Response, error) -} - type DeleteRequestsStoreOption func(c *deleteRequestsClient) func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { @@ -52,22 +43,14 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { } } -func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { - u, err := url.Parse(addr) - if err != nil { - level.Error(log.Logger).Log("msg", "error parsing url", "err", err) - return nil, err - } - u.Path = getDeletePath - +func NewDeleteRequestsClient(compactorClient CompactorClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { client := &deleteRequestsClient{ - url: u.String(), - httpClient: c, - cacheDuration: 5 * time.Minute, - cache: make(map[string][]DeleteRequest), - clientType: clientType, - metrics: deleteClientMetrics, - stopChan: make(chan struct{}), + compactorClient: compactorClient, + cacheDuration: 5 * time.Minute, + cache: make(map[string][]DeleteRequest), + clientType: clientType, + metrics: deleteClientMetrics, + stopChan: make(chan struct{}), } for _, o := range opts { @@ -83,10 +66,10 @@ func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, return cachedRequests, nil } - c.metrics.deleteRequestsLookupsTotal.With(prometheus.Labels{"client_type": c.clientType}).Inc() - requests, err := c.getRequestsFromServer(ctx, userID) + c.metrics.deleteRequestsLookupsTotal.Inc() + requests, err := c.compactorClient.GetAllDeleteRequestsForUser(ctx, userID) if err != nil { - c.metrics.deleteRequestsLookupsFailedTotal.With(prometheus.Labels{"client_type": c.clientType}).Inc() + c.metrics.deleteRequestsLookupsFailedTotal.Inc() return nil, err } @@ -126,7 +109,7 @@ func (c *deleteRequestsClient) updateCache() { newCache := make(map[string][]DeleteRequest) for _, userID := range userIDs { - deleteReq, err := c.getRequestsFromServer(context.Background(), userID) + deleteReq, err := c.compactorClient.GetAllDeleteRequestsForUser(context.Background(), userID) if err != nil { level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) continue @@ -150,37 +133,3 @@ func (c *deleteRequestsClient) currentUserIDs() []string { return userIDs } - -func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - - req.Header.Set(orgHeaderKey, userID) - - resp, err := c.httpClient.Do(req) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - defer func() { - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - }() - - if resp.StatusCode/100 != 2 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err) - return nil, err - } - - var deleteRequests []DeleteRequest - if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil { - level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) - return nil, err - } - - return deleteRequests, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go index 2f9834c3ea46d..cac79b924ccb0 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client_test.go @@ -2,9 +2,6 @@ package deletion import ( "context" - "io" - "net/http" - "strings" "testing" "time" @@ -15,9 +12,16 @@ import ( func TestGetCacheGenNumberForUser(t *testing.T) { deleteClientMetrics := NewDeleteRequestClientMetrics(prometheus.DefaultRegisterer) - t.Run("it requests results from the api", func(t *testing.T) { - httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} - client, err := NewDeleteRequestsClient("http://test-server", httpClient, deleteClientMetrics, "test_client") + t.Run("it requests results from the compactor client", func(t *testing.T) { + compactorClient := mockCompactorClient{ + delRequests: []DeleteRequest{ + { + RequestID: "test-request", + }, + }, + } + + client, err := NewDeleteRequestsClient(&compactorClient, deleteClientMetrics, "test_client") require.Nil(t, err) deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") @@ -25,22 +29,28 @@ func TestGetCacheGenNumberForUser(t *testing.T) { require.Len(t, deleteRequests, 1) require.Equal(t, "test-request", deleteRequests[0].RequestID) - - require.Equal(t, "http://test-server/loki/api/v1/delete", httpClient.req.URL.String()) - require.Equal(t, http.MethodGet, httpClient.req.Method) - require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) }) t.Run("it caches the results", func(t *testing.T) { - httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`} - client, err := NewDeleteRequestsClient("http://test-server", httpClient, deleteClientMetrics, "test_client", WithRequestClientCacheDuration(100*time.Millisecond)) + compactorClient := mockCompactorClient{ + delRequests: []DeleteRequest{ + { + RequestID: "test-request", + }, + }, + } + client, err := NewDeleteRequestsClient(&compactorClient, deleteClientMetrics, "test_client", WithRequestClientCacheDuration(100*time.Millisecond)) require.Nil(t, err) deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) require.Equal(t, "test-request", deleteRequests[0].RequestID) - httpClient.ret = `[{"request_id":"different"}]` + compactorClient.delRequests = []DeleteRequest{ + { + RequestID: "different", + }, + } deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID") require.Nil(t, err) @@ -56,16 +66,21 @@ func TestGetCacheGenNumberForUser(t *testing.T) { }) } -type mockHTTPClient struct { - ret string - req *http.Request +type mockCompactorClient struct { + delRequests []DeleteRequest + cacheGenNum string +} + +func (m *mockCompactorClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + return m.delRequests, nil } -func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { - c.req = req +func (m *mockCompactorClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + return m.cacheGenNum, nil +} - return &http.Response{ - StatusCode: 200, - Body: io.NopCloser(strings.NewReader(c.ret)), - }, nil +func (m *mockCompactorClient) Name() string { + return "" } + +func (m *mockCompactorClient) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go index b3c7793905ff6..5a9861a28edbf 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go @@ -501,6 +501,8 @@ type mockDeleteRequestsStore struct { getAllUser string getAllResult []DeleteRequest getAllErr error + + genNumber string } func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { @@ -530,3 +532,7 @@ func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Contex m.getAllUser = userID return m.getAllResult, m.getAllErr } + +func (m *mockDeleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { + return m.genNumber, m.getErr +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go new file mode 100644 index 0000000000000..15b5452e89d5a --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler.go @@ -0,0 +1,94 @@ +package deletion + +import ( + "context" + "sort" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + util_log "github.com/grafana/loki/pkg/util/log" +) + +type GRPCRequestHandler struct { + deleteRequestsStore DeleteRequestsStore + limits Limits +} + +func NewGRPCRequestHandler(deleteRequestsStore DeleteRequestsStore, limits Limits) *GRPCRequestHandler { + return &GRPCRequestHandler{ + deleteRequestsStore: deleteRequestsStore, + limits: limits, + } +} + +func (g *GRPCRequestHandler) GetDeleteRequests(ctx context.Context, _ *grpc.GetDeleteRequestsRequest) (*grpc.GetDeleteRequestsResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + hasDelete, err := validDeletionLimit(g.limits, userID) + if err != nil { + return nil, err + } + + if !hasDelete { + return nil, errors.New(deletionNotAvailableMsg) + } + + deleteGroups, err := g.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err) + return nil, err + } + + deletesPerRequest := partitionByRequestID(deleteGroups) + deleteRequests := mergeDeletes(deletesPerRequest) + + sort.Slice(deleteRequests, func(i, j int) bool { + return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt + }) + + resp := grpc.GetDeleteRequestsResponse{ + DeleteRequests: make([]*grpc.DeleteRequest, len(deleteRequests)), + } + for i, dr := range deleteRequests { + resp.DeleteRequests[i] = &grpc.DeleteRequest{ + RequestID: dr.RequestID, + StartTime: int64(dr.StartTime), + EndTime: int64(dr.EndTime), + Query: dr.Query, + Status: string(dr.Status), + CreatedAt: int64(dr.CreatedAt), + } + } + + return &resp, nil +} + +func (g *GRPCRequestHandler) GetCacheGenNumbers(ctx context.Context, _ *grpc.GetCacheGenNumbersRequest) (*grpc.GetCacheGenNumbersResponse, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + hasDelete, err := validDeletionLimit(g.limits, userID) + if err != nil { + return nil, err + } + + if !hasDelete { + return nil, errors.New(deletionNotAvailableMsg) + } + + cacheGenNumber, err := g.deleteRequestsStore.GetCacheGenerationNumber(ctx, userID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting cache generation number", "err", err) + return nil, err + } + + return &grpc.GetCacheGenNumbersResponse{ResultsCacheGen: cacheGenNumber}, nil +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go new file mode 100644 index 0000000000000..8d9483a700484 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/grpc_request_handler_test.go @@ -0,0 +1,223 @@ +package deletion + +import ( + "context" + "errors" + "net" + "testing" + "time" + + "github.com/grafana/dskit/tenant" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + + compactor_client_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode" +) + +func server(t *testing.T, h *GRPCRequestHandler) (compactor_client_grpc.CompactorClient, func()) { + buffer := 101024 * 1024 + lis := bufconn.Listen(buffer) + + baseServer := grpc.NewServer(grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler) + }), grpc.ChainUnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return middleware.ServerUserHeaderInterceptor(ctx, req, info, handler) + })) + + compactor_client_grpc.RegisterCompactorServer(baseServer, h) + go func() { + require.NoError(t, baseServer.Serve(lis)) + }() + + conn, err := grpc.DialContext(context.Background(), "", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + closer := func() { + require.NoError(t, lis.Close()) + baseServer.Stop() + } + + client := compactor_client_grpc.NewCompactorClient(conn) + + return client, closer +} + +func grpcDeleteRequestsToDeleteRequests(requests []*compactor_client_grpc.DeleteRequest) []DeleteRequest { + resp := make([]DeleteRequest, len(requests)) + for i, grpcReq := range requests { + resp[i] = DeleteRequest{ + RequestID: grpcReq.RequestID, + StartTime: model.Time(grpcReq.StartTime), + EndTime: model.Time(grpcReq.EndTime), + Query: grpcReq.Query, + Status: DeleteRequestStatus(grpcReq.Status), + CreatedAt: model.Time(grpcReq.CreatedAt), + } + } + + return resp +} + +func TestGRPCGetDeleteRequests(t *testing.T) { + t.Run("it gets all the delete requests for the user", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}} + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, store.getAllResult, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("it merges requests with the same requestID", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, StartTime: now, EndTime: now.Add(time.Hour)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(2 * time.Hour), EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)}, + } + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, []DeleteRequest{ + {RequestID: "test-request-1", Status: StatusReceived, CreatedAt: now, StartTime: now, EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", Status: StatusReceived, CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + }, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("it only considers a request processed if all it's subqueries are processed", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + } + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.NoError(t, err) + require.ElementsMatch(t, []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: "66% Complete"}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + }, grpcDeleteRequestsToDeleteRequests(resp.DeleteRequests)) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllErr = errors.New("something bad") + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + _, err = grpcClient.GetDeleteRequests(ctx, &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "something bad", sts.Message()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + _, err := grpcClient.GetDeleteRequests(context.Background(), &compactor_client_grpc.GetDeleteRequestsRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "no org id", sts.Message()) + }) + }) +} + +func TestGRPCGetCacheGenNumbers(t *testing.T) { + t.Run("get gen number", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.genNumber = "123" + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + resp, err := grpcClient.GetCacheGenNumbers(ctx, &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.NoError(t, err) + require.Equal(t, store.genNumber, resp.ResultsCacheGen) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getErr = errors.New("something bad") + h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + ctx, _ := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), user1)) + orgID, err := tenant.TenantID(ctx) + require.NoError(t, err) + require.Equal(t, user1, orgID) + + _, err = grpcClient.GetCacheGenNumbers(ctx, &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "something bad", sts.Message()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + grpcClient, closer := server(t, h) + defer closer() + + _, err := grpcClient.GetCacheGenNumbers(context.Background(), &compactor_client_grpc.GetCacheGenNumbersRequest{}) + require.Error(t, err) + sts, _ := status.FromError(err) + require.Equal(t, "no org id", sts.Message()) + }) + }) +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go index 4da42c7384d0c..b9489e5f23b8d 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go @@ -6,24 +6,24 @@ import ( ) type DeleteRequestClientMetrics struct { - deleteRequestsLookupsTotal *prometheus.CounterVec - deleteRequestsLookupsFailedTotal *prometheus.CounterVec + deleteRequestsLookupsTotal prometheus.Counter + deleteRequestsLookupsFailedTotal prometheus.Counter } func NewDeleteRequestClientMetrics(r prometheus.Registerer) *DeleteRequestClientMetrics { m := DeleteRequestClientMetrics{} - m.deleteRequestsLookupsTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + m.deleteRequestsLookupsTotal = promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: "loki", Name: "delete_request_lookups_total", Help: "Number times the client has looked up delete requests", - }, []string{"client_type"}) + }) - m.deleteRequestsLookupsFailedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + m.deleteRequestsLookupsFailedTotal = promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: "loki", Name: "delete_request_lookups_failed_total", Help: "Number times the client has failed to look up delete requests", - }, []string{"client_type"}) + }) return &m } diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go deleted file mode 100644 index 7acb6d9bd6193..0000000000000 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client.go +++ /dev/null @@ -1,81 +0,0 @@ -package generationnumber - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/url" - - "github.com/go-kit/log/level" - - "github.com/grafana/loki/pkg/util/log" -) - -const ( - orgHeaderKey = "X-Scope-OrgID" - cacheGenNumPath = "/loki/api/v1/cache/generation_numbers" -) - -type CacheGenClient interface { - GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) - Name() string -} - -type genNumberClient struct { - url string - httpClient doer -} - -type doer interface { - Do(*http.Request) (*http.Response, error) -} - -func NewGenNumberClient(addr string, c doer) (CacheGenClient, error) { - u, err := url.Parse(addr) - if err != nil { - level.Error(log.Logger).Log("msg", "error parsing url", "err", err) - return nil, err - } - u.Path = cacheGenNumPath - - return &genNumberClient{ - url: u.String(), - httpClient: c, - }, nil -} - -func (c *genNumberClient) Name() string { - return "gen_number_client" -} - -func (c *genNumberClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - - req.Header.Set(orgHeaderKey, userID) - - resp, err := c.httpClient.Do(req) - if err != nil { - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - level.Error(log.Logger).Log("msg", "error getting cache gen numbers from the store", "err", err) - return "", err - } - - var genNumber string - if err := json.NewDecoder(resp.Body).Decode(&genNumber); err != nil { - level.Error(log.Logger).Log("msg", "error marshalling response", "err", err) - return "", err - } - - return genNumber, err -} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go deleted file mode 100644 index edd711f06dbfd..0000000000000 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_client_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package generationnumber - -import ( - "context" - "io" - "net/http" - "strings" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGetCacheGenNumberForUser(t *testing.T) { - httpClient := &mockHTTPClient{ret: `"42"`} - client, err := NewGenNumberClient("http://test-server", httpClient) - require.Nil(t, err) - - cacheGenNumber, err := client.GetCacheGenerationNumber(context.Background(), "userID") - require.Nil(t, err) - - require.Equal(t, "42", cacheGenNumber) - - require.Equal(t, "http://test-server/loki/api/v1/cache/generation_numbers", httpClient.req.URL.String()) - require.Equal(t, http.MethodGet, httpClient.req.Method) - require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID")) -} - -type mockHTTPClient struct { - ret string - req *http.Request -} - -func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) { - c.req = req - - return &http.Response{ - StatusCode: 200, - Body: io.NopCloser(strings.NewReader(c.ret)), - }, nil -} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go index 55267987cf0c3..63e1904e02047 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go @@ -16,6 +16,12 @@ import ( const reloadDuration = 5 * time.Minute +type CacheGenClient interface { + GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) + Name() string + Stop() +} + type GenNumberLoader struct { numberGetter CacheGenClient numbers map[string]string @@ -33,6 +39,7 @@ func NewGenNumberLoader(g CacheGenClient, registerer prometheus.Registerer) *Gen numberGetter: g, numbers: make(map[string]string), metrics: newGenLoaderMetrics(registerer), + quit: make(chan struct{}), } go l.loop() @@ -140,6 +147,7 @@ func (l *GenNumberLoader) getCacheGenNumber(userID string) string { func (l *GenNumberLoader) Stop() { close(l.quit) + l.numberGetter.Stop() } type noopNumberGetter struct{} @@ -151,3 +159,5 @@ func (g *noopNumberGetter) GetCacheGenerationNumber(_ context.Context, _ string) func (g *noopNumberGetter) Name() string { return "noop-getter" } + +func (g *noopNumberGetter) Stop() {} diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go index 7c5581a8b1811..afa59a12b47b5 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader_test.go @@ -52,3 +52,5 @@ func (g *mockGenNumberClient) GetCacheGenerationNumber(ctx context.Context, user func (g *mockGenNumberClient) Name() string { return "" } + +func (g *mockGenNumberClient) Stop() {} diff --git a/pkg/util/marshal/marshal.go b/pkg/util/marshal/marshal.go index 3fbdb84dbdbc3..d4ea4e41272e5 100644 --- a/pkg/util/marshal/marshal.go +++ b/pkg/util/marshal/marshal.go @@ -3,6 +3,7 @@ package marshal import ( + "fmt" "io" "github.com/gorilla/websocket" @@ -20,7 +21,7 @@ import ( func WriteQueryResponseJSON(v logqlmodel.Result, w io.Writer) error { value, err := NewResultValue(v.Data) if err != nil { - return err + return fmt.Errorf("could not write JSON response: %w", err) } q := loghttp.QueryResponse{