Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shards Series API. #3856

Merged
merged 2 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/loghttp/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) {
Start: start,
End: end,
Groups: deduped,
Shards: shards(r),
}, nil

}

func union(cols ...[]string) []string {
Expand Down
243 changes: 157 additions & 86 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ message SeriesRequest {
google.protobuf.Timestamp start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
repeated string groups = 3;
repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"];
}

message SeriesResponse {
Expand Down Expand Up @@ -163,4 +164,4 @@ message GetChunkIDsRequest {

message GetChunkIDsResponse {
repeated string chunkIDs = 1;
}
}
14 changes: 6 additions & 8 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ const (

// ShardingMetrics is the metrics wrapper used in shard mapping
type ShardingMetrics struct {
shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric)
Shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric)
ShardFactor prometheus.Histogram // per request shard factor
parsed *prometheus.CounterVec // parsed ASTs total, partitioned by (success/failure/noop)
shardFactor prometheus.Histogram // per request shard factor
}

func NewShardingMetrics(registerer prometheus.Registerer) *ShardingMetrics {

return &ShardingMetrics{
shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_shards_total",
}, []string{"type"}),
parsed: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_sharding_parsed_queries_total",
}, []string{"type"}),
shardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
ShardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "query_frontend_shard_factor",
Help: "Number of shards per request",
Expand Down Expand Up @@ -67,14 +66,14 @@ type shardRecorder struct {
// Add increments both the shard count and tracks it for the eventual histogram entry.
func (r *shardRecorder) Add(x int, key string) {
r.total += x
r.shards.WithLabelValues(key).Add(float64(x))
r.Shards.WithLabelValues(key).Add(float64(x))
}

// Finish idemptotently records a histogram entry with the total shard factor.
func (r *shardRecorder) Finish() {
if !r.done {
r.done = true
r.shardFactor.Observe(float64(r.total))
r.ShardFactor.Observe(float64(r.total))
}
}

Expand Down Expand Up @@ -203,7 +202,6 @@ func (m ShardMapper) mapSampleExpr(expr SampleExpr, r *shardRecorder) SampleExpr
// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *shardRecorder) (SampleExpr, error) {

// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if !expr.Shardable() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest)
}

go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups())
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -441,20 +441,21 @@ func (q *Querier) seriesForMatchers(
ctx context.Context,
from, through time.Time,
groups []string,
shards []string,
) ([]logproto.SeriesIdentifier, error) {

var results []logproto.SeriesIdentifier
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "")
results, err = q.seriesForMatcher(ctx, from, through, "", shards)
if err != nil {
return nil, err
}
} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group)
ids, err := q.seriesForMatcher(ctx, from, through, group, shards)
if err != nil {
return nil, err
}
Expand All @@ -465,14 +466,15 @@ func (q *Querier) seriesForMatchers(
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string) ([]logproto.SeriesIdentifier, error) {
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
ids, err := q.store.GetSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
Shards: shards,
},
})
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (r *LokiSeriesRequest) LogToSpan(sp opentracing.Span) {
otlog.String("matchers", strings.Join(r.GetMatch(), ",")),
otlog.String("start", timestamp.Time(r.GetStart()).String()),
otlog.String("end", timestamp.Time(r.GetEnd()).String()),
otlog.String("shards", strings.Join(r.GetShards(), ",")),
)
}

Expand Down Expand Up @@ -196,7 +197,6 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf("unknown request path: %s", r.URL.Path))
}

}

func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
Expand Down Expand Up @@ -235,7 +235,9 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
"end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())},
"match[]": request.Match,
}

if len(request.Shards) > 0 {
params["shards"] = request.Shards
}
u := &url.URL{
Path: "/loki/api/v1/series",
RawQuery: params.Encode(),
Expand Down Expand Up @@ -355,7 +357,6 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
return nil, httpgrpc.Errorf(http.StatusBadRequest, "unsupported response type")
}
}

}

func (Codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
Expand Down Expand Up @@ -482,7 +483,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
lokiSeriesData = append(lokiSeriesData, series)
uniqueSeries[series.String()] = struct{}{}
}

}
}

Expand All @@ -504,7 +504,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
names = append(names, labelName)
uniqueNames[labelName] = struct{}{}
}

}
}

Expand All @@ -520,7 +519,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons

// mergeOrderedNonOverlappingStreams merges a set of ordered, nonoverlapping responses by concatenating matching streams then running them through a heap to pull out limit values
func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, direction logproto.Direction) []logproto.Stream {

var total int

// turn resps -> map[labels] []entries
Expand Down Expand Up @@ -612,7 +610,6 @@ func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, dire
}

return results

}

func toProto(m loghttp.Matrix) []queryrange.SampleStream {
Expand Down Expand Up @@ -642,7 +639,6 @@ func (res LokiResponse) Count() int64 {
result += int64(len(s.Entries))
}
return result

}

type paramsWrapper struct {
Expand All @@ -658,12 +654,15 @@ func paramsFromRequest(req queryrange.Request) *paramsWrapper {
func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
}

func (p paramsWrapper) Start() time.Time {
return p.StartTs
}

func (p paramsWrapper) End() time.Time {
return p.EndTs
}

func (p paramsWrapper) Step() time.Duration {
return time.Duration(p.LokiRequest.Step * 1e6)
}
Expand Down
Loading