From d6d33871a2453de6ece21559e98b8b566aa0dee3 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 13 Oct 2021 20:24:04 -0400 Subject: [PATCH] update some of the forked code to use dskit grpcclient and grpcutil since #4312 was merged after the original fork was created. --- pkg/lokifrontend/frontend/v1/frontend.go | 5 +-- pkg/lokifrontend/frontend/v2/frontend.go | 7 ++-- pkg/querier/worker/scheduler_processor.go | 7 ++-- pkg/querier/worker/worker.go | 2 +- pkg/scheduler/scheduler.go | 7 ++-- pkg/util/httpgrpc/carrier.go | 40 +++++++++++++++++++++++ 6 files changed, 56 insertions(+), 12 deletions(-) create mode 100644 pkg/util/httpgrpc/carrier.go diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go index ea02b5aedbf5..03149de22ce6 100644 --- a/pkg/lokifrontend/frontend/v1/frontend.go +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -12,7 +12,6 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/grpcutil" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -22,6 +21,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) var ( @@ -153,7 +154,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) // Propagate trace context in gRPC too - this will be ignored if using HTTP. tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) if tracer != nil && span != nil { - carrier := (*grpcutil.HttpgrpcHeadersCarrier)(req) + carrier := (*lokigrpc.HttpgrpcHeadersCarrier)(req) err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier) if err != nil { return nil, err diff --git a/pkg/lokifrontend/frontend/v2/frontend.go b/pkg/lokifrontend/frontend/v2/frontend.go index 6a8de240fdb9..a69e456b1067 100644 --- a/pkg/lokifrontend/frontend/v2/frontend.go +++ b/pkg/lokifrontend/frontend/v2/frontend.go @@ -13,11 +13,10 @@ import ( "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/tenant" - "github.com/cortexproject/cortex/pkg/util/grpcclient" - "github.com/cortexproject/cortex/pkg/util/grpcutil" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/services" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -25,6 +24,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) // Config for a Frontend. @@ -162,7 +163,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) // Propagate trace context in gRPC too - this will be ignored if using HTTP. tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx) if tracer != nil && span != nil { - carrier := (*grpcutil.HttpgrpcHeadersCarrier)(req) + carrier := (*lokigrpc.HttpgrpcHeadersCarrier)(req) if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, carrier); err != nil { return nil, err } diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index eb5bb219a43b..5ad2e1e54a59 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -11,13 +11,12 @@ import ( querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" - "github.com/cortexproject/cortex/pkg/util/grpcclient" - "github.com/cortexproject/cortex/pkg/util/grpcutil" util_log "github.com/cortexproject/cortex/pkg/util/log" cortex_middleware "github.com/cortexproject/cortex/pkg/util/middleware" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/services" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" @@ -28,6 +27,8 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (*schedulerProcessor, []services.Service) { @@ -137,7 +138,7 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer tracer := opentracing.GlobalTracer() // Ignore errors here. If we cannot get parent span, we just don't create new one. - parentSpanContext, _ := grpcutil.GetParentSpanForRequest(tracer, request.HttpRequest) + parentSpanContext, _ := lokigrpc.GetParentSpanForRequest(tracer, request.HttpRequest) if parentSpanContext != nil { queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext)) defer queueSpan.Finish() diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 3bcce4ee7709..223da6aa5289 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -9,9 +9,9 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4fb7d2a10e62..bcf57d131f8f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -14,11 +14,10 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/grpcclient" - "github.com/cortexproject/cortex/pkg/util/grpcutil" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" otgrpc "github.com/opentracing-contrib/go-grpc" @@ -30,6 +29,8 @@ import ( "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "google.golang.org/grpc" + + lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" ) var ( @@ -340,7 +341,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr // Extract tracing information from headers in HTTP request. FrontendContext doesn't have the correct tracing // information, since that is a long-running request. tracer := opentracing.GlobalTracer() - parentSpanContext, err := grpcutil.GetParentSpanForRequest(tracer, msg.HttpRequest) + parentSpanContext, err := lokigrpc.GetParentSpanForRequest(tracer, msg.HttpRequest) if err != nil { return err } diff --git a/pkg/util/httpgrpc/carrier.go b/pkg/util/httpgrpc/carrier.go new file mode 100644 index 000000000000..dd300070d6fe --- /dev/null +++ b/pkg/util/httpgrpc/carrier.go @@ -0,0 +1,40 @@ +package httpgrpc + +import ( + "github.com/opentracing/opentracing-go" + "github.com/weaveworks/common/httpgrpc" +) + +// Used to transfer trace information from/to HTTP request. +type HttpgrpcHeadersCarrier httpgrpc.HTTPRequest + +func (c *HttpgrpcHeadersCarrier) Set(key, val string) { + c.Headers = append(c.Headers, &httpgrpc.Header{ + Key: key, + Values: []string{val}, + }) +} + +func (c *HttpgrpcHeadersCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.Headers { + for _, v := range h.Values { + if err := handler(h.Key, v); err != nil { + return err + } + } + } + return nil +} + +func GetParentSpanForRequest(tracer opentracing.Tracer, req *httpgrpc.HTTPRequest) (opentracing.SpanContext, error) { + if tracer == nil { + return nil, nil + } + + carrier := (*HttpgrpcHeadersCarrier)(req) + extracted, err := tracer.Extract(opentracing.HTTPHeaders, carrier) + if err == opentracing.ErrSpanContextNotFound { + err = nil + } + return extracted, err +}