Skip to content

Commit

Permalink
Add baggage propogation
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanhuhta committed Sep 29, 2024
1 parent 987df37 commit c794044
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/clientpool/ingester_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (f *ingesterPoolFactory) FromInstance(inst ring.InstanceDesc) (ring_client.
return nil, err
}

httpClient := util.InstrumentedDefaultHTTPClient(util.WithTracingTransport())
httpClient := util.InstrumentedDefaultHTTPClient(util.WithTracingTransport(), util.WithBaggageTransport())
return &ingesterPoolClient{
IngesterServiceClient: ingesterv1connect.NewIngesterServiceClient(httpClient, "http://"+inst.Addr, f.options...),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Expand Down
2 changes: 1 addition & 1 deletion pkg/clientpool/store_gateway_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (f *storeGatewayPoolFactory) FromInstance(inst ring.InstanceDesc) (ring_cli
return nil, err
}

httpClient := util.InstrumentedDefaultHTTPClient(util.WithTracingTransport())
httpClient := util.InstrumentedDefaultHTTPClient(util.WithTracingTransport(), util.WithBaggageTransport())
return &storeGatewayPoolClient{
StoreGatewayServiceClient: storegatewayv1connect.NewStoreGatewayServiceClient(httpClient, "http://"+inst.Addr, f.options...),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ func forAllIngesters[T any](ctx context.Context, ingesterQuerier *IngesterQuerie
if err != nil {
return nil, err
}
return forGivenReplicationSet(ctx, func(addr string) (IngesterQueryClient, error) {

clientFactoryFn := func(addr string) (IngesterQueryClient, error) {
client, err := ingesterQuerier.pool.GetClientFor(addr)
if err != nil {
return nil, err
}
return client.(IngesterQueryClient), nil
}, replicationSet, f)
}
return forGivenReplicationSet(ctx, clientFactoryFn, replicationSet, f)
}

// forAllPlannedIngesters runs f, in parallel, for all ingesters part of the plan
Expand Down
10 changes: 10 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/samber/lo"
"go.opentelemetry.io/otel/baggage"
"golang.org/x/sync/errgroup"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
Expand Down Expand Up @@ -281,6 +282,15 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L
sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelNames")
defer sp.Finish()

header := req.Header()
fmt.Println("header:", header)

ctxBaggage := baggage.FromContext(ctx)
fmt.Println("baggage:", ctxBaggage)

parsedBaggage, e := baggage.Parse(header.Get("Baggage"))
fmt.Println("parsedBaggage:", parsedBaggage, "error:", e)

_, hasTimeRange := phlaremodel.GetTimeRange(req.Msg)
sp.LogFields(
otlog.Bool("legacy_request", !hasTimeRange),
Expand Down
7 changes: 6 additions & 1 deletion pkg/querier/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ func forGivenReplicationSet[Result any, Querier any](ctx context.Context, client
}

// forGivenPlan runs f, in parallel, for given plan.
func forGivenPlan[Result any, Querier any](ctx context.Context, plan map[string]*blockPlanEntry, clientFactory func(string) (Querier, error), replicationSet ring.ReplicationSet, f QueryReplicaWithHintsFn[Result, Querier]) ([]ResponseFromReplica[Result], error) {
func forGivenPlan[Result any, Querier any](
ctx context.Context,
plan map[string]*blockPlanEntry,
clientFactory func(string) (Querier, error),
replicationSet ring.ReplicationSet, f QueryReplicaWithHintsFn[Result, Querier],
) ([]ResponseFromReplica[Result], error) {
g, _ := errgroup.WithContext(ctx)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/clientpool/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (f *poolFactory) FromInstance(inst ring.InstanceDesc) (ring_client.PoolClie
return nil, err
}

httpClient := util.InstrumentedDefaultHTTPClient(util.WithTracingTransport())
httpClient := util.InstrumentedDefaultHTTPClient(util.WithTracingTransport(), util.WithBaggageTransport())
return &storeGatewayPoolClient{
StoreGatewayServiceClient: storegatewayv1connect.NewStoreGatewayServiceClient(httpClient, "http://"+inst.Addr, f.options...),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Expand Down
22 changes: 22 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/grafana/dskit/instrument"
"go.opentelemetry.io/otel/baggage"

"github.com/dustin/go-humanize"
"github.com/felixge/httpsnoop"
Expand Down Expand Up @@ -84,6 +85,27 @@ func WithTracingTransport() RoundTripperInstrumentFunc {
}
}

// WithBaggageTransport will set the Baggage header on the request if there is
// any baggage in the context and it was not already set.
func WithBaggageTransport() RoundTripperInstrumentFunc {
return func(next http.RoundTripper) http.RoundTripper {
return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
_, ok := req.Header["Baggage"]
if ok {
return next.RoundTrip(req)
}

b := baggage.FromContext(req.Context())
if b.Len() == 0 {
return next.RoundTrip(req)
}

req.Header.Set("Baggage", b.String())
return next.RoundTrip(req)
})
}
}

// WriteYAMLResponse writes some YAML as a HTTP response.
func WriteYAMLResponse(w http.ResponseWriter, v interface{}) {
// There is not standardised content-type for YAML, text/plain ensures the
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/dskit/middleware"
"github.com/grafana/pyroscope-go/x/k6"
"go.opentelemetry.io/otel/baggage"
)

// K6Middleware creates a middleware that extracts k6 load test labels from the
Expand All @@ -13,7 +14,22 @@ func K6Middleware() middleware.Interface {
return middleware.Func(func(h http.Handler) http.Handler {
next := k6.LabelsFromBaggageHandler(h)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = setBaggageRequestContext(r)
next.ServeHTTP(w, r)
})
})
}

// SetBaggageContext sets the Baggage in the request context if it exists as a
// header.
//
// TODO(bryan) Move this into the pyroscope-go/x/k6 package.
func setBaggageRequestContext(r *http.Request) *http.Request {
b, err := baggage.Parse(r.Header.Get("Baggage"))
if err != nil {
return r
}

ctx := baggage.ContextWithBaggage(r.Context(), b)
return r.WithContext(ctx)
}

0 comments on commit c794044

Please sign in to comment.