Skip to content

Commit

Permalink
Cosmos.Prometheus: Replace app tag with custom tags (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Apr 28, 2021
1 parent ad83a6e commit 4bb8e8f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Cosmos.Discovery.Endpoint`: Extracts Uri for diagnostic purposes [#284](https://github.com/jet/equinox/issues/284)
- `Cosmos.Prometheus.LogSink`: Generalized `app` tag to arbitrary custom tags [#287](https://github.com/jet/equinox/issues/287)

### Changed

Expand Down
161 changes: 85 additions & 76 deletions src/Equinox.Cosmos.Prometheus/CosmosPrometheus.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,110 +7,119 @@ module private Impl =

module private Histograms =

let labelNames = [| "facet"; "op"; "app"; "db"; "con"; "cat" |]
let labelNames tagNames = Array.append tagNames [| "facet"; "op"; "db"; "con"; "cat" |]
let labelValues tagValues (facet, op, db, con, cat) = Array.append tagValues [| facet; op; db; con; cat |]
let private mkHistogram (cfg : Prometheus.HistogramConfiguration) name desc =
let h = Prometheus.Metrics.CreateHistogram(name, desc, cfg)
fun (facet : string, op : string) app (db, con, cat : string) s ->
h.WithLabels(facet, op, app, db, con, cat).Observe(s)
fun tagValues (facet : string, op : string) (db, con, cat : string) s ->
h.WithLabels(labelValues tagValues (facet, op, db, con, cat)).Observe(s)
// Given we also have summary metrics with equivalent labels, we focus the bucketing on LAN latencies
let private sHistogram =
let private sHistogram tagNames =
let sBuckets = [| 0.0005; 0.001; 0.002; 0.004; 0.008; 0.016; 0.5; 1.; 2.; 4.; 8. |]
let sCfg = Prometheus.HistogramConfiguration(Buckets = sBuckets, LabelNames = labelNames)
let sCfg = Prometheus.HistogramConfiguration(Buckets = sBuckets, LabelNames = labelNames tagNames)
mkHistogram sCfg
let private ruHistogram =
let private ruHistogram tagNames =
let ruBuckets = Prometheus.Histogram.ExponentialBuckets(1., 2., 11) // 1 .. 1024
let ruCfg = Prometheus.HistogramConfiguration(Buckets = ruBuckets, LabelNames = labelNames)
let ruCfg = Prometheus.HistogramConfiguration(Buckets = ruBuckets, LabelNames = labelNames tagNames)
mkHistogram ruCfg
let sAndRuPair stat desc =
let sAndRuPair (tagNames, tagValues) stat desc =
let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
let observeS = sHistogram (baseName + "_seconds") (baseDesc + " latency")
let observeRu = ruHistogram (baseName + "_ru") (baseDesc + " charge")
fun (facet, op) app (db, con, cat, s : System.TimeSpan, ru) ->
observeS (facet, op) app (db, con, cat) s.TotalSeconds
observeRu (facet, op) app (db, con, cat) ru
let observeS = sHistogram tagNames (baseName + "_seconds") (baseDesc + " latency")
let observeRu = ruHistogram tagNames (baseName + "_ru") (baseDesc + " charge")
fun (facet, op) (db, con, cat, s : System.TimeSpan, ru) ->
observeS tagValues (facet, op) (db, con, cat) s.TotalSeconds
observeRu tagValues (facet, op) (db, con, cat) ru

module private Summaries =

let labelNames = [| "facet"; "app"; "db"; "con" |]
let labelNames tagNames = Array.append tagNames [| "facet"; "db"; "con" |]
let labelValues tagValues (facet, db, con) = Array.append tagValues [| facet; db; con |]
let private mkSummary (cfg : Prometheus.SummaryConfiguration) name desc =
let s = Prometheus.Metrics.CreateSummary(name, desc, cfg)
fun (facet : string) app (db, con) o -> s.WithLabels(facet, app, db, con).Observe(o)
let config =
fun tagValues (facet : string) (db, con) o -> s.WithLabels(labelValues tagValues (facet, db, con)).Observe(o)
let config tagNames =
let inline qep q e = Prometheus.QuantileEpsilonPair(q, e)
let objectives = [| qep 0.50 0.05; qep 0.95 0.01; qep 0.99 0.01 |]
Prometheus.SummaryConfiguration(Objectives = objectives, LabelNames = labelNames, MaxAge = System.TimeSpan.FromMinutes 1.)
let sAndRuPair stat desc =
Prometheus.SummaryConfiguration(Objectives = objectives, LabelNames = labelNames tagNames, MaxAge = System.TimeSpan.FromMinutes 1.)
let sAndRuPair (tagNames, tagValues) stat desc =
let baseName, baseDesc = Impl.baseName stat, Impl.baseDesc desc
let observeS = mkSummary config (baseName + "_seconds") (baseDesc + " latency")
let observeRu = mkSummary config (baseName + "_ru") (baseDesc + " charge")
fun facet app (db, con, s : System.TimeSpan, ru) ->
observeS facet app (db, con) s.TotalSeconds
observeRu facet app (db, con) ru
let observeS = mkSummary (config tagNames) (baseName + "_seconds") (baseDesc + " latency") tagValues
let observeRu = mkSummary (config tagNames) (baseName + "_ru") (baseDesc + " charge") tagValues
fun facet (db, con, s : System.TimeSpan, ru) ->
observeS facet (db, con) s.TotalSeconds
observeRu facet (db, con) ru

module private Counters =

let labelNames = [| "facet"; "op"; "outcome"; "app"; "db"; "con"; "cat" |]
let labelNames tagNames = Array.append tagNames [| "facet"; "op"; "outcome"; "db"; "con"; "cat" |]
let private mkCounter (cfg : Prometheus.CounterConfiguration) name desc =
let h = Prometheus.Metrics.CreateCounter(name, desc, cfg)
fun (facet : string, op : string, outcome : string) app (db, con, cat) c ->
h.WithLabels(facet, op, outcome, app, db, con, cat).Inc(c)
let config = Prometheus.CounterConfiguration(LabelNames = labelNames)
let total stat desc =
fun tagValues (facet : string, op : string, outcome : string) (db, con, cat) c ->
h.WithLabels(facet, op, outcome, db, con, cat).Inc(c)
let config tagNames = Prometheus.CounterConfiguration(LabelNames = labelNames tagNames)
let total (tagNames, tagValues) stat desc =
let name = Impl.baseName (stat + "_total")
let desc = Impl.baseDesc desc
mkCounter config name desc
let eventsAndBytesPair stat desc =
let observeE = total (stat + "_events") (desc + "Events")
let observeB = total (stat + "_bytes") (desc + "Bytes")
fun ctx app (db, con, cat, e, b) ->
observeE ctx app (db, con, cat) e
match b with None -> () | Some b -> observeB ctx app (db, con, cat) b

module private Stats =

let opHistogram = Histograms.sAndRuPair "op" "Operation"
let roundtripHistogram = Histograms.sAndRuPair "roundtrip" "Fragment"
let opSummary = Summaries.sAndRuPair "op_summary" "Operation Summary"
let roundtripSummary = Summaries.sAndRuPair "roundtrip_summary" "Fragment Summary"
let payloadCounters = Counters.eventsAndBytesPair "payload" "Payload, "
let cacheCounter = Counters.total "cache" "Cache"

let observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru) =
opHistogram (facet, op) app (db, con, cat, s, ru)
opSummary facet app (db, con, s, ru)
let observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, count, bytes) =
observeLatencyAndCharge (facet, op) app (db, con, cat, s, ru)
payloadCounters (facet, op, outcome) app (db, con, cat, float count, if bytes = -1 then None else Some (float bytes))

let inline (|CatSRu|) ({ interval = i; ru = ru } : Equinox.Cosmos.Store.Log.Measurement as m) =
mkCounter (config tagNames) name desc tagValues
let eventsAndBytesPair tags stat desc =
let observeE = total tags (stat + "_events") (desc + "Events")
let observeB = total tags (stat + "_bytes") (desc + "Bytes")
fun ctx (db, con, cat, e, b) ->
observeE ctx (db, con, cat) e
match b with None -> () | Some b -> observeB ctx (db, con, cat) b

open Equinox.Cosmos.Store.Log

/// Extracts metrics embedded in Equinox log messages, and maps those to prometheus-net metrics.
/// Acts as a virtual Serilog Sink.
type LogSink(customKeys, customValues) =

do if Array.length customKeys <> Array.length customValues then invalidArg "customKeys" "Unmatched tag/value counts"

let tags = (customKeys, customValues)
let opHistogram = Histograms.sAndRuPair tags "op" "Operation"
let roundtripHistogram = Histograms.sAndRuPair tags "roundtrip" "Fragment"
let opSummary = Summaries.sAndRuPair tags "op_summary" "Operation Summary"
let roundtripSummary = Summaries.sAndRuPair tags "roundtrip_summary" "Fragment Summary"
let payloadCounters = Counters.eventsAndBytesPair tags "payload" "Payload, "
let cacheCounter = Counters.total tags "cache" "Cache"

let observeLatencyAndCharge (facet, op) (db, con, cat, s, ru) =
opHistogram (facet, op) (db, con, cat, s, ru)
opSummary facet (db, con, s, ru)
let observeLatencyAndChargeWithEventCounts (facet, op, outcome) (db, con, cat, s, ru, count, bytes) =
observeLatencyAndCharge (facet, op) (db, con, cat, s, ru)
payloadCounters (facet, op, outcome) (db, con, cat, float count, if bytes = -1 then None else Some (float bytes))

let (|CatSRu|) ({ interval = i; ru = ru } : Equinox.Cosmos.Store.Log.Measurement as m) =
let cat, _id = FsCodec.StreamName.splitCategoryAndId (FSharp.UMX.UMX.tag m.stream)
m.database, m.container, cat, i.Elapsed, ru
let observeRes (facet, _op as stat) app (CatSRu (db, con, cat, s, ru)) =
roundtripHistogram stat app (db, con, cat, s, ru)
roundtripSummary facet app (db, con, s, ru)
let observe_ stat app (CatSRu (db, con, cat, s, ru)) =
observeLatencyAndCharge stat app (db, con, cat, s, ru)
let observe (facet, op, outcome) app (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
let observeTip (facet, op, outcome, cacheOutcome) app (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) app (db, con, cat, s, ru, m.count, m.bytes)
cacheCounter (facet, op, cacheOutcome) app (db, con, cat) 1.
let observeRes (facet, _op as stat) (CatSRu (db, con, cat, s, ru)) =
roundtripHistogram stat (db, con, cat, s, ru)
roundtripSummary facet (db, con, s, ru)
let observe_ stat (CatSRu (db, con, cat, s, ru)) =
observeLatencyAndCharge stat (db, con, cat, s, ru)
let observe (facet, op, outcome) (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) (db, con, cat, s, ru, m.count, m.bytes)
let observeTip (facet, op, outcome, cacheOutcome) (CatSRu (db, con, cat, s, ru) as m) =
observeLatencyAndChargeWithEventCounts (facet, op, outcome) (db, con, cat, s, ru, m.count, m.bytes)
cacheCounter (facet, op, cacheOutcome) (db, con, cat) 1.

open Equinox.Cosmos.Store.Log
/// Emit metrics with a single custom tag
new appName = LogSink([| "app" |],[| appName |])

type LogSink(app) =
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| MetricEvent cm -> cm |> function
| Op (Operation.Tip, m) -> Stats.observeTip ("query", "tip", "ok", "200") app m
| Op (Operation.Tip404, m) -> Stats.observeTip ("query", "tip", "ok", "404") app m
| Op (Operation.Tip302, m) -> Stats.observeTip ("query", "tip", "ok", "302") app m
| Op (Operation.Query, m) -> Stats.observe ("query", "query", "ok") app m
| QueryRes (_direction, m) -> Stats.observeRes ("query", "queryPage") app m
| Op (Operation.Write, m) -> Stats.observe ("transact", "sync", "ok") app m
| Op (Operation.Conflict, m) -> Stats.observe ("transact", "conflict", "conflict") app m
| Op (Operation.Resync, m) -> Stats.observe ("transact", "resync", "conflict") app m
| Op (Operation.Prune, m) -> Stats.observe_ ("prune", "pruneQuery") app m
| PruneRes ( m) -> Stats.observeRes ("prune", "pruneQueryPage") app m
| Op (Operation.Delete, m) -> Stats.observe ("prune", "delete", "ok") app m
| Op (Operation.Tip, m) -> observeTip ("query", "tip", "ok", "200") m
| Op (Operation.Tip404, m) -> observeTip ("query", "tip", "ok", "404") m
| Op (Operation.Tip302, m) -> observeTip ("query", "tip", "ok", "302") m
| Op (Operation.Query, m) -> observe ("query", "query", "ok") m
| QueryRes (_direction, m) -> observeRes ("query", "queryPage") m
| Op (Operation.Write, m) -> observe ("transact", "sync", "ok") m
| Op (Operation.Conflict, m) -> observe ("transact", "conflict", "conflict") m
| Op (Operation.Resync, m) -> observe ("transact", "resync", "conflict") m
| Op (Operation.Prune, m) -> observe_ ("prune", "pruneQuery") m
| PruneRes ( m) -> observeRes ("prune", "pruneQueryPage") m
| Op (Operation.Delete, m) -> observe ("prune", "delete", "ok") m
| _ -> ()

0 comments on commit 4bb8e8f

Please sign in to comment.