diff --git a/README.md b/README.md index 141c93e10..b24cdce31 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ Usage of go-carbon: -version=false: Print version ``` -```toml +```ini [common] # Run as user. Works only in daemon mode user = "carbon" @@ -101,6 +101,9 @@ data-dir = "/var/lib/graphite/whisper" schemas-file = "/etc/go-carbon/storage-schemas.conf" # http://graphite.readthedocs.org/en/latest/config-carbon.html#storage-aggregation-conf. Optional aggregation-file = "/etc/go-carbon/storage-aggregation.conf" +# It's currently go-carbon only feature, not a standard graphite feature. Optional +# More details in doc/quotas.md +# quotas-file = "/etc/go-carbon/storage-quotas.conf" # Worker threads count. Metrics sharded by "crc32(metricName) % workers" workers = 8 # Limits the number of whisper update_many() calls per second. 0 - no limit @@ -327,6 +330,9 @@ scan-frequency = "5m0s" # could be speeded up by enabling adding trigrams to trie, at the some costs of # memory usage (by setting both trie-index and trigram-index to true). trie-index = false +# Control how frequent it is to generate quota and usage metrics, and reset +# throughput counters (More details in doc/quotas.md). +# quota-usage-report-frequency = "1m" # Cache file list scan data in the specified path. This option speeds # up index building after reboot by reading the last scan result in file diff --git a/cache/cache.go b/cache/cache.go index 4ac5acc66..e7f6d101a 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -59,6 +59,8 @@ type Cache struct { } newMetricsChan chan string + + throttle func(*points.Points) bool } // A "thread" safe string to anything map. @@ -266,6 +268,10 @@ func (c *Cache) Add(p *points.Points) { return } + if c.throttle != nil && c.throttle(p) { + return + } + if s.tagsEnabled { var err error p.Metric, err = tags.Normalize(p.Metric) @@ -367,3 +373,5 @@ func (c *Cache) GetRecentNewMetrics() []map[string]struct{} { } return metricNames } + +func (c *Cache) SetThrottle(throttle func(*points.Points) bool) { c.throttle = throttle } diff --git a/carbon/app.go b/carbon/app.go index c3c571620..5c0866b7c 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -1,6 +1,7 @@ package carbon import ( + "errors" "fmt" "net" "net/url" @@ -99,6 +100,13 @@ func (app *App) configure() error { return err } + if cfg.Whisper.QuotasFilename != "" { + cfg.Whisper.Quotas, err = persister.ReadWhisperQuotas(cfg.Whisper.QuotasFilename) + if err != nil { + return err + } + } + if cfg.Whisper.AggregationFilename != "" { cfg.Whisper.Aggregation, err = persister.ReadWhisperAggregation(cfg.Whisper.AggregationFilename) if err != nil { @@ -437,6 +445,7 @@ func (app *App) Start(version string) (err error) { return } + // TODO: refactor: do not use var name the same as pkg name carbonserver := carbonserver.NewCarbonserverListener(core.Get) carbonserver.SetWhisperData(conf.Whisper.DataDir) carbonserver.SetMaxGlobs(conf.Carbonserver.MaxGlobs) @@ -449,6 +458,7 @@ func (app *App) Start(version string) (err error) { carbonserver.SetBuckets(conf.Carbonserver.Buckets) carbonserver.SetMetricsAsCounters(conf.Carbonserver.MetricsAsCounters) carbonserver.SetScanFrequency(conf.Carbonserver.ScanFrequency.Value()) + carbonserver.SetQuotaUsageReportFrequency(conf.Carbonserver.QuotaUsageReportFrequency.Value()) carbonserver.SetReadTimeout(conf.Carbonserver.ReadTimeout.Value()) carbonserver.SetIdleTimeout(conf.Carbonserver.IdleTimeout.Value()) carbonserver.SetWriteTimeout(conf.Carbonserver.WriteTimeout.Value()) @@ -463,6 +473,29 @@ func (app *App) Start(version string) (err error) { carbonserver.SetPercentiles(conf.Carbonserver.Percentiles) // carbonserver.SetQueryTimeout(conf.Carbonserver.QueryTimeout.Value()) + if app.Config.Whisper.Quotas != nil { + if !conf.Carbonserver.ConcurrentIndex || conf.Carbonserver.RealtimeIndex <= 0 { + return errors.New("concurrent-index and realtime-index needs to be enabled for quota control.") + } + + carbonserver.SetEstimateSize(func(metric string) (size, dataPoints int64) { + schema, ok := app.Config.Whisper.Schemas.Match(metric) + if !ok { + // Why not configurable: go-carbon users + // should always make sure that there is a default retention policy. + return 4096 + 172800*12, 172800 // 2 days of secondly data + } + + for _, r := range schema.Retentions { + dataPoints += int64(r.NumberOfPoints()) + } + return 4096 + dataPoints*12, dataPoints + }) + + carbonserver.SetQuotas(app.Config.getCarbonserverQuotas()) + core.SetThrottle(carbonserver.ShouldThrottleMetric) + } + var setConfigRetriever bool if conf.Carbonserver.CacheScan { core.InitCacheScanAdds() diff --git a/carbon/collector.go b/carbon/collector.go index e0b6638c8..cfee0dd9c 100644 --- a/carbon/collector.go +++ b/carbon/collector.go @@ -133,7 +133,6 @@ func NewCollector(app *App) *Collector { } }) }) - } sendCallback := func(moduleName string) func(metric string, value float64) { diff --git a/carbon/config.go b/carbon/config.go index 9e7854fbb..c24b43bb6 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -10,6 +10,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/go-graphite/go-carbon/carbonserver" "github.com/go-graphite/go-carbon/persister" "github.com/go-graphite/go-carbon/receiver/tcp" "github.com/go-graphite/go-carbon/receiver/udp" @@ -56,6 +57,7 @@ type whisperConfig struct { DataDir string `toml:"data-dir"` SchemasFilename string `toml:"schemas-file"` AggregationFilename string `toml:"aggregation-file"` + QuotasFilename string `toml:"quotas-file"` Workers int `toml:"workers"` MaxUpdatesPerSecond int `toml:"max-updates-per-second"` MaxCreatesPerSecond int `toml:"max-creates-per-second"` @@ -67,6 +69,7 @@ type whisperConfig struct { HashFilenames bool `toml:"hash-filenames"` Schemas persister.WhisperSchemas Aggregation *persister.WhisperAggregation + Quotas persister.WhisperQuotas RemoveEmptyFile bool `toml:"remove-empty-file"` } @@ -121,6 +124,8 @@ type carbonserverConfig struct { ConcurrentIndex bool `toml:"concurrent-index"` RealtimeIndex int `toml:"realtime-index"` FileListCache string `toml:"file-list-cache"` + + QuotaUsageReportFrequency *Duration `toml:"quota-usage-report-frequency"` } type pprofConfig struct { @@ -213,6 +218,9 @@ func NewConfig() *Config { ScanFrequency: &Duration{ Duration: 300 * time.Second, }, + QuotaUsageReportFrequency: &Duration{ + Duration: 60 * time.Second, + }, ReadTimeout: &Duration{ Duration: 60 * time.Second, }, @@ -376,3 +384,21 @@ retentions = 60:43200,3600:43800`), 0644) return configFile } + +func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) { + for _, q := range c.Whisper.Quotas { + quotas = append(quotas, &carbonserver.Quota{ + Pattern: q.Pattern, + Namespaces: q.Namespaces, + Metrics: q.Metrics, + DataPoints: q.DataPoints, + LogicalSize: q.LogicalSize, + PhysicalSize: q.PhysicalSize, + Throughput: q.Throughput, + DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy), + StatMetricPrefix: q.StatMetricPrefix, + }) + } + + return quotas +} diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 387925f69..0455aa436 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -91,6 +91,8 @@ type metricStruct struct { TrieNodes uint64 TrieFiles uint64 TrieDirs uint64 + QuotaApplyTimeNs uint64 + UsageRefreshTimeNs uint64 } type requestsTimes struct { @@ -250,6 +252,11 @@ type CarbonserverListener struct { prometheus prometheus db *leveldb.DB + + quotas []*Quota + estimateSize func(metric string) (size, dataPoints int64) + quotaAndUsageMetrics atomic.Value // []points.Points + quotaUsageReportFrequency time.Duration } type prometheus struct { @@ -475,6 +482,9 @@ func (listener *CarbonserverListener) SetBuckets(buckets int) { func (listener *CarbonserverListener) SetScanFrequency(scanFrequency time.Duration) { listener.scanFrequency = scanFrequency } +func (listener *CarbonserverListener) SetQuotaUsageReportFrequency(quotaUsageReportFrequency time.Duration) { + listener.quotaUsageReportFrequency = quotaUsageReportFrequency +} func (listener *CarbonserverListener) SetReadTimeout(readTimeout time.Duration) { listener.readTimeout = readTimeout } @@ -531,6 +541,25 @@ func (listener *CarbonserverListener) SetInternalStatsDir(dbPath string) { func (listener *CarbonserverListener) SetPercentiles(percentiles []int) { listener.percentiles = percentiles } +func (listener *CarbonserverListener) SetEstimateSize(f func(metric string) (size, dataPoints int64)) { + listener.estimateSize = f +} +func (listener *CarbonserverListener) SetQuotas(quotas []*Quota) { + listener.quotas = quotas +} +func (listener *CarbonserverListener) ShouldThrottleMetric(ps *points.Points) bool { + fidx := listener.CurrentFileIndex() + if fidx == nil || fidx.trieIdx == nil { + return false + } + + var throttled = fidx.trieIdx.throttle(ps) + if strings.HasPrefix(ps.Metric, "vm.xhu.test.metrics") { + listener.logger.Info("metrics throttled\n", zap.String("metric", ps.Metric), zap.Bool("throttled", throttled)) + } + + return throttled +} func (listener *CarbonserverListener) CurrentFileIndex() *fileIndex { p := listener.fileIdx.Load() if p == nil { @@ -613,23 +642,44 @@ func splitAndInsert(cacheMetricNames map[string]struct{}, newCacheMetricNames [] func (listener *CarbonserverListener) fileListUpdater(dir string, tick <-chan time.Time, force <-chan struct{}, exit <-chan struct{}) { cacheMetricNames := make(map[string]struct{}) + qaurtFreq := listener.quotaUsageReportFrequency + if qaurtFreq <= 0 { + qaurtFreq = time.Minute + } + qaurt := time.NewTicker(qaurtFreq) // quota and usage refresh ticker uloop: for { select { case <-exit: + qaurt.Stop() return case <-tick: case <-force: + case <-qaurt.C: + listener.refreshQuotaAndUsage() + + // drain remaining blocked tickers + quartDrainLoop: + for { + select { + case <-qaurt.C: + default: + break quartDrainLoop + } + } + + continue uloop case m := <-listener.newMetricsChan: fidx := listener.CurrentFileIndex() if listener.trieIndex && listener.concurrentIndex && fidx != nil && fidx.trieIdx != nil { metric := "/" + filepath.Clean(strings.ReplaceAll(m, ".", "/")+".wsp") - if err := fidx.trieIdx.insert(metric); err != nil { + if err := fidx.trieIdx.insert(metric, 0, 0, 0); err != nil { listener.logger.Warn("failed to insert new metrics for realtime indexing", zap.String("metric", metric), zap.Error(err)) } } + continue uloop } @@ -644,9 +694,38 @@ uloop: listener.logger.Info("file list updated with cache, starting a new scan immediately") listener.updateFileList(dir, cacheMetricNames) } + + listener.refreshQuotaAndUsage() } } +func (listener *CarbonserverListener) refreshQuotaAndUsage() { + fidx := listener.CurrentFileIndex() + + if len(listener.quotas) == 0 || !listener.concurrentIndex || listener.realtimeIndex <= 0 || fidx == nil || fidx.trieIdx == nil { + return + } + + quotaStart := time.Now() + fidx.trieIdx.applyQuotas(listener.quotas...) + quotaTime := uint64(time.Since(quotaStart)) + atomic.StoreUint64(&listener.metrics.QuotaApplyTimeNs, quotaTime) + + usageStart := time.Now() + fidx.trieIdx.refreshUsage() + usageTime := uint64(time.Since(usageStart)) + atomic.StoreUint64(&listener.metrics.UsageRefreshTimeNs, usageTime) + + listener.quotaAndUsageMetrics.Store(fidx.trieIdx.qauMetrics) + fidx.trieIdx.qauMetrics = nil + + listener.logger.Debug( + "refreshQuotaAndUsage", + zap.Uint64("quota_apply_time", quotaTime), + zap.Uint64("usage_refresh_time", usageTime), + ) +} + func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricNames map[string]struct{}) (readFromCache bool) { logger := listener.logger.With(zap.String("handler", "fileListUpdated")) defer func() { @@ -668,7 +747,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName var infos []zap.Field if listener.trieIndex { if fidx == nil || !listener.concurrentIndex { - trieIdx = newTrie(".wsp") + trieIdx = newTrie(".wsp", listener.estimateSize) } else { trieIdx = fidx.trieIdx trieIdx.root.gen++ @@ -681,7 +760,7 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName var cacheMetricLen = len(cacheMetricNames) for fileName := range cacheMetricNames { if listener.trieIndex { - if err := trieIdx.insert(fileName); err != nil { + if err := trieIdx.insert(fileName, 0, 0, 0); err != nil { logger.Error("error populating index from cache indexMap", zap.Error(err)) } @@ -708,11 +787,11 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName if entry == "" { continue } - if err := trieIdx.insert(entry); err != nil { + if err := trieIdx.insert(entry, 0, 0, 0); err != nil { logger.Error("failed to read from file list cache", zap.Error(err)) readFromCache = false - trieIdx = newTrie(".wsp") + trieIdx = newTrie(".wsp", listener.estimateSize) break } filesLen++ @@ -746,19 +825,41 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName }() } } + var usageRefreshTimeout = struct { + refreshedAt time.Time + count int + }{time.Now(), 0} err := filepath.Walk(dir, func(p string, info os.FileInfo, err error) error { if err != nil { logger.Info("error processing", zap.String("path", p), zap.Error(err)) return nil } - // && len(listener.newMetricsChan) >= cap(listener.newMetricsChan)/2 + // WHY: as filepath.walk could potentially taking a long + // time to complete (>= 5 minutes or more), depending + // on how many files are there on disk. It's nice to + // have consistent quota and usage metrics produced as + // regularly as possible according to the + // quotaUsageReportFrequency specified in the config. + if usageRefreshTimeout.count++; usageRefreshTimeout.count > 10_000 && time.Since(usageRefreshTimeout.refreshedAt) >= listener.quotaUsageReportFrequency { + listener.refreshQuotaAndUsage() + usageRefreshTimeout.refreshedAt = time.Now() + } + + // WHY: as filepath.walk could potentially taking a long + // time to complete (>= 5 minutes or more), depending + // on how many files are there on disk. It's nice to + // try to flush newMetricsChan if possible. + // + // TODO: only trigger enter the loop when it's half full? + // len(listener.newMetricsChan) >= cap(listener.newMetricsChan)/2 if listener.trieIndex && listener.concurrentIndex && listener.newMetricsChan != nil { newMetricsLoop: for { select { case m := <-listener.newMetricsChan: - if err := trieIdx.insert(filepath.Clean(strings.ReplaceAll(m, ".", "/") + ".wsp")); err != nil { + fileName := "/" + filepath.Clean(strings.ReplaceAll(m, ".", "/")+".wsp") + if err := trieIdx.insert(fileName, 0, 0, 0); err != nil { logger.Warn("failed to update realtime trie index", zap.Error(err)) } default: @@ -787,7 +888,19 @@ func (listener *CarbonserverListener) updateFileList(dir string, cacheMetricName delete(cacheMetricNames, trimmedName) } else { if listener.trieIndex { - if err := trieIdx.insert(trimmedName); err != nil { + var dataPoints int64 + if isFullMetric && listener.estimateSize != nil { + m := strings.ReplaceAll(trimmedName, "/", ".") + m = m[1 : len(m)-4] + _, dataPoints = listener.estimateSize(m) + } + + var physicalSize = info.Size() + if stat, ok := info.Sys().(*syscall.Stat_t); ok { + physicalSize = stat.Blocks * 512 + } + + if err := trieIdx.insert(trimmedName, info.Size(), physicalSize, dataPoints); err != nil { return fmt.Errorf("updateFileList.trie: %s", err) } } else { @@ -1175,6 +1288,10 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) { senderRaw("trie_index_files", &listener.metrics.TrieFiles, send) senderRaw("trie_index_dirs", &listener.metrics.TrieDirs, send) } + if len(listener.quotas) > 0 { + senderRaw("quota_apply_time_ns", &listener.metrics.QuotaApplyTimeNs, send) + senderRaw("usage_refresh_time_ns", &listener.metrics.UsageRefreshTimeNs, send) + } sender("alloc", &alloc, send) sender("total_alloc", &totalAlloc, send) @@ -1212,6 +1329,12 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) { } } } + + // why using _: avoid panics due to casting nil atomic.Value + qauMetrics, _ := listener.quotaAndUsageMetrics.Load().([]points.Points) + for _, ps := range qauMetrics { + send(ps.Metric, float64(ps.Data[0].Value)) + } } func (listener *CarbonserverListener) Stop() error { @@ -1337,6 +1460,18 @@ func (listener *CarbonserverListener) Listen(listen string) error { } }) + carbonserverMux.HandleFunc("/admin/quota", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "text/plain") + + fidx := listener.CurrentFileIndex() + if fidx == nil && fidx.trieIdx == nil { + fmt.Fprintf(w, "index doesn't exist.") + return + } + + fidx.trieIdx.getQuotaTree(w) + }) + carbonserverMux.HandleFunc("/robots.txt", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "User-agent: *\nDisallow: /") }) diff --git a/carbonserver/trie.go b/carbonserver/trie.go index 7a5e1faea..e210cdd6a 100644 --- a/carbonserver/trie.go +++ b/carbonserver/trie.go @@ -1,6 +1,7 @@ package carbonserver import ( + "crypto/md5" "errors" "fmt" "io" @@ -11,6 +12,7 @@ import ( "unsafe" trigram "github.com/dgryski/go-trigram" + "github.com/go-graphite/go-carbon/points" ) // debug codes diff for reference: https://play.golang.org/p/FxuvRyosk3U @@ -282,7 +284,7 @@ func newGlobState(expr string, expand func(globs []string) ([]string, error)) (* m.dstates = append(m.dstates, &droot) // TODO: consider dropping trigram integration - if m.lsComplex { + if m.lsComplex && expand != nil { es, err := expand([]string{expr}) if err != nil { return &m, nil @@ -316,17 +318,98 @@ type trieIndex struct { depth uint64 longestMetric string trigrams map[*trieNode][]uint32 + + // qau: Quota And Usage + qauMetrics []points.Points + estimateSize func(metric string) (size, dataPoints int64) } type trieNode struct { c []byte // TODO: look for a more compact/compressed formats childrens *[]*trieNode gen uint8 + + meta trieMeta +} + +type trieMeta interface{ trieMeta() } + +type fileMeta struct { + logicalSize int64 + physicalSize int64 + dataPoints int64 +} + +func (fm *fileMeta) trieMeta() {} + +type dirMeta struct { + // type: *Quota + // note: the underlying Quota value is shared with other dir nodes. + // + // TODO: save 8 bytes by using a pointer value? + quota atomic.Value + + usage *QuotaUsage +} + +func newDirMeta() *dirMeta { return &dirMeta{usage: &QuotaUsage{}} } + +func (dm *dirMeta) trieMeta() {} +func (dm *dirMeta) update(quota *Quota) { dm.quota.Store(quota) } + +func (dm *dirMeta) withinQuota(metrics, namespaces, logical, physical, dataPoints, throughput int64) bool { + quota, ok := dm.quota.Load().(*Quota) + if !ok { + return true + } + + var ( + qmetrics = quota.Metrics + qnamespaces = quota.Namespaces + qlogicalSize = quota.LogicalSize + qphysicalSize = quota.PhysicalSize + qdataPoints = quota.DataPoints + qthroughput = quota.Throughput + + umetrics = atomic.LoadInt64(&dm.usage.Metrics) + unamespaces = atomic.LoadInt64(&dm.usage.Namespaces) + ulogicalsize = atomic.LoadInt64(&dm.usage.LogicalSize) + uphysicalsize = atomic.LoadInt64(&dm.usage.PhysicalSize) + udataPoints = atomic.LoadInt64(&dm.usage.DataPoints) + uthroughput = atomic.LoadInt64(&dm.usage.Throughput) + ) + + if qmetrics > 0 && umetrics+metrics > qmetrics { + return false + } + if qnamespaces > 0 && unamespaces+namespaces > qnamespaces { + return false + } + if qlogicalSize > 0 && ulogicalsize+logical > qlogicalSize { + return false + } + if qphysicalSize > 0 && uphysicalsize+physical > qphysicalSize { + return false + } + if qdataPoints > 0 && udataPoints+dataPoints > qdataPoints { + return false + } + if qthroughput > 0 && uthroughput+throughput > qthroughput { + return false + } + + return true } var emptyTrieNodes = &[]*trieNode{} -func newFileNode(m uint8) *trieNode { return &trieNode{childrens: emptyTrieNodes, gen: m} } +func newFileNode(m uint8, logicalSize, physicalSize, dataPoints int64) *trieNode { + return &trieNode{ + childrens: emptyTrieNodes, + gen: m, + meta: &fileMeta{logicalSize: logicalSize, physicalSize: physicalSize, dataPoints: dataPoints}, + } +} func (tn *trieNode) dir() bool { return len(tn.c) == 1 && tn.c[0] == '/' } func (tn *trieNode) file() bool { return tn.c == nil } @@ -363,11 +446,13 @@ func (ti *trieIndex) trigramsContains(tn *trieNode, t uint32) bool { return false } -func newTrie(fileExt string) *trieIndex { +func newTrie(fileExt string, estimateSize func(metric string) (size, dataPoints int64)) *trieIndex { + meta := newDirMeta() return &trieIndex{ - root: &trieNode{childrens: &[]*trieNode{}}, - fileExt: fileExt, - trigrams: map[*trieNode][]uint32{}, + root: &trieNode{childrens: &[]*trieNode{}, meta: meta}, + fileExt: fileExt, + trigrams: map[*trieNode][]uint32{}, + estimateSize: estimateSize, } } @@ -378,13 +463,16 @@ type nilFilenameError string func (nfe nilFilenameError) Error() string { return string(nfe) } -// TODO: add some defensive logics agains bad paths? +// TODO: add some more defensive logics agains bad paths? // // abc.def.ghi // abc.def2.ghi // abc.daf2.ghi // efg.cjk -func (ti *trieIndex) insert(path string) error { +// +// insert expects / separated file path, like ns1/ns2/ns3/metric.wsp. +// insert considers path name ending with trieIndex.fileExt as a metric. +func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints int64) error { path = filepath.Clean(path) if len(path) > 0 && path[0] == '/' { path = path[1:] @@ -526,7 +614,7 @@ outer: } if i < len(path) { - newn = &trieNode{c: []byte{'/'}, childrens: &[]*trieNode{}, gen: ti.root.gen} + newn = ti.newDir() cur.addChild(newn) cur = newn } @@ -537,31 +625,51 @@ outer: if !isFile { // TODO: should double check if / already exists if cur.dir() { - cur.addChild(&trieNode{c: []byte{'/'}, childrens: &[]*trieNode{}, gen: ti.root.gen}) + cur.addChild(ti.newDir()) } + return nil } var hasFileNode bool for _, c := range *cur.childrens { - // if c == fileNode { if c.file() { hasFileNode = true c.gen = ti.root.gen + + if logicalSize > 0 || physicalSize > 0 || dataPoints > 0 { + atomic.StoreInt64(&c.meta.(*fileMeta).logicalSize, logicalSize) + atomic.StoreInt64(&c.meta.(*fileMeta).physicalSize, physicalSize) + atomic.StoreInt64(&c.meta.(*fileMeta).dataPoints, dataPoints) + } break } } if !hasFileNode { - cur.addChild(newFileNode(ti.root.gen)) + if ti.estimateSize != nil && logicalSize == 0 && physicalSize == 0 && dataPoints == 0 { + logicalSize, dataPoints = ti.estimateSize(strings.ReplaceAll(path, "/", ".")) + physicalSize = logicalSize + } + cur.addChild(newFileNode(ti.root.gen, logicalSize, physicalSize, dataPoints)) ti.fileCount++ } return nil } +func (ti *trieIndex) newDir() *trieNode { + n := &trieNode{ + c: []byte{'/'}, + childrens: &[]*trieNode{}, + gen: ti.root.gen, + } + + return n +} + // TODO: add some defensive logics agains bad queries? // depth first search -func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ([]string, error)) (files []string, isFiles []bool, err error) { +func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ([]string, error)) (files []string, isFiles []bool, nodes []*trieNode, err error) { expr = strings.TrimSpace(expr) if expr == "" { expr = "*" @@ -574,14 +682,14 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ( } gs, err := newGlobState(node, expand) if err != nil { - return nil, nil, err + return nil, nil, nil, err } exact = exact && gs.exact matchers = append(matchers, gs) } if len(matchers) == 0 { - return nil, nil, nil + return nil, nil, nil, nil } var cur = ti.root @@ -595,6 +703,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ( var curm = matchers[0] var ndstate *gdstate var isFile, isDir, hasMoreNodes bool + var dirNode *trieNode for { if nindex[ncindex] >= len(curChildrens) { @@ -663,6 +772,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ( isFile = true case child.dir(): isDir = true + dirNode = child default: hasMoreNodes = true } @@ -688,6 +798,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ( if isDir { files = append(files, cur.fullPath('.', trieNodes[:ncindex])) isFiles = append(isFiles, false) + nodes = append(nodes, dirNode) } if len(files) >= limit || exact { @@ -722,7 +833,7 @@ func (ti *trieIndex) query(expr string, limit int, expand func(globs []string) ( continue } - return files, isFiles, nil + return files, isFiles, nodes, nil } func (tn *trieNode) fullPath(sep byte, parents []*trieNode) string { @@ -807,6 +918,10 @@ func (ti *trieIndex) dump(w io.Writer) { var curChildrens = cur.getChildrens() var trieNodes = make([]*trieNode, depth) var ident []byte + + fmt.Fprintf(w, "%s%s (%d/%d) (quota:%s usage:%s) %p\n", ident, "/", len(*cur.childrens), cur.gen, cur.meta.(*dirMeta).quota.Load(), cur.meta.(*dirMeta).usage, cur) + ident = append(ident, ' ', ' ') + for { if nindex[ncindex] >= len(curChildrens) { goto parent @@ -820,11 +935,69 @@ func (ti *trieIndex) dump(w io.Writer) { goto parent } + switch { + case cur.file(): + fmt.Fprintf(w, "%s$ (%d/%d) %p\n", ident, len(*cur.childrens), cur.gen, cur) + case cur.dir() && cur.meta != nil: + fmt.Fprintf(w, "%s%s (%d/%d) (quota:%s usage:%s) %p\n", ident, cur.c, len(*cur.childrens), cur.gen, cur.meta.(*dirMeta).quota.Load(), cur.meta.(*dirMeta).usage, cur) + default: + fmt.Fprintf(w, "%s%s (%d/%d) %p\n", ident, cur.c, len(*cur.childrens), cur.gen, cur) + } + ident = append(ident, ' ', ' ') + if cur.file() { - fmt.Fprintf(w, "%s$ (%d/%d)\n", ident, len(*cur.childrens), cur.gen) - } else { - fmt.Fprintf(w, "%s%s (%d/%d)\n", ident, cur.c, len(*cur.childrens), cur.gen) + goto parent + } + + continue + + parent: + nindex[ncindex] = 0 + ncindex-- + if ncindex < 0 { + break } + nindex[ncindex]++ + cur = trieNodes[ncindex] + curChildrens = cur.getChildrens() + + ident = ident[:len(ident)-2] + + continue + } +} + +func (ti *trieIndex) getQuotaTree(w io.Writer) { + var depth = ti.getDepth() + trieDepthBuffer + var nindex = make([]int, depth) + var ncindex int + var cur = ti.root + var curChildrens = cur.getChildrens() + var trieNodes = make([]*trieNode, depth) + var ident []byte + + fmt.Fprintf(w, "%s%s (%d/%d) (quota:%s usage:%s) %p\n", ident, "/", len(*cur.childrens), cur.gen, cur.meta.(*dirMeta).quota.Load(), cur.meta.(*dirMeta).usage, cur) + ident = append(ident, ' ', ' ') + + for { + if nindex[ncindex] >= len(curChildrens) { + goto parent + } + + trieNodes[ncindex] = cur + cur = cur.getChild(curChildrens, nindex[ncindex]) + curChildrens = cur.getChildrens() + ncindex++ + if ncindex >= len(trieNodes)-1 { + goto parent + } + + if cur.dir() && cur.meta != nil { + name := ti.root.fullPath('.', trieNodes[:ncindex]) + + fmt.Fprintf(w, "%s%s %s (%d/%d) (quota:%s usage:%s) %p\n", ident, cur.c, name, len(*cur.childrens), cur.gen, cur.meta.(*dirMeta).quota.Load(), cur.meta.(*dirMeta).usage, cur) + } + ident = append(ident, ' ', ' ') if cur.file() { @@ -1203,7 +1376,7 @@ func (listener *CarbonserverListener) expandGlobsTrie(query string) ([]string, [ var leafs []bool for _, g := range globs { - f, l, err := fidx.trieIdx.query(g, listener.maxMetricsGlobbed-len(files), listener.expandGlobBraces) + f, l, _, err := fidx.trieIdx.query(g, listener.maxMetricsGlobbed-len(files), listener.expandGlobBraces) if err != nil { return nil, nil, err } @@ -1213,3 +1386,521 @@ func (listener *CarbonserverListener) expandGlobsTrie(query string) ([]string, [ return files, leafs, nil } + +type QuotaDroppingPolicy int8 + +const ( + QDPNew QuotaDroppingPolicy = iota + QDPNone +) + +func ParseQuotaDroppingPolicy(policy string) QuotaDroppingPolicy { + switch policy { + case "none": + return QDPNone + case "new": + fallthrough + default: + return QDPNew + } +} + +func (qdp QuotaDroppingPolicy) String() string { + switch qdp { + case QDPNone: + return "none" + case QDPNew: + fallthrough + default: + return "new" + } +} + +type Quota struct { + Pattern string + Namespaces int64 // top level subdirectories + Metrics int64 // files + LogicalSize int64 + PhysicalSize int64 + + DataPoints int64 + Throughput int64 + + DroppingPolicy QuotaDroppingPolicy + StatMetricPrefix string +} + +func (q *Quota) String() string { + return fmt.Sprintf("pattern:%s,dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,policy:%s", q.Pattern, q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.DroppingPolicy) +} + +type QuotaUsage struct { + Namespaces int64 // top level subdirectories + Metrics int64 // files + LogicalSize int64 + PhysicalSize int64 + + DataPoints int64 // inferred from retention policy + Throughput int64 + + Throttled int64 +} + +func (q *QuotaUsage) String() string { + return fmt.Sprintf("dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,throttled:%d", q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.Throttled) +} + +// applyQuotas applies quotas on new and old dirnodes. +// It can't be evoked with concurrent trieIndex.insert. +// +// TODO: * how to remove old quotas? +func (ti *trieIndex) applyQuotas(quotas ...*Quota) error { + for _, quota := range quotas { + if quota.Pattern == "/" { + ti.root.meta.(*dirMeta).update(quota) + + continue + } + + _, _, nodes, err := ti.query(strings.ReplaceAll(quota.Pattern, ".", "/"), 1<<31-1, nil) + if err != nil { + return err + } + + for _, node := range nodes { + if node.meta == nil { + node.meta = newDirMeta() + } + + meta, ok := node.meta.(*dirMeta) + if !ok { + // TODO: log an error? + continue + } + + meta.update(quota) + } + } + + return nil +} + +// refreshUsage updates usage data and generate stat metrics. +// It can't be evoked with concurrent trieIndex.insert. +func (ti *trieIndex) refreshUsage() { + type state struct { + next int + node *trieNode + childrens *[]*trieNode + + files int64 + logicalSize int64 + physicalSize int64 + dataPoints int64 + } + + var idx int + var depth = ti.getDepth() + trieDepthBuffer + var states = make([]state, depth) + + var pstate *state + var cur = &states[idx] + cur.node = ti.root + cur.childrens = cur.node.childrens + + var dirs = make([]int64, depth) + var dirIndex int + + // Drops unflushed metrics to avoid overusing memories, as timestamp is + // set by app.Collector, keeping old stats helps no one. + ti.qauMetrics = ti.qauMetrics[:0] + + for { + if cur.next >= len(*cur.childrens) { + if (cur.node.dir() || cur.node == ti.root) && dirIndex >= 0 { + if cur.node.meta != nil && cur.node.meta.(*dirMeta) != nil && cur.node.meta.(*dirMeta).usage != nil { + usage := cur.node.meta.(*dirMeta).usage + atomic.StoreInt64(&usage.Namespaces, dirs[dirIndex]) + atomic.StoreInt64(&usage.Metrics, cur.files) + atomic.StoreInt64(&usage.LogicalSize, cur.logicalSize) + atomic.StoreInt64(&usage.PhysicalSize, cur.physicalSize) + atomic.StoreInt64(&usage.DataPoints, cur.dataPoints) + + var name string + if cur.node == ti.root { + name = "root" + } else { + var nodes []*trieNode + for i := 0; i < idx; i++ { + nodes = append(nodes, states[i].node) + } + name = ti.root.fullPath('-', nodes) + } + var prefix string + if quota, ok := cur.node.meta.(*dirMeta).quota.Load().(*Quota); ok { + prefix = quota.StatMetricPrefix + } + ti.generateQuotaAndUsageMetrics(prefix, name, cur.node) + + throttled := atomic.LoadInt64(&usage.Throttled) + if throttled > 0 { + atomic.AddInt64(&usage.Throttled, -throttled) + } + throughput := atomic.LoadInt64(&usage.Throughput) + if throughput > 0 { + atomic.AddInt64(&usage.Throughput, -throughput) + } + } + + dirIndex-- + } + + goto parent + } + + idx++ + if idx >= len(states)-1 { + goto parent + } + + states[idx] = state{} // reset to zero value + + pstate = &states[idx-1] + cur = &states[idx] + cur.node = (*pstate.node.childrens)[pstate.next] + cur.childrens = cur.node.childrens + + if cur.node.file() { + cur.files++ + cur.logicalSize += cur.node.meta.(*fileMeta).logicalSize + cur.physicalSize += cur.node.meta.(*fileMeta).physicalSize + cur.dataPoints += cur.node.meta.(*fileMeta).dataPoints + + goto parent + } else if cur.node.dir() { + dirs[dirIndex]++ + dirIndex++ + dirs[dirIndex] = 0 + } + + continue + + parent: + files := cur.files + logicalSize := cur.logicalSize + physicalSize := cur.physicalSize + dataPoints := cur.dataPoints + + states[idx] = state{} + idx-- + if idx < 0 { + break + } + + cur = &states[idx] + cur.files += files + cur.logicalSize += logicalSize + cur.physicalSize += physicalSize + cur.dataPoints += dataPoints + cur.next++ + + continue + } +} + +func (ti *trieIndex) generateQuotaAndUsageMetrics(prefix, name string, node *trieNode) { + // WHY: on linux, the maximum filename length is 255, keeping 5 here for + // file extension. + if len(name) >= 250 { + name = fmt.Sprintf("%s-%x", name[:(250-md5.Size*2-1)], md5.Sum([]byte(name))) //nolint:nosec + } + + // Note: Timestamp for each points.Points are set by collector send logics + meta := node.meta.(*dirMeta) + quota := meta.quota.Load().(*Quota) + if quota.Namespaces > 0 { + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("quota.namespaces.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(quota.Namespaces), + }}, + }) + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("usage.namespaces.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.Namespaces)), + }}, + }) + } + if quota.Metrics > 0 { + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("quota.metrics.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(quota.Metrics), + }}, + }) + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("usage.metrics.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.Metrics)), + }}, + }) + } + if quota.DataPoints > 0 { + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("quota.data_points.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(quota.DataPoints), + }}, + }) + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("usage.data_points.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.DataPoints)), + }}, + }) + } + if quota.LogicalSize > 0 { + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("quota.logical_size.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(quota.LogicalSize), + }}, + }) + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("usage.logical_size.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.LogicalSize)), + }}, + }) + } + if quota.PhysicalSize > 0 { + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("quota.physical_size.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(quota.PhysicalSize), + }}, + }) + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("usage.physical_size.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.PhysicalSize)), + }}, + }) + } + if quota.Throughput > 0 { + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("quota.throughput.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(quota.Throughput), + }}, + }) + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("usage.throughput.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.Throughput)), + }}, + }) + } + + ti.qauMetrics = append(ti.qauMetrics, points.Points{ + Metric: fmt.Sprintf("throttle.%s%s", prefix, name), + Data: []points.Point{{ + Value: float64(atomic.LoadInt64(&meta.usage.Throttled)), + }}, + }) +} + +//nolint:golint,unused +func (ti *trieIndex) getNodeFullPath(node *trieNode) string { + //nolint:golint,unused + type state struct { + next int + node *trieNode + childrens *[]*trieNode + } + + var idx int + var depth = ti.getDepth() + trieDepthBuffer + var states = make([]state, depth) + + var cur = &states[idx] + cur.node = ti.root + cur.childrens = cur.node.childrens + + for { + if cur.next >= len(*cur.childrens) { + + goto parent + } + + idx++ + if idx >= len(states)-1 { + goto parent + } + + cur = &states[idx] + cur.next = 0 + cur.node = (*states[idx-1].childrens)[states[idx-1].next] + cur.childrens = cur.node.childrens + + if cur.node == node { + var parents []*trieNode + for i := 0; i < idx; i++ { + parents = append(parents, states[i].node) + } + + return cur.node.fullPath('.', parents) + } + + if cur.node.file() { + goto parent + } + + continue + + parent: + states[idx] = state{} + idx-- + if idx < 0 { + break + } + + cur = &states[idx] + cur.next++ + + continue + } + + return "" +} + +func (ti *trieIndex) throttle(ps *points.Points) bool { + var node = ti.root + var dirs = []*trieNode{ti.root} + var mindex int + var isNew bool + var metric = ps.Metric + +mloop: + for { + cindex := 0 + for ; cindex < len(node.c) && mindex < len(metric); cindex++ { + if node.c[cindex] != metric[mindex] { + isNew = true + break mloop + } + + mindex++ + } + + if cindex < len(node.c) { + isNew = true + break + } + + childrens := node.getChildrens() + + if mindex < len(metric) && metric[mindex] == '.' { + var dir *trieNode + for i := 0; i < len(childrens); i++ { + child := node.getChild(childrens, i) + if !child.dir() { + continue + } + + dirs = append(dirs, child) + dir = child + break + } + + if dir == nil { + isNew = true + break + } + + node = dir + childrens = dir.getChildrens() + mindex++ + } + + for i := 0; i < len(childrens); i++ { + child := node.getChild(childrens, i) + if mindex >= len(metric) { + if child.file() { + break mloop + } + + continue + } + + if len(child.c) > 0 && child.c[0] == metric[mindex] { + node = child + continue mloop + } + } + + isNew = true + break + } + + if !isNew { + // Throughput quota applies on both new and old metrics + for _, n := range dirs { + meta, ok := n.meta.(*dirMeta) + if !ok || meta.usage == nil { + continue + } + quota, ok := meta.quota.Load().(*Quota) + if !ok || quota.Throughput <= 0 { + continue + } + + uthroughput := atomic.LoadInt64(&meta.usage.Throughput) + if uthroughput+int64(len(ps.Data)) > quota.Throughput { + return true + } + + atomic.AddInt64(&meta.usage.Throughput, int64(len(ps.Data))) + } + + return false + } + + size, dataPoints := ti.estimateSize(metric) + + var toThrottle bool + for i, n := range dirs { + // TODO: might need more considerations + var namespaces int64 + if i == len(dirs)-1 { + namespaces = 1 + } + + meta, ok := n.meta.(*dirMeta) + if !ok || meta.withinQuota(1, namespaces, size, size, dataPoints, int64(len(ps.Data))) { + continue + } + + if meta.usage != nil { + atomic.AddInt64(&meta.usage.Throttled, 1) + } + + if quota, ok := meta.quota.Load().(*Quota); ok && quota != nil && quota.DroppingPolicy == QDPNone { + continue + } + + toThrottle = true + break + } + + if !toThrottle { + for _, n := range dirs { + if meta, ok := n.meta.(*dirMeta); ok && meta != nil && meta.usage != nil { + atomic.AddInt64(&meta.usage.Throughput, int64(len(ps.Data))) + } + } + } + + return toThrottle +} diff --git a/carbonserver/trie_real_test.go b/carbonserver/trie_real_test.go index e4b36aa7a..e186c1aaf 100644 --- a/carbonserver/trie_real_test.go +++ b/carbonserver/trie_real_test.go @@ -235,9 +235,9 @@ func uniqFilesLeafs(files []string, leafs []bool) ([]string, []bool) { func TestTrieContinuousUpdate(t *testing.T) { files := readFile(*testDataPath) - ptrie := newTrie(".wsp") + ptrie := newTrie(".wsp", nil, ) for i := 0; i < 100; i++ { - ttrie := newTrie(".wsp") + ttrie := newTrie(".wsp", nil, ) ptrie.root.gen++ var count int fmt.Println("--- run", i, count) @@ -246,8 +246,8 @@ func TestTrieContinuousUpdate(t *testing.T) { continue } count++ - ptrie.insert(f) - ttrie.insert(f) + ptrie.insert(f, 0, 0, 0) + ttrie.insert(f, 0, 0, 0) } count0, files0, dirs0, onec0, onefc0, onedc0, countByChildren0, nodesByMark0 := ptrie.countNodes() diff --git a/carbonserver/trie_test.go b/carbonserver/trie_test.go index d30405f3b..c934ea24e 100644 --- a/carbonserver/trie_test.go +++ b/carbonserver/trie_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/dgryski/go-trigram" + "github.com/go-graphite/go-carbon/points" "go.uber.org/zap" ) @@ -37,9 +38,9 @@ func newTrieServer(files []string, withTrigram bool, l logf) *CarbonserverListen listener.failOnMaxGlobs = true start := time.Now() - trieIndex := newTrie(".wsp") + trieIndex := newTrie(".wsp", nil) for _, file := range files { - trieIndex.insert(file) + trieIndex.insert(file, 0, 0, 0) } l.Logf("trie index took %s\n", time.Since(start)) @@ -706,16 +707,16 @@ func TestTrieIndex(t *testing.T) { } func TestTrieEdgeCases(t *testing.T) { - var trie = newTrie(".wsp") + var trie = newTrie(".wsp", nil) - _, _, err := trie.query("[\xff\xff-\xff", 1000, func([]string) ([]string, error) { return nil, nil }) + _, _, _, err := trie.query("[\xff\xff-\xff", 1000, func([]string) ([]string, error) { return nil, nil }) if err == nil || err.Error() != "glob: range overflow" { t.Errorf("trie should return an range overflow error") } } func TestTrieConcurrentReadWrite(t *testing.T) { - trieIndex := newTrie(".wsp") + trieIndex := newTrie(".wsp", nil) rand.Seed(time.Now().Unix()) @@ -729,7 +730,7 @@ func TestTrieConcurrentReadWrite(t *testing.T) { for j := 0; j < factor; j++ { for k := 0; k < factor; k++ { filem.Store(fmt.Sprintf("level-0-%d.level-1-%d.level-2-%d", i, j, k), true) - trieIndex.insert(fmt.Sprintf("/level-0-%d/level-1-%d/level-2-%d.wsp", i, j, k)) + trieIndex.insert(fmt.Sprintf("/level-0-%d/level-1-%d/level-2-%d.wsp", i, j, k), 0, 0, 0) // if (i+j+k)%5 == 0 { // time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) // } @@ -754,7 +755,7 @@ func TestTrieConcurrentReadWrite(t *testing.T) { return // case <-filec: default: - files, _, err := trieIndex.query(fmt.Sprintf("level-0-%d/level-1-%d/level-2-%d*", rand.Intn(factor), rand.Intn(factor), rand.Intn(factor)), int(math.MaxInt64), nil) + files, _, _, err := trieIndex.query(fmt.Sprintf("level-0-%d/level-1-%d/level-2-%d*", rand.Intn(factor), rand.Intn(factor), rand.Intn(factor)), int(math.MaxInt64), nil) if err != nil { panic(err) } @@ -842,17 +843,17 @@ func TestTriePrune(t *testing.T) { for i, c := range cases { t.Run(strconv.Itoa(i), func(t *testing.T) { - ctrieIndex := newTrie(".wsp") - strieIndex := newTrie(".wsp") + ctrieIndex := newTrie(".wsp", nil) + strieIndex := newTrie(".wsp", nil) for _, f := range c.files1 { - ctrieIndex.insert(f) + ctrieIndex.insert(f, 0, 0, 0) } ctrieIndex.root.gen++ for _, f := range c.files2 { - ctrieIndex.insert(f) - strieIndex.insert(f) + ctrieIndex.insert(f, 0, 0, 0) + strieIndex.insert(f, 0, 0, 0) } ctrieIndex.prune() @@ -1019,3 +1020,269 @@ func TestDumpAllMetrics(t *testing.T) { t.Errorf("trie.allMetrics:\nwant: %s\ngot: %s\n", files, metrics) } } + +func TestTrieQuotaGeneral(t *testing.T) { + type metric struct { + name string + logicalSize int64 + physicalSize int64 + dataPoints int64 + } + type throttleTest struct { + metric string + data []points.Point + throttle bool + reason string + } + + var cases = []struct { + input1 []metric + input2 []metric + quotas []*Quota + tests []throttleTest + statMetrics []points.Points + }{ + { + input1: (func() (r []metric) { + for i := 0; i < 5; i++ { + r = append( + r, + metric{fmt.Sprintf("/sys/app/server-%02d/cpu.wsp", i), 12 * 1024, 12 * 1024, 1024}, + metric{fmt.Sprintf("/sys/app/server-%02d/memory.wsp", i), 12 * 1024, 12 * 1024, 1024}, + metric{fmt.Sprintf("/sys/app/server-%02d/iostat.wsp", i), 12 * 1024, 12 * 1024, 1024}, + ) + } + + r = append( + r, + metric{fmt.Sprintf("/sys/db/server-%02d/iostat.wsp", 0), 12 * 1024, 12 * 1024, 1024}, + metric{fmt.Sprintf("/user/foo/server-%02d/cpu.wsp", 0), 12 * 1024, 12 * 1024, 1024}, + metric{fmt.Sprintf("/play/foo/server-%02d/cpu.wsp", 0), 12 * 1024, 12 * 1024, 1024}, + ) + return r + })(), + + input2: []metric{ + {"/sys/kv/server-00/iostat.wsp", 12 * 1024, 12 * 1024, 1024}, + {"/sys/kv/server-01/iostat.wsp", 12 * 1024, 12 * 1024, 1024}, + }, + + quotas: []*Quota{ + { + Pattern: "/", + Namespaces: 3, + Metrics: 60, + }, + { + Pattern: "*", + Namespaces: 3, + Metrics: 500, + }, + { + Pattern: "sys", + Namespaces: 3, + Metrics: 500, + }, + { + Pattern: "sys.*", + Metrics: 600, + }, + { + Pattern: "sys.kv", + Namespaces: 2, + Metrics: 600, + }, + { + Pattern: "sys.app.*", + Metrics: 3, + }, + { + Pattern: "sys.app.server-01", + Metrics: 4, + }, + { + Pattern: "sys.app.server-02", + Metrics: 4, + DataPoints: 1024 * 3, + }, + }, + tests: []throttleTest{ + {"sys.app.server-31.cpu", nil, false, ""}, + {"sys.app.server-00.cpu3", nil, true, "throttled by sys.app.* metrics limit"}, + + {"sys.app.server-01.cpu3", nil, false, ""}, + {"sys.app.server-02.cpu3", nil, true, "throttled by sys.app.server-01 dataPoints limit"}, + + {"sys.kv.server-01.cpu2", nil, false, ""}, + {"sys.kv.server-02.iostat", nil, true, "throttled by sys.kv namespaces limit"}, + + {"foo2.kv.server-02.iostat", nil, true, "throttled by / namespaces limit"}, + }, + }, + { + input1: (func() (r []metric) { + for i := 0; i < 5; i++ { + r = append( + r, + metric{fmt.Sprintf("/sys/app/server-%02d/cpu.wsp", i), 12 * 1024, 12 * 1024, 1024}, + metric{fmt.Sprintf("/sys/app/server-%02d/memory.wsp", i), 12 * 1024, 12 * 1024, 1024}, + metric{fmt.Sprintf("/sys/app/server-%02d/iostat.wsp", i), 12 * 1024, 12 * 1024, 1024}, + ) + } + + return r + })(), + + quotas: []*Quota{ + { + Pattern: "/", + PhysicalSize: 1024 * 12 * 15, + }, + { + Pattern: "*", + Metrics: 500, + }, + }, + tests: []throttleTest{ + {"sys.app.server-00.cpu", nil, false, ""}, + {"sys.app.server-00.cpu3", nil, true, "throttled by / physicalSize limit"}, + }, + }, + { + input1: (func() (r []metric) { + r = append(r, metric{"/sys/app/srv1/nodes/host-01/cpu.wsp", 1, 1, 1}) + r = append(r, metric{"/sys/app/srv1/nodes/host-01/mem.wsp", 1, 1, 1}) + r = append(r, metric{"/sys/app/srv2/nodes/host-01/mem.wsp", 1, 1, 1}) + + r = append(r, metric{"/sys/app/srv1/nodes/foo-01/cpu-0.wsp", 1, 1, 1}) + r = append(r, metric{"/sys/app/srv1/nodes/foo-01/cpu-1.wsp", 1, 1, 1}) + r = append(r, metric{"/sys/app/srv1/nodes/foo-01/cpu-2.wsp", 1, 1, 1}) + + return r + })(), + + quotas: []*Quota{ + { + Pattern: "/", + PhysicalSize: 1024 * 12 * 15, + }, + { + Pattern: "sys.app.*.nodes.*", + Metrics: 2, + }, + { + Pattern: "sys.app.*.nodes.foo-*", + Metrics: 4, + }, + }, + tests: []throttleTest{ + {"sys.app.srv1.nodes.host-01.cpu2", nil, true, "throttled by sys.app.*.nodes.* memtrics limit"}, + {"sys.app.srv1.nodes.foo-01.cpu-3", nil, false, ""}, + }, + + statMetrics: []points.Points{ + {Metric: "quota.metrics.sys-app-srv1-nodes-host-01", Data: []points.Point{{Value: 2}}}, + {Metric: "usage.metrics.sys-app-srv1-nodes-host-01", Data: []points.Point{{Value: 2}}}, + {Metric: "throttle.sys-app-srv1-nodes-host-01", Data: []points.Point{{Value: 1}}}, + + {Metric: "quota.metrics.sys-app-srv1-nodes-foo-01", Data: []points.Point{{Value: 4}}}, + {Metric: "usage.metrics.sys-app-srv1-nodes-foo-01", Data: []points.Point{{Value: 3}}}, + {Metric: "throttle.sys-app-srv1-nodes-foo-01", Data: []points.Point{{Value: 0}}}, + + {Metric: "quota.metrics.sys-app-srv2-nodes-host-01", Data: []points.Point{{Value: 2}}}, + {Metric: "usage.metrics.sys-app-srv2-nodes-host-01", Data: []points.Point{{Value: 1}}}, + {Metric: "throttle.sys-app-srv2-nodes-host-01", Data: []points.Point{{Value: 0}}}, + + {Metric: "quota.physical_size.root", Data: []points.Point{{Value: 184320}}}, + {Metric: "usage.physical_size.root", Data: []points.Point{{Value: 6}}}, + {Metric: "throttle.root", Data: []points.Point{{Value: 0}}}, + }, + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + tindex := newTrie( + ".wsp", + func(metric string) (size, dataPoints int64) { + return 12 * 1024, 1024 + }, + ) + + for _, m := range c.input1 { + tindex.insert(m.name, m.logicalSize, m.physicalSize, m.dataPoints) + } + tindex.applyQuotas(c.quotas...) + tindex.qauMetrics = nil + tindex.refreshUsage() + + for _, m := range c.input2 { + tindex.insert(m.name, m.logicalSize, m.physicalSize, m.dataPoints) + } + + tindex.applyQuotas(c.quotas...) + tindex.qauMetrics = nil + tindex.refreshUsage() + + // tindex.dump(os.Stdout) + // tindex.getQuotaTree(os.Stdout) + + for _, tt := range c.tests { + t.Logf("Teseting throttle(%q)", tt.metric) + if throttle := tindex.throttle(&points.Points{Metric: tt.metric, Data: tt.data}); throttle != tt.throttle { + t.Errorf("throttle(%q) = %t, wants %t (explanation:%q)", tt.metric, throttle, tt.throttle, tt.reason) + } + } + + tindex.qauMetrics = nil + tindex.refreshUsage() + + if c.statMetrics != nil && !reflect.DeepEqual(tindex.qauMetrics, c.statMetrics) { + t.Errorf("qauMetrics:\n%swants:\n%s", stringifyQuotaPoints(tindex.qauMetrics), stringifyQuotaPoints(c.statMetrics)) + } + }) + } +} + +func stringifyQuotaPoints(ps []points.Points) string { + var str string + for _, p := range ps { + str += fmt.Sprintf("%s %v\n", p.Metric, p.Data[0].Value) + } + return str +} + +func TestTrieQuotaThroughput(t *testing.T) { + tindex := newTrie( + ".wsp", + func(metric string) (size, dataPoints int64) { + return 12 * 1024, 1024 + }, + ) + + tindex.insert("/sys/app/server-001/cpu.wsp", 0, 0, 0) + tindex.insert("/sys/app/server-002/cpu.wsp", 0, 0, 0) + + tindex.applyQuotas( + &Quota{ + Pattern: "/", + PhysicalSize: 1024 * 12 * 15, + }, + &Quota{ + Pattern: "sys.app.*", + Throughput: 5, + }, + ) + tindex.refreshUsage() + + if tindex.throttle(&points.Points{Metric: "sys.app.server-001.cpu", Data: []points.Point{{}, {}, {}, {}}}) { + t.Errorf("should not throttle old metric within throughput quota") + } + if !tindex.throttle(&points.Points{Metric: "sys.app.server-001.cpu", Data: []points.Point{{}, {}, {}, {}}}) { + t.Errorf("should throttle old metric exceeding throughput quota") + } + + if !tindex.throttle(&points.Points{Metric: "sys.app.server-002.cpu2", Data: []points.Point{{}, {}, {}, {}, {}, {}}}) { + t.Errorf("should throttle new metric exceeding throughput quota") + } +} diff --git a/deploy/go-carbon.conf b/deploy/go-carbon.conf index 0ac335df0..d7d2a6eb4 100644 --- a/deploy/go-carbon.conf +++ b/deploy/go-carbon.conf @@ -39,6 +39,9 @@ hash-filenames = true compressed = false # automatically delete empty whisper file caused by edge cases like server reboot remove-empty-file = false +# It's currently go-carbon only feature, not a standard graphite feature. Optional +# More details in doc/quotas.md +# quotas-file = "/etc/go-carbon/storage-quotas.conf" [cache] # Limit of in-memory stored points (not metrics) @@ -240,6 +243,9 @@ scan-frequency = "5m0s" # could be speeded up by enabling adding trigrams to trie, at the some costs of # memory usage (by setting both trie-index and trigram-index to true). trie-index = false +# Control how frequent it is to generate quota and usage metrics, and reset +# throughput counters (More details in doc/quotas.md). +# quota-usage-report-frequency = "1m" # Cache file list scan data in the specified path. This option speeds # up index building after reboot by reading the last scan result in file diff --git a/deploy/storage-quota.conf b/deploy/storage-quota.conf new file mode 100644 index 000000000..fd79eddf5 --- /dev/null +++ b/deploy/storage-quota.conf @@ -0,0 +1,24 @@ + +# This control all the namespaces under root +[*] +metrics = 1,000,000 +logical-size = 250,000,000,000 +physical-size = 50,000,000,000 +# max means practically no limit +data-points = max +throughput = max + +[sys.app.*] +metrics = 3,000,000 +logical-size = 1,500,000,000,000 +physical-size = 100,000,000,000 +data-points = 130,000,000,000 + +# This controls the root/global limits +[/] +namespaces = 20 +metrics = 10,000,000 +logical-size = 2,500,000,000,000 +physical-size = 2,500,000,000,000 +data-points = 200,000,000,000 +dropping-policy = new diff --git a/doc/quotas.md b/doc/quotas.md new file mode 100644 index 000000000..45b47fe49 --- /dev/null +++ b/doc/quotas.md @@ -0,0 +1,110 @@ +# Quotas + +In large Graphite (go-carbon) storage installations, it's desirable to have control over how many resources a user can consume. + +Go-Carbon addresses this issue by the implementation of pattern-matching based quota design. + +Limitations/Caveats: + +* The current implementation is based on concurrent/realtime trie-index, so to use quotas, also need to enable those features on go-carbon. +* Pattern matching rules aren't regexp, but like graphite query syntax, using globs. +* Quota and usage metrics are only produced if the the control has non-empty quota values. + +## Available controls + +* namespaces: how many sub-namespaces are allowed for the matched prefix/namespaces +* metrics: how many metrics are allowed for the matched prefix/namespaces +* logical-size: how many logical disk space are allowed for the matched prefix/namespaces +* physical-size: how many physical disk space are allowed for the matched prefix/namespaces +* data-points: how many data points (inferred using storage-schemas.conf) are allowed for the matched prefix/namespaces +* throughput: how many data points are allowed within the interval specified by `carbonserver.quota-usage-report-frequency` for the matched prefix/namespaces +* dropping-policy: available values includes `none` and `new`. `none` means doesn't not drop any values. `new` means dropping new metrics. `none` can be used to produce only the quota, usage, and throttle metrics without actually dropping the data. + +### Why `logical-size` and `physical-size` + +If `whisper.sparse-create` or `whisper.compressed` is enabled, logical size could be much larger than physical size. + +Distinguishing them gives us control for that scenario. + +### `data-points` and `logical-size` + +Usually, `data_points` corresponds to `logical-size` as they are both determined by its matching retention policy. + +### `throughput` + +Throughput controls how many data points are allowed within the interval specified by `carbonserver.quota-usage-report-frequency`. + +This is useful for us to monitor how many datapoints are sent to a specific patthen/namespace. It's useful for cases like some of those namespaces are generating too many datapoints and causing other namespaces to be overflown in the memory cache. + +The value is reset every interval specified by `carbonserver.quota-usage-report-frequency`. + +If `carbonserver.quota-usage-report-frequency` is 1 minute, then `throughput = 600,000`, then it means only 600k data points per minute are allowed to be pushed to the matched namespace/pattern. + +For `throughput` control, both new and old metrics might be dropped, depending their arriving order. + +## Special values + +The control values can be set to `max`/`maximum`, which is set to max int64 (9,223,372,036,854,775,807), and it practically means no limit. + +If the config is not set or set to empty ``, then no quota and usage metrics will be produced. + +## Stat Metrics + +When using quota, go-carbon will also generate usage, quota, and throttling metrics for each matched patterns. + +``` +carbon.agents.{host}.quota.metrics.sys +carbon.agents.{host}.usage.metrics.app +carbon.agents.{host}.throttle.app +... +``` + +The `carbon.agents.{host}` is from config `common.graph-prefix`. + +We can also define a prefix for the generated path, using `stat-metric-prefix` for each matched patterns. + +## How to enable Quotas + +For go-carbon.conf: + +```ini +[whisper] +quotas-file = "/etc/go-carbon/storage-quotas.conf" + +[carbonserver] +scan-frequency = "2h" +trie-index = true +concurrent-index = true +realtime-index = 65536 +quota-usage-report-frequency = "1m" +``` + +Quota config example: + +```ini +# This control all the namespaces under root +[*] +metrics = 3,000,000 +logical-size = 250,000,000,000 +physical-size = 25,000,000,000 +# max means practically no limit +data-points = max +throughput = max +stat-metric-prefix = "level1." + +[sys.app.*] +metrics = 3,000,000 +logical-size = 1,500,000,000,000 +physical-size = 250,000,000,000 +data-points = 130,000,000,000 +stat-metric-prefix = "level2." + +# This controls the root/global limits +[/] +namespaces = 20 +metrics = 10,000,000 +logical-size = 2,500,000,000,000 +physical-size = 2,500,000,000,000 +data-points = 200,000,000,000 +dropping-policy = new +``` diff --git a/persister/whisper_quota.go b/persister/whisper_quota.go new file mode 100644 index 000000000..be115de60 --- /dev/null +++ b/persister/whisper_quota.go @@ -0,0 +1,87 @@ +package persister + +import ( + "fmt" + "math" + "strconv" + "strings" +) + +// Quota represents one quota setting. +type Quota struct { + Pattern string + Namespaces int64 + Metrics int64 + LogicalSize int64 + PhysicalSize int64 + DataPoints int64 + Throughput int64 + DroppingPolicy string + StatMetricPrefix string +} + +type WhisperQuotas []Quota + +// ReadWhisperQuotas reads and parses a storage-quotas.conf file and returns the +// defined quotas. +func ReadWhisperQuotas(filename string) (WhisperQuotas, error) { + config, err := parseIniFile(filename) + if err != nil { + return nil, err + } + + var quotas WhisperQuotas + + parseInt := func(section map[string]string, name string) (int64, error) { + str := section[name] + switch str { + case "": + return 0, nil + case "maximum", "max": + return math.MaxInt64, nil + } + + v, err := strconv.ParseInt(strings.ReplaceAll(str, ",", ""), 10, 64) + if err != nil { + err = fmt.Errorf("[persister] Failed to parse %s for [%s]: %s", name, section["name"], err) + } + return v, err + } + + for _, section := range config { + var quota Quota + quota.Pattern = section["name"] + + if quota.Namespaces, err = parseInt(section, "namespaces"); err != nil { + return nil, err + } + if quota.Metrics, err = parseInt(section, "metrics"); err != nil { + return nil, err + } + if quota.LogicalSize, err = parseInt(section, "logical-size"); err != nil { + return nil, err + } + if quota.PhysicalSize, err = parseInt(section, "physical-size"); err != nil { + return nil, err + } + if quota.DataPoints, err = parseInt(section, "data-points"); err != nil { + return nil, err + } + if quota.Throughput, err = parseInt(section, "throughput"); err != nil { + return nil, err + } + + switch v := section["dropping-policy"]; v { + case "new", "none", "": + quota.DroppingPolicy = v + default: + return nil, fmt.Errorf("[persister] Unknown dropping-policy %q for [%s]", v, section["name"]) + } + + quota.StatMetricPrefix = strings.TrimPrefix(section["stat-metric-prefix"], ".") + + quotas = append(quotas, quota) + } + + return quotas, nil +}