From f6450c1326763a229674b95637255ae1f9f9ec4b Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Sat, 6 Feb 2021 16:48:46 -0500 Subject: [PATCH] [aggregator] Move shardID calculation out of critical section (#3179) --- src/aggregator/aggregator/aggregator.go | 40 +++++++++++++++++++------ 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 5e0df44e9c..e510a57a3d 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -26,7 +26,6 @@ import ( "math" "strconv" "sync" - "sync/atomic" "time" "github.com/m3db/m3/src/aggregator/aggregator/handler" @@ -46,6 +45,7 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/uber-go/tally" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -118,11 +118,12 @@ type aggregator struct { shards []*aggregatorShard currStagedPlacement placement.ActiveStagedPlacement currPlacement placement.Placement + currNumShards atomic.Int32 state aggregatorState doneCh chan struct{} wg sync.WaitGroup sleepFn sleepFn - shardsPendingClose int32 + shardsPendingClose atomic.Int32 metrics aggregatorMetrics logger *zap.Logger } @@ -352,8 +353,17 @@ func (agg *aggregator) passWriter() (writer.Writer, error) { } func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) { + var ( + numShards = agg.currNumShards.Load() + shardID uint32 + ) + + if numShards > 0 { + shardID = agg.shardFn(id, uint32(numShards)) + } + agg.RLock() - shard, err := agg.shardForWithLock(id, noUpdateShards) + shard, err := agg.shardForWithLock(id, shardID, noUpdateShards) if err == nil || err != errActivePlacementChanged { agg.RUnlock() return shard, err @@ -361,20 +371,26 @@ func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) { agg.RUnlock() agg.Lock() - shard, err = agg.shardForWithLock(id, updateShards) + shard, err = agg.shardForWithLock(id, shardID, updateShards) agg.Unlock() return shard, err } -func (agg *aggregator) shardForWithLock(id id.RawID, updateShardsType updateShardsType) (*aggregatorShard, error) { +func (agg *aggregator) shardForWithLock( + id id.RawID, + shardID uint32, + updateShardsType updateShardsType, +) (*aggregatorShard, error) { if agg.state != aggregatorOpen { return nil, errAggregatorNotOpenOrClosed } + stagedPlacement, placement, err := agg.placementManager.Placement() if err != nil { return nil, err } + if agg.shouldProcessPlacementWithLock(stagedPlacement, placement) { if updateShardsType == noUpdateShards { return nil, errActivePlacementChanged @@ -382,11 +398,16 @@ func (agg *aggregator) shardForWithLock(id id.RawID, updateShardsType updateShar if err := agg.processPlacementWithLock(stagedPlacement, placement); err != nil { return nil, err } + // check if number of shards in placement changed, and recalculate shardID if needed + if int32(placement.NumShards()) != agg.currNumShards.Load() { + shardID = agg.shardFn(id, uint32(placement.NumShards())) + } } - shardID := agg.shardFn([]byte(id), uint32(placement.NumShards())) + if int(shardID) >= len(agg.shards) || agg.shards[shardID] == nil { return nil, errShardNotOwned } + return agg.shards[shardID], nil } @@ -582,6 +603,7 @@ func (agg *aggregator) updateShardsWithLock( agg.shards = incoming agg.currStagedPlacement = newStagedPlacement agg.currPlacement = newPlacement + agg.currNumShards.Store(int32(newPlacement.NumShards())) agg.closeShardsAsync(closing) } @@ -647,14 +669,14 @@ func (agg *aggregator) ownedShards() (owned, toClose []*aggregatorShard) { // Because each shard write happens while holding the shard read lock, the shard // may only close itself after all its pending writes are finished. func (agg *aggregator) closeShardsAsync(shards []*aggregatorShard) { - pendingClose := atomic.AddInt32(&agg.shardsPendingClose, int32(len(shards))) + pendingClose := agg.shardsPendingClose.Add(int32(len(shards))) agg.metrics.shards.pendingClose.Update(float64(pendingClose)) for _, shard := range shards { shard := shard go func() { shard.Close() - pendingClose := atomic.AddInt32(&agg.shardsPendingClose, -1) + pendingClose := agg.shardsPendingClose.Add(-1) agg.metrics.shards.pendingClose.Update(float64(pendingClose)) agg.metrics.shards.close.Inc(1) }() @@ -680,7 +702,7 @@ func (agg *aggregator) tickInternal() { numShards := len(ownedShards) agg.metrics.shards.owned.Update(float64(numShards)) - agg.metrics.shards.pendingClose.Update(float64(atomic.LoadInt32(&agg.shardsPendingClose))) + agg.metrics.shards.pendingClose.Update(float64(agg.shardsPendingClose.Load())) if numShards == 0 { agg.sleepFn(agg.checkInterval) return