Skip to content

Commit

Permalink
Merge branch 'master' into linasn/bootstrap-profiler
Browse files Browse the repository at this point in the history
* master: (30 commits)
  [dbnode] Use go context to cancel index query workers after timeout (#3194)
  [aggregator] Fix change ActivePlacement semantics on close (#3201)
  [aggregator] Simplify (Active)StagedPlacement API (#3199)
  [aggregator] Checking if metadata is set to default should not cause copying (#3198)
  [dbnode] Remove readers and writer from aggregator API (#3122)
  [aggregator] Avoid large copies in entry rollup comparisons by making them more inline-friendly (#3195)
  [dbnode] Re-add aggregator doc limit update (#3137)
  [m3db] Do not close reader in filterFieldsIterator.Close() (#3196)
  Revert "Remove disk series read limit (#3174)" (#3193)
  [instrument] Improve sampled timer and stopwatch performance (#3191)
  Omit unset fields in metadata json (#3189)
  [dbnode] Remove left-over code in storage/bootstrap/bootstrapper (#3190)
  [dbnode][coordinator] Support match[] in label endpoints (#3180)
  Instrument the worker pool with the wait time (#3188)
  Instrument query path (#3182)
  [aggregator] Remove indirection, large copy from unaggregated protobuf decoder (#3186)
  [aggregator] Sample timers completely (#3184)
  [aggregator] Reduce error handling overhead in rawtcp server (#3183)
  [aggregator] Move shardID calculation out of critical section (#3179)
  Move instrumentation cleanup to FetchTaggedResultIterator Close() (#3173)
  ...
  • Loading branch information
soundvibe committed Feb 10, 2021
2 parents f37237d + 2641d76 commit 885e8f1
Show file tree
Hide file tree
Showing 154 changed files with 4,944 additions and 2,953 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,4 @@ issues:
max-same-issues: 0

# Show only new issues created after git revision `REV`
new-from-rev: master
new-from-rev: origin/master
87 changes: 85 additions & 2 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ function test_query_limits_applied {
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 3" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/query?query=database_write_tagged_success) = "400" ]]'

# Test the default docs limit applied when directly querying
# Test the docs limit applied when directly querying
# coordinator (docs limit set by header)
echo "Test query docs limit with coordinator limit header"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
Expand Down Expand Up @@ -391,6 +391,87 @@ function test_series {
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

function test_label_query_limits_applied {
# Test that require exhaustive does nothing if limits are not hit
echo "Test label limits with require-exhaustive headers true (below limit therefore no error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Max-Series: 10000" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "200" ]]'

# the header takes precedence over the configured default series limit
echo "Test label series limit with coordinator limit header (default requires exhaustive so error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Series: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'

echo "Test label series limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]'

echo "Test label series limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Series: 2" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]'

echo "Test label docs limit with coordinator limit header (default requires exhaustive so error)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'

echo "Test label docs limit with require-exhaustive headers false"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Docs: 2" -H "M3-Limit-Require-Exhaustive: false" 0.0.0.0:7201/api/v1/label/__name__/values | jq -r ".data | length") -eq 1 ]]'

echo "Test label docs limit with require-exhaustive headers true (above limit therefore error)"
# Test that require exhaustive error is returned
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ -n $(curl -s -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values | jq ."error" | grep "query exceeded limit") ]]'
# Test that require exhaustive error is 4xx
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -o /dev/null -w "%{http_code}" -H "M3-Limit-Max-Docs: 1" -H "M3-Limit-Require-Exhaustive: true" 0.0.0.0:7201/api/v1/label/__name__/values) = "400" ]]'
}

function test_labels {
TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \
TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \
TAG_NAME_2="name_2" TAG_VALUE_2="value_2_1" \
prometheus_remote_write \
label_metric now 42.42 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

TAG_NAME_0="name_0" TAG_VALUE_0="value_0_2" \
TAG_NAME_1="name_1" TAG_VALUE_1="value_1_2" \
prometheus_remote_write \
label_metric_2 now 42.42 \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

# Test label search with match
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/labels" | jq -r "[.data[] | select(index(\"name_0\", \"name_1\", \"name_2\"))] | length") -eq 3 ]]'

ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric" | jq -r ".data | length") -eq 4 ]]'

ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2" | jq -r ".data | length") -eq 3 ]]'

# Test label values search with match
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values" | jq -r ".data | length") -eq 2 ]]' # two values without a match

ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric" | jq -r ".data | length") -eq 1 ]]'
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric" | jq -r ".data[0]") = "value_1_1" ]]'

ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2" | jq -r ".data | length") -eq 1 ]]'
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2" | jq -r ".data[0]") = "value_1_2" ]]'
}

echo "Running readiness test"
test_readiness

Expand All @@ -408,7 +489,9 @@ test_query_restrict_metrics_type
test_prometheus_query_native_timeout
test_query_restrict_tags
test_prometheus_remote_write_map_tags
test_series
test_series
test_label_query_limits_applied
test_labels

echo "Running function correctness tests"
test_correctness
Expand Down
2 changes: 1 addition & 1 deletion site/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module m3-site

go 1.15

require github.com/chronosphereio/victor v0.0.0-20201229142059-d2026c6102a7 // indirect
require github.com/chronosphereio/victor v0.0.0-20210204163248-3548c3638c6d // indirect
2 changes: 2 additions & 0 deletions site/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ github.com/chronosphereio/victor v0.0.0-20201222112852-eaf9ae24e2db h1:qEMGd5zaq
github.com/chronosphereio/victor v0.0.0-20201222112852-eaf9ae24e2db/go.mod h1:wz1ngMsk+1D1ug2ObnI3zXs+/ZdBPrWLb6R1WQW3XNM=
github.com/chronosphereio/victor v0.0.0-20201229142059-d2026c6102a7 h1:VQHAfLGF53imcPuvx0jxNPFHqS9h0vSyq4asWxEdYlU=
github.com/chronosphereio/victor v0.0.0-20201229142059-d2026c6102a7/go.mod h1:wz1ngMsk+1D1ug2ObnI3zXs+/ZdBPrWLb6R1WQW3XNM=
github.com/chronosphereio/victor v0.0.0-20210204163248-3548c3638c6d h1:NSc2M6OYD/kI3sMSEoWDPxUa5wqA7/RnSMvCBMOygNE=
github.com/chronosphereio/victor v0.0.0-20210204163248-3548c3638c6d/go.mod h1:wz1ngMsk+1D1ug2ObnI3zXs+/ZdBPrWLb6R1WQW3XNM=
72 changes: 51 additions & 21 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/m3db/m3/src/aggregator/aggregator/handler"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -118,11 +118,12 @@ type aggregator struct {
shards []*aggregatorShard
currStagedPlacement placement.ActiveStagedPlacement
currPlacement placement.Placement
currNumShards atomic.Int32
state aggregatorState
doneCh chan struct{}
wg sync.WaitGroup
sleepFn sleepFn
shardsPendingClose int32
shardsPendingClose atomic.Int32
metrics aggregatorMetrics
logger *zap.Logger
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func (agg *aggregator) AddUntimed(
metric unaggregated.MetricUnion,
metadatas metadata.StagedMetadatas,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addUntimed.SuccessLatencyStopwatch()
if err := agg.checkMetricType(metric); err != nil {
agg.metrics.addUntimed.ReportError(err)
return err
Expand All @@ -196,15 +197,16 @@ func (agg *aggregator) AddUntimed(
agg.metrics.addUntimed.ReportError(err)
return err
}
agg.metrics.addUntimed.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addUntimed.ReportSuccess()
sw.Stop()
return nil
}

func (agg *aggregator) AddTimed(
metric aggregated.Metric,
metadata metadata.TimedMetadata,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addTimed.SuccessLatencyStopwatch()
agg.metrics.timed.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
Expand All @@ -215,15 +217,16 @@ func (agg *aggregator) AddTimed(
agg.metrics.addTimed.ReportError(err)
return err
}
agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addTimed.ReportSuccess()
sw.Stop()
return nil
}

func (agg *aggregator) AddTimedWithStagedMetadatas(
metric aggregated.Metric,
metas metadata.StagedMetadatas,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addTimed.SuccessLatencyStopwatch()
agg.metrics.timed.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
Expand All @@ -234,15 +237,16 @@ func (agg *aggregator) AddTimedWithStagedMetadatas(
agg.metrics.addTimed.ReportError(err)
return err
}
agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addTimed.ReportSuccess()
sw.Stop()
return nil
}

func (agg *aggregator) AddForwarded(
metric aggregated.ForwardedMetric,
metadata metadata.ForwardMetadata,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addForwarded.SuccessLatencyStopwatch()
agg.metrics.forwarded.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
Expand All @@ -254,7 +258,8 @@ func (agg *aggregator) AddForwarded(
return err
}
callEnd := agg.nowFn()
agg.metrics.addForwarded.ReportSuccess(callEnd.Sub(callStart))
agg.metrics.addForwarded.ReportSuccess()
sw.Stop()
forwardingDelay := time.Duration(callEnd.UnixNano() - metric.TimeNanos)
agg.metrics.addForwarded.ReportForwardingLatency(
metadata.StoragePolicy.Resolution().Window,
Expand All @@ -268,7 +273,7 @@ func (agg *aggregator) AddPassthrough(
metric aggregated.Metric,
storagePolicy policy.StoragePolicy,
) error {
callStart := agg.nowFn()
sw := agg.metrics.addPassthrough.SuccessLatencyStopwatch()
agg.metrics.passthrough.Inc(1)

if agg.electionManager.ElectionState() == FollowerState {
Expand Down Expand Up @@ -297,7 +302,8 @@ func (agg *aggregator) AddPassthrough(
agg.metrics.addPassthrough.ReportError(err)
return err
}
agg.metrics.addPassthrough.ReportSuccess(agg.nowFn().Sub(callStart))
agg.metrics.addPassthrough.ReportSuccess()
sw.Stop()
return nil
}

Expand Down Expand Up @@ -352,41 +358,61 @@ func (agg *aggregator) passWriter() (writer.Writer, error) {
}

func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
var (
numShards = agg.currNumShards.Load()
shardID uint32
)

if numShards > 0 {
shardID = agg.shardFn(id, uint32(numShards))
}

agg.RLock()
shard, err := agg.shardForWithLock(id, noUpdateShards)
shard, err := agg.shardForWithLock(id, shardID, noUpdateShards)
if err == nil || err != errActivePlacementChanged {
agg.RUnlock()
return shard, err
}
agg.RUnlock()

agg.Lock()
shard, err = agg.shardForWithLock(id, updateShards)
shard, err = agg.shardForWithLock(id, shardID, updateShards)
agg.Unlock()

return shard, err
}

func (agg *aggregator) shardForWithLock(id id.RawID, updateShardsType updateShardsType) (*aggregatorShard, error) {
func (agg *aggregator) shardForWithLock(
id id.RawID,
shardID uint32,
updateShardsType updateShardsType,
) (*aggregatorShard, error) {
if agg.state != aggregatorOpen {
return nil, errAggregatorNotOpenOrClosed
}

stagedPlacement, placement, err := agg.placementManager.Placement()
if err != nil {
return nil, err
}

if agg.shouldProcessPlacementWithLock(stagedPlacement, placement) {
if updateShardsType == noUpdateShards {
return nil, errActivePlacementChanged
}
if err := agg.processPlacementWithLock(stagedPlacement, placement); err != nil {
return nil, err
}
// check if number of shards in placement changed, and recalculate shardID if needed
if int32(placement.NumShards()) != agg.currNumShards.Load() {
shardID = agg.shardFn(id, uint32(placement.NumShards()))
}
}
shardID := agg.shardFn([]byte(id), uint32(placement.NumShards()))

if int(shardID) >= len(agg.shards) || agg.shards[shardID] == nil {
return nil, errShardNotOwned
}

return agg.shards[shardID], nil
}

Expand Down Expand Up @@ -582,6 +608,7 @@ func (agg *aggregator) updateShardsWithLock(
agg.shards = incoming
agg.currStagedPlacement = newStagedPlacement
agg.currPlacement = newPlacement
agg.currNumShards.Store(int32(newPlacement.NumShards()))
agg.closeShardsAsync(closing)
}

Expand Down Expand Up @@ -647,14 +674,14 @@ func (agg *aggregator) ownedShards() (owned, toClose []*aggregatorShard) {
// Because each shard write happens while holding the shard read lock, the shard
// may only close itself after all its pending writes are finished.
func (agg *aggregator) closeShardsAsync(shards []*aggregatorShard) {
pendingClose := atomic.AddInt32(&agg.shardsPendingClose, int32(len(shards)))
pendingClose := agg.shardsPendingClose.Add(int32(len(shards)))
agg.metrics.shards.pendingClose.Update(float64(pendingClose))

for _, shard := range shards {
shard := shard
go func() {
shard.Close()
pendingClose := atomic.AddInt32(&agg.shardsPendingClose, -1)
pendingClose := agg.shardsPendingClose.Add(-1)
agg.metrics.shards.pendingClose.Update(float64(pendingClose))
agg.metrics.shards.close.Inc(1)
}()
Expand All @@ -680,7 +707,7 @@ func (agg *aggregator) tickInternal() {

numShards := len(ownedShards)
agg.metrics.shards.owned.Update(float64(numShards))
agg.metrics.shards.pendingClose.Update(float64(atomic.LoadInt32(&agg.shardsPendingClose)))
agg.metrics.shards.pendingClose.Update(float64(agg.shardsPendingClose.Load()))
if numShards == 0 {
agg.sleepFn(agg.checkInterval)
return
Expand Down Expand Up @@ -736,9 +763,12 @@ func newAggregatorAddMetricMetrics(
}
}

func (m *aggregatorAddMetricMetrics) ReportSuccess(d time.Duration) {
func (m *aggregatorAddMetricMetrics) SuccessLatencyStopwatch() tally.Stopwatch {
return m.successLatency.Start()
}

func (m *aggregatorAddMetricMetrics) ReportSuccess() {
m.success.Inc(1)
m.successLatency.Record(d)
}

func (m *aggregatorAddMetricMetrics) ReportError(err error) {
Expand Down
Loading

0 comments on commit 885e8f1

Please sign in to comment.