Skip to content

Commit

Permalink
use grpc for communicating with compactor for query time filtering of…
Browse files Browse the repository at this point in the history
… data requested for deletion (#7804)

**What this PR does / why we need it**:
Add grpc support to compactor for getting delete requests and gen number
for query time filtering.
Since these requests are internal to Loki, it would be good to use grpc
instead of HTTP same as all the internal requests we do in Loki.

I have added a new config for accepting the grpc address of the
compactor. I tried having just the existing config and detecting if it
is a grpc server, but it was hard to do it reliably, considering the
different deployment modes we support. I think it is safe to keep it the
same and eventually deprecate the existing config.

**Checklist**
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
  • Loading branch information
sandeepsukhani authored Nov 30, 2022
1 parent d3615f7 commit 1410808
Show file tree
Hide file tree
Showing 22 changed files with 2,321 additions and 326 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [7731](https://github.com/grafana/loki/pull/7731) **bitkill**: Add healthchecks to the docker-compose example.
* [7759](https://github.com/grafana/loki/pull/7759) **kavirajk**: Improve error message for loading config with ENV variables.
* [7785](https://github.com/grafana/loki/pull/7785) **dannykopping**: Add query blocker for queries and rules.
* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion.

##### Fixes

Expand Down
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2911,6 +2911,10 @@ This way, one doesn't have to replicate configuration in multiple places.
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]

# Address and port number where the compactor grpc requests are being served.
# CLI flag: -common.compactor-grpc-address
[compactor_grpc_address: <string> | default = ""]

## analytics

The `analytics` block configures the reporting of Loki analytics to grafana.com.
Expand Down
4 changes: 4 additions & 0 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Config struct {

// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`

// CompactorAddress is the grpc address of the compactor in the form host:port
CompactorGRPCAddress string `yaml:"compactor_grpc_address"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -57,6 +60,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
f.StringVar(&c.CompactorGRPCAddress, "common.compactor-grpc-address", "", "the grpc address of the compactor in the form host:port")
}

type Storage struct {
Expand Down
53 changes: 28 additions & 25 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
Expand All @@ -70,30 +71,31 @@ type Config struct {
UseBufferedLogger bool `yaml:"use_buffered_logger"`
UseSyncLogger bool `yaml:"use_sync_logger"`

Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"analytics"`
Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
CompactorHTTPClient compactor_client.HTTPConfig `yaml:"compactor_client,omitempty"`
CompactorGRPCClient compactor_client.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"analytics"`
}

// RegisterFlags registers flag.
Expand All @@ -116,7 +118,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.CompactorClient.RegisterFlags(f)
c.CompactorHTTPClient.RegisterFlags(f)
c.CompactorGRPCClient.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
c.StorageConfig.RegisterFlags(f)
Expand Down
71 changes: 45 additions & 26 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"strings"
"time"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -43,6 +41,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/ruler"
base_ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/runtime"
Expand All @@ -54,6 +53,8 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
compactor_client "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber"
"github.com/grafana/loki/pkg/storage/stores/series/index"
Expand Down Expand Up @@ -677,45 +678,53 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
var client generationnumber.CacheGenClient
if t.supportIndexDeleteRequest() {
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
compactorAddress, isGRPCAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient)
if err != nil {
return nil, err
reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "cache_gen", "client_type": t.Cfg.Target.String()}, prometheus.DefaultRegisterer)
if isGRPCAddress {
client, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
if err != nil {
return nil, err
}
} else {
client, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
if err != nil {
return nil, err
}
}
}

t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer)
return services.NewIdleService(nil, nil), nil
return services.NewIdleService(nil, func(failureCase error) error {
t.cacheGenerationLoader.Stop()
return nil
}), nil
}

func (t *Loki) supportIndexDeleteRequest() bool {
return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs)
}

func (t *Loki) compactorAddress() (string, error) {
// compactorAddress returns the configured address of the compactor.
// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false
func (t *Loki) compactorAddress() (string, bool, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
proto := "http"
if len(t.Cfg.Server.HTTPTLSConfig.TLSCertPath) > 0 && len(t.Cfg.Server.HTTPTLSConfig.TLSKeyPath) > 0 {
proto = "https"
}
return fmt.Sprintf("%s://%s:%d", proto, t.Cfg.Server.HTTPListenAddress, t.Cfg.Server.HTTPListenPort), nil
return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
}

if t.Cfg.Common.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.Common.CompactorAddress == "" && t.Cfg.Common.CompactorGRPCAddress == "" {
return "", false, errors.New("query filtering for deletes requires 'compactor_grpc_address' or 'compactor_address' to be configured")
}

return t.Cfg.Common.CompactorAddress, nil
if t.Cfg.Common.CompactorGRPCAddress != "" {
return t.Cfg.Common.CompactorGRPCAddress, true, nil
}

return t.Cfg.Common.CompactorAddress, false, nil
}

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
Expand Down Expand Up @@ -1012,6 +1021,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))
grpc.RegisterCompactorServer(t.Server.GRPC, t.compactor.DeleteRequestsGRPCHandler)
}

return t.compactor, nil
Expand Down Expand Up @@ -1128,17 +1138,26 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri
return deletion.NewNoOpDeleteRequestsStore(), nil
}

compactorAddress, err := t.compactorAddress()
compactorAddress, isGRPCAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}

httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
reg := prometheus.WrapRegistererWith(prometheus.Labels{"for": "delete_requests", "client_type": clientType}, prometheus.DefaultRegisterer)
var compactorClient deletion.CompactorClient
if isGRPCAddress {
compactorClient, err = compactor_client.NewGRPCClient(compactorAddress, t.Cfg.CompactorGRPCClient, reg)
if err != nil {
return nil, err
}
} else {
compactorClient, err = compactor_client.NewHTTPClient(compactorAddress, t.Cfg.CompactorHTTPClient)
if err != nil {
return nil, err
}
}

client, err := deletion.NewDeleteRequestsClient(compactorAddress, httpClient, t.deleteClientMetrics, clientType)
client, err := deletion.NewDeleteRequestsClient(compactorClient, t.deleteClientMetrics, clientType)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewResultsCacheMetrics(registerer prometheus.Registerer) *ResultsCacheMetri

type CacheGenNumberLoader interface {
GetResultsCacheGenNumber(tenantIDs []string) string
Stop()
}

// ResultsCacheConfig is the config for the results cache.
Expand Down
100 changes: 100 additions & 0 deletions pkg/storage/stores/indexshipper/compactor/client/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package client

import (
"context"
"flag"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"

"github.com/grafana/dskit/grpcclient"
deletion_grpc "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/client/grpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
"google.golang.org/grpc"
)

type GRPCConfig struct {
GRPCClientConfig grpcclient.Config `yaml:",inline"`
}

// RegisterFlags registers flags.
func (cfg *GRPCConfig) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("", f)
}

type compactorGRPCClient struct {
cfg GRPCConfig

GRPCClientRequestDuration *prometheus.HistogramVec
conn *grpc.ClientConn
grpcClient deletion_grpc.CompactorClient
}

// NewGRPCClient supports only methods which are used for internal communication of Loki like
// loading delete requests and cache gen numbers for query time filtering.
func NewGRPCClient(addr string, cfg GRPCConfig, r prometheus.Registerer) (deletion.CompactorClient, error) {
client := &compactorGRPCClient{
cfg: cfg,
GRPCClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki_compactor",
Name: "grpc_request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using compactor GRPC client",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(client.GRPCClientRequestDuration))
if err != nil {
return nil, err
}

client.conn, err = grpc.Dial(addr, dialOpts...)
if err != nil {
return nil, err
}

client.grpcClient = deletion_grpc.NewCompactorClient(client.conn)
return client, nil
}

func (s *compactorGRPCClient) Stop() {
s.conn.Close()
}

func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) {
ctx = user.InjectOrgID(ctx, userID)
grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{})
if err != nil {
return nil, err
}

deleteRequests := make([]deletion.DeleteRequest, len(grpcResp.DeleteRequests))
for i, dr := range grpcResp.DeleteRequests {
deleteRequests[i] = deletion.DeleteRequest{
RequestID: dr.RequestID,
StartTime: model.Time(dr.StartTime),
EndTime: model.Time(dr.EndTime),
Query: dr.Query,
Status: deletion.DeleteRequestStatus(dr.Status),
CreatedAt: model.Time(dr.CreatedAt),
}
}

return deleteRequests, nil
}

func (s *compactorGRPCClient) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) {
ctx = user.InjectOrgID(ctx, userID)
grpcResp, err := s.grpcClient.GetCacheGenNumbers(ctx, &deletion_grpc.GetCacheGenNumbersRequest{})
if err != nil {
return "", err
}

return grpcResp.ResultsCacheGen, nil
}

func (s *compactorGRPCClient) Name() string {
return "grpc_client"
}
Loading

0 comments on commit 1410808

Please sign in to comment.