Skip to content

Commit

Permalink
feat: metrics for cache subsystem (#22915)
Browse files Browse the repository at this point in the history
* fix: drop complicated cache metrics and document remaining

* feat: metrics for cache
  • Loading branch information
lesam authored Nov 23, 2021
1 parent a74e051 commit feb459c
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 167 deletions.
2 changes: 1 addition & 1 deletion cmd/influxd/inspect/build_tsi/build_tsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i
}
} else {
log.Debug("Building cache from wal files")
cache := tsm1.NewCache(maxCacheSize)
cache := tsm1.NewCache(maxCacheSize, tsm1.EngineTags{}) // tags are for metrics only
loader := tsm1.NewCacheLoader(walPaths)
loader.WithLogger(log)
if err := loader.Load(cache); err != nil {
Expand Down
206 changes: 102 additions & 104 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -143,25 +144,6 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) {
return e.values.InfluxQLType()
}

// Statistics gathered by the Cache.
const (
// levels - point in time measures

statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time

// counters - accumulative measures

statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots

statCacheWriteOK = "writeOk"
statCacheWriteErr = "writeErr"
statCacheWriteDropped = "writeDropped"
)

// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
Expand Down Expand Up @@ -196,8 +178,7 @@ type Cache struct {
// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
snapshotAttempts int

stats *CacheStatistics
lastSnapshot time.Time
stats *cacheMetrics
lastWriteTime time.Time

// A one time synchronization used to initial the cache with a store. Since the store can allocate a
Expand All @@ -208,52 +189,103 @@ type Cache struct {

// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots.
func NewCache(maxSize uint64) *Cache {
// Note tags are for metrics only, so if metrics are not desired tags do not have to be set.
func NewCache(maxSize uint64, tags EngineTags) *Cache {
c := &Cache{
maxSize: maxSize,
store: emptyStore{},
stats: &CacheStatistics{},
lastSnapshot: time.Now(),
maxSize: maxSize,
store: emptyStore{},
stats: newCacheMetrics(tags),
}
c.stats.LastSnapshot.SetToCurrentTime()
c.initialize.Store(&sync.Once{})
c.UpdateAge()
c.UpdateCompactTime(0)
c.updateCachedBytes(0)
c.updateMemSize(0)
c.updateSnapshots()
return c
}

// CacheStatistics hold statistics related to the cache.
type CacheStatistics struct {
MemSizeBytes int64
DiskSizeBytes int64
SnapshotCount int64
CacheAgeMs int64
CachedBytes int64
WALCompactionTimeMs int64
WriteOK int64
WriteErr int64
WriteDropped int64
}

// Statistics returns statistics for periodic monitoring.
func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "tsm1_cache",
Tags: tags,
Values: map[string]interface{}{
statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes),
statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes),
statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount),
statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs),
statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes),
statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK),
statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr),
statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped),
},
}}
var globalCacheMetrics = newAllCacheMetrics()

const cacheSubsystem = "cache"

type allCacheMetrics struct {
MemBytes *prometheus.GaugeVec
DiskBytes *prometheus.GaugeVec
LastSnapshot *prometheus.GaugeVec
Writes *prometheus.CounterVec
WriteErr *prometheus.CounterVec
WriteDropped *prometheus.CounterVec
}

type cacheMetrics struct {
MemBytes prometheus.Gauge
DiskBytes prometheus.Gauge
LastSnapshot prometheus.Gauge
Writes prometheus.Counter
WriteErr prometheus.Counter
WriteDropped prometheus.Counter
}

func newAllCacheMetrics() *allCacheMetrics {
labels := engineLabelNames()
return &allCacheMetrics{
MemBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "inuse_bytes",
Help: "Gauge of current memory consumption of cache",
}, labels),
DiskBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "disk_bytes",
Help: "Gauge of size of most recent snapshot",
}, labels),
LastSnapshot: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "latest_snapshot",
Help: "Unix time of most recent snapshot",
}, labels),
Writes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "writes_total",
Help: "Counter of all writes to cache",
}, labels),
WriteErr: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "writes_err",
Help: "Counter of failed writes to cache",
}, labels),
WriteDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: storageNamespace,
Subsystem: cacheSubsystem,
Name: "writes_dropped",
Help: "Counter of writes to cache with some dropped points",
}, labels),
}
}

func CacheCollectors() []prometheus.Collector {
return []prometheus.Collector{
globalCacheMetrics.MemBytes,
globalCacheMetrics.DiskBytes,
globalCacheMetrics.LastSnapshot,
globalCacheMetrics.Writes,
globalCacheMetrics.WriteErr,
globalCacheMetrics.WriteDropped,
}
}

