Skip to content

Commit

Permalink
Introduces a quota subsystem
Browse files Browse the repository at this point in the history
The quota subsystem is made to improve control, reliability and visibility of go-carbon.
It is not a standard graphite component, but it is backward compatible and
could be turned on optionally.

Caveat: the current implementation only supports concurrent/realtime trie index.

The quota subsystem allows user to control how many resources can be consumed
on a patter-matching based basis.

Implemented controls include: data points (based retention policy), disk size
(logical and physical), throughput, metric count, and namespaces (i.e. immediate
sub-directory count).

More details could be found in doc/quotas.md in the PR.

An example configuration:

```ini
# This control all the namespaces under root
[*]
metrics       =       1,000,000
logical_size  = 250,000,000,000
physical_size =  50,000,000,000
# max means practically no limit
data_points   =             max
throughput    =             max

[sys.app.*]
metrics       =         3,000,000
logical_size  = 1,500,000,000,000
physical_size =   100,000,000,000
data_points   =   130,000,000,000

# This controls the root/global limits
[/]
namespaces    =                20
metrics       =        10,000,000
logical_size  = 2,500,000,000,000
physical_size = 2,500,000,000,000
data_points   =   200,000,000,000
dropping_policy = new
```

Throttling control is implemented in `carbonserver`, while quota config
is implemented in persister (mainly for convenience).
  • Loading branch information
