Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: errors reported by the race detector #13174

Merged
merged 9 commits into from
Jun 10, 2024
47 changes: 25 additions & 22 deletions pkg/analytics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,31 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
MaxRetries: 0,
})
for backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
PrometheusVersion: build.GetVersion(),
CreatedAt: time.Now(),
}
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
{
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
PrometheusVersion: build.GetVersion(),
CreatedAt: time.Now(),
}
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// ensure stability of the cluster seed
stableSeed := ensureStableKey(ctx, kvClient, rep.logger)
seed = *stableSeed
// This is a new local variable so that Go knows it's not racing with the previous usage.
seed := *stableSeed
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
Expand Down Expand Up @@ -262,7 +265,7 @@ func (rep *Reporter) running(ctx context.Context) error {
}
return nil
}
rep.startCPUPercentCollection(ctx)
rep.startCPUPercentCollection(ctx, time.Minute)
// check every minute if we should report.
ticker := time.NewTicker(reportCheckInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -317,13 +320,13 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error
return errs.Err()
}

const cpuUsageKey = "cpu_usage"

var (
cpuUsageKey = "cpu_usage"
cpuUsage = NewFloat(cpuUsageKey)
cpuCollectionInterval = time.Minute
cpuUsage = NewFloat(cpuUsageKey)
)

