Skip to content

Commit

Permalink
feat: optimize series order
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Jun 7, 2024
1 parent 4837be6 commit 50cf911
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 58 deletions.
2 changes: 2 additions & 0 deletions cmd/pyroscope/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ Usage of ./pyroscope:
[experimental] Set to true to enable profiling integration.
-usage-stats.enabled
Enable anonymous usage reporting. (default true)
-validation.enforce-labels-order
Enforce labels order optimization.
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
2 changes: 2 additions & 0 deletions cmd/pyroscope/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ Usage of ./pyroscope:
Set to false to disable tracing. (default true)
-usage-stats.enabled
Enable anonymous usage reporting. (default true)
-validation.enforce-labels-order
Enforce labels order optimization.
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
12 changes: 9 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type Limits interface {
MaxProfileStacktraceDepth(tenantID string) int
MaxProfileSymbolValueLength(tenantID string) int
MaxSessionsPerSeries(tenantID string) int
EnforceLabelsOrder(tenantID string) bool
validation.ProfileValidationLimits
aggregator.Limits
}
Expand Down Expand Up @@ -393,8 +394,9 @@ func (d *Distributor) sendAggregatedProfile(ctx context.Context, req *distributo

func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.PushRequest, tenantID string) (resp *connect.Response[pushv1.PushResponse], err error) {
// Reduce cardinality of session_id label.
maxSessionsPerSeries := d.limits.MaxSessionsPerSeries(tenantID)
for _, series := range req.Series {
series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels)
series.Labels = d.limitMaxSessionsPerSeries(maxSessionsPerSeries, series.Labels)
}

// Next we split profiles by labels.
Expand All @@ -414,7 +416,12 @@ func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.Pu

// Validate the labels again and generate tokens for shuffle sharding.
keys := make([]uint32, len(profileSeries))
enforceLabelsOrder := d.limits.EnforceLabelsOrder(tenantID)
for i, series := range profileSeries {
if enforceLabelsOrder {
labels := phlaremodel.Labels(series.Labels)
labels.Insert(phlaremodel.LabelNameOrder, phlaremodel.LabelOrderEnforced)
}
if err = validation.ValidateLabels(d.limits, tenantID, series.Labels); err != nil {
validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(req.TotalProfiles))
validation.DiscardedBytes.WithLabelValues(string(validation.ReasonOf(err)), tenantID).Add(float64(req.TotalBytesUncompressed))
Expand Down Expand Up @@ -666,8 +673,7 @@ func extractSampleSeries(req *distributormodel.PushRequest) []*distributormodel.
return profileSeries
}

func (d *Distributor) limitMaxSessionsPerSeries(tenantID string, labels phlaremodel.Labels) phlaremodel.Labels {
maxSessionsPerSeries := d.limits.MaxSessionsPerSeries(tenantID)
func (d *Distributor) limitMaxSessionsPerSeries(maxSessionsPerSeries int, labels phlaremodel.Labels) phlaremodel.Labels {
if maxSessionsPerSeries == 0 {
return labels.Delete(phlaremodel.LabelNameSessionID)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ func Test_Sessions_Limit(t *testing.T) {
}), nil, log.NewLogfmtLogger(os.Stdout))

require.NoError(t, err)
assert.Equal(t, tc.expectedLabels, d.limitMaxSessionsPerSeries("user-1", tc.seriesLabels))
limit := d.limits.MaxSessionsPerSeries("user-1")
assert.Equal(t, tc.expectedLabels, d.limitMaxSessionsPerSeries(limit, tc.seriesLabels))
})
}
}
Expand Down
108 changes: 79 additions & 29 deletions pkg/model/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@ import (
var seps = []byte{'\xff'}

const (
LabelNameProfileType = "__profile_type__"
LabelNameType = "__type__"
LabelNameUnit = "__unit__"
LabelNamePeriodType = "__period_type__"
LabelNamePeriodUnit = "__period_unit__"
LabelNameDelta = "__delta__"
LabelNameProfileName = pmodel.MetricNameLabel
LabelNameSessionID = "__session_id__"
LabelNameProfileType = "__profile_type__"
LabelNameServiceNamePrivate = "__service_name__"
LabelNameDelta = "__delta__"
LabelNameProfileName = pmodel.MetricNameLabel
LabelNamePeriodType = "__period_type__"
LabelNamePeriodUnit = "__period_unit__"
LabelNameSessionID = "__session_id__"
LabelNameType = "__type__"
LabelNameUnit = "__unit__"

LabelNameServiceGitRef = "service_git_ref"
LabelNameServiceName = "service_name"
LabelNameServiceRepository = "service_repository"
LabelNameServiceGitRef = "service_git_ref"

LabelNamePyroscopeSpy = "pyroscope_spy"
LabelNameServiceNameK8s = "__meta_kubernetes_pod_annotation_pyroscope_io_service_name"
LabelNameOrder = "__order__"
LabelOrderEnforced = "enforced"

LabelNamePyroscopeSpy = "pyroscope_spy"

labelSep = '\xfe'
)
Expand All @@ -49,6 +52,30 @@ func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }

