From 95052943c4c1fde919ac0b3825d951bde0d09e6f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 7 Jan 2021 15:42:14 -0500 Subject: [PATCH] swaps mutex for atomic (#3141) --- pkg/ingester/instance.go | 29 ++++++++++++----------------- pkg/ingester/instance_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 2cbdd1182805..61d9a62fd45c 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" tsdb_record "github.com/prometheus/prometheus/tsdb/record" "github.com/weaveworks/common/httpgrpc" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ingester/index" @@ -600,32 +601,26 @@ func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool { return false } -// OnceSwitch is a write optimized switch that can only ever be switched "on". -// It uses a RWMutex underneath the hood to quickly and effectively (in a concurrent environment) -// check if the switch has already been triggered, only actually acquiring the mutex for writing if not. +// OnceSwitch is an optimized switch that can only ever be switched "on" in a concurrent environment. type OnceSwitch struct { - sync.RWMutex - toggle bool + triggered atomic.Bool } func (o *OnceSwitch) Get() bool { - o.RLock() - defer o.RUnlock() - return o.toggle + return o.triggered.Load() +} + +func (o *OnceSwitch) Trigger() { + o.TriggerAnd(nil) } // TriggerAnd will ensure the switch is on and run the provided function if // the switch was not already toggled on. func (o *OnceSwitch) TriggerAnd(fn func()) { - o.RLock() - if o.toggle { - o.RUnlock() - return + + triggeredPrior := o.triggered.Swap(true) + if !triggeredPrior && fn != nil { + fn() } - o.RUnlock() - o.Lock() - o.toggle = true - o.Unlock() - fn() } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index f456a6285777..1a5abb8efc05 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "runtime" "sort" "sync" "testing" @@ -334,3 +335,27 @@ func Benchmark_instance_addNewTailer(b *testing.B) { }) } + +func Benchmark_OnceSwitch(b *testing.B) { + threads := runtime.GOMAXPROCS(0) + + // limit threads + if threads > 4 { + threads = 4 + } + + for n := 0; n < b.N; n++ { + x := &OnceSwitch{} + var wg sync.WaitGroup + for i := 0; i < threads; i++ { + wg.Add(1) + go func() { + for i := 0; i < 1000; i++ { + x.Trigger() + } + wg.Done() + }() + } + wg.Wait() + } +}