Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expanded Postings Cache can cache results without the nearly created series under high load. #6417

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ jobs:
ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex
- name: Run Tests
run: make BUILD_IN_CONTAINER=false test
test-no-race:
runs-on: ubuntu-20.04
container:
image: quay.io/cortexproject/build-image:master-0ddced051
steps:
- name: Checkout Repo
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Git safe.directory
run: |
echo "this step is needed because when running in container, actions/checkout does not set safe.directory effectively."
echo "See https://github.com/actions/runner/issues/2033. We should use --system instead of --global"
git config --system --add safe.directory $GITHUB_WORKSPACE
- name: Sym Link Expected Path to Workspace
run: |
mkdir -p /go/src/github.com/cortexproject/cortex
ln -s $GITHUB_WORKSPACE/* /go/src/github.com/cortexproject/cortex
- name: Run Tests
run: make BUILD_IN_CONTAINER=false test-no-race

security:
name: CodeQL
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ lint:
test:
go test -tags netgo -timeout 30m -race -count 1 ./...

test-no-race:
go test -tags netgo -timeout 30m -count 1 ./...

cover:
$(eval COVERDIR := $(shell mktemp -d coverage.XXXXXXXXXX))
$(eval COVERFILE := $(shell mktemp $(COVERDIR)/unit.XXXXXXXXXX))
Expand Down
21 changes: 21 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)
var newSeries []labels.Labels

for _, ts := range req.Timeseries {
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
Expand Down Expand Up @@ -1233,6 +1235,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
// Retain the reference in case there are multiple samples for the series.
if ref, err = app.Append(0, copiedLabels, s.TimestampMs, s.Value); err == nil {
// Keep track of what series needs to be expired on the postings cache
if db.postingCache != nil {
newSeries = append(newSeries, copiedLabels)
}
succeededSamplesCount++
continue
}
Expand Down Expand Up @@ -1274,6 +1280,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
if ref, err = app.AppendHistogram(0, copiedLabels, hp.TimestampMs, h, fh); err == nil {
// Keep track of what series needs to be expired on the postings cache
if db.postingCache != nil {
newSeries = append(newSeries, copiedLabels)
}
succeededHistogramsCount++
continue
}
Expand Down Expand Up @@ -1342,6 +1352,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if err := app.Commit(); err != nil {
return nil, wrapWithUser(err, userID)
}

// This is a workaround of https://github.com/prometheus/prometheus/pull/15579
// Calling expire here may result in the series names being expired multiple times,
// as there may be multiple Push operations concurrently for the same new timeseries.
// TODO: alanprot remove this when/if the PR is merged
if db.postingCache != nil {
for _, s := range newSeries {
db.postingCache.ExpireSeries(s)
}
}

i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds())

// If only invalid samples or histograms are pushed, don't change "last update", as TSDB was not modified.
Expand Down
90 changes: 90 additions & 0 deletions pkg/ingester/ingester_no_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//go:build !race

package ingester

import (
"context"
"fmt"
"math"
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
)

// Running this test without race check as there is a known prometheus race condition.
// See https://github.com/prometheus/prometheus/pull/15141 and https://github.com/prometheus/prometheus/pull/15316
func TestExpandedCachePostings_Race(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.LifecyclerConfig.JoinAfter = 0
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true

r := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test")

wg := sync.WaitGroup{}
labelNames := 100
seriesPerLabelName := 200

for j := 0; j < labelNames; j++ {
metricName := fmt.Sprintf("test_metric_%d", j)
wg.Add(seriesPerLabelName * 2)
for k := 0; k < seriesPerLabelName; k++ {
go func() {
defer wg.Done()
_, err := i.Push(ctx, cortexpb.ToWriteRequest(
[]labels.Labels{labels.FromStrings(labels.MetricName, metricName, "k", strconv.Itoa(k))},
[]cortexpb.Sample{{Value: 1, TimestampMs: 9}}, nil, nil, cortexpb.API))
require.NoError(t, err)
}()

go func() {
defer wg.Done()
err := i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}},
}, &mockQueryStreamServer{ctx: ctx})
require.NoError(t, err)
}()
}

wg.Wait()

s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricName}},
}, s)
require.NoError(t, err)

set, err := seriesSetFromResponseStream(s)
require.NoError(t, err)
res, err := client.MatrixFromSeriesSet(set)
require.NoError(t, err)
require.Equal(t, seriesPerLabelName, res.Len())
}
}
Loading