// LabelsEnforcedOrder is a sort order of labels, where profile type and
// service name labels always go first. This is crucial for query performance
// as labels determine the physical order of the profiling data.
type LabelsEnforcedOrder []*typesv1.LabelPair

func (ls LabelsEnforcedOrder) Len() int { return len(ls) }
func (ls LabelsEnforcedOrder) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }

func (ls LabelsEnforcedOrder) Less(i, j int) bool {
if ls[i].Name[0] == '_' || ls[j].Name[0] == '_' {
leftType := ls[i].Name == LabelNameProfileType
rightType := ls[j].Name == LabelNameProfileType
if leftType || rightType {
return leftType || !rightType
}
leftService := ls[i].Name == LabelNameServiceNamePrivate
rightService := ls[j].Name == LabelNameServiceNamePrivate
if leftService || rightService {
return leftService || !rightService
}
}
return ls[i].Name < ls[j].Name
}

// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
// Use xxhash.Sum64(b) for fast path as it's faster.
Expand Down Expand Up @@ -192,14 +219,41 @@ func (ls Labels) GetLabel(name string) (*typesv1.LabelPair, bool) {
return nil, false
}

// Delete removes the first label encountered with the name given.
// A copy of the label set without the label is returned.
// Delete removes the first label encountered with the name given in place.
func (ls Labels) Delete(name string) Labels {
return slices.RemoveInPlace(ls, func(pair *typesv1.LabelPair, i int) bool {
return pair.Name == name
})
}

// Insert adds the given label to the set of labels.
// It assumes the labels are ordered
func (ls *Labels) Insert(name, value string) {
// Find the index where the new label should be inserted
index := -1
for i, label := range *ls {
if label.Name > name {
index = i
break
}
if label.Name == name {
label.Value = value
return
}
}
// Insert the new label at the found index
l := &typesv1.LabelPair{
Name: name,
Value: value,
}
*ls = append(*ls, l)
if index == -1 {
return
}
copy((*ls)[index+1:], (*ls)[index:])
(*ls)[index] = l
}

func (ls Labels) Clone() Labels {
result := make(Labels, len(ls))
for i, l := range ls {
Expand Down Expand Up @@ -277,19 +331,7 @@ func LabelsFromStrings(ss ...string) Labels {
return res
}

// CloneLabelPairs clones the label pairs.
func CloneLabelPairs(lbs []*typesv1.LabelPair) []*typesv1.LabelPair {
result := make([]*typesv1.LabelPair, len(lbs))
for i, l := range lbs {
result[i] = &typesv1.LabelPair{
Name: l.Name,
Value: l.Value,
}
}
return result
}

// Compare compares the two label sets.
// CompareLabelPairs compares the two label sets.
// The result will be 0 if a==b, <0 if a < b, and >0 if a > b.
func CompareLabelPairs(a []*typesv1.LabelPair, b []*typesv1.LabelPair) int {
l := len(a)
Expand Down Expand Up @@ -377,6 +419,16 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder {
// Labels returns the labels from the builder. If no modifications
// were made, the original labels are returned.
func (b *LabelsBuilder) Labels() Labels {
res := b.LabelsUnsorted()
sort.Sort(res)
return res
}

// LabelsUnsorted returns the labels from the builder. If no modifications
// were made, the original labels are returned.
//
// The order is not deterministic.
func (b *LabelsBuilder) LabelsUnsorted() Labels {
if len(b.del) == 0 && len(b.add) == 0 {
return b.base
}
Expand All @@ -398,10 +450,8 @@ Outer:
}
res = append(res, l)
}
res = append(res, b.add...)
sort.Sort(res)

return res
return append(res, b.add...)
}

// StableHash is a labels hashing implementation which is guaranteed to not change over time.
Expand Down
Loading

0 comments on commit 50cf911

Please sign in to comment.