bom-d-van committed Jul 23, 2021
1 parent 3a20e05 commit 30cd0cd
Show file tree
Hide file tree
Showing 13 changed files with 1,438 additions and 46 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Usage of go-carbon:
-version=false: Print version
```

```toml
```ini
[common]
# Run as user. Works only in daemon mode
user = "carbon"
Expand All @@ -101,6 +101,9 @@ data-dir = "/var/lib/graphite/whisper"
schemas-file = "/etc/go-carbon/storage-schemas.conf"
# http://graphite.readthedocs.org/en/latest/config-carbon.html#storage-aggregation-conf. Optional
aggregation-file = "/etc/go-carbon/storage-aggregation.conf"
# It's currently go-carbon only feature, not a standard graphite feature. Optional
# More details in doc/quotas.md
# quotas-file = "/etc/go-carbon/storage-quotas.conf"
# Worker threads count. Metrics sharded by "crc32(metricName) % workers"
workers = 8
# Limits the number of whisper update_many() calls per second. 0 - no limit
Expand Down Expand Up @@ -327,6 +330,9 @@ scan-frequency = "5m0s"
# could be speeded up by enabling adding trigrams to trie, at the some costs of
# memory usage (by setting both trie-index and trigram-index to true).
trie-index = false
# Control how frequent it is to generate quota and usage metrics, and reset
# throughput counters (More details in doc/quotas.md).
# quota-usage-report-frequency = "1m"

# Cache file list scan data in the specified path. This option speeds
# up index building after reboot by reading the last scan result in file
Expand Down
8 changes: 8 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Cache struct {
}

newMetricsChan chan string

throttle func(*points.Points) bool
}

// A "thread" safe string to anything map.
Expand Down Expand Up @@ -266,6 +268,10 @@ func (c *Cache) Add(p *points.Points) {
return
}

if c.throttle != nil && c.throttle(p) {
return
}

if s.tagsEnabled {
var err error
p.Metric, err = tags.Normalize(p.Metric)
Expand Down Expand Up @@ -367,3 +373,5 @@ func (c *Cache) GetRecentNewMetrics() []map[string]struct{} {
}
return metricNames
}

func (c *Cache) SetThrottle(throttle func(*points.Points) bool) { c.throttle = throttle }
33 changes: 33 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package carbon

import (
"errors"
"fmt"
"net"
"net/url"
Expand Down Expand Up @@ -99,6 +100,13 @@ func (app *App) configure() error {
return err
}

if cfg.Whisper.QuotasFilename != "" {
cfg.Whisper.Quotas, err = persister.ReadWhisperQuotas(cfg.Whisper.QuotasFilename)
if err != nil {
return err
}
}

if cfg.Whisper.AggregationFilename != "" {
cfg.Whisper.Aggregation, err = persister.ReadWhisperAggregation(cfg.Whisper.AggregationFilename)
if err != nil {
Expand Down Expand Up @@ -437,6 +445,7 @@ func (app *App) Start(version string) (err error) {
return
}

// TODO: refactor: do not use var name the same as pkg name
carbonserver := carbonserver.NewCarbonserverListener(core.Get)
carbonserver.SetWhisperData(conf.Whisper.DataDir)
carbonserver.SetMaxGlobs(conf.Carbonserver.MaxGlobs)
Expand All @@ -449,6 +458,7 @@ func (app *App) Start(version string) (err error) {
carbonserver.SetBuckets(conf.Carbonserver.Buckets)
carbonserver.SetMetricsAsCounters(conf.Carbonserver.MetricsAsCounters)
carbonserver.SetScanFrequency(conf.Carbonserver.ScanFrequency.Value())
carbonserver.SetQuotaUsageReportFrequency(conf.Carbonserver.QuotaUsageReportFrequency.Value())
carbonserver.SetReadTimeout(conf.Carbonserver.ReadTimeout.Value())
carbonserver.SetIdleTimeout(conf.Carbonserver.IdleTimeout.Value())
carbonserver.SetWriteTimeout(conf.Carbonserver.WriteTimeout.Value())
Expand All @@ -463,6 +473,29 @@ func (app *App) Start(version string) (err error) {
carbonserver.SetPercentiles(conf.Carbonserver.Percentiles)
// carbonserver.SetQueryTimeout(conf.Carbonserver.QueryTimeout.Value())

if app.Config.Whisper.Quotas != nil {
if !conf.Carbonserver.ConcurrentIndex || conf.Carbonserver.RealtimeIndex <= 0 {
return errors.New("concurrent-index and realtime-index needs to be enabled for quota control.")
}

carbonserver.SetEstimateSize(func(metric string) (size, dataPoints int64) {
schema, ok := app.Config.Whisper.Schemas.Match(metric)
if !ok {
// Why not configurable: go-carbon users
// should always make sure that there is a default retention policy.
return 4096 + 172800*12, 172800 // 2 days of secondly data
}

for _, r := range schema.Retentions {
dataPoints += int64(r.NumberOfPoints())
}
return 4096 + dataPoints*12, dataPoints
})

carbonserver.SetQuotas(app.Config.getCarbonserverQuotas())
core.SetThrottle(carbonserver.ShouldThrottleMetric)
}

var setConfigRetriever bool
if conf.Carbonserver.CacheScan {
core.InitCacheScanAdds()
Expand Down
1 change: 0 additions & 1 deletion carbon/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func NewCollector(app *App) *Collector {
}
})
})

}

sendCallback := func(moduleName string) func(metric string, value float64) {
Expand Down
26 changes: 26 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/go-graphite/go-carbon/carbonserver"
"github.com/go-graphite/go-carbon/persister"
"github.com/go-graphite/go-carbon/receiver/tcp"
"github.com/go-graphite/go-carbon/receiver/udp"
Expand Down Expand Up @@ -56,6 +57,7 @@ type whisperConfig struct {
DataDir string `toml:"data-dir"`
SchemasFilename string `toml:"schemas-file"`
AggregationFilename string `toml:"aggregation-file"`
QuotasFilename string `toml:"quotas-file"`
Workers int `toml:"workers"`
MaxUpdatesPerSecond int `toml:"max-updates-per-second"`
MaxCreatesPerSecond int `toml:"max-creates-per-second"`
Expand All @@ -67,6 +69,7 @@ type whisperConfig struct {
HashFilenames bool `toml:"hash-filenames"`
Schemas persister.WhisperSchemas
Aggregation *persister.WhisperAggregation
Quotas persister.WhisperQuotas
RemoveEmptyFile bool `toml:"remove-empty-file"`
}

Expand Down Expand Up @@ -121,6 +124,8 @@ type carbonserverConfig struct {
ConcurrentIndex bool `toml:"concurrent-index"`
RealtimeIndex int `toml:"realtime-index"`
FileListCache string `toml:"file-list-cache"`

QuotaUsageReportFrequency *Duration `toml:"quota-usage-report-frequency"`
}

type pprofConfig struct {
Expand Down Expand Up @@ -213,6 +218,9 @@ func NewConfig() *Config {
ScanFrequency: &Duration{
Duration: 300 * time.Second,
},
QuotaUsageReportFrequency: &Duration{
Duration: 60 * time.Second,
},
ReadTimeout: &Duration{
Duration: 60 * time.Second,
},
Expand Down Expand Up @@ -376,3 +384,21 @@ retentions = 60:43200,3600:43800`), 0644)

return configFile
}

func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
for _, q := range c.Whisper.Quotas {
quotas = append(quotas, &carbonserver.Quota{
Pattern: q.Pattern,
Namespaces: q.Namespaces,
Metrics: q.Metrics,
DataPoints: q.DataPoints,
LogicalSize: q.LogicalSize,
PhysicalSize: q.PhysicalSize,
Throughput: q.Throughput,
DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy),
StatMetricPrefix: q.StatMetricPrefix,
})
}

return quotas
}
Loading

0 comments on commit 30cd0cd

Please sign in to comment.