func (rep *Reporter) startCPUPercentCollection(ctx context.Context) {
func (rep *Reporter) startCPUPercentCollection(ctx context.Context, cpuCollectionInterval time.Duration) {
proc, err := process.NewProcess(int32(os.Getpid()))
if err != nil {
level.Debug(rep.logger).Log("msg", "failed to get process", "err", err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/analytics/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,13 @@ func TestWrongKV(t *testing.T) {
}

func TestStartCPUCollection(t *testing.T) {
cpuCollectionInterval = 1 * time.Second
r, err := NewReporter(Config{Leader: true, Enabled: true}, kv.Config{
Store: "inmemory",
}, nil, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r.startCPUPercentCollection(ctx)
r.startCPUPercentCollection(ctx, 1*time.Second)
require.Eventually(t, func() bool {
return cpuUsage.Value() > 0
}, 5*time.Second, 1*time.Second)
Expand Down
7 changes: 4 additions & 3 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -87,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
require.NoError(t, err)

require.Eventually(t, func() bool {
return server.completedTasks == len(tasks)
return int(server.completedTasks.Load()) == len(tasks)
}, 5*time.Second, 100*time.Millisecond)

err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand All @@ -98,7 +99,7 @@ func Test_BuilderLoop(t *testing.T) {

type fakePlannerServer struct {
tasks []*protos.ProtoTask
completedTasks int
completedTasks atomic.Int64
shutdownCalled bool

addr string
Expand Down Expand Up @@ -148,7 +149,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
f.completedTasks++
f.completedTasks.Add(1)
}

// No more tasks. Wait until shutdown.
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (p *Planner) totalPendingTasks() (total int) {
func (p *Planner) enqueueTask(task *QueueTask) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
task.timesEnqueued++
task.timesEnqueued.Add(1)
p.addPendingTask(task)
})
}
Expand Down Expand Up @@ -761,12 +761,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && task.timesEnqueued >= maxRetries {
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.metrics.tasksFailed.Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued,
"retries", task.timesEnqueued.Load(),
"maxRetries", maxRetries,
"err", err,
)
Expand All @@ -792,7 +792,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
p.metrics.tasksRequeued.Inc()
level.Error(logger).Log(
"msg", "error forwarding task to builder, Task requeued",
"retries", task.timesEnqueued,
"retries", task.timesEnqueued.Load(),
"err", err,
)
continue
Expand All @@ -801,7 +801,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued,
"retries", task.timesEnqueued.Load(),
)
p.removePendingTask(task)

Expand Down
38 changes: 23 additions & 15 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -517,7 +518,7 @@ func Test_BuilderLoop(t *testing.T) {
resultsCh := make(chan *protos.TaskResult, nTasks)
tasks := createTasks(nTasks, resultsCh)
for _, task := range tasks {
err = planner.enqueueTask(task)
err := planner.enqueueTask(task)
require.NoError(t, err)
}

Expand All @@ -527,10 +528,10 @@ func Test_BuilderLoop(t *testing.T) {
builder := newMockBuilder(fmt.Sprintf("builder-%d", i))
builders = append(builders, builder)

go func() {
err = planner.BuilderLoop(builder)
require.ErrorIs(t, err, tc.expectedBuilderLoopError)
}()
go func(expectedBuilderLoopError error) {
err := planner.BuilderLoop(builder)
require.ErrorIs(t, err, expectedBuilderLoopError)
}(tc.expectedBuilderLoopError)
}

// Eventually, all tasks should be sent to builders
Expand Down Expand Up @@ -558,7 +559,7 @@ func Test_BuilderLoop(t *testing.T) {

// Enqueue tasks again
for _, task := range tasks {
err = planner.enqueueTask(task)
err := planner.enqueueTask(task)
require.NoError(t, err)
}

Expand Down Expand Up @@ -809,14 +810,15 @@ func Test_processTenantTaskResults(t *testing.T) {
}

type fakeBuilder struct {
mx sync.Mutex // Protects tasks and currTaskIdx.
id string
tasks []*protos.Task
currTaskIdx int
grpc.ServerStream

returnError bool
returnErrorMsg bool
wait bool
returnError atomic.Bool
returnErrorMsg atomic.Bool
wait atomic.Bool
ctx context.Context
ctxCancel context.CancelFunc
}
Expand All @@ -833,19 +835,21 @@ func newMockBuilder(id string) *fakeBuilder {
}

func (f *fakeBuilder) ReceivedTasks() []*protos.Task {
f.mx.Lock()
defer f.mx.Unlock()
return f.tasks
}

func (f *fakeBuilder) SetReturnError(b bool) {
f.returnError = b
f.returnError.Store(b)
}

func (f *fakeBuilder) SetReturnErrorMsg(b bool) {
f.returnErrorMsg = b
f.returnErrorMsg.Store(b)
}

func (f *fakeBuilder) SetWait(b bool) {
f.wait = b
f.wait.Store(b)
}

func (f *fakeBuilder) CancelContext(b bool) {
Expand Down Expand Up @@ -873,6 +877,8 @@ func (f *fakeBuilder) Send(req *protos.PlannerToBuilder) error {
return err
}

f.mx.Lock()
defer f.mx.Unlock()
f.tasks = append(f.tasks, task)
f.currTaskIdx++
return nil
Expand All @@ -886,12 +892,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}, nil
}

if f.returnError {
if f.returnError.Load() {
return nil, fmt.Errorf("fake error from %s", f.id)
}

// Wait until `wait` is false
for f.wait {
for f.wait.Load() {
time.Sleep(time.Second)
}

Expand All @@ -901,10 +907,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

var errMsg string
if f.returnErrorMsg {
if f.returnErrorMsg.Load() {
errMsg = fmt.Sprintf("fake error from %s", f.id)
}

f.mx.Lock()
defer f.mx.Unlock()
return &protos.BuilderToPlanner{
BuilderID: f.id,
Result: protos.ProtoTaskResult{
Expand Down
4 changes: 3 additions & 1 deletion pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
)

Expand All @@ -13,7 +15,7 @@ type QueueTask struct {
resultsChannel chan *protos.TaskResult

// Tracking
timesEnqueued int
timesEnqueued atomic.Int64
queueTime time.Time
ctx context.Context
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("request fails when providing invalid block", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(refs, queriers, metas)

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.err = errors.New("request failed")

reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

// replace store implementation and re-initialize workers and sub-services
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.delay = 2000 * time.Millisecond

reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)
store := newMockBloomStore(refs, queriers, metas)

gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)
Expand Down
Loading
Loading