Skip to content

Commit

Permalink
Update cortex to 1 6 (#3131)
Browse files Browse the repository at this point in the history
* Update Cortex to recent master (1.5.0+, 35e698bb56d6).

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Ignore set scheduler address.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Make linter happy.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Upgrade cortex to 1.6

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Work around issues in go-openapi libraries

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Clean up go.mod and go.sum

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

Co-authored-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
MichelHollands and pstibrany authored Jan 7, 2021
1 parent cdf4a73 commit 4f62f36
Show file tree
Hide file tree
Showing 596 changed files with 53,000 additions and 10,107 deletions.
24 changes: 16 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ module github.com/grafana/loki
go 1.15

require (
github.com/NYTimes/gziphandler v1.1.1
github.com/aws/aws-lambda-go v1.17.0
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bmatcuk/doublestar v1.2.2
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/cespare/xxhash/v2 v2.1.1
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.4.1-0.20201022071705-85942c5703cf
github.com/cortexproject/cortex v1.6.0
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v17.12.0-ce-rc1.0.20201009160326-9c15e82f19b0+incompatible
github.com/docker/go-metrics v0.0.0-20181218153428-b84716841b82 // indirect
Expand All @@ -37,30 +38,30 @@ require (
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.9.5
github.com/mitchellh/mapstructure v1.2.2
github.com/mitchellh/mapstructure v1.3.3
github.com/moby/term v0.0.0-20200915141129-7f0af18e79f2 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pierrec/lz4/v4 v4.0.2-0.20200813132121-22f5d580d5c4
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_golang v1.8.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.14.0
github.com/prometheus/prometheus v1.8.2-0.20201014093524-73e2ce1bd643
github.com/prometheus/common v0.15.0
github.com/prometheus/prometheus v1.8.2-0.20201119181812-c8f810083d3f
github.com/segmentio/fasthash v1.0.2
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546
github.com/stretchr/testify v1.6.1
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099
github.com/weaveworks/common v0.0.0-20201119133501-0619918236ec
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0
google.golang.org/grpc v1.32.0
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
google.golang.org/grpc v1.33.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.3.0
Expand All @@ -87,3 +88,10 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.29.1
// Same as Cortex
// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86
replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

// Fix errors like too many arguments in call to "github.com/go-openapi/errors".Required
// have (string, string)
// want (string, string, interface {})
replace github.com/go-openapi/errors => github.com/go-openapi/errors v0.19.4

replace github.com/go-openapi/validate => github.com/go-openapi/validate v0.19.8
109 changes: 80 additions & 29 deletions go.sum

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (r
return result, nil
}

func (r mockRing) GetAll(op ring.Operation) (ring.ReplicationSet, error) {
func (r mockRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) {
return r.GetReplicationSetForOperation(op)
}

func (r mockRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (w *WALCheckpointWriter) deleteCheckpoints(maxIndex int) (err error) {
}
}()

var errs tsdb_errors.MultiError
errs := tsdb_errors.NewMulti()

files, err := ioutil.ReadDir(w.segmentWAL.Dir())
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"fmt"
"net/http"

frontend "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/querier/worker"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"

"github.com/cortexproject/cortex/pkg/util/flagext"
Expand All @@ -15,7 +18,6 @@ import (
"github.com/weaveworks/common/signals"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
Expand Down Expand Up @@ -60,7 +62,7 @@ type Config struct {
SchemaConfig storage.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
Expand Down
57 changes: 44 additions & 13 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"os"
"time"

"github.com/NYTimes/gziphandler"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"

"github.com/grafana/loki/pkg/ruler/manager"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"

Expand All @@ -18,8 +23,7 @@ import (
cortex_storage "github.com/cortexproject/cortex/pkg/chunk/storage"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier "github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/frontend"
cortex_querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
Expand Down Expand Up @@ -159,11 +163,23 @@ func (t *Loki) initDistributor() (services.Service, error) {
}

func (t *Loki) initQuerier() (services.Service, error) {
level.Debug(util.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
worker, err := frontend.NewWorker(t.cfg.Worker, cortex_querier.Config{MaxConcurrent: t.cfg.Querier.MaxConcurrent}, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util.Logger)
if err != nil {
return nil, err
var (
worker services.Service
err error
)

// NewQuerierWorker now expects Frontend (or Scheduler) address to be set. Loki only supports Frontend for now.
if t.cfg.Worker.FrontendAddress != "" {
// In case someone set scheduler address, we ignore it.
t.cfg.Worker.SchedulerAddress = ""
t.cfg.Worker.MaxConcurrentRequests = t.cfg.Querier.MaxConcurrent
level.Debug(util.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
worker, err = cortex_querier_worker.NewQuerierWorker(t.cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
}

if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod
}
Expand Down Expand Up @@ -345,12 +361,23 @@ type disabledShuffleShardingLimits struct{}
func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { return 0 }

func (t *Loki) initQueryFrontend() (_ services.Service, err error) {

level.Debug(util.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.cfg.Frontend))
t.frontend, err = frontend.New(t.cfg.Frontend.Config, disabledShuffleShardingLimits{}, util.Logger, prometheus.DefaultRegisterer)

roundTripper, frontendV1, _, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
// Don't set FrontendV2 field to make sure that only frontendV1 can be initialized.
Handler: t.cfg.Frontend.Handler,
FrontendV1: t.cfg.Frontend.FrontendV1,
CompressResponses: t.cfg.Frontend.CompressResponses,
DownstreamURL: t.cfg.Frontend.DownstreamURL,
}, disabledShuffleShardingLimits{}, t.cfg.Server.GRPCListenPort, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return
return nil, err
}
t.frontend = frontendV1
if t.frontend != nil {
frontendv1pb.RegisterFrontendServer(t.Server.GRPC, t.frontend)
}

level.Debug(util.Logger).Log("msg", "initializing query range tripperware",
"config", fmt.Sprintf("%+v", t.cfg.QueryRange),
"limits", fmt.Sprintf("%+v", t.cfg.LimitsConfig),
Expand All @@ -367,16 +394,20 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
return
}
t.stopper = stopper
t.frontend.Wrap(tripperware)
frontend.RegisterFrontendServer(t.Server.GRPC, t.frontend)

frontendHandler := middleware.Merge(
roundTripper = tripperware(roundTripper)
frontendHandler := transport.NewHandler(t.cfg.Frontend.Handler, roundTripper, util.Logger, prometheus.DefaultRegisterer)
if t.cfg.Frontend.CompressResponses {
frontendHandler = gziphandler.GzipHandler(frontendHandler)
}

frontendHandler = middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
t.httpAuthMiddleware,
queryrange.StatsHTTPMiddleware,
serverutil.NewPrepopulateMiddleware(),
serverutil.ResponseJSONMiddleware(),
).Wrap(t.frontend.Handler())
).Wrap(frontendHandler)

var defaultHandler http.Handler
if t.cfg.Frontend.TailProxyURL != "" {
Expand Down
19 changes: 15 additions & 4 deletions pkg/lokifrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@ package lokifrontend
import (
"flag"

"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
v1 "github.com/cortexproject/cortex/pkg/frontend/v1"
)

type Config struct {
frontend.Config `yaml:",inline"`
TailProxyURL string `yaml:"tail_proxy_url"`
Handler transport.HandlerConfig `yaml:",inline"`
FrontendV1 v1.Config `yaml:",inline"`

CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`

TailProxyURL string `yaml:"tail_proxy_url"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)
cfg.Handler.RegisterFlags(f)
cfg.FrontendV1.RegisterFlags(f)

f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")

f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}
6 changes: 3 additions & 3 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryD
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
replicationSet, err := q.ring.GetAll(ring.Read)
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

// Get the current replication set from the ring
replicationSet, err := q.ring.GetAll(ring.Read)
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (q *IngesterQuerier) Series(ctx context.Context, req *logproto.SeriesReques
}

func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
replicationSet, err := q.ring.GetAll(ring.Read)
replicationSet, err := q.ring.GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,11 @@ func (r *readRingMock) BatchGet(keys []uint32, op ring.Operation) ([]ring.Replic
return []ring.ReplicationSet{r.replicationSet}, nil
}

func (r *readRingMock) GetAll(op ring.Operation) (ring.ReplicationSet, error) {
func (r *readRingMock) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

func (r *readRingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -45,7 +44,7 @@ func NewTripperware(
schema chunk.SchemaConfig,
minShardingLookback time.Duration,
registerer prometheus.Registerer,
) (frontend.Tripperware, Stopper, error) {
) (queryrange.Tripperware, Stopper, error) {
// Ensure that QuerySplitDuration uses configuration defaults.
// This avoids divide by zero errors when determining cache keys where user specific overrides don't exist.
limits = WithDefaultLimits(limits, cfg.Config)
Expand Down Expand Up @@ -222,8 +221,8 @@ func NewLogFilterTripperware(
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
) (frontend.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.LimitsMiddleware(limits)}
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByMetrics))
}
Expand Down Expand Up @@ -265,7 +264,7 @@ func NewSeriesTripperware(
instrumentMetrics *queryrange.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
splitByMetrics *SplitByMetrics,
) (frontend.Tripperware, error) {
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware,
Expand Down Expand Up @@ -295,7 +294,7 @@ func NewLabelsTripperware(
instrumentMetrics *queryrange.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
splitByMetrics *SplitByMetrics,
) (frontend.Tripperware, error) {
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware,
Expand Down Expand Up @@ -331,8 +330,8 @@ func NewMetricTripperware(
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
registerer prometheus.Registerer,
) (frontend.Tripperware, Stopper, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.LimitsMiddleware(limits)}
) (queryrange.Tripperware, Stopper, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand Down Expand Up @@ -406,7 +405,7 @@ func NewMetricTripperware(
// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
rt := queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...)
return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
if !strings.HasSuffix(r.URL.Path, "/query_range") {
return next.RoundTrip(r)
}
Expand Down
Loading

0 comments on commit 4f62f36

Please sign in to comment.