Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
idx metrics fixes
Browse files Browse the repository at this point in the history
* split up updates vs adds. correctly categorize them
* clarify and assure that all ES/cass operation (add, update, delete) metrics also include the memory part
  (which was already the case for most, but not all operations. and was not documented well)
* operations on ES index (add/delete/update) turn into backend operations that
  get mingled into larger bulk operations, clarify the ambiguity.
  don't pretend it was an add, make it clear we can't tell for sure what
  kind of operation it was.
* similar for Cassandra: the backend executes inserts (on behalf of adds
  and updates) and deletes (on behalf of updates and deletes)
  Even if we tied back query results to the original command, e.g. we
  could increment an update success counter when an insert succeeds, if
  we track that insert was due to an update, the update might still not
  be successfull if the delete fails. So the ok/fail counters should
  really be for queries.
* also time prune operations
* specify whether durations concern one metric, or several (e.g. a pattern delete)

fix #304
  • Loading branch information
Dieterbe committed Jan 31, 2017
1 parent 89dd730 commit 5b47b0c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 37 deletions.
41 changes: 32 additions & 9 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@ const TableSchema = `CREATE TABLE IF NOT EXISTS %s.metric_idx (
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}`

var (
// metric idx.cassadra.add.ok is how many metrics are successfully being indexed
idxCasOk = stats.NewCounter32("idx.cassandra.add.ok")
// metric idx.cassandra.add.fail is how many failures were encountered while trying to index metrics
idxCasFail = stats.NewCounter32("idx.cassandra.add.fail")
// metric idx.cassandra.add is the duration of addititons to the cassandra idx
// metric idx.cassadra.query-insert.ok is how many insert queries for a metric completed successfully (triggered by an add or an update)
idxCasQueryInsertOk = stats.NewCounter32("idx.cassandra.query-insert.ok")
// metric idx.cassandra.query-insert.fail is how many insert queries for a metric failed (triggered by an add or an update)
idxCasQueryInsertFail = stats.NewCounter32("idx.cassandra.query-insert.fail")
// metric idx.cassadra.query-delete.ok is how many delete queries for a metric completed successfully (triggered by an update or a delete)
idxCasQueryDeleteOk = stats.NewCounter32("idx.cassandra.query-delete.ok")
// metric idx.cassandra.query-delete.fail is how many delete queries for a metric failed (triggered by an update or a delete)
idxCasQueryDeleteFail = stats.NewCounter32("idx.cassandra.query-delete.fail")

// metric idx.cassandra.add is the duration of an add of one metric to the cassandra idx, including the add to the in-memory index, excluding the insert query
idxCasAddDuration = stats.NewLatencyHistogram15s32("idx.cassandra.add")
// metric idx.cassandra.delete is the duration of deletions from the cassandra idx
// metric idx.cassandra.update is the duration of an update of one metric to the cassandra idx, including the update to the in-memory index, excluding any insert/delete queries
idxCasUpdateDuration = stats.NewLatencyHistogram15s32("idx.cassandra.update")
// metric idx.cassandra.prune is the duration of a prune of the cassandra idx, including the prune of the in-memory index and all needed delete queries
idxCasPruneDuration = stats.NewLatencyHistogram15s32("idx.cassandra.prune")
// metric idx.cassandra.delete is the duration of a delete of one or more metrics from the cassandra idx, including the delete from the in-memory index and the delete query
idxCasDeleteDuration = stats.NewLatencyHistogram15s32("idx.cassandra.delete")
errmetrics = cassandra.NewErrMetrics("idx.cassandra")

Expand Down Expand Up @@ -206,6 +215,7 @@ func (c *CasIdx) Stop() {
}

func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) error {
pre := time.Now()
existing, inMemory := c.MemoryIdx.Get(data.Id)

if inMemory {
Expand All @@ -219,6 +229,7 @@ func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) error {
if err != nil {
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
}
idxCasUpdateDuration.Value(time.Since(pre))
return err
}
return nil
Expand All @@ -232,13 +243,22 @@ func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) error {
}
}()
}
def := schema.MetricDefinitionFromMetricData(data)
def.Partition = partition
err := c.MemoryIdx.AddOrUpdateDef(def)
if err == nil {
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
}
idxCasUpdateDuration.Value(time.Since(pre))
return err
}
def := schema.MetricDefinitionFromMetricData(data)
def.Partition = partition
err := c.MemoryIdx.AddOrUpdateDef(def)
if err == nil {
c.writeQueue <- writeReq{recvTime: time.Now(), def: def}
}
idxCasAddDuration.Value(time.Since(pre))
return err
}

