Skip to content

Commit

Permalink
Merge branch 'master' into rhall-instrument-index
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhall07 authored Feb 6, 2021
2 parents 07960d6 + f6450c1 commit 9c424e2
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/m3db/m3/src/aggregator/aggregator/handler"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -118,11 +118,12 @@ type aggregator struct {
shards []*aggregatorShard
currStagedPlacement placement.ActiveStagedPlacement
currPlacement placement.Placement
currNumShards atomic.Int32
state aggregatorState
doneCh chan struct{}
wg sync.WaitGroup
sleepFn sleepFn
shardsPendingClose int32
shardsPendingClose atomic.Int32
metrics aggregatorMetrics
logger *zap.Logger
}
Expand Down Expand Up @@ -352,41 +353,61 @@ func (agg *aggregator) passWriter() (writer.Writer, error) {
}

func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
var (
numShards = agg.currNumShards.Load()
shardID uint32
)

if numShards > 0 {
shardID = agg.shardFn(id, uint32(numShards))
}

agg.RLock()
shard, err := agg.shardForWithLock(id, noUpdateShards)
shard, err := agg.shardForWithLock(id, shardID, noUpdateShards)
if err == nil || err != errActivePlacementChanged {
agg.RUnlock()
return shard, err
}
agg.RUnlock()

agg.Lock()
shard, err = agg.shardForWithLock(id, updateShards)
shard, err = agg.shardForWithLock(id, shardID, updateShards)
agg.Unlock()

return shard, err
}

func (agg *aggregator) shardForWithLock(id id.RawID, updateShardsType updateShardsType) (*aggregatorShard, error) {
func (agg *aggregator) shardForWithLock(
id id.RawID,
shardID uint32,
updateShardsType updateShardsType,
) (*aggregatorShard, error) {
if agg.state != aggregatorOpen {
return nil, errAggregatorNotOpenOrClosed
}

stagedPlacement, placement, err := agg.placementManager.Placement()
if err != nil {
return nil, err
}

if agg.shouldProcessPlacementWithLock(stagedPlacement, placement) {
if updateShardsType == noUpdateShards {
return nil, errActivePlacementChanged
}
if err := agg.processPlacementWithLock(stagedPlacement, placement); err != nil {
return nil, err
}
// check if number of shards in placement changed, and recalculate shardID if needed
if int32(placement.NumShards()) != agg.currNumShards.Load() {
shardID = agg.shardFn(id, uint32(placement.NumShards()))
}
}
shardID := agg.shardFn([]byte(id), uint32(placement.NumShards()))

if int(shardID) >= len(agg.shards) || agg.shards[shardID] == nil {
return nil, errShardNotOwned
}

return agg.shards[shardID], nil
}

Expand Down Expand Up @@ -582,6 +603,7 @@ func (agg *aggregator) updateShardsWithLock(
agg.shards = incoming
agg.currStagedPlacement = newStagedPlacement
agg.currPlacement = newPlacement
agg.currNumShards.Store(int32(newPlacement.NumShards()))
agg.closeShardsAsync(closing)
}

Expand Down Expand Up @@ -647,14 +669,14 @@ func (agg *aggregator) ownedShards() (owned, toClose []*aggregatorShard) {
// Because each shard write happens while holding the shard read lock, the shard
// may only close itself after all its pending writes are finished.
func (agg *aggregator) closeShardsAsync(shards []*aggregatorShard) {
pendingClose := atomic.AddInt32(&agg.shardsPendingClose, int32(len(shards)))
pendingClose := agg.shardsPendingClose.Add(int32(len(shards)))
agg.metrics.shards.pendingClose.Update(float64(pendingClose))

for _, shard := range shards {
shard := shard
go func() {
shard.Close()
pendingClose := atomic.AddInt32(&agg.shardsPendingClose, -1)
pendingClose := agg.shardsPendingClose.Add(-1)
agg.metrics.shards.pendingClose.Update(float64(pendingClose))
agg.metrics.shards.close.Inc(1)
}()
Expand All @@ -680,7 +702,7 @@ func (agg *aggregator) tickInternal() {

numShards := len(ownedShards)
agg.metrics.shards.owned.Update(float64(numShards))
agg.metrics.shards.pendingClose.Update(float64(atomic.LoadInt32(&agg.shardsPendingClose)))
agg.metrics.shards.pendingClose.Update(float64(agg.shardsPendingClose.Load()))
if numShards == 0 {
agg.sleepFn(agg.checkInterval)
return
Expand Down

0 comments on commit 9c424e2

Please sign in to comment.