From 5a4f67be0387fc882ce05c17cf699650c2328035 Mon Sep 17 00:00:00 2001 From: Reasno Date: Mon, 30 Aug 2021 16:40:09 +0800 Subject: [PATCH 1/6] feat: Use type safe metrics struct --- observability/doc.go | 2 +- observability/metrics.go | 363 ++++++++++++++++------------ observability/observability.go | 3 +- observability/observability_test.go | 23 +- otgorm/gorm_metrics.go | 46 +++- otkafka/reader_metrics.go | 86 ++++--- otkafka/writer_metrics.go | 98 ++++---- otredis/metrics.go | 61 +++-- srvgrpc/metrics.go | 48 ++++ srvgrpc/metrics_test.go | 28 +++ srvhttp/metrics.go | 60 +++++ srvhttp/metrics_test.go | 28 +++ 12 files changed, 573 insertions(+), 273 deletions(-) create mode 100644 srvgrpc/metrics_test.go create mode 100644 srvhttp/metrics_test.go diff --git a/observability/doc.go b/observability/doc.go index a779d01d..adf90d5d 100644 --- a/observability/doc.go +++ b/observability/doc.go @@ -1,5 +1,5 @@ /* -Package observability provides a tracer and a histogram to measure all incoming +Package observability provides a tracer and a Histogram to measure all incoming requests to the system. Introduction diff --git a/observability/metrics.go b/observability/metrics.go index 7653edff..c211a12c 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -1,353 +1,406 @@ package observability import ( - "sync" - + "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/otgorm" "github.com/DoNewsCode/core/otkafka" "github.com/DoNewsCode/core/otredis" + "github.com/DoNewsCode/core/srvgrpc" + "github.com/DoNewsCode/core/srvhttp" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" ) -type histogram struct { - once sync.Once - *prometheus.Histogram +// MetricsIn is the injection parameter of most metrics constructors in the observability package. +type MetricsIn struct { + di.In + + Registerer stdprometheus.Registerer `optional:"true"` } -var his histogram +// ProvideHTTPRequestDurationSeconds returns a metrics.Histogram that is designed to measure incoming HTTP requests +// to the system. Note it has three labels: "module", "service", "route". If any label is missing, +// the system will panic. +func ProvideHTTPRequestDurationSeconds(in MetricsIn) *srvhttp.RequestDurationSeconds { + http := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "Total time spent serving requests.", + }, []string{"module", "service", "route"}) + + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + in.Registerer.MustRegister(http) + + return &srvhttp.RequestDurationSeconds{ + Histogram: prometheus.NewHistogram(http), + } +} -// ProvideHistogramMetrics returns a metrics.Histogram that is designed to measure incoming requests -// to the system. Note it has three labels: "module", "service", "method". If any label is missing, +// ProvideGRPCRequestDurationSeconds returns a metrics.Histogram that is designed to measure incoming GRPC requests +// to the system. Note it has three labels: "module", "service", "route". If any label is missing, // the system will panic. -func ProvideHistogramMetrics() metrics.Histogram { - his.once.Do(func() { - his.Histogram = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Name: "http_request_duration_seconds", - Help: "Total time spent serving requests.", - }, []string{"module", "service", "method"}) - }) - return &his +func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSeconds { + grpc := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ + Name: "grpc_request_duration_seconds", + Help: "Total time spent serving requests.", + }, []string{"module", "service", "route"}) + + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + in.Registerer.MustRegister(grpc) + + return &srvgrpc.RequestDurationSeconds{ + Histogram: prometheus.NewHistogram(grpc), + } } // ProvideGORMMetrics returns a *otgorm.Gauges that measures the connection info in databases. // It is meant to be consumed by the otgorm.Providers. -func ProvideGORMMetrics() *otgorm.Gauges { +func ProvideGORMMetrics(in MetricsIn) *otgorm.Gauges { + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } return &otgorm.Gauges{ - Idle: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Idle: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "gorm_idle_connections", Help: "number of idle connections", - }, []string{"dbname", "driver"}), - Open: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname", "driver"}, in.Registerer), + Open: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "gorm_open_connections", Help: "number of open connections", - }, []string{"dbname", "driver"}), - InUse: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname", "driver"}, in.Registerer), + InUse: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "gorm_in_use_connections", Help: "number of in use connections", - }, []string{"dbname", "driver"}), + }, []string{"dbname", "driver"}, in.Registerer), } } -// ProvideRedisMetrics returns a *otredis.Gauges that measures the connection info in redis. +// ProvideRedisMetrics returns a RedisMetrics that measures the connection info in redis. // It is meant to be consumed by the otredis.Providers. -func ProvideRedisMetrics() *otredis.Gauges { +func ProvideRedisMetrics(in MetricsIn) *otredis.Gauges { + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } return &otredis.Gauges{ - Hits: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Hits: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_hit_connections", Help: "number of times free connection was found in the pool", - }, []string{"dbname"}), - Misses: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + Misses: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_miss_connections", Help: "number of times free connection was NOT found in the pool", - }, []string{"dbname"}), - Timeouts: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + Timeouts: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_timeout_connections", Help: "number of times a wait timeout occurred", - }, []string{"dbname"}), - TotalConns: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + TotalConns: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_total_connections", Help: "number of total connections in the pool", - }, []string{"dbname"}), - IdleConns: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + IdleConns: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_idle_connections", Help: "number of idle connections in the pool", - }, []string{"dbname"}), - StaleConns: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + StaleConns: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_stale_connections", Help: "number of stale connections removed from the pool", - }, []string{"dbname"}), + }, []string{"dbname"}, in.Registerer), } } // ProvideKafkaReaderMetrics returns a *otkafka.ReaderStats that measures the reader info in kafka. // It is meant to be consumed by the otkafka.Providers. -func ProvideKafkaReaderMetrics() *otkafka.ReaderStats { +func ProvideKafkaReaderMetrics(in MetricsIn) *otkafka.ReaderStats { labels := []string{"reader", "client_id", "topic", "partition"} + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + return &otkafka.ReaderStats{ - Dials: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Dials: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_dial_count", Help: "", - }, labels), - Fetches: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Fetches: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_fetch_count", Help: "", - }, labels), - Messages: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Messages: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_message_count", Help: "", - }, labels), - Bytes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Bytes: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_message_bytes", Help: "", - }, labels), - Rebalances: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Rebalances: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_rebalance_count", Help: "", - }, labels), - Timeouts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Timeouts: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_timeout_count", Help: "", - }, labels), - Errors: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Errors: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_error_count", Help: "", - }, labels), - Offset: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Offset: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_offset", Help: "", - }, labels), - Lag: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Lag: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_lag", Help: "", - }, labels), - MinBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MinBytes: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_bytes_min", Help: "", - }, labels), - MaxBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxBytes: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_bytes_max", Help: "", - }, labels), - MaxWait: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxWait: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_wait_max", Help: "", - }, labels), - QueueLength: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + QueueLength: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_queue_length", Help: "", - }, labels), - QueueCapacity: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + QueueCapacity: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_queue_capacity", Help: "", - }, labels), + }, labels, in.Registerer), DialTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_dial_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_dial_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_dial_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, ReadTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_read_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_read_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_read_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, WaitTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_wait_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_wait_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_wait_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, FetchSize: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_size_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_size_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_size_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, FetchBytes: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_bytes_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_bytes_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_bytes_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, } } // ProvideKafkaWriterMetrics returns a *otkafka.WriterStats that measures the writer info in kafka. // It is meant to be consumed by the otkafka.Providers. -func ProvideKafkaWriterMetrics() *otkafka.WriterStats { +func ProvideKafkaWriterMetrics(in MetricsIn) *otkafka.WriterStats { labels := []string{"writer", "topic"} + + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + return &otkafka.WriterStats{ - Writes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Writes: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_write_count", Help: "", - }, labels), - Messages: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Messages: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_message_count", Help: "", - }, labels), - Bytes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Bytes: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_message_bytes", Help: "", - }, labels), - Errors: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Errors: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_error_count", Help: "", - }, labels), - MaxAttempts: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxAttempts: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_attempts_max", Help: "", - }, labels), - MaxBatchSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxBatchSize: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_max", Help: "", - }, labels), - BatchTimeout: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + BatchTimeout: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_timeout", Help: "", - }, labels), - ReadTimeout: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + ReadTimeout: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_read_timeout", Help: "", - }, labels), - WriteTimeout: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + WriteTimeout: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_timeout", Help: "", - }, labels), - RequiredAcks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + RequiredAcks: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_acks_required", Help: "", - }, labels), - Async: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Async: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_async", Help: "", - }, labels), + }, labels, in.Registerer), BatchTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, WriteTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, WaitTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_wait_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_wait_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_wait_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, Retries: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_retries_count_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_retries_count_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_retries_count_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, BatchSize: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_size_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_size_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_size_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, BatchBytes: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_bytes_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_bytes_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_bytes_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, } } + +func newCounterFrom(opts stdprometheus.CounterOpts, labelNames []string, registerer stdprometheus.Registerer) metrics.Counter { + cv := stdprometheus.NewCounterVec(opts, labelNames) + registerer.MustRegister(cv) + return prometheus.NewCounter(cv) +} + +func newGaugeFrom(opts stdprometheus.GaugeOpts, labelNames []string, registerer stdprometheus.Registerer) metrics.Gauge { + cv := stdprometheus.NewGaugeVec(opts, labelNames) + registerer.MustRegister(cv) + return prometheus.NewGauge(cv) +} diff --git a/observability/observability.go b/observability/observability.go index 18451fde..d76345ec 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -22,7 +22,8 @@ func Providers() di.Deps { return di.Deps{ ProvideJaegerLogAdapter, ProvideOpentracing, - ProvideHistogramMetrics, + ProvideHTTPRequestDurationSeconds, + ProvideGRPCRequestDurationSeconds, ProvideGORMMetrics, ProvideRedisMetrics, ProvideKafkaReaderMetrics, diff --git a/observability/observability_test.go b/observability/observability_test.go index 24a7498d..89f3cd9d 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" + "github.com/prometheus/client_golang/prometheus" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" "gorm.io/gorm" @@ -33,8 +34,26 @@ func TestProvideOpentracing(t *testing.T) { } func TestProvideHistogramMetrics(t *testing.T) { - Out := ProvideHistogramMetrics() - assert.NotNil(t, Out) + for _, c := range []struct { + name string + registerer prometheus.Registerer + }{ + { + "default", + nil, + }, + { + "provided registerer", + prometheus.NewPedanticRegistry(), + }, + } { + t.Run(c.name, func(t *testing.T) { + http := ProvideHTTPRequestDurationSeconds(MetricsIn{Registerer: c.registerer}) + assert.NotNil(t, http) + grpc := ProvideGRPCRequestDurationSeconds(MetricsIn{Registerer: c.registerer}) + assert.NotNil(t, grpc) + }) + } } func TestProvideGORMMetrics(t *testing.T) { diff --git a/otgorm/gorm_metrics.go b/otgorm/gorm_metrics.go index 3c4e6e59..78e8a197 100644 --- a/otgorm/gorm_metrics.go +++ b/otgorm/gorm_metrics.go @@ -1,6 +1,7 @@ package otgorm import ( + "database/sql" "time" "github.com/go-kit/kit/metrics" @@ -18,6 +19,38 @@ type Gauges struct { Idle metrics.Gauge InUse metrics.Gauge Open metrics.Gauge + + dbName string + driver string +} + +// DBName sets the dbname label of metrics. +func (g Gauges) DBName(dbName string) Gauges { + g.dbName = dbName + return g +} + +// Driver sets the driver label of metrics. +func (g Gauges) Driver(driver string) Gauges { + g.driver = driver + return g +} + +// Observe records the DBStats collected. It should be called periodically. +func (g Gauges) Observe(stats sql.DBStats) Gauges { + withValues := []string{"dbname", g.dbName, "driver", g.driver} + g.Idle. + With(withValues...). + Set(float64(stats.Idle)) + + g.InUse. + With(withValues...). + Set(float64(stats.InUse)) + + g.Open. + With(withValues...). + Set(float64(stats.OpenConnections)) + return g } // newCollector creates a new database wrapper containing the name of the database, @@ -36,17 +69,6 @@ func (d *collector) collectConnectionStats() { conn := v.Conn.(*gorm.DB) db, _ := conn.DB() stats := db.Stats() - withValues := []string{"dbname", k, "driver", conn.Name()} - d.gauges.Idle. - With(withValues...). - Set(float64(stats.Idle)) - - d.gauges.InUse. - With(withValues...). - Set(float64(stats.InUse)) - - d.gauges.Open. - With(withValues...). - Set(float64(stats.OpenConnections)) + d.gauges.DBName(k).Driver(conn.Name()).Observe(stats) } } diff --git a/otkafka/reader_metrics.go b/otkafka/reader_metrics.go index 43e50612..355377ef 100644 --- a/otkafka/reader_metrics.go +++ b/otkafka/reader_metrics.go @@ -43,6 +43,54 @@ type ReaderStats struct { WaitTime AggStats FetchSize AggStats FetchBytes AggStats + + reader string +} + +// Reader sets the writer label in WriterStats. +func (r ReaderStats) Reader(reader string) ReaderStats { + r.reader = reader + return r +} + +// Observe records the reader stats. It should be called periodically. +func (r ReaderStats) Observe(stats kafka.ReaderStats) { + withValues := []string{"reader", r.reader, "client_id", stats.ClientID, "topic", stats.Topic, "partition", stats.Partition} + r.Dials.With(withValues...).Add(float64(stats.Dials)) + r.Fetches.With(withValues...).Add(float64(stats.Fetches)) + r.Messages.With(withValues...).Add(float64(stats.Messages)) + r.Bytes.With(withValues...).Add(float64(stats.Bytes)) + r.Rebalances.With(withValues...).Add(float64(stats.Rebalances)) + r.Timeouts.With(withValues...).Add(float64(stats.Timeouts)) + r.Errors.With(withValues...).Add(float64(stats.Errors)) + + r.Offset.With(withValues...).Set(float64(stats.Offset)) + r.Lag.With(withValues...).Set(float64(stats.Lag)) + r.MinBytes.With(withValues...).Set(float64(stats.MinBytes)) + r.MaxBytes.With(withValues...).Set(float64(stats.MaxBytes)) + r.MaxWait.With(withValues...).Set(stats.MaxWait.Seconds()) + r.QueueLength.With(withValues...).Set(float64(stats.QueueLength)) + r.QueueCapacity.With(withValues...).Set(float64(stats.QueueCapacity)) + + r.DialTime.Min.With(withValues...).Set(stats.DialTime.Min.Seconds()) + r.DialTime.Max.With(withValues...).Set(stats.DialTime.Max.Seconds()) + r.DialTime.Avg.With(withValues...).Set(stats.DialTime.Avg.Seconds()) + + r.ReadTime.Min.With(withValues...).Set(stats.ReadTime.Min.Seconds()) + r.ReadTime.Max.With(withValues...).Set(stats.ReadTime.Max.Seconds()) + r.ReadTime.Avg.With(withValues...).Set(stats.ReadTime.Avg.Seconds()) + + r.WaitTime.Min.With(withValues...).Set(stats.WaitTime.Min.Seconds()) + r.WaitTime.Max.With(withValues...).Set(stats.WaitTime.Max.Seconds()) + r.WaitTime.Avg.With(withValues...).Set(stats.WaitTime.Avg.Seconds()) + + r.FetchSize.Min.With(withValues...).Set(float64(stats.FetchSize.Min)) + r.FetchSize.Max.With(withValues...).Set(float64(stats.FetchSize.Max)) + r.FetchSize.Avg.With(withValues...).Set(float64(stats.FetchSize.Avg)) + + r.FetchBytes.Min.With(withValues...).Set(float64(stats.FetchBytes.Min)) + r.FetchBytes.Max.With(withValues...).Set(float64(stats.FetchBytes.Max)) + r.FetchBytes.Avg.With(withValues...).Set(float64(stats.FetchBytes.Avg)) } // newCollector creates a new kafka reader wrapper containing the name of the reader. @@ -59,42 +107,6 @@ func (d *readerCollector) collectConnectionStats() { for k, v := range d.factory.List() { reader := v.Conn.(*kafka.Reader) stats := reader.Stats() - withValues := []string{"reader", k, "client_id", stats.ClientID, "topic", stats.Topic, "partition", stats.Partition} - - d.stats.Dials.With(withValues...).Add(float64(stats.Dials)) - d.stats.Fetches.With(withValues...).Add(float64(stats.Fetches)) - d.stats.Messages.With(withValues...).Add(float64(stats.Messages)) - d.stats.Bytes.With(withValues...).Add(float64(stats.Bytes)) - d.stats.Rebalances.With(withValues...).Add(float64(stats.Rebalances)) - d.stats.Timeouts.With(withValues...).Add(float64(stats.Timeouts)) - d.stats.Errors.With(withValues...).Add(float64(stats.Errors)) - - d.stats.Offset.With(withValues...).Set(float64(stats.Offset)) - d.stats.Lag.With(withValues...).Set(float64(stats.Lag)) - d.stats.MinBytes.With(withValues...).Set(float64(stats.MinBytes)) - d.stats.MaxBytes.With(withValues...).Set(float64(stats.MaxBytes)) - d.stats.MaxWait.With(withValues...).Set(stats.MaxWait.Seconds()) - d.stats.QueueLength.With(withValues...).Set(float64(stats.QueueLength)) - d.stats.QueueCapacity.With(withValues...).Set(float64(stats.QueueCapacity)) - - d.stats.DialTime.Min.With(withValues...).Set(stats.DialTime.Min.Seconds()) - d.stats.DialTime.Max.With(withValues...).Set(stats.DialTime.Max.Seconds()) - d.stats.DialTime.Avg.With(withValues...).Set(stats.DialTime.Avg.Seconds()) - - d.stats.ReadTime.Min.With(withValues...).Set(stats.ReadTime.Min.Seconds()) - d.stats.ReadTime.Max.With(withValues...).Set(stats.ReadTime.Max.Seconds()) - d.stats.ReadTime.Avg.With(withValues...).Set(stats.ReadTime.Avg.Seconds()) - - d.stats.WaitTime.Min.With(withValues...).Set(stats.WaitTime.Min.Seconds()) - d.stats.WaitTime.Max.With(withValues...).Set(stats.WaitTime.Max.Seconds()) - d.stats.WaitTime.Avg.With(withValues...).Set(stats.WaitTime.Avg.Seconds()) - - d.stats.FetchSize.Min.With(withValues...).Set(float64(stats.FetchSize.Min)) - d.stats.FetchSize.Max.With(withValues...).Set(float64(stats.FetchSize.Max)) - d.stats.FetchSize.Avg.With(withValues...).Set(float64(stats.FetchSize.Avg)) - - d.stats.FetchBytes.Min.With(withValues...).Set(float64(stats.FetchBytes.Min)) - d.stats.FetchBytes.Max.With(withValues...).Set(float64(stats.FetchBytes.Max)) - d.stats.FetchBytes.Avg.With(withValues...).Set(float64(stats.FetchBytes.Avg)) + d.stats.Reader(k).Observe(stats) } } diff --git a/otkafka/writer_metrics.go b/otkafka/writer_metrics.go index c737f0cf..48b064a3 100644 --- a/otkafka/writer_metrics.go +++ b/otkafka/writer_metrics.go @@ -34,6 +34,61 @@ type WriterStats struct { Retries AggStats BatchSize AggStats BatchBytes AggStats + + writer string +} + +// Writer sets the writer label in WriterStats. +func (w WriterStats) Writer(writer string) WriterStats { + w.writer = writer + return w +} + +// Observe records the writer stats. It should called periodically. +func (w WriterStats) Observe(stats kafka.WriterStats) WriterStats { + withValues := []string{"writer", w.writer, "topic", stats.Topic} + + w.Writes.With(withValues...).Add(float64(stats.Writes)) + w.Messages.With(withValues...).Add(float64(stats.Messages)) + w.Bytes.With(withValues...).Add(float64(stats.Bytes)) + w.Errors.With(withValues...).Add(float64(stats.Errors)) + + w.BatchTime.Min.With(withValues...).Add(stats.BatchTime.Min.Seconds()) + w.BatchTime.Max.With(withValues...).Add(stats.BatchTime.Max.Seconds()) + w.BatchTime.Avg.With(withValues...).Add(stats.BatchTime.Avg.Seconds()) + + w.WriteTime.Min.With(withValues...).Add(stats.WriteTime.Min.Seconds()) + w.WriteTime.Max.With(withValues...).Add(stats.WriteTime.Max.Seconds()) + w.WriteTime.Avg.With(withValues...).Add(stats.WriteTime.Avg.Seconds()) + + w.WaitTime.Min.With(withValues...).Add(stats.WaitTime.Min.Seconds()) + w.WaitTime.Max.With(withValues...).Add(stats.WaitTime.Max.Seconds()) + w.WaitTime.Avg.With(withValues...).Add(stats.WaitTime.Avg.Seconds()) + + w.Retries.Min.With(withValues...).Add(float64(stats.Retries.Min)) + w.Retries.Max.With(withValues...).Add(float64(stats.Retries.Max)) + w.Retries.Avg.With(withValues...).Add(float64(stats.Retries.Avg)) + + w.BatchSize.Min.With(withValues...).Add(float64(stats.BatchSize.Min)) + w.BatchSize.Max.With(withValues...).Add(float64(stats.BatchSize.Max)) + w.BatchSize.Avg.With(withValues...).Add(float64(stats.BatchSize.Avg)) + + w.BatchBytes.Min.With(withValues...).Add(float64(stats.BatchBytes.Min)) + w.BatchBytes.Max.With(withValues...).Add(float64(stats.BatchBytes.Max)) + w.BatchBytes.Avg.With(withValues...).Add(float64(stats.BatchBytes.Avg)) + + w.MaxAttempts.With(withValues...).Set(float64(stats.MaxAttempts)) + w.MaxBatchSize.With(withValues...).Set(float64(stats.MaxBatchSize)) + w.BatchTimeout.With(withValues...).Set(stats.BatchTimeout.Seconds()) + w.ReadTimeout.With(withValues...).Set(stats.ReadTimeout.Seconds()) + w.WriteTimeout.With(withValues...).Set(stats.WriteTimeout.Seconds()) + w.RequiredAcks.With(withValues...).Set(float64(stats.RequiredAcks)) + var async float64 + if stats.Async { + async = 1.0 + } + w.Async.With(withValues...).Set(async) + return w } // newCollector creates a new kafka writer wrapper containing the name of the reader. @@ -50,47 +105,6 @@ func (d *writerCollector) collectConnectionStats() { for k, v := range d.factory.List() { writer := v.Conn.(*kafka.Writer) stats := writer.Stats() - withValues := []string{"writer", k, "topic", stats.Topic} - - d.stats.Writes.With(withValues...).Add(float64(stats.Writes)) - d.stats.Messages.With(withValues...).Add(float64(stats.Messages)) - d.stats.Bytes.With(withValues...).Add(float64(stats.Bytes)) - d.stats.Errors.With(withValues...).Add(float64(stats.Errors)) - - d.stats.BatchTime.Min.With(withValues...).Add(stats.BatchTime.Min.Seconds()) - d.stats.BatchTime.Max.With(withValues...).Add(stats.BatchTime.Max.Seconds()) - d.stats.BatchTime.Avg.With(withValues...).Add(stats.BatchTime.Avg.Seconds()) - - d.stats.WriteTime.Min.With(withValues...).Add(stats.WriteTime.Min.Seconds()) - d.stats.WriteTime.Max.With(withValues...).Add(stats.WriteTime.Max.Seconds()) - d.stats.WriteTime.Avg.With(withValues...).Add(stats.WriteTime.Avg.Seconds()) - - d.stats.WaitTime.Min.With(withValues...).Add(stats.WaitTime.Min.Seconds()) - d.stats.WaitTime.Max.With(withValues...).Add(stats.WaitTime.Max.Seconds()) - d.stats.WaitTime.Avg.With(withValues...).Add(stats.WaitTime.Avg.Seconds()) - - d.stats.Retries.Min.With(withValues...).Add(float64(stats.Retries.Min)) - d.stats.Retries.Max.With(withValues...).Add(float64(stats.Retries.Max)) - d.stats.Retries.Avg.With(withValues...).Add(float64(stats.Retries.Avg)) - - d.stats.BatchSize.Min.With(withValues...).Add(float64(stats.BatchSize.Min)) - d.stats.BatchSize.Max.With(withValues...).Add(float64(stats.BatchSize.Max)) - d.stats.BatchSize.Avg.With(withValues...).Add(float64(stats.BatchSize.Avg)) - - d.stats.BatchBytes.Min.With(withValues...).Add(float64(stats.BatchBytes.Min)) - d.stats.BatchBytes.Max.With(withValues...).Add(float64(stats.BatchBytes.Max)) - d.stats.BatchBytes.Avg.With(withValues...).Add(float64(stats.BatchBytes.Avg)) - - d.stats.MaxAttempts.With(withValues...).Set(float64(stats.MaxAttempts)) - d.stats.MaxBatchSize.With(withValues...).Set(float64(stats.MaxBatchSize)) - d.stats.BatchTimeout.With(withValues...).Set(stats.BatchTimeout.Seconds()) - d.stats.ReadTimeout.With(withValues...).Set(stats.ReadTimeout.Seconds()) - d.stats.WriteTimeout.With(withValues...).Set(stats.WriteTimeout.Seconds()) - d.stats.RequiredAcks.With(withValues...).Set(float64(stats.RequiredAcks)) - var async float64 - if stats.Async { - async = 1.0 - } - d.stats.Async.With(withValues...).Set(async) + d.stats.Writer(k).Observe(stats) } } diff --git a/otredis/metrics.go b/otredis/metrics.go index 5a222b04..cdd0436a 100644 --- a/otredis/metrics.go +++ b/otredis/metrics.go @@ -23,6 +23,43 @@ type Gauges struct { TotalConns metrics.Gauge IdleConns metrics.Gauge StaleConns metrics.Gauge + + dbName string +} + +// DBName sets the dbname label of redis metrics. +func (r Gauges) DBName(dbName string) Gauges { + r.dbName = dbName + return r +} + +// Observe records the redis pool stats. It should be called periodically. +func (r Gauges) Observe(stats *redis.PoolStats) { + withValues := []string{"dbname", r.dbName} + + r.Hits. + With(withValues...). + Set(float64(stats.Hits)) + + r.Misses. + With(withValues...). + Set(float64(stats.Misses)) + + r.Timeouts. + With(withValues...). + Set(float64(stats.Timeouts)) + + r.TotalConns. + With(withValues...). + Set(float64(stats.TotalConns)) + + r.IdleConns. + With(withValues...). + Set(float64(stats.IdleConns)) + + r.StaleConns. + With(withValues...). + Set(float64(stats.StaleConns)) } // newCollector creates a new redis wrapper containing the name of the redis. @@ -39,28 +76,6 @@ func (d *collector) collectConnectionStats() { for k, v := range d.factory.List() { conn := v.Conn.(redis.UniversalClient) stats := conn.PoolStats() - - withValues := []string{"dbname", k} - d.gauges.Hits. - With(withValues...). - Set(float64(stats.Hits)) - - d.gauges.Misses. - With(withValues...). - Set(float64(stats.Misses)) - - d.gauges.Timeouts. - With(withValues...). - Set(float64(stats.Timeouts)) - - d.gauges.TotalConns. - With(withValues...). - Set(float64(stats.TotalConns)) - d.gauges.IdleConns. - With(withValues...). - Set(float64(stats.IdleConns)) - d.gauges.StaleConns. - With(withValues...). - Set(float64(stats.StaleConns)) + d.gauges.DBName(k).Observe(stats) } } diff --git a/srvgrpc/metrics.go b/srvgrpc/metrics.go index 82f3f060..d4e83041 100644 --- a/srvgrpc/metrics.go +++ b/srvgrpc/metrics.go @@ -1,6 +1,10 @@ package srvgrpc import ( + "context" + "time" + + "github.com/go-kit/kit/metrics" "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" ) @@ -20,3 +24,47 @@ type MetricsModule struct{} func (m MetricsModule) ProvideGRPC(server *grpc.Server) { grpc_prometheus.Register(server) } + +func Metrics(metrics *RequestDurationSeconds) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + start := time.Now() + defer func() { + metrics.Route(info.FullMethod).Observe(time.Since(start).Seconds()) + }() + return handler(ctx, req) + } +} + +// RequestDurationSeconds is a Histogram that measures the request latency. +type RequestDurationSeconds struct { + // Histogram is the underlying histogram of RequestDurationSeconds. + Histogram metrics.Histogram + + // labels + module string + service string + route string +} + +// Module specifies the module label for RequestDurationSeconds. +func (r RequestDurationSeconds) Module(module string) RequestDurationSeconds { + r.module = module + return r +} + +// Service specifies the service label for RequestDurationSeconds. +func (r RequestDurationSeconds) Service(service string) RequestDurationSeconds { + r.service = service + return r +} + +// Method specifies the method label for RequestDurationSeconds. +func (r RequestDurationSeconds) Route(route string) RequestDurationSeconds { + r.route = route + return r +} + +// Observe records the time taken to process the request. +func (r RequestDurationSeconds) Observe(seconds float64) { + r.Histogram.With("module", r.module, "service", r.service, "route", r.route).Observe(seconds) +} diff --git a/srvgrpc/metrics_test.go b/srvgrpc/metrics_test.go new file mode 100644 index 00000000..812395d8 --- /dev/null +++ b/srvgrpc/metrics_test.go @@ -0,0 +1,28 @@ +package srvgrpc + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/kit/metrics/generic" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestRequestDurationSeconds(t *testing.T) { + rds := RequestDurationSeconds{ + Histogram: generic.NewHistogram("foo", 2), + } + rds = rds.Module("m").Service("s").Route("r") + rds.Observe(5) + + assert.Equal(t, 5.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) + + f := grpc.UnaryHandler(func(ctx context.Context, req interface{}) (interface{}, error) { + time.Sleep(time.Millisecond) + return nil, nil + }) + _, _ = Metrics(&rds)(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "/"}, f) + assert.GreaterOrEqual(t, 1.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) +} diff --git a/srvhttp/metrics.go b/srvhttp/metrics.go index 660e0f38..ff04daad 100644 --- a/srvhttp/metrics.go +++ b/srvhttp/metrics.go @@ -1,6 +1,10 @@ package srvhttp import ( + "net/http" + "time" + + "github.com/go-kit/kit/metrics" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -13,3 +17,59 @@ type MetricsModule struct{} func (m MetricsModule) ProvideHTTP(router *mux.Router) { router.PathPrefix("/metrics").Handler(promhttp.Handler()) } + +func Metrics(metrics *RequestDurationSeconds) func(handler http.Handler) http.Handler { + return func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + start := time.Now() + defer func() { + route := mux.CurrentRoute(request) + if route == nil { + metrics.Route("").Observe(time.Since(start).Seconds()) + return + } + path, err := route.GetPathTemplate() + if err != nil { + metrics.Route("").Observe(time.Since(start).Seconds()) + return + } + metrics.Route(path).Observe(time.Since(start).Seconds()) + }() + handler.ServeHTTP(writer, request) + }) + } +} + +// RequestDurationSeconds is a Histogram that measures the request latency. +type RequestDurationSeconds struct { + // Histogram is the underlying histogram of RequestDurationSeconds. + Histogram metrics.Histogram + + // labels + module string + service string + route string +} + +// Module specifies the module label for RequestDurationSeconds. +func (r RequestDurationSeconds) Module(module string) RequestDurationSeconds { + r.module = module + return r +} + +// Service specifies the service label for RequestDurationSeconds. +func (r RequestDurationSeconds) Service(service string) RequestDurationSeconds { + r.service = service + return r +} + +// Route specifies the method label for RequestDurationSeconds. +func (r RequestDurationSeconds) Route(route string) RequestDurationSeconds { + r.route = route + return r +} + +// Observe records the time taken to process the request. +func (r RequestDurationSeconds) Observe(seconds float64) { + r.Histogram.With("module", r.module, "service", r.service, "route", r.route).Observe(seconds) +} diff --git a/srvhttp/metrics_test.go b/srvhttp/metrics_test.go new file mode 100644 index 00000000..83467283 --- /dev/null +++ b/srvhttp/metrics_test.go @@ -0,0 +1,28 @@ +package srvhttp + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/kit/metrics/generic" + "github.com/stretchr/testify/assert" +) + +func TestRequestDurationSeconds(t *testing.T) { + rds := RequestDurationSeconds{ + Histogram: generic.NewHistogram("foo", 2), + } + rds = rds.Module("m").Service("s").Route("r") + rds.Observe(5) + + assert.Equal(t, 5.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) + + f := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + time.Sleep(time.Millisecond) + }) + h := Metrics(&rds)(f) + h.ServeHTTP(nil, httptest.NewRequest(http.MethodGet, "/", nil)) + assert.GreaterOrEqual(t, 1.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) +} From 6f9e694868983ea05ee8870fbf6874c467a38ec4 Mon Sep 17 00:00:00 2001 From: Reasno Date: Mon, 30 Aug 2021 16:50:43 +0800 Subject: [PATCH 2/6] doc: add comment --- srvgrpc/metrics.go | 3 ++- srvhttp/metrics.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/srvgrpc/metrics.go b/srvgrpc/metrics.go index d4e83041..d6367918 100644 --- a/srvgrpc/metrics.go +++ b/srvgrpc/metrics.go @@ -25,6 +25,7 @@ func (m MetricsModule) ProvideGRPC(server *grpc.Server) { grpc_prometheus.Register(server) } +// Metrics is a unary interceptor for grpc package. It records the request duration in a histogram. func Metrics(metrics *RequestDurationSeconds) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { start := time.Now() @@ -58,7 +59,7 @@ func (r RequestDurationSeconds) Service(service string) RequestDurationSeconds { return r } -// Method specifies the method label for RequestDurationSeconds. +// Route specifies the method label for RequestDurationSeconds. func (r RequestDurationSeconds) Route(route string) RequestDurationSeconds { r.route = route return r diff --git a/srvhttp/metrics.go b/srvhttp/metrics.go index ff04daad..646328b6 100644 --- a/srvhttp/metrics.go +++ b/srvhttp/metrics.go @@ -18,6 +18,7 @@ func (m MetricsModule) ProvideHTTP(router *mux.Router) { router.PathPrefix("/metrics").Handler(promhttp.Handler()) } +// Metrics is a unary interceptor for standard library http package. It records the request duration in a histogram. func Metrics(metrics *RequestDurationSeconds) func(handler http.Handler) http.Handler { return func(handler http.Handler) http.Handler { return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { From 4a807d3eca2a31985f808ae6b3d5231fd48ac820 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B0=B7=E6=BA=AA?= Date: Mon, 30 Aug 2021 17:30:33 +0800 Subject: [PATCH 3/6] Update doc.go --- observability/doc.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/observability/doc.go b/observability/doc.go index adf90d5d..a02b59f2 100644 --- a/observability/doc.go +++ b/observability/doc.go @@ -1,6 +1,5 @@ /* -Package observability provides a tracer and a Histogram to measure all incoming -requests to the system. +Package observability provides a tracer and a set of perdefined metrics to measure critical system stats. Introduction From 4baa8bc233af1178a03e2a424a307ee60cde4b15 Mon Sep 17 00:00:00 2001 From: Reasno Date: Tue, 31 Aug 2021 14:38:26 +0800 Subject: [PATCH 4/6] refactor: use point receivers --- otgorm/gorm_metrics.go | 42 +++++++++++++++++--------------- otkafka/reader_metrics.go | 51 +++++++++++++++++++++++++++++++++++---- otkafka/writer_metrics.go | 43 +++++++++++++++++++++++++++++---- otredis/metrics.go | 41 ++++++++++++++----------------- srvgrpc/metrics.go | 32 ++++++++++++++++-------- srvgrpc/metrics_test.go | 5 ++-- srvhttp/metrics.go | 34 +++++++++++++++++--------- srvhttp/metrics_test.go | 5 ++-- 8 files changed, 175 insertions(+), 78 deletions(-) diff --git a/otgorm/gorm_metrics.go b/otgorm/gorm_metrics.go index 78e8a197..3aa5bbc1 100644 --- a/otgorm/gorm_metrics.go +++ b/otgorm/gorm_metrics.go @@ -25,32 +25,34 @@ type Gauges struct { } // DBName sets the dbname label of metrics. -func (g Gauges) DBName(dbName string) Gauges { - g.dbName = dbName - return g +func (g *Gauges) DBName(dbName string) *Gauges { + withValues := []string{"dbname", g.dbName} + return &Gauges{ + Idle: g.Idle.With(withValues...), + InUse: g.InUse.With(withValues...), + Open: g.Open.With(withValues...), + dbName: dbName, + driver: g.driver, + } } // Driver sets the driver label of metrics. -func (g Gauges) Driver(driver string) Gauges { - g.driver = driver - return g +func (g *Gauges) Driver(driver string) *Gauges { + withValues := []string{"driver", driver} + return &Gauges{ + Idle: g.Idle.With(withValues...), + InUse: g.InUse.With(withValues...), + Open: g.Open.With(withValues...), + dbName: g.dbName, + driver: driver, + } } // Observe records the DBStats collected. It should be called periodically. -func (g Gauges) Observe(stats sql.DBStats) Gauges { - withValues := []string{"dbname", g.dbName, "driver", g.driver} - g.Idle. - With(withValues...). - Set(float64(stats.Idle)) - - g.InUse. - With(withValues...). - Set(float64(stats.InUse)) - - g.Open. - With(withValues...). - Set(float64(stats.OpenConnections)) - return g +func (g *Gauges) Observe(stats sql.DBStats) { + g.Idle.Set(float64(stats.Idle)) + g.InUse.Set(float64(stats.InUse)) + g.Open.Set(float64(stats.OpenConnections)) } // newCollector creates a new database wrapper containing the name of the database, diff --git a/otkafka/reader_metrics.go b/otkafka/reader_metrics.go index 355377ef..865862fa 100644 --- a/otkafka/reader_metrics.go +++ b/otkafka/reader_metrics.go @@ -48,14 +48,55 @@ type ReaderStats struct { } // Reader sets the writer label in WriterStats. -func (r ReaderStats) Reader(reader string) ReaderStats { - r.reader = reader - return r +func (r *ReaderStats) Reader(reader string) *ReaderStats { + withValues := []string{"reader", reader} + return &ReaderStats{ + Dials: r.Dials.With(withValues...), + Fetches: r.Fetches.With(withValues...), + Messages: r.Messages.With(withValues...), + Bytes: r.Bytes.With(withValues...), + Rebalances: r.Rebalances.With(withValues...), + Timeouts: r.Timeouts.With(withValues...), + Errors: r.Errors.With(withValues...), + Offset: r.Offset.With(withValues...), + Lag: r.Lag.With(withValues...), + MinBytes: r.MinBytes.With(withValues...), + MaxBytes: r.MaxBytes.With(withValues...), + MaxWait: r.MaxWait.With(withValues...), + QueueLength: r.QueueLength.With(withValues...), + QueueCapacity: r.QueueCapacity.With(withValues...), + DialTime: AggStats{ + Min: r.DialTime.Min.With(withValues...), + Max: r.DialTime.Max.With(withValues...), + Avg: r.DialTime.Avg.With(withValues...), + }, + ReadTime: AggStats{ + Min: r.ReadTime.Min.With(withValues...), + Max: r.ReadTime.Max.With(withValues...), + Avg: r.ReadTime.Avg.With(withValues...), + }, + WaitTime: AggStats{ + Min: r.WaitTime.Min.With(withValues...), + Max: r.WaitTime.Max.With(withValues...), + Avg: r.WaitTime.Avg.With(withValues...), + }, + FetchSize: AggStats{ + Min: r.FetchSize.Min.With(withValues...), + Max: r.FetchSize.Max.With(withValues...), + Avg: r.FetchSize.Avg.With(withValues...), + }, + FetchBytes: AggStats{ + Min: r.FetchBytes.Min.With(withValues...), + Max: r.FetchBytes.Max.With(withValues...), + Avg: r.FetchBytes.Avg.With(withValues...), + }, + reader: reader, + } } // Observe records the reader stats. It should be called periodically. -func (r ReaderStats) Observe(stats kafka.ReaderStats) { - withValues := []string{"reader", r.reader, "client_id", stats.ClientID, "topic", stats.Topic, "partition", stats.Partition} +func (r *ReaderStats) Observe(stats kafka.ReaderStats) { + withValues := []string{"client_id", stats.ClientID, "topic", stats.Topic, "partition", stats.Partition} r.Dials.With(withValues...).Add(float64(stats.Dials)) r.Fetches.With(withValues...).Add(float64(stats.Fetches)) r.Messages.With(withValues...).Add(float64(stats.Messages)) diff --git a/otkafka/writer_metrics.go b/otkafka/writer_metrics.go index 48b064a3..ef435a7c 100644 --- a/otkafka/writer_metrics.go +++ b/otkafka/writer_metrics.go @@ -39,14 +39,47 @@ type WriterStats struct { } // Writer sets the writer label in WriterStats. -func (w WriterStats) Writer(writer string) WriterStats { - w.writer = writer - return w +func (w *WriterStats) Writer(writer string) *WriterStats { + withValues := []string{"writer", writer} + return &WriterStats{ + Writes: w.Writes.With(withValues...), + Messages: w.Messages.With(withValues...), + Bytes: w.Bytes.With(withValues...), + Errors: w.Errors.With(withValues...), + MaxAttempts: w.MaxAttempts.With(withValues...), + MaxBatchSize: w.MaxBatchSize.With(withValues...), + BatchTimeout: w.BatchTimeout.With(withValues...), + ReadTimeout: w.ReadTimeout.With(withValues...), + WriteTimeout: w.WriteTimeout.With(withValues...), + RequiredAcks: w.RequiredAcks.With(withValues...), + BatchTime: AggStats{ + Min: w.BatchTime.Min.With(withValues...), + Max: w.BatchTime.Max.With(withValues...), + Avg: w.BatchTime.Avg.With(withValues...), + }, + WriteTime: AggStats{ + Min: w.WriteTime.Min.With(withValues...), + Max: w.WriteTime.Max.With(withValues...), + Avg: w.WriteTime.Avg.With(withValues...), + }, + WaitTime: AggStats{ + Min: w.WaitTime.Min.With(withValues...), + Max: w.WaitTime.Max.With(withValues...), + Avg: w.WaitTime.Avg.With(withValues...), + }, + BatchBytes: AggStats{ + Min: w.BatchBytes.Min.With(withValues...), + Max: w.BatchBytes.Max.With(withValues...), + Avg: w.BatchBytes.Avg.With(withValues...), + }, + Async: w.Async.With(withValues...), + writer: writer, + } } // Observe records the writer stats. It should called periodically. -func (w WriterStats) Observe(stats kafka.WriterStats) WriterStats { - withValues := []string{"writer", w.writer, "topic", stats.Topic} +func (w *WriterStats) Observe(stats kafka.WriterStats) *WriterStats { + withValues := []string{"topic", stats.Topic} w.Writes.With(withValues...).Add(float64(stats.Writes)) w.Messages.With(withValues...).Add(float64(stats.Messages)) diff --git a/otredis/metrics.go b/otredis/metrics.go index cdd0436a..52754e02 100644 --- a/otredis/metrics.go +++ b/otredis/metrics.go @@ -28,38 +28,33 @@ type Gauges struct { } // DBName sets the dbname label of redis metrics. -func (r Gauges) DBName(dbName string) Gauges { - r.dbName = dbName - return r +func (g *Gauges) DBName(dbName string) *Gauges { + withValues := []string{"dbname", dbName} + return &Gauges{ + Hits: g.Hits.With(withValues...), + Misses: g.Misses.With(withValues...), + Timeouts: g.Timeouts.With(withValues...), + TotalConns: g.TotalConns.With(withValues...), + IdleConns: g.IdleConns.With(withValues...), + StaleConns: g.StaleConns.With(withValues...), + dbName: dbName, + } } // Observe records the redis pool stats. It should be called periodically. -func (r Gauges) Observe(stats *redis.PoolStats) { - withValues := []string{"dbname", r.dbName} +func (g *Gauges) Observe(stats *redis.PoolStats) { - r.Hits. - With(withValues...). - Set(float64(stats.Hits)) + g.Hits.Set(float64(stats.Hits)) - r.Misses. - With(withValues...). - Set(float64(stats.Misses)) + g.Misses.Set(float64(stats.Misses)) - r.Timeouts. - With(withValues...). - Set(float64(stats.Timeouts)) + g.Timeouts.Set(float64(stats.Timeouts)) - r.TotalConns. - With(withValues...). - Set(float64(stats.TotalConns)) + g.TotalConns.Set(float64(stats.TotalConns)) - r.IdleConns. - With(withValues...). - Set(float64(stats.IdleConns)) + g.IdleConns.Set(float64(stats.IdleConns)) - r.StaleConns. - With(withValues...). - Set(float64(stats.StaleConns)) + g.StaleConns.Set(float64(stats.StaleConns)) } // newCollector creates a new redis wrapper containing the name of the redis. diff --git a/srvgrpc/metrics.go b/srvgrpc/metrics.go index d6367918..865202dc 100644 --- a/srvgrpc/metrics.go +++ b/srvgrpc/metrics.go @@ -48,24 +48,36 @@ type RequestDurationSeconds struct { } // Module specifies the module label for RequestDurationSeconds. -func (r RequestDurationSeconds) Module(module string) RequestDurationSeconds { - r.module = module - return r +func (r *RequestDurationSeconds) Module(module string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("module", module), + module: module, + service: r.service, + route: r.route, + } } // Service specifies the service label for RequestDurationSeconds. -func (r RequestDurationSeconds) Service(service string) RequestDurationSeconds { - r.service = service - return r +func (r *RequestDurationSeconds) Service(service string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("service", service), + module: r.module, + service: service, + route: r.route, + } } // Route specifies the method label for RequestDurationSeconds. -func (r RequestDurationSeconds) Route(route string) RequestDurationSeconds { - r.route = route - return r +func (r *RequestDurationSeconds) Route(route string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("route", route), + module: r.module, + service: r.service, + route: route, + } } // Observe records the time taken to process the request. func (r RequestDurationSeconds) Observe(seconds float64) { - r.Histogram.With("module", r.module, "service", r.service, "route", r.route).Observe(seconds) + r.Histogram.Observe(seconds) } diff --git a/srvgrpc/metrics_test.go b/srvgrpc/metrics_test.go index 812395d8..6901e20a 100644 --- a/srvgrpc/metrics_test.go +++ b/srvgrpc/metrics_test.go @@ -11,18 +11,19 @@ import ( ) func TestRequestDurationSeconds(t *testing.T) { - rds := RequestDurationSeconds{ + rds := &RequestDurationSeconds{ Histogram: generic.NewHistogram("foo", 2), } rds = rds.Module("m").Service("s").Route("r") rds.Observe(5) assert.Equal(t, 5.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) + assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r"}, rds.Histogram.(*generic.Histogram).LabelValues()) f := grpc.UnaryHandler(func(ctx context.Context, req interface{}) (interface{}, error) { time.Sleep(time.Millisecond) return nil, nil }) - _, _ = Metrics(&rds)(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "/"}, f) + _, _ = Metrics(rds)(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "/"}, f) assert.GreaterOrEqual(t, 1.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) } diff --git a/srvhttp/metrics.go b/srvhttp/metrics.go index 646328b6..4e77050f 100644 --- a/srvhttp/metrics.go +++ b/srvhttp/metrics.go @@ -53,24 +53,36 @@ type RequestDurationSeconds struct { } // Module specifies the module label for RequestDurationSeconds. -func (r RequestDurationSeconds) Module(module string) RequestDurationSeconds { - r.module = module - return r +func (r *RequestDurationSeconds) Module(module string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("module", module), + module: module, + service: r.service, + route: r.route, + } } // Service specifies the service label for RequestDurationSeconds. -func (r RequestDurationSeconds) Service(service string) RequestDurationSeconds { - r.service = service - return r +func (r *RequestDurationSeconds) Service(service string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("service", service), + module: r.module, + service: service, + route: r.route, + } } // Route specifies the method label for RequestDurationSeconds. -func (r RequestDurationSeconds) Route(route string) RequestDurationSeconds { - r.route = route - return r +func (r *RequestDurationSeconds) Route(route string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("route", route), + module: r.module, + service: r.service, + route: route, + } } // Observe records the time taken to process the request. -func (r RequestDurationSeconds) Observe(seconds float64) { - r.Histogram.With("module", r.module, "service", r.service, "route", r.route).Observe(seconds) +func (r *RequestDurationSeconds) Observe(seconds float64) { + r.Histogram.Observe(seconds) } diff --git a/srvhttp/metrics_test.go b/srvhttp/metrics_test.go index 83467283..79a6634e 100644 --- a/srvhttp/metrics_test.go +++ b/srvhttp/metrics_test.go @@ -11,18 +11,19 @@ import ( ) func TestRequestDurationSeconds(t *testing.T) { - rds := RequestDurationSeconds{ + rds := &RequestDurationSeconds{ Histogram: generic.NewHistogram("foo", 2), } rds = rds.Module("m").Service("s").Route("r") rds.Observe(5) assert.Equal(t, 5.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) + assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r"}, rds.Histogram.(*generic.Histogram).LabelValues()) f := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { time.Sleep(time.Millisecond) }) - h := Metrics(&rds)(f) + h := Metrics(rds)(f) h.ServeHTTP(nil, httptest.NewRequest(http.MethodGet, "/", nil)) assert.GreaterOrEqual(t, 1.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) } From 4c084909454533e86b3e73e528a995184458676f Mon Sep 17 00:00:00 2001 From: Reasno Date: Tue, 31 Aug 2021 15:05:15 +0800 Subject: [PATCH 5/6] test: fix tests --- otgorm/gorm_metrics.go | 2 +- otgorm/gorm_metrics_test.go | 2 +- otgorm/module_test.go | 9 ++++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/otgorm/gorm_metrics.go b/otgorm/gorm_metrics.go index 3aa5bbc1..47b26495 100644 --- a/otgorm/gorm_metrics.go +++ b/otgorm/gorm_metrics.go @@ -26,7 +26,7 @@ type Gauges struct { // DBName sets the dbname label of metrics. func (g *Gauges) DBName(dbName string) *Gauges { - withValues := []string{"dbname", g.dbName} + withValues := []string{"dbname", dbName} return &Gauges{ Idle: g.Idle.With(withValues...), InUse: g.InUse.With(withValues...), diff --git a/otgorm/gorm_metrics_test.go b/otgorm/gorm_metrics_test.go index c46c25bc..6ecd7e55 100644 --- a/otgorm/gorm_metrics_test.go +++ b/otgorm/gorm_metrics_test.go @@ -26,7 +26,7 @@ func TestCollector(t *testing.T) { Idle: m, Open: m, } - m.EXPECT().With(gomock.Any()).Times(3).Return(m) + m.EXPECT().With(gomock.Any()).MinTimes(3).Return(m) m.EXPECT().Set(gomock.Any()).Times(3) c := core.New() diff --git a/otgorm/module_test.go b/otgorm/module_test.go index 5b2e00e1..2f09e7e9 100644 --- a/otgorm/module_test.go +++ b/otgorm/module_test.go @@ -106,15 +106,18 @@ func TestModule_ProvideRunGroup(t *testing.T) { } idle := mock_metrics.NewMockGauge(ctrl) - idle.EXPECT().With(withValues...).Return(idle).MinTimes(1) + idle.EXPECT().With(withValues[0:2]...).Return(idle).MinTimes(1) + idle.EXPECT().With(withValues[2:4]...).Return(idle).MinTimes(1) idle.EXPECT().Set(gomock.Eq(0.0)).MinTimes(1) open := mock_metrics.NewMockGauge(ctrl) - open.EXPECT().With(withValues...).Return(open).MinTimes(1) + open.EXPECT().With(withValues[0:2]...).Return(open).MinTimes(1) + open.EXPECT().With(withValues[2:4]...).Return(open).MinTimes(1) open.EXPECT().Set(gomock.Eq(2.0)).MinTimes(1) inUse := mock_metrics.NewMockGauge(ctrl) - inUse.EXPECT().With(withValues...).Return(inUse).MinTimes(1) + inUse.EXPECT().With(withValues[0:2]...).Return(inUse).MinTimes(1) + inUse.EXPECT().With(withValues[2:4]...).Return(inUse).MinTimes(1) inUse.EXPECT().Set(gomock.Eq(2.0)).MinTimes(1) c := core.New( From 953c579a51cfb069c3958a7793c6bdee4b50d05f Mon Sep 17 00:00:00 2001 From: Reasno Date: Tue, 31 Aug 2021 15:29:43 +0800 Subject: [PATCH 6/6] test: fix tests --- otkafka/writer_metrics.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/otkafka/writer_metrics.go b/otkafka/writer_metrics.go index ef435a7c..799a4be7 100644 --- a/otkafka/writer_metrics.go +++ b/otkafka/writer_metrics.go @@ -62,11 +62,21 @@ func (w *WriterStats) Writer(writer string) *WriterStats { Max: w.WriteTime.Max.With(withValues...), Avg: w.WriteTime.Avg.With(withValues...), }, + Retries: AggStats{ + Min: w.Retries.Min.With(withValues...), + Max: w.Retries.Max.With(withValues...), + Avg: w.Retries.Avg.With(withValues...), + }, WaitTime: AggStats{ Min: w.WaitTime.Min.With(withValues...), Max: w.WaitTime.Max.With(withValues...), Avg: w.WaitTime.Avg.With(withValues...), }, + BatchSize: AggStats{ + Min: w.BatchSize.Min.With(withValues...), + Max: w.BatchSize.Max.With(withValues...), + Avg: w.BatchSize.Avg.With(withValues...), + }, BatchBytes: AggStats{ Min: w.BatchBytes.Min.With(withValues...), Max: w.BatchBytes.Max.With(withValues...),