Expand Down Expand Up @@ -322,7 +342,7 @@ func (c *CasIdx) processWriteQueue() {
req.def.Tags,
req.def.LastUpdate).Exec(); err != nil {

idxCasFail.Inc()
idxCasQueryInsertFail.Inc()
errmetrics.Inc(err)
if (attempts % 20) == 0 {
log.Warn("cassandra-idx Failed to write def to cassandra. it will be retried. %s", err)
Expand All @@ -335,8 +355,7 @@ func (c *CasIdx) processWriteQueue() {
attempts++
} else {
success = true
idxCasAddDuration.Value(time.Since(req.recvTime))
idxCasOk.Inc()
idxCasQueryInsertOk.Inc()
log.Debug("cassandra-idx metricDef saved to cassandra. %s", req.def.Id)
}
}
Expand Down Expand Up @@ -367,17 +386,20 @@ func (c *CasIdx) deleteDef(def *schema.MetricDefinition) error {
attempts++
err := c.session.Query("DELETE FROM metric_idx where partition=? AND id=?", def.Partition, def.Id).Exec()
if err != nil {
idxCasQueryDeleteFail.Inc()
errmetrics.Inc(err)
log.Error(3, "cassandra-idx Failed to delete metricDef %s from cassandra. %s", def.Id, err)
time.Sleep(time.Second)
} else {
idxCasQueryDeleteOk.Inc()
return nil
}
}
return fmt.Errorf("unable to delete metricDef %s from index after %d attempts.", def.Id, attempts)
}

func (c *CasIdx) Prune(orgId int, oldest time.Time) ([]schema.MetricDefinition, error) {
pre := time.Now()
pruned, err := c.MemoryIdx.Prune(orgId, oldest)
// if an error was encountered then pruned is probably a partial list of metricDefs
// deleted, so lets still try and delete these from Cassandra.
Expand All @@ -388,6 +410,7 @@ func (c *CasIdx) Prune(orgId int, oldest time.Time) ([]schema.MetricDefinition,
log.Error(3, "cassandra-idx: %s", err.Error())
}
}
idxCasPruneDuration.Value(time.Since(pre))
return pruned, err
}

Expand Down
30 changes: 22 additions & 8 deletions idx/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ import (
)

var (
// metric idx.elasticsearch.add.ok is the number of successfull additions to the ES idx
idxEsOk = stats.NewCounter32("idx.elasticsearch.add.ok")
// metric idx.elasticsearch.add.fail is the number of failed additions to the ES idx
idxEsFail = stats.NewCounter32("idx.elasticsearch.add.fail")
// metric idx.elasticsearch.add is the duration of additions to the ES idx
// metric idx.elasticsearch.add.ok is the number of successfull add/update/delete operations on the ES idx, via a bulk operation
idxEsOk = stats.NewCounter32("idx.elasticsearch.bulk.ok")
// metric idx.elasticsearch.add.fail is the number of failed add/update/delete operations on the ES idx, via a bulk operation
idxEsFail = stats.NewCounter32("idx.elasticsearch.bulk.fail")
// metric idx.elasticsearch.add is the duration of an add of one metric, including the add to the in-memory index (note: excludes time to flush to index. see bulk)
idxEsAddDuration = stats.NewLatencyHistogram15s32("idx.elasticsearch.add")
// metric idx.elasticsearch.delete is the duration of deletes from the ES idx
// metric idx.elasticsearch.update is the duration of an update of one metric, including the update to the in-memory index (note: excludes time to flush to index. see bulk)
idxEsUpdateDuration = stats.NewLatencyHistogram15s32("idx.elasticsearch.update")
// metric idx.elasticsearch.delete is the duration of a delete of one or more metrics, including the delete in the in-memory index (note: excludes time to flush to index. see bulk)
idxEsDeleteDuration = stats.NewLatencyHistogram15s32("idx.elasticsearch.delete")
// metric idx.elasticsearch.prune is the duration of an index prune operation, including the prune of the in-memory index (note: excludes time to flush to index. see bulk)
idxEsPruneDuration = stats.NewLatencyHistogram15s32("idx.elasticsearch.prune")
// metric idx.elasticsearch.bulk is the duration of a bulk operation to the ES index, which persists any pending adds, deletes and updates
idxEsBulkDuration = stats.NewLatencyHistogram15s32("idx.elasticsearch.bulk")
// metric idx.elasticsearch.retrybuf.items is the amount of items currently in the retry buffer
retryBufItems = stats.NewGauge32("idx.elasticsearch.retrybuf.items")

Expand Down Expand Up @@ -213,6 +219,7 @@ func (e *EsIdx) Init() error {
}

func (e *EsIdx) AddOrUpdate(data *schema.MetricData, partition int32) error {
pre := time.Now()
existing, inMemory := e.MemoryIdx.Get(data.Id)

def := schema.MetricDefinitionFromMetricData(data)
Expand All @@ -231,6 +238,11 @@ func (e *EsIdx) AddOrUpdate(data *schema.MetricData, partition int32) error {
e.retryBuf.Queue(def.Id)
}
}
if inMemory {
idxEsUpdateDuration.Value(time.Since(pre))
} else {
idxEsAddDuration.Value(time.Since(pre))
}
return err
}

