Skip to content

Commit

Permalink
storage: aggregate iterator stats
Browse files Browse the repository at this point in the history
Aggregate the iterator stats across all of an engine's iterators. Expose seven
new timeseries metrics for visibility into the behavior of storage engine iterators:

  - storage.iterator.block-load.bytes
  - storage.iterator.block-load.cached-bytes
  - storage.iterator.block-load.read-duration
  - storage.iterator.external.seeks
  - storage.iterator.external.steps
  - storage.iterator.internal.seeks
  - storage.iterator.internal.steps

Close #95790.
Epic: None
Release note (ops change): Introduces seven new timeseries metrics for better
visibility into the behavior of storage engine iterators and their internals.
  • Loading branch information
jbowens committed Mar 28, 2023
1 parent 7657c00 commit d901bc0
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 47 deletions.
75 changes: 75 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,60 @@ Raft applied index at which this checkpoint was taken.`,
Unit: metric.Unit_COUNT,
}

metaBlockBytes = metric.Metadata{
Name: "storage.iterator.block-load.bytes",
Help: "Bytes loaded by storage engine iterators (possibly cached). See storage.AggregatedIteratorStats for details.",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaBlockBytesInCache = metric.Metadata{
Name: "storage.iterator.block-load.cached-bytes",
Help: "Bytes loaded by storage engine iterators from the block cache. See storage.AggregatedIteratorStats for details.",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaBlockReadDuration = metric.Metadata{
Name: "storage.iterator.block-load.read-duration",
Help: "Cumulative time storage engine iterators spent loading blocks from durable storage. See storage.AggregatedIteratorStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaIterExternalSeeks = metric.Metadata{
Name: "storage.iterator.external.seeks",
Help: "Cumulative count of seeks performed on storage engine iterators. See storage.AggregatedIteratorStats for details.",
Measurement: "Iterator Ops",
Unit: metric.Unit_COUNT,
}
metaIterExternalSteps = metric.Metadata{
Name: "storage.iterator.external.steps",
Help: "Cumulative count of steps performed on storage engine iterators. See storage.AggregatedIteratorStats for details.",
Measurement: "Iterator Ops",
Unit: metric.Unit_COUNT,
}
metaIterInternalSeeks = metric.Metadata{
Name: "storage.iterator.internal.seeks",
Help: `Cumulative count of seeks performed internally within storage engine iterators.
A value high relative to 'storage.iterator.external.seeks'
is a good indication that there's an accumulation of garbage
internally within the storage engine.
See storage.AggregatedIteratorStats for details.`,
Measurement: "Iterator Ops",
Unit: metric.Unit_COUNT,
}
metaIterInternalSteps = metric.Metadata{
Name: "storage.iterator.internal.steps",
Help: `Cumulative count of steps performed internally within storage engine iterators.
A value high relative to 'storage.iterator.external.steps'
is a good indication that there's an accumulation of garbage
internally within the storage engine.
See storage.AggregatedIteratorStats for more details.`,
Measurement: "Iterator Ops",
Unit: metric.Unit_COUNT,
}
metaSharedStorageBytesWritten = metric.Metadata{
Name: "storage.shared-storage.write",
Help: "Bytes written to external storage",
Expand Down Expand Up @@ -1876,6 +1930,13 @@ type StoreMetrics struct {
RdbWriteStallNanos *metric.Gauge
SharedStorageBytesRead *metric.Gauge
SharedStorageBytesWritten *metric.Gauge
IterBlockBytes *metric.Gauge
IterBlockBytesInCache *metric.Gauge
IterBlockReadDuration *metric.Gauge
IterExternalSeeks *metric.Gauge
IterExternalSteps *metric.Gauge
IterInternalSeeks *metric.Gauge
IterInternalSteps *metric.Gauge
FlushableIngestCount *metric.Gauge
FlushableIngestTableCount *metric.Gauge
FlushableIngestTableSize *metric.Gauge
Expand Down Expand Up @@ -2421,6 +2482,13 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RdbLevelScore: rdbLevelScore,
RdbWriteStalls: metric.NewGauge(metaRdbWriteStalls),
RdbWriteStallNanos: metric.NewGauge(metaRdbWriteStallNanos),
IterBlockBytes: metric.NewGauge(metaBlockBytes),
IterBlockBytesInCache: metric.NewGauge(metaBlockBytesInCache),
IterBlockReadDuration: metric.NewGauge(metaBlockReadDuration),
IterExternalSeeks: metric.NewGauge(metaIterExternalSeeks),
IterExternalSteps: metric.NewGauge(metaIterExternalSteps),
IterInternalSeeks: metric.NewGauge(metaIterInternalSeeks),
IterInternalSteps: metric.NewGauge(metaIterInternalSteps),
SharedStorageBytesRead: metric.NewGauge(metaSharedStorageBytesRead),
SharedStorageBytesWritten: metric.NewGauge(metaSharedStorageBytesWritten),
FlushableIngestCount: metric.NewGauge(metaFlushableIngestCount),
Expand Down Expand Up @@ -2753,6 +2821,13 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.RdbWriteStallNanos.Update(m.WriteStallDuration.Nanoseconds())
sm.DiskSlow.Update(m.DiskSlowCount)
sm.DiskStalled.Update(m.DiskStallCount)
sm.IterBlockBytes.Update(int64(m.Iterator.BlockBytes))
sm.IterBlockBytesInCache.Update(int64(m.Iterator.BlockBytesInCache))
sm.IterBlockReadDuration.Update(int64(m.Iterator.BlockReadDuration))
sm.IterExternalSeeks.Update(int64(m.Iterator.ExternalSeeks))
sm.IterExternalSteps.Update(int64(m.Iterator.ExternalSteps))
sm.IterInternalSeeks.Update(int64(m.Iterator.InternalSeeks))
sm.IterInternalSteps.Update(int64(m.Iterator.InternalSteps))
sm.SharedStorageBytesRead.Update(m.SharedStorageReadBytes)
sm.SharedStorageBytesWritten.Update(m.SharedStorageWriteBytes)
sm.RdbL0Sublevels.Update(int64(m.Levels[0].Sublevels))
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/spanset/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/pebbleiter",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down
64 changes: 54 additions & 10 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ type EngineIterator interface {
// CloneContext is an opaque type encapsulating sufficient context to construct
// a clone of an existing iterator.
type CloneContext struct {
rawIter pebbleiter.Iterator
rawIter pebbleiter.Iterator
reportStats func(IteratorStats)
}

// IterOptions contains options used to create an {MVCC,Engine}Iterator.
Expand Down Expand Up @@ -468,6 +469,7 @@ type IterOptions struct {
// Range keys themselves are not affected by the masking, and will be
// emitted as normal.
RangeKeyMaskingBelow hlc.Timestamp

// useL6Filters allows the caller to opt into reading filter blocks for
// L6 sstables. Only for use with Prefix = true. Helpful if a lot of prefix
// Seeks are expected in quick succession, that are also likely to not
Expand Down Expand Up @@ -1019,15 +1021,7 @@ type WriteBatch interface {
// *pebble.Metrics struct, which has its own documentation.
type Metrics struct {
*pebble.Metrics
// WriteStallCount counts the number of times Pebble intentionally delayed
// incoming writes. Currently, the only two reasons for this to happen are:
// - "memtable count limit reached"
// - "L0 file count limit exceeded"
//
// We do not split this metric across these two reasons, but they can be
// distinguished in the pebble logs.
WriteStallCount int64
WriteStallDuration time.Duration
Iterator AggregatedIteratorStats
// DiskSlowCount counts the number of times Pebble records disk slowness.
DiskSlowCount int64
// DiskStallCount counts the number of times Pebble observes slow writes
Expand All @@ -1037,6 +1031,56 @@ type Metrics struct {
SharedStorageWriteBytes int64
// SharedStorageReadBytes counts the number of bytes read from shared storage.
SharedStorageReadBytes int64
// WriteStallCount counts the number of times Pebble intentionally delayed
// incoming writes. Currently, the only two reasons for this to happen are:
// - "memtable count limit reached"
// - "L0 file count limit exceeded"
//
// We do not split this metric across these two reasons, but they can be
// distinguished in the pebble logs.
WriteStallCount int64
WriteStallDuration time.Duration
}

// AggregatedIteratorStats holds cumulative stats, collected and summed over all
// of an engine's iterators.
type AggregatedIteratorStats struct {
// BlockBytes holds the sum of sizes of all loaded blocks. If the block was
// compressed, this is the compressed bytes. This value includes blocks that
// were loaded from the cache, and bytes that needed to be read from
// persistent storage.
//
// Currently, there may be some gaps in coverage. (At the time of writing,
// 2nd-level index blocks are excluded.)
BlockBytes uint64
// BlockBytesInCache holds the subset of BlockBytes that were already in the
// block cache, requiring no I/O.
BlockBytesInCache uint64
// BlockReadDuration accumulates the duration spent fetching blocks due to
// block cache misses.
//
// Currently, there may be some gaps in coverage. (At the time of writing,
// range deletion and range key blocks, meta index blocks and properties
// blocks are all excluded.)
BlockReadDuration time.Duration
// ExternalSeeks is the total count of seeks in forward and backward
// directions performed on pebble.Iterators.
ExternalSeeks int
// ExternalSteps is the total count of relative positioning operations (eg,
// Nexts, Prevs, NextPrefix, NextWithLimit, etc) in forward and backward
// directions performed on pebble.Iterators.
ExternalSteps int
// InternalSeeks is the total count of steps in forward and backward
// directions performed on Pebble's internal iterator. If this is high
// relative to ExternalSeeks, it's a good indication that there's an
// accumulation of garbage within the LSM (NOT MVCC garbage).
InternalSeeks int
// InternalSteps is the total count of relative positioning operations (eg,
// Nexts, Prevs, NextPrefix, etc) in forward and backward directions
// performed on pebble's internal iterator. If this is high relative to
// ExternalSteps, it's a good indication that there's an accumulation of
// garbage within the LSM (NOT MVCC garbage).
InternalSteps int
}

// MetricsForInterval is a set of pebble.Metrics that need to be saved in order to
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator
if reader.ConsistentIterators() {
iter = maybeUnwrapUnsafeIter(reader.NewMVCCIterator(MVCCKeyIterKind, opts)).(*pebbleIterator)
} else {
cloneCtx := intentIter.CloneContext()
iter = newPebbleIteratorByCloning(cloneCtx.rawIter, opts, StandardDurability)
iter = newPebbleIteratorByCloning(intentIter.CloneContext(), opts, StandardDurability)
}

*iiIter = intentInterleavingIter{
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5052,8 +5052,7 @@ func MVCCResolveWriteIntentRange(
mvccIter = rw.NewMVCCIterator(MVCCKeyIterKind, iterOpts)
} else {
// For correctness, we need mvccIter to be consistent with engineIter.
cloneCtx := engineIter.CloneContext()
mvccIter = newPebbleIteratorByCloning(cloneCtx.rawIter, iterOpts, StandardDurability)
mvccIter = newPebbleIteratorByCloning(engineIter.CloneContext(), iterOpts, StandardDurability)
}
iterAndBuf := GetBufUsingIter(mvccIter)
defer func() {
Expand Down
56 changes: 42 additions & 14 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,10 @@ type Pebble struct {
diskStallCount int64
sharedBytesRead int64
sharedBytesWritten int64
iterStats struct {
syncutil.Mutex
AggregatedIteratorStats
}

// Relevant options copied over from pebble.Options.
unencryptedFS vfs.FS
Expand Down Expand Up @@ -1306,6 +1310,21 @@ func (p *Pebble) Close() {
}
}

// aggregateIterStats is propagated to all of an engine's iterators, aggregating
// iterator stats when an iterator is closed or its stats are reset. These
// aggregated stats are exposed through GetMetrics.
func (p *Pebble) aggregateIterStats(stats IteratorStats) {
p.iterStats.Lock()
defer p.iterStats.Unlock()
p.iterStats.BlockBytes += stats.Stats.InternalStats.BlockBytes
p.iterStats.BlockBytesInCache += stats.Stats.InternalStats.BlockBytesInCache
p.iterStats.BlockReadDuration += stats.Stats.InternalStats.BlockReadDuration
p.iterStats.ExternalSeeks += stats.Stats.ForwardSeekCount[pebble.InterfaceCall] + stats.Stats.ReverseSeekCount[pebble.InterfaceCall]
p.iterStats.ExternalSteps += stats.Stats.ForwardStepCount[pebble.InterfaceCall] + stats.Stats.ReverseStepCount[pebble.InterfaceCall]
p.iterStats.InternalSeeks += stats.Stats.ForwardSeekCount[pebble.InternalIterCall] + stats.Stats.ReverseSeekCount[pebble.InternalIterCall]
p.iterStats.InternalSteps += stats.Stats.ForwardStepCount[pebble.InternalIterCall] + stats.Stats.ReverseStepCount[pebble.InternalIterCall]
}

// Closed implements the Engine interface.
func (p *Pebble) Closed() bool {
return p.closed
Expand Down Expand Up @@ -1338,13 +1357,13 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt
return maybeWrapInUnsafeIter(iter)
}

iter := newPebbleIterator(p.db, opts, StandardDurability)
iter := newPebbleIterator(p.db, opts, StandardDurability, p.aggregateIterStats)
return maybeWrapInUnsafeIter(iter)
}

// NewEngineIterator implements the Engine interface.
func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator {
return newPebbleIterator(p.db, opts, StandardDurability)
return newPebbleIterator(p.db, opts, StandardDurability, p.aggregateIterStats)
}

// ConsistentIterators implements the Engine interface.
Expand Down Expand Up @@ -1729,16 +1748,19 @@ func (p *Pebble) Flush() error {

// GetMetrics implements the Engine interface.
func (p *Pebble) GetMetrics() Metrics {
m := p.db.Metrics()
return Metrics{
Metrics: m,
m := Metrics{
Metrics: p.db.Metrics(),
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
WriteStallDuration: time.Duration(atomic.LoadInt64((*int64)(&p.writeStallDuration))),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
SharedStorageReadBytes: atomic.LoadInt64(&p.sharedBytesRead),
SharedStorageWriteBytes: atomic.LoadInt64(&p.sharedBytesWritten),
}
p.iterStats.Lock()
m.Iterator = p.iterStats.AggregatedIteratorStats
p.iterStats.Unlock()
return m
}

// GetEncryptionRegistries implements the Engine interface.
Expand Down Expand Up @@ -1834,7 +1856,7 @@ func (p *Pebble) GetAuxiliaryDir() string {

// NewBatch implements the Engine interface.
func (p *Pebble) NewBatch() Batch {
return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings)
return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings, p.aggregateIterStats)
}

// NewReadOnly implements the Engine interface.
Expand All @@ -1844,12 +1866,12 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter {

// NewUnindexedBatch implements the Engine interface.
func (p *Pebble) NewUnindexedBatch() Batch {
return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings)
return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings, p.aggregateIterStats)
}

// NewWriteBatch implements the Engine interface.
func (p *Pebble) NewWriteBatch() WriteBatch {
return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings)
return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings, p.aggregateIterStats)
}

// NewSnapshot implements the Engine interface.
Expand Down Expand Up @@ -2158,13 +2180,16 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
iter = &p.prefixIter
}
if iter.inuse {
return newPebbleIteratorByCloning(p.iter, opts, p.durability)
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
reportStats: p.parent.aggregateIterStats,
}, opts, p.durability)
}

if iter.iter != nil {
iter.setOptions(opts, p.durability)
} else {
iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability)
iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability, p.parent.aggregateIterStats)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand All @@ -2188,13 +2213,16 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
iter = &p.prefixEngineIter
}
if iter.inuse {
return newPebbleIteratorByCloning(p.iter, opts, p.durability)
return newPebbleIteratorByCloning(CloneContext{
rawIter: p.iter,
reportStats: p.parent.aggregateIterStats,
}, opts, p.durability)
}

if iter.iter != nil {
iter.setOptions(opts, p.durability)
} else {
iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability)
iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability, p.parent.aggregateIterStats)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down Expand Up @@ -2383,13 +2411,13 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
return maybeWrapInUnsafeIter(iter)
}

iter := MVCCIterator(newPebbleIterator(p.snapshot, opts, StandardDurability))
iter := MVCCIterator(newPebbleIterator(p.snapshot, opts, StandardDurability, p.parent.aggregateIterStats))
return maybeWrapInUnsafeIter(iter)
}

// NewEngineIterator implements the Reader interface.
func (p pebbleSnapshot) NewEngineIterator(opts IterOptions) EngineIterator {
return newPebbleIterator(p.snapshot, opts, StandardDurability)
return newPebbleIterator(p.snapshot, opts, StandardDurability, p.parent.aggregateIterStats)
}

// ConsistentIterators implements the Reader interface.
Expand Down
Loading

0 comments on commit d901bc0

Please sign in to comment.