func newCacheMetrics(tags EngineTags) *cacheMetrics {
labels := tags.getLabels()
return &cacheMetrics{
MemBytes: globalCacheMetrics.MemBytes.With(labels),
DiskBytes: globalCacheMetrics.DiskBytes.With(labels),
LastSnapshot: globalCacheMetrics.LastSnapshot.With(labels),
Writes: globalCacheMetrics.Writes.With(labels),
WriteErr: globalCacheMetrics.WriteErr.With(labels),
WriteDropped: globalCacheMetrics.WriteDropped.With(labels),
}
}

// init initializes the cache and allocates the underlying store. Once initialized,
Expand Down Expand Up @@ -286,6 +318,7 @@ func (c *Cache) Free() {
// error will be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
c.init()
c.stats.Writes.Inc()
var addedSize uint64
for _, v := range values {
addedSize += uint64(Values(v).Size())
Expand All @@ -295,7 +328,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
c.stats.WriteErr.Inc()
return ErrCacheMemorySizeLimitExceeded(n, limit)
}

Expand Down Expand Up @@ -323,13 +356,12 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// Some points in the batch were dropped. An error is returned so
// error stat is incremented as well.
if werr != nil {
atomic.AddInt64(&c.stats.WriteDropped, 1)
atomic.AddInt64(&c.stats.WriteErr, 1)
c.stats.WriteDropped.Inc()
c.stats.WriteErr.Inc()
}

// Update the memory size stat
c.updateMemSize(int64(addedSize))
atomic.AddInt64(&c.stats.WriteOK, 1)
c.stats.MemBytes.Set(float64(c.Size()))

c.mu.Lock()
c.lastWriteTime = time.Now()
Expand Down Expand Up @@ -382,10 +414,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
// Reset the cache's store.
c.store.reset()
atomic.StoreUint64(&c.size, 0)
c.lastSnapshot = time.Now()

c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
c.updateSnapshots()
c.stats.LastSnapshot.SetToCurrentTime()

return c.snapshot, nil
}
Expand Down Expand Up @@ -423,16 +452,15 @@ func (c *Cache) ClearSnapshot(success bool) {

if success {
c.snapshotAttempts = 0
c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache

// Reset the snapshot to a fresh Cache.
c.snapshot = &Cache{
store: c.snapshot.store,
}

c.stats.DiskBytes.Set(float64(atomic.LoadUint64(&c.snapshotSize)))
atomic.StoreUint64(&c.snapshotSize, 0)
c.updateSnapshots()
}
c.stats.MemBytes.Set(float64(c.Size()))
}

// Size returns the number of point-calcuated bytes the cache currently uses.
Expand Down Expand Up @@ -613,7 +641,7 @@ func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {

c.decreaseSize(origSize - uint64(e.size()))
}
atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size()))
c.stats.MemBytes.Set(float64(c.Size()))
}

// SetMaxSize updates the memory limit of the cache.
Expand Down Expand Up @@ -740,29 +768,6 @@ func (c *Cache) LastWriteTime() time.Time {
return c.lastWriteTime
}

// UpdateAge updates the age statistic based on the current time.
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond)
atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat)
}

// UpdateCompactTime updates WAL compaction time statistic based on d.
func (c *Cache) UpdateCompactTime(d time.Duration) {
atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond))
}

// updateCachedBytes increases the cachedBytes counter by b.
func (c *Cache) updateCachedBytes(b uint64) {
atomic.AddInt64(&c.stats.CachedBytes, int64(b))
}

// updateMemSize updates the memSize level by b.
func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
}

const (
valueTypeUndefined = 0
valueTypeFloat64 = 1
Expand All @@ -789,13 +794,6 @@ func valueType(v Value) byte {
}
}

// updateSnapshots updates the snapshotsCount and the diskSize levels.
func (c *Cache) updateSnapshots() {
// Update disk stats
atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize)))
atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts))
}

type emptyStore struct{}

func (e emptyStore) entry(key []byte) *entry { return nil }
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/cache_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000)
c := tsm1.NewCache(1000000, tsm1.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestCacheRace(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000)
c := tsm1.NewCache(1000000, tsm1.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestCacheRace2Compacters(t *testing.T) {
}

wg := sync.WaitGroup{}
c := tsm1.NewCache(1000000)
c := tsm1.NewCache(1000000, tsm1.EngineTags{})

ch := make(chan struct{})
for _, s := range series {
Expand Down
Loading

0 comments on commit feb459c

Please sign in to comment.