Expand All @@ -248,7 +260,7 @@ func (e *EsIdx) bulkSend(buf *bytes.Buffer) error {
if err := e.processEsResponse(body); err != nil {
return err
}
idxEsAddDuration.Value(time.Since(pre))
idxEsBulkDuration.Value(time.Since(pre))
return nil
}

Expand Down Expand Up @@ -369,11 +381,13 @@ func (e *EsIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition, er
}

func (e *EsIdx) Prune(orgId int, oldest time.Time) ([]schema.MetricDefinition, error) {
pre := time.Now()
pruned, err := e.MemoryIdx.Prune(orgId, oldest)
// if an error was encountered then pruned is probably a partial list of metricDefs
// deleted, so lets still try and delete these from Cassandra.
// deleted, so lets still try and delete these from Es
for _, def := range pruned {
e.BulkIndexer.Delete(esIndex, "metric_index", def.Id)
}
idxEsPruneDuration.Value(time.Since(pre))
return pruned, err
}
47 changes: 27 additions & 20 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@ import (
)

var (
// metric idx.memory.update.ok is the number of successful updates to the memory idx
idxUpdateOk = stats.NewCounter32("idx.memory.update.ok")
// metric idx.memory.add.ok is the number of successful additions to the memory idx
idxOk = stats.NewCounter32("idx.memory.add.ok")
idxAddOk = stats.NewCounter32("idx.memory.add.ok")
// metric idx.memory.add.fail is the number of failed additions to the memory idx
idxFail = stats.NewCounter32("idx.memory.add.fail")
// metric idx.memory.add is the duration of (successfull) memory idx additions
idxAddFail = stats.NewCounter32("idx.memory.add.fail")
// metric idx.memory.add is the duration of a (successful) add of a metric to the memory idx
idxAddDuration = stats.NewLatencyHistogram15s32("idx.memory.add")
// metric idx.memory.get is the duration of memory idx gets
// metric idx.memory.update is the duration of (successful) update of a metric to the memory idx
idxUpdateDuration = stats.NewLatencyHistogram15s32("idx.memory.update")
// metric idx.memory.get is the duration of a get of one metric in the memory idx
idxGetDuration = stats.NewLatencyHistogram15s32("idx.memory.get")
// metric idx.memory.list is the duration of memory idx listings
idxListDuration = stats.NewLatencyHistogram15s32("idx.memory.list")
// metric idx.memory.find is the duration of memory idx find
idxFindDuration = stats.NewLatencyHistogram15s32("idx.memory.find")
// metric idx.memory.delete is the duration of memory idx deletes
// metric idx.memory.delete is the duration of a delete of one or more metrics from the memory idx
idxDeleteDuration = stats.NewLatencyHistogram15s32("idx.memory.delete")
// metric idx.memory.prune is the duration of successful memory idx prunes
idxPruneDuration = stats.NewLatencyHistogram15s32("idx.memory.prune")

// metric idx.metrics_active is the amount of currently known metrics in the index
metricsActive = stats.NewGauge32("idx.metrics_active")
Expand Down Expand Up @@ -63,14 +69,14 @@ func (n *Node) String() string {
// Implements the the "MetricIndex" interface
type MemoryIdx struct {
sync.RWMutex
FailedDefs map[string]error
FailedAdds map[string]error // by metric id
DefById map[string]*schema.MetricDefinition
Tree map[int]*Tree
}

func New() *MemoryIdx {
return &MemoryIdx{
FailedDefs: make(map[string]error),
FailedAdds: make(map[string]error),
DefById: make(map[string]*schema.MetricDefinition),
Tree: make(map[int]*Tree),
}
Expand All @@ -88,21 +94,21 @@ func (m *MemoryIdx) AddOrUpdate(data *schema.MetricData, partition int32) error
pre := time.Now()
m.Lock()
defer m.Unlock()
err, ok := m.FailedDefs[data.Id]
err, ok := m.FailedAdds[data.Id]
if ok {
// if it failed before, it would fail again.
// there's not much point in doing the work of trying over
// and over again, and flooding the logs with the same failure.
// so just trigger the stats metric as if we tried again
idxFail.Inc()
idxAddFail.Inc()
return err
}
existing, ok := m.DefById[data.Id]
if ok {
log.Debug("metricDef with id %s already in index.", data.Id)
existing.LastUpdate = data.Time
idxOk.Inc()
idxAddDuration.Value(time.Since(pre))
idxUpdateOk.Inc()
idxUpdateDuration.Value(time.Since(pre))
return nil
}

Expand Down Expand Up @@ -147,8 +153,8 @@ func (m *MemoryIdx) AddOrUpdateDef(def *schema.MetricDefinition) error {
if _, ok := m.DefById[def.Id]; ok {
log.Debug("memory-idx: metricDef with id %s already in index.", def.Id)
m.DefById[def.Id] = def
idxOk.Inc()
idxAddDuration.Value(time.Since(pre))
idxUpdateOk.Inc()
idxUpdateDuration.Value(time.Since(pre))
return nil
}
err := m.add(def)
Expand Down Expand Up @@ -182,14 +188,14 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) error {
if !node.Leaf {
//bad data. A path cant be both a leaf and a branch.
log.Info("memory-idx: Bad data, a path can not be both a leaf and a branch. %d - %s", def.OrgId, path)
m.FailedDefs[def.Id] = idx.BothBranchAndLeaf
idxFail.Inc()
m.FailedAdds[def.Id] = idx.BothBranchAndLeaf
idxAddFail.Inc()
return idx.BothBranchAndLeaf
}
log.Debug("memory-idx: existing index entry for %s. Adding %s as child", path, def.Id)
node.Children = append(node.Children, def.Id)
m.DefById[def.Id] = def
idxOk.Inc()
idxAddOk.Inc()
return nil
}
}
Expand All @@ -209,8 +215,8 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) error {
if n, ok := tree.Items[branch]; ok {
if n.Leaf {
log.Info("memory-idx: Branches cant be added to a leaf node. %d - %s", def.OrgId, path)
m.FailedDefs[def.Id] = idx.BranchUnderLeaf
idxFail.Inc()
m.FailedAdds[def.Id] = idx.BranchUnderLeaf
idxAddFail.Inc()
return idx.BranchUnderLeaf
}
log.Debug("memory-idx: Found branch %s which metricDef %s is a descendant of", branch, path)
Expand Down Expand Up @@ -249,7 +255,7 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) error {
Children: []string{def.Id},
}
m.DefById[def.Id] = def
idxOk.Inc()
idxAddOk.Inc()
return nil
}

Expand Down Expand Up @@ -482,7 +488,7 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition
// by deleting one or more nodes in the tree, any defs that previously failed may now
// be able to be added. An easy way to support this is just reset this map and give them
// all a chance again
m.FailedDefs = make(map[string]error)
m.FailedAdds = make(map[string]error)

for _, f := range found {
deleted, err := m.delete(orgId, f)
Expand Down Expand Up @@ -615,6 +621,7 @@ func (m *MemoryIdx) Prune(orgId int, oldest time.Time) ([]schema.MetricDefinitio
if orgId == -1 {
log.Info("memory-idx: pruning stale metricDefs from memory for all orgs took %s", time.Since(pre).String())
}
idxPruneDuration.Value(time.Since(pre))
return pruned, nil
}

Expand Down

0 comments on commit 5b47b0c

Please sign in to comment.