diff --git a/observability/doc.go b/observability/doc.go index a779d01d..a02b59f2 100644 --- a/observability/doc.go +++ b/observability/doc.go @@ -1,6 +1,5 @@ /* -Package observability provides a tracer and a histogram to measure all incoming -requests to the system. +Package observability provides a tracer and a set of perdefined metrics to measure critical system stats. Introduction diff --git a/observability/metrics.go b/observability/metrics.go index 7653edff..c211a12c 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -1,353 +1,406 @@ package observability import ( - "sync" - + "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/otgorm" "github.com/DoNewsCode/core/otkafka" "github.com/DoNewsCode/core/otredis" + "github.com/DoNewsCode/core/srvgrpc" + "github.com/DoNewsCode/core/srvhttp" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" ) -type histogram struct { - once sync.Once - *prometheus.Histogram +// MetricsIn is the injection parameter of most metrics constructors in the observability package. +type MetricsIn struct { + di.In + + Registerer stdprometheus.Registerer `optional:"true"` } -var his histogram +// ProvideHTTPRequestDurationSeconds returns a metrics.Histogram that is designed to measure incoming HTTP requests +// to the system. Note it has three labels: "module", "service", "route". If any label is missing, +// the system will panic. +func ProvideHTTPRequestDurationSeconds(in MetricsIn) *srvhttp.RequestDurationSeconds { + http := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "Total time spent serving requests.", + }, []string{"module", "service", "route"}) + + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + in.Registerer.MustRegister(http) + + return &srvhttp.RequestDurationSeconds{ + Histogram: prometheus.NewHistogram(http), + } +} -// ProvideHistogramMetrics returns a metrics.Histogram that is designed to measure incoming requests -// to the system. Note it has three labels: "module", "service", "method". If any label is missing, +// ProvideGRPCRequestDurationSeconds returns a metrics.Histogram that is designed to measure incoming GRPC requests +// to the system. Note it has three labels: "module", "service", "route". If any label is missing, // the system will panic. -func ProvideHistogramMetrics() metrics.Histogram { - his.once.Do(func() { - his.Histogram = prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Name: "http_request_duration_seconds", - Help: "Total time spent serving requests.", - }, []string{"module", "service", "method"}) - }) - return &his +func ProvideGRPCRequestDurationSeconds(in MetricsIn) *srvgrpc.RequestDurationSeconds { + grpc := stdprometheus.NewHistogramVec(stdprometheus.HistogramOpts{ + Name: "grpc_request_duration_seconds", + Help: "Total time spent serving requests.", + }, []string{"module", "service", "route"}) + + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + in.Registerer.MustRegister(grpc) + + return &srvgrpc.RequestDurationSeconds{ + Histogram: prometheus.NewHistogram(grpc), + } } // ProvideGORMMetrics returns a *otgorm.Gauges that measures the connection info in databases. // It is meant to be consumed by the otgorm.Providers. -func ProvideGORMMetrics() *otgorm.Gauges { +func ProvideGORMMetrics(in MetricsIn) *otgorm.Gauges { + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } return &otgorm.Gauges{ - Idle: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Idle: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "gorm_idle_connections", Help: "number of idle connections", - }, []string{"dbname", "driver"}), - Open: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname", "driver"}, in.Registerer), + Open: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "gorm_open_connections", Help: "number of open connections", - }, []string{"dbname", "driver"}), - InUse: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname", "driver"}, in.Registerer), + InUse: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "gorm_in_use_connections", Help: "number of in use connections", - }, []string{"dbname", "driver"}), + }, []string{"dbname", "driver"}, in.Registerer), } } -// ProvideRedisMetrics returns a *otredis.Gauges that measures the connection info in redis. +// ProvideRedisMetrics returns a RedisMetrics that measures the connection info in redis. // It is meant to be consumed by the otredis.Providers. -func ProvideRedisMetrics() *otredis.Gauges { +func ProvideRedisMetrics(in MetricsIn) *otredis.Gauges { + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } return &otredis.Gauges{ - Hits: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Hits: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_hit_connections", Help: "number of times free connection was found in the pool", - }, []string{"dbname"}), - Misses: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + Misses: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_miss_connections", Help: "number of times free connection was NOT found in the pool", - }, []string{"dbname"}), - Timeouts: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + Timeouts: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_timeout_connections", Help: "number of times a wait timeout occurred", - }, []string{"dbname"}), - TotalConns: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + TotalConns: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_total_connections", Help: "number of total connections in the pool", - }, []string{"dbname"}), - IdleConns: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + IdleConns: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_idle_connections", Help: "number of idle connections in the pool", - }, []string{"dbname"}), - StaleConns: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, []string{"dbname"}, in.Registerer), + StaleConns: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "redis_stale_connections", Help: "number of stale connections removed from the pool", - }, []string{"dbname"}), + }, []string{"dbname"}, in.Registerer), } } // ProvideKafkaReaderMetrics returns a *otkafka.ReaderStats that measures the reader info in kafka. // It is meant to be consumed by the otkafka.Providers. -func ProvideKafkaReaderMetrics() *otkafka.ReaderStats { +func ProvideKafkaReaderMetrics(in MetricsIn) *otkafka.ReaderStats { labels := []string{"reader", "client_id", "topic", "partition"} + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + return &otkafka.ReaderStats{ - Dials: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Dials: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_dial_count", Help: "", - }, labels), - Fetches: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Fetches: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_fetch_count", Help: "", - }, labels), - Messages: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Messages: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_message_count", Help: "", - }, labels), - Bytes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Bytes: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_message_bytes", Help: "", - }, labels), - Rebalances: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Rebalances: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_rebalance_count", Help: "", - }, labels), - Timeouts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Timeouts: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_timeout_count", Help: "", - }, labels), - Errors: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Errors: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_reader_error_count", Help: "", - }, labels), - Offset: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Offset: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_offset", Help: "", - }, labels), - Lag: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Lag: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_lag", Help: "", - }, labels), - MinBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MinBytes: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_bytes_min", Help: "", - }, labels), - MaxBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxBytes: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_bytes_max", Help: "", - }, labels), - MaxWait: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxWait: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_wait_max", Help: "", - }, labels), - QueueLength: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + QueueLength: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_queue_length", Help: "", - }, labels), - QueueCapacity: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + QueueCapacity: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_queue_capacity", Help: "", - }, labels), + }, labels, in.Registerer), DialTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_dial_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_dial_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_dial_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, ReadTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_read_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_read_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_read_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, WaitTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_wait_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_wait_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_wait_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, FetchSize: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_size_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_size_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_size_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, FetchBytes: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_bytes_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_bytes_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_reader_fetch_bytes_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, } } // ProvideKafkaWriterMetrics returns a *otkafka.WriterStats that measures the writer info in kafka. // It is meant to be consumed by the otkafka.Providers. -func ProvideKafkaWriterMetrics() *otkafka.WriterStats { +func ProvideKafkaWriterMetrics(in MetricsIn) *otkafka.WriterStats { labels := []string{"writer", "topic"} + + if in.Registerer == nil { + in.Registerer = stdprometheus.DefaultRegisterer + } + return &otkafka.WriterStats{ - Writes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Writes: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_write_count", Help: "", - }, labels), - Messages: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Messages: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_message_count", Help: "", - }, labels), - Bytes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Bytes: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_message_bytes", Help: "", - }, labels), - Errors: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + }, labels, in.Registerer), + Errors: newCounterFrom(stdprometheus.CounterOpts{ Name: "kafka_writer_error_count", Help: "", - }, labels), - MaxAttempts: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxAttempts: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_attempts_max", Help: "", - }, labels), - MaxBatchSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + MaxBatchSize: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_max", Help: "", - }, labels), - BatchTimeout: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + BatchTimeout: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_timeout", Help: "", - }, labels), - ReadTimeout: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + ReadTimeout: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_read_timeout", Help: "", - }, labels), - WriteTimeout: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + WriteTimeout: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_timeout", Help: "", - }, labels), - RequiredAcks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + RequiredAcks: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_acks_required", Help: "", - }, labels), - Async: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Async: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_async", Help: "", - }, labels), + }, labels, in.Registerer), BatchTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, WriteTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_write_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, WaitTime: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_wait_seconds_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_wait_seconds_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_wait_seconds_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, Retries: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_retries_count_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_retries_count_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_retries_count_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, BatchSize: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_size_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_size_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_size_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, BatchBytes: otkafka.AggStats{ - Min: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Min: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_bytes_min", Help: "", - }, labels), - Max: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Max: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_bytes_max", Help: "", - }, labels), - Avg: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + }, labels, in.Registerer), + Avg: newGaugeFrom(stdprometheus.GaugeOpts{ Name: "kafka_writer_batch_bytes_avg", Help: "", - }, labels), + }, labels, in.Registerer), }, } } + +func newCounterFrom(opts stdprometheus.CounterOpts, labelNames []string, registerer stdprometheus.Registerer) metrics.Counter { + cv := stdprometheus.NewCounterVec(opts, labelNames) + registerer.MustRegister(cv) + return prometheus.NewCounter(cv) +} + +func newGaugeFrom(opts stdprometheus.GaugeOpts, labelNames []string, registerer stdprometheus.Registerer) metrics.Gauge { + cv := stdprometheus.NewGaugeVec(opts, labelNames) + registerer.MustRegister(cv) + return prometheus.NewGauge(cv) +} diff --git a/observability/observability.go b/observability/observability.go index 18451fde..d76345ec 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -22,7 +22,8 @@ func Providers() di.Deps { return di.Deps{ ProvideJaegerLogAdapter, ProvideOpentracing, - ProvideHistogramMetrics, + ProvideHTTPRequestDurationSeconds, + ProvideGRPCRequestDurationSeconds, ProvideGORMMetrics, ProvideRedisMetrics, ProvideKafkaReaderMetrics, diff --git a/observability/observability_test.go b/observability/observability_test.go index 24a7498d..89f3cd9d 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" + "github.com/prometheus/client_golang/prometheus" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" "gorm.io/gorm" @@ -33,8 +34,26 @@ func TestProvideOpentracing(t *testing.T) { } func TestProvideHistogramMetrics(t *testing.T) { - Out := ProvideHistogramMetrics() - assert.NotNil(t, Out) + for _, c := range []struct { + name string + registerer prometheus.Registerer + }{ + { + "default", + nil, + }, + { + "provided registerer", + prometheus.NewPedanticRegistry(), + }, + } { + t.Run(c.name, func(t *testing.T) { + http := ProvideHTTPRequestDurationSeconds(MetricsIn{Registerer: c.registerer}) + assert.NotNil(t, http) + grpc := ProvideGRPCRequestDurationSeconds(MetricsIn{Registerer: c.registerer}) + assert.NotNil(t, grpc) + }) + } } func TestProvideGORMMetrics(t *testing.T) { diff --git a/otgorm/gorm_metrics.go b/otgorm/gorm_metrics.go index 3c4e6e59..47b26495 100644 --- a/otgorm/gorm_metrics.go +++ b/otgorm/gorm_metrics.go @@ -1,6 +1,7 @@ package otgorm import ( + "database/sql" "time" "github.com/go-kit/kit/metrics" @@ -18,6 +19,40 @@ type Gauges struct { Idle metrics.Gauge InUse metrics.Gauge Open metrics.Gauge + + dbName string + driver string +} + +// DBName sets the dbname label of metrics. +func (g *Gauges) DBName(dbName string) *Gauges { + withValues := []string{"dbname", dbName} + return &Gauges{ + Idle: g.Idle.With(withValues...), + InUse: g.InUse.With(withValues...), + Open: g.Open.With(withValues...), + dbName: dbName, + driver: g.driver, + } +} + +// Driver sets the driver label of metrics. +func (g *Gauges) Driver(driver string) *Gauges { + withValues := []string{"driver", driver} + return &Gauges{ + Idle: g.Idle.With(withValues...), + InUse: g.InUse.With(withValues...), + Open: g.Open.With(withValues...), + dbName: g.dbName, + driver: driver, + } +} + +// Observe records the DBStats collected. It should be called periodically. +func (g *Gauges) Observe(stats sql.DBStats) { + g.Idle.Set(float64(stats.Idle)) + g.InUse.Set(float64(stats.InUse)) + g.Open.Set(float64(stats.OpenConnections)) } // newCollector creates a new database wrapper containing the name of the database, @@ -36,17 +71,6 @@ func (d *collector) collectConnectionStats() { conn := v.Conn.(*gorm.DB) db, _ := conn.DB() stats := db.Stats() - withValues := []string{"dbname", k, "driver", conn.Name()} - d.gauges.Idle. - With(withValues...). - Set(float64(stats.Idle)) - - d.gauges.InUse. - With(withValues...). - Set(float64(stats.InUse)) - - d.gauges.Open. - With(withValues...). - Set(float64(stats.OpenConnections)) + d.gauges.DBName(k).Driver(conn.Name()).Observe(stats) } } diff --git a/otgorm/gorm_metrics_test.go b/otgorm/gorm_metrics_test.go index c46c25bc..6ecd7e55 100644 --- a/otgorm/gorm_metrics_test.go +++ b/otgorm/gorm_metrics_test.go @@ -26,7 +26,7 @@ func TestCollector(t *testing.T) { Idle: m, Open: m, } - m.EXPECT().With(gomock.Any()).Times(3).Return(m) + m.EXPECT().With(gomock.Any()).MinTimes(3).Return(m) m.EXPECT().Set(gomock.Any()).Times(3) c := core.New() diff --git a/otgorm/module_test.go b/otgorm/module_test.go index 5b2e00e1..2f09e7e9 100644 --- a/otgorm/module_test.go +++ b/otgorm/module_test.go @@ -106,15 +106,18 @@ func TestModule_ProvideRunGroup(t *testing.T) { } idle := mock_metrics.NewMockGauge(ctrl) - idle.EXPECT().With(withValues...).Return(idle).MinTimes(1) + idle.EXPECT().With(withValues[0:2]...).Return(idle).MinTimes(1) + idle.EXPECT().With(withValues[2:4]...).Return(idle).MinTimes(1) idle.EXPECT().Set(gomock.Eq(0.0)).MinTimes(1) open := mock_metrics.NewMockGauge(ctrl) - open.EXPECT().With(withValues...).Return(open).MinTimes(1) + open.EXPECT().With(withValues[0:2]...).Return(open).MinTimes(1) + open.EXPECT().With(withValues[2:4]...).Return(open).MinTimes(1) open.EXPECT().Set(gomock.Eq(2.0)).MinTimes(1) inUse := mock_metrics.NewMockGauge(ctrl) - inUse.EXPECT().With(withValues...).Return(inUse).MinTimes(1) + inUse.EXPECT().With(withValues[0:2]...).Return(inUse).MinTimes(1) + inUse.EXPECT().With(withValues[2:4]...).Return(inUse).MinTimes(1) inUse.EXPECT().Set(gomock.Eq(2.0)).MinTimes(1) c := core.New( diff --git a/otkafka/reader_metrics.go b/otkafka/reader_metrics.go index 43e50612..865862fa 100644 --- a/otkafka/reader_metrics.go +++ b/otkafka/reader_metrics.go @@ -43,6 +43,95 @@ type ReaderStats struct { WaitTime AggStats FetchSize AggStats FetchBytes AggStats + + reader string +} + +// Reader sets the writer label in WriterStats. +func (r *ReaderStats) Reader(reader string) *ReaderStats { + withValues := []string{"reader", reader} + return &ReaderStats{ + Dials: r.Dials.With(withValues...), + Fetches: r.Fetches.With(withValues...), + Messages: r.Messages.With(withValues...), + Bytes: r.Bytes.With(withValues...), + Rebalances: r.Rebalances.With(withValues...), + Timeouts: r.Timeouts.With(withValues...), + Errors: r.Errors.With(withValues...), + Offset: r.Offset.With(withValues...), + Lag: r.Lag.With(withValues...), + MinBytes: r.MinBytes.With(withValues...), + MaxBytes: r.MaxBytes.With(withValues...), + MaxWait: r.MaxWait.With(withValues...), + QueueLength: r.QueueLength.With(withValues...), + QueueCapacity: r.QueueCapacity.With(withValues...), + DialTime: AggStats{ + Min: r.DialTime.Min.With(withValues...), + Max: r.DialTime.Max.With(withValues...), + Avg: r.DialTime.Avg.With(withValues...), + }, + ReadTime: AggStats{ + Min: r.ReadTime.Min.With(withValues...), + Max: r.ReadTime.Max.With(withValues...), + Avg: r.ReadTime.Avg.With(withValues...), + }, + WaitTime: AggStats{ + Min: r.WaitTime.Min.With(withValues...), + Max: r.WaitTime.Max.With(withValues...), + Avg: r.WaitTime.Avg.With(withValues...), + }, + FetchSize: AggStats{ + Min: r.FetchSize.Min.With(withValues...), + Max: r.FetchSize.Max.With(withValues...), + Avg: r.FetchSize.Avg.With(withValues...), + }, + FetchBytes: AggStats{ + Min: r.FetchBytes.Min.With(withValues...), + Max: r.FetchBytes.Max.With(withValues...), + Avg: r.FetchBytes.Avg.With(withValues...), + }, + reader: reader, + } +} + +// Observe records the reader stats. It should be called periodically. +func (r *ReaderStats) Observe(stats kafka.ReaderStats) { + withValues := []string{"client_id", stats.ClientID, "topic", stats.Topic, "partition", stats.Partition} + r.Dials.With(withValues...).Add(float64(stats.Dials)) + r.Fetches.With(withValues...).Add(float64(stats.Fetches)) + r.Messages.With(withValues...).Add(float64(stats.Messages)) + r.Bytes.With(withValues...).Add(float64(stats.Bytes)) + r.Rebalances.With(withValues...).Add(float64(stats.Rebalances)) + r.Timeouts.With(withValues...).Add(float64(stats.Timeouts)) + r.Errors.With(withValues...).Add(float64(stats.Errors)) + + r.Offset.With(withValues...).Set(float64(stats.Offset)) + r.Lag.With(withValues...).Set(float64(stats.Lag)) + r.MinBytes.With(withValues...).Set(float64(stats.MinBytes)) + r.MaxBytes.With(withValues...).Set(float64(stats.MaxBytes)) + r.MaxWait.With(withValues...).Set(stats.MaxWait.Seconds()) + r.QueueLength.With(withValues...).Set(float64(stats.QueueLength)) + r.QueueCapacity.With(withValues...).Set(float64(stats.QueueCapacity)) + + r.DialTime.Min.With(withValues...).Set(stats.DialTime.Min.Seconds()) + r.DialTime.Max.With(withValues...).Set(stats.DialTime.Max.Seconds()) + r.DialTime.Avg.With(withValues...).Set(stats.DialTime.Avg.Seconds()) + + r.ReadTime.Min.With(withValues...).Set(stats.ReadTime.Min.Seconds()) + r.ReadTime.Max.With(withValues...).Set(stats.ReadTime.Max.Seconds()) + r.ReadTime.Avg.With(withValues...).Set(stats.ReadTime.Avg.Seconds()) + + r.WaitTime.Min.With(withValues...).Set(stats.WaitTime.Min.Seconds()) + r.WaitTime.Max.With(withValues...).Set(stats.WaitTime.Max.Seconds()) + r.WaitTime.Avg.With(withValues...).Set(stats.WaitTime.Avg.Seconds()) + + r.FetchSize.Min.With(withValues...).Set(float64(stats.FetchSize.Min)) + r.FetchSize.Max.With(withValues...).Set(float64(stats.FetchSize.Max)) + r.FetchSize.Avg.With(withValues...).Set(float64(stats.FetchSize.Avg)) + + r.FetchBytes.Min.With(withValues...).Set(float64(stats.FetchBytes.Min)) + r.FetchBytes.Max.With(withValues...).Set(float64(stats.FetchBytes.Max)) + r.FetchBytes.Avg.With(withValues...).Set(float64(stats.FetchBytes.Avg)) } // newCollector creates a new kafka reader wrapper containing the name of the reader. @@ -59,42 +148,6 @@ func (d *readerCollector) collectConnectionStats() { for k, v := range d.factory.List() { reader := v.Conn.(*kafka.Reader) stats := reader.Stats() - withValues := []string{"reader", k, "client_id", stats.ClientID, "topic", stats.Topic, "partition", stats.Partition} - - d.stats.Dials.With(withValues...).Add(float64(stats.Dials)) - d.stats.Fetches.With(withValues...).Add(float64(stats.Fetches)) - d.stats.Messages.With(withValues...).Add(float64(stats.Messages)) - d.stats.Bytes.With(withValues...).Add(float64(stats.Bytes)) - d.stats.Rebalances.With(withValues...).Add(float64(stats.Rebalances)) - d.stats.Timeouts.With(withValues...).Add(float64(stats.Timeouts)) - d.stats.Errors.With(withValues...).Add(float64(stats.Errors)) - - d.stats.Offset.With(withValues...).Set(float64(stats.Offset)) - d.stats.Lag.With(withValues...).Set(float64(stats.Lag)) - d.stats.MinBytes.With(withValues...).Set(float64(stats.MinBytes)) - d.stats.MaxBytes.With(withValues...).Set(float64(stats.MaxBytes)) - d.stats.MaxWait.With(withValues...).Set(stats.MaxWait.Seconds()) - d.stats.QueueLength.With(withValues...).Set(float64(stats.QueueLength)) - d.stats.QueueCapacity.With(withValues...).Set(float64(stats.QueueCapacity)) - - d.stats.DialTime.Min.With(withValues...).Set(stats.DialTime.Min.Seconds()) - d.stats.DialTime.Max.With(withValues...).Set(stats.DialTime.Max.Seconds()) - d.stats.DialTime.Avg.With(withValues...).Set(stats.DialTime.Avg.Seconds()) - - d.stats.ReadTime.Min.With(withValues...).Set(stats.ReadTime.Min.Seconds()) - d.stats.ReadTime.Max.With(withValues...).Set(stats.ReadTime.Max.Seconds()) - d.stats.ReadTime.Avg.With(withValues...).Set(stats.ReadTime.Avg.Seconds()) - - d.stats.WaitTime.Min.With(withValues...).Set(stats.WaitTime.Min.Seconds()) - d.stats.WaitTime.Max.With(withValues...).Set(stats.WaitTime.Max.Seconds()) - d.stats.WaitTime.Avg.With(withValues...).Set(stats.WaitTime.Avg.Seconds()) - - d.stats.FetchSize.Min.With(withValues...).Set(float64(stats.FetchSize.Min)) - d.stats.FetchSize.Max.With(withValues...).Set(float64(stats.FetchSize.Max)) - d.stats.FetchSize.Avg.With(withValues...).Set(float64(stats.FetchSize.Avg)) - - d.stats.FetchBytes.Min.With(withValues...).Set(float64(stats.FetchBytes.Min)) - d.stats.FetchBytes.Max.With(withValues...).Set(float64(stats.FetchBytes.Max)) - d.stats.FetchBytes.Avg.With(withValues...).Set(float64(stats.FetchBytes.Avg)) + d.stats.Reader(k).Observe(stats) } } diff --git a/otkafka/writer_metrics.go b/otkafka/writer_metrics.go index c737f0cf..799a4be7 100644 --- a/otkafka/writer_metrics.go +++ b/otkafka/writer_metrics.go @@ -34,6 +34,104 @@ type WriterStats struct { Retries AggStats BatchSize AggStats BatchBytes AggStats + + writer string +} + +// Writer sets the writer label in WriterStats. +func (w *WriterStats) Writer(writer string) *WriterStats { + withValues := []string{"writer", writer} + return &WriterStats{ + Writes: w.Writes.With(withValues...), + Messages: w.Messages.With(withValues...), + Bytes: w.Bytes.With(withValues...), + Errors: w.Errors.With(withValues...), + MaxAttempts: w.MaxAttempts.With(withValues...), + MaxBatchSize: w.MaxBatchSize.With(withValues...), + BatchTimeout: w.BatchTimeout.With(withValues...), + ReadTimeout: w.ReadTimeout.With(withValues...), + WriteTimeout: w.WriteTimeout.With(withValues...), + RequiredAcks: w.RequiredAcks.With(withValues...), + BatchTime: AggStats{ + Min: w.BatchTime.Min.With(withValues...), + Max: w.BatchTime.Max.With(withValues...), + Avg: w.BatchTime.Avg.With(withValues...), + }, + WriteTime: AggStats{ + Min: w.WriteTime.Min.With(withValues...), + Max: w.WriteTime.Max.With(withValues...), + Avg: w.WriteTime.Avg.With(withValues...), + }, + Retries: AggStats{ + Min: w.Retries.Min.With(withValues...), + Max: w.Retries.Max.With(withValues...), + Avg: w.Retries.Avg.With(withValues...), + }, + WaitTime: AggStats{ + Min: w.WaitTime.Min.With(withValues...), + Max: w.WaitTime.Max.With(withValues...), + Avg: w.WaitTime.Avg.With(withValues...), + }, + BatchSize: AggStats{ + Min: w.BatchSize.Min.With(withValues...), + Max: w.BatchSize.Max.With(withValues...), + Avg: w.BatchSize.Avg.With(withValues...), + }, + BatchBytes: AggStats{ + Min: w.BatchBytes.Min.With(withValues...), + Max: w.BatchBytes.Max.With(withValues...), + Avg: w.BatchBytes.Avg.With(withValues...), + }, + Async: w.Async.With(withValues...), + writer: writer, + } +} + +// Observe records the writer stats. It should called periodically. +func (w *WriterStats) Observe(stats kafka.WriterStats) *WriterStats { + withValues := []string{"topic", stats.Topic} + + w.Writes.With(withValues...).Add(float64(stats.Writes)) + w.Messages.With(withValues...).Add(float64(stats.Messages)) + w.Bytes.With(withValues...).Add(float64(stats.Bytes)) + w.Errors.With(withValues...).Add(float64(stats.Errors)) + + w.BatchTime.Min.With(withValues...).Add(stats.BatchTime.Min.Seconds()) + w.BatchTime.Max.With(withValues...).Add(stats.BatchTime.Max.Seconds()) + w.BatchTime.Avg.With(withValues...).Add(stats.BatchTime.Avg.Seconds()) + + w.WriteTime.Min.With(withValues...).Add(stats.WriteTime.Min.Seconds()) + w.WriteTime.Max.With(withValues...).Add(stats.WriteTime.Max.Seconds()) + w.WriteTime.Avg.With(withValues...).Add(stats.WriteTime.Avg.Seconds()) + + w.WaitTime.Min.With(withValues...).Add(stats.WaitTime.Min.Seconds()) + w.WaitTime.Max.With(withValues...).Add(stats.WaitTime.Max.Seconds()) + w.WaitTime.Avg.With(withValues...).Add(stats.WaitTime.Avg.Seconds()) + + w.Retries.Min.With(withValues...).Add(float64(stats.Retries.Min)) + w.Retries.Max.With(withValues...).Add(float64(stats.Retries.Max)) + w.Retries.Avg.With(withValues...).Add(float64(stats.Retries.Avg)) + + w.BatchSize.Min.With(withValues...).Add(float64(stats.BatchSize.Min)) + w.BatchSize.Max.With(withValues...).Add(float64(stats.BatchSize.Max)) + w.BatchSize.Avg.With(withValues...).Add(float64(stats.BatchSize.Avg)) + + w.BatchBytes.Min.With(withValues...).Add(float64(stats.BatchBytes.Min)) + w.BatchBytes.Max.With(withValues...).Add(float64(stats.BatchBytes.Max)) + w.BatchBytes.Avg.With(withValues...).Add(float64(stats.BatchBytes.Avg)) + + w.MaxAttempts.With(withValues...).Set(float64(stats.MaxAttempts)) + w.MaxBatchSize.With(withValues...).Set(float64(stats.MaxBatchSize)) + w.BatchTimeout.With(withValues...).Set(stats.BatchTimeout.Seconds()) + w.ReadTimeout.With(withValues...).Set(stats.ReadTimeout.Seconds()) + w.WriteTimeout.With(withValues...).Set(stats.WriteTimeout.Seconds()) + w.RequiredAcks.With(withValues...).Set(float64(stats.RequiredAcks)) + var async float64 + if stats.Async { + async = 1.0 + } + w.Async.With(withValues...).Set(async) + return w } // newCollector creates a new kafka writer wrapper containing the name of the reader. @@ -50,47 +148,6 @@ func (d *writerCollector) collectConnectionStats() { for k, v := range d.factory.List() { writer := v.Conn.(*kafka.Writer) stats := writer.Stats() - withValues := []string{"writer", k, "topic", stats.Topic} - - d.stats.Writes.With(withValues...).Add(float64(stats.Writes)) - d.stats.Messages.With(withValues...).Add(float64(stats.Messages)) - d.stats.Bytes.With(withValues...).Add(float64(stats.Bytes)) - d.stats.Errors.With(withValues...).Add(float64(stats.Errors)) - - d.stats.BatchTime.Min.With(withValues...).Add(stats.BatchTime.Min.Seconds()) - d.stats.BatchTime.Max.With(withValues...).Add(stats.BatchTime.Max.Seconds()) - d.stats.BatchTime.Avg.With(withValues...).Add(stats.BatchTime.Avg.Seconds()) - - d.stats.WriteTime.Min.With(withValues...).Add(stats.WriteTime.Min.Seconds()) - d.stats.WriteTime.Max.With(withValues...).Add(stats.WriteTime.Max.Seconds()) - d.stats.WriteTime.Avg.With(withValues...).Add(stats.WriteTime.Avg.Seconds()) - - d.stats.WaitTime.Min.With(withValues...).Add(stats.WaitTime.Min.Seconds()) - d.stats.WaitTime.Max.With(withValues...).Add(stats.WaitTime.Max.Seconds()) - d.stats.WaitTime.Avg.With(withValues...).Add(stats.WaitTime.Avg.Seconds()) - - d.stats.Retries.Min.With(withValues...).Add(float64(stats.Retries.Min)) - d.stats.Retries.Max.With(withValues...).Add(float64(stats.Retries.Max)) - d.stats.Retries.Avg.With(withValues...).Add(float64(stats.Retries.Avg)) - - d.stats.BatchSize.Min.With(withValues...).Add(float64(stats.BatchSize.Min)) - d.stats.BatchSize.Max.With(withValues...).Add(float64(stats.BatchSize.Max)) - d.stats.BatchSize.Avg.With(withValues...).Add(float64(stats.BatchSize.Avg)) - - d.stats.BatchBytes.Min.With(withValues...).Add(float64(stats.BatchBytes.Min)) - d.stats.BatchBytes.Max.With(withValues...).Add(float64(stats.BatchBytes.Max)) - d.stats.BatchBytes.Avg.With(withValues...).Add(float64(stats.BatchBytes.Avg)) - - d.stats.MaxAttempts.With(withValues...).Set(float64(stats.MaxAttempts)) - d.stats.MaxBatchSize.With(withValues...).Set(float64(stats.MaxBatchSize)) - d.stats.BatchTimeout.With(withValues...).Set(stats.BatchTimeout.Seconds()) - d.stats.ReadTimeout.With(withValues...).Set(stats.ReadTimeout.Seconds()) - d.stats.WriteTimeout.With(withValues...).Set(stats.WriteTimeout.Seconds()) - d.stats.RequiredAcks.With(withValues...).Set(float64(stats.RequiredAcks)) - var async float64 - if stats.Async { - async = 1.0 - } - d.stats.Async.With(withValues...).Set(async) + d.stats.Writer(k).Observe(stats) } } diff --git a/otredis/metrics.go b/otredis/metrics.go index 5a222b04..52754e02 100644 --- a/otredis/metrics.go +++ b/otredis/metrics.go @@ -23,6 +23,38 @@ type Gauges struct { TotalConns metrics.Gauge IdleConns metrics.Gauge StaleConns metrics.Gauge + + dbName string +} + +// DBName sets the dbname label of redis metrics. +func (g *Gauges) DBName(dbName string) *Gauges { + withValues := []string{"dbname", dbName} + return &Gauges{ + Hits: g.Hits.With(withValues...), + Misses: g.Misses.With(withValues...), + Timeouts: g.Timeouts.With(withValues...), + TotalConns: g.TotalConns.With(withValues...), + IdleConns: g.IdleConns.With(withValues...), + StaleConns: g.StaleConns.With(withValues...), + dbName: dbName, + } +} + +// Observe records the redis pool stats. It should be called periodically. +func (g *Gauges) Observe(stats *redis.PoolStats) { + + g.Hits.Set(float64(stats.Hits)) + + g.Misses.Set(float64(stats.Misses)) + + g.Timeouts.Set(float64(stats.Timeouts)) + + g.TotalConns.Set(float64(stats.TotalConns)) + + g.IdleConns.Set(float64(stats.IdleConns)) + + g.StaleConns.Set(float64(stats.StaleConns)) } // newCollector creates a new redis wrapper containing the name of the redis. @@ -39,28 +71,6 @@ func (d *collector) collectConnectionStats() { for k, v := range d.factory.List() { conn := v.Conn.(redis.UniversalClient) stats := conn.PoolStats() - - withValues := []string{"dbname", k} - d.gauges.Hits. - With(withValues...). - Set(float64(stats.Hits)) - - d.gauges.Misses. - With(withValues...). - Set(float64(stats.Misses)) - - d.gauges.Timeouts. - With(withValues...). - Set(float64(stats.Timeouts)) - - d.gauges.TotalConns. - With(withValues...). - Set(float64(stats.TotalConns)) - d.gauges.IdleConns. - With(withValues...). - Set(float64(stats.IdleConns)) - d.gauges.StaleConns. - With(withValues...). - Set(float64(stats.StaleConns)) + d.gauges.DBName(k).Observe(stats) } } diff --git a/srvgrpc/metrics.go b/srvgrpc/metrics.go index 82f3f060..865202dc 100644 --- a/srvgrpc/metrics.go +++ b/srvgrpc/metrics.go @@ -1,6 +1,10 @@ package srvgrpc import ( + "context" + "time" + + "github.com/go-kit/kit/metrics" "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" ) @@ -20,3 +24,60 @@ type MetricsModule struct{} func (m MetricsModule) ProvideGRPC(server *grpc.Server) { grpc_prometheus.Register(server) } + +// Metrics is a unary interceptor for grpc package. It records the request duration in a histogram. +func Metrics(metrics *RequestDurationSeconds) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + start := time.Now() + defer func() { + metrics.Route(info.FullMethod).Observe(time.Since(start).Seconds()) + }() + return handler(ctx, req) + } +} + +// RequestDurationSeconds is a Histogram that measures the request latency. +type RequestDurationSeconds struct { + // Histogram is the underlying histogram of RequestDurationSeconds. + Histogram metrics.Histogram + + // labels + module string + service string + route string +} + +// Module specifies the module label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Module(module string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("module", module), + module: module, + service: r.service, + route: r.route, + } +} + +// Service specifies the service label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Service(service string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("service", service), + module: r.module, + service: service, + route: r.route, + } +} + +// Route specifies the method label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Route(route string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("route", route), + module: r.module, + service: r.service, + route: route, + } +} + +// Observe records the time taken to process the request. +func (r RequestDurationSeconds) Observe(seconds float64) { + r.Histogram.Observe(seconds) +} diff --git a/srvgrpc/metrics_test.go b/srvgrpc/metrics_test.go new file mode 100644 index 00000000..6901e20a --- /dev/null +++ b/srvgrpc/metrics_test.go @@ -0,0 +1,29 @@ +package srvgrpc + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/kit/metrics/generic" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestRequestDurationSeconds(t *testing.T) { + rds := &RequestDurationSeconds{ + Histogram: generic.NewHistogram("foo", 2), + } + rds = rds.Module("m").Service("s").Route("r") + rds.Observe(5) + + assert.Equal(t, 5.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) + assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r"}, rds.Histogram.(*generic.Histogram).LabelValues()) + + f := grpc.UnaryHandler(func(ctx context.Context, req interface{}) (interface{}, error) { + time.Sleep(time.Millisecond) + return nil, nil + }) + _, _ = Metrics(rds)(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "/"}, f) + assert.GreaterOrEqual(t, 1.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) +} diff --git a/srvhttp/metrics.go b/srvhttp/metrics.go index 660e0f38..4e77050f 100644 --- a/srvhttp/metrics.go +++ b/srvhttp/metrics.go @@ -1,6 +1,10 @@ package srvhttp import ( + "net/http" + "time" + + "github.com/go-kit/kit/metrics" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -13,3 +17,72 @@ type MetricsModule struct{} func (m MetricsModule) ProvideHTTP(router *mux.Router) { router.PathPrefix("/metrics").Handler(promhttp.Handler()) } + +// Metrics is a unary interceptor for standard library http package. It records the request duration in a histogram. +func Metrics(metrics *RequestDurationSeconds) func(handler http.Handler) http.Handler { + return func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + start := time.Now() + defer func() { + route := mux.CurrentRoute(request) + if route == nil { + metrics.Route("").Observe(time.Since(start).Seconds()) + return + } + path, err := route.GetPathTemplate() + if err != nil { + metrics.Route("").Observe(time.Since(start).Seconds()) + return + } + metrics.Route(path).Observe(time.Since(start).Seconds()) + }() + handler.ServeHTTP(writer, request) + }) + } +} + +// RequestDurationSeconds is a Histogram that measures the request latency. +type RequestDurationSeconds struct { + // Histogram is the underlying histogram of RequestDurationSeconds. + Histogram metrics.Histogram + + // labels + module string + service string + route string +} + +// Module specifies the module label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Module(module string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("module", module), + module: module, + service: r.service, + route: r.route, + } +} + +// Service specifies the service label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Service(service string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("service", service), + module: r.module, + service: service, + route: r.route, + } +} + +// Route specifies the method label for RequestDurationSeconds. +func (r *RequestDurationSeconds) Route(route string) *RequestDurationSeconds { + return &RequestDurationSeconds{ + Histogram: r.Histogram.With("route", route), + module: r.module, + service: r.service, + route: route, + } +} + +// Observe records the time taken to process the request. +func (r *RequestDurationSeconds) Observe(seconds float64) { + r.Histogram.Observe(seconds) +} diff --git a/srvhttp/metrics_test.go b/srvhttp/metrics_test.go new file mode 100644 index 00000000..79a6634e --- /dev/null +++ b/srvhttp/metrics_test.go @@ -0,0 +1,29 @@ +package srvhttp + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/kit/metrics/generic" + "github.com/stretchr/testify/assert" +) + +func TestRequestDurationSeconds(t *testing.T) { + rds := &RequestDurationSeconds{ + Histogram: generic.NewHistogram("foo", 2), + } + rds = rds.Module("m").Service("s").Route("r") + rds.Observe(5) + + assert.Equal(t, 5.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) + assert.ElementsMatch(t, []string{"module", "m", "service", "s", "route", "r"}, rds.Histogram.(*generic.Histogram).LabelValues()) + + f := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + time.Sleep(time.Millisecond) + }) + h := Metrics(rds)(f) + h.ServeHTTP(nil, httptest.NewRequest(http.MethodGet, "/", nil)) + assert.GreaterOrEqual(t, 1.0, rds.Histogram.(*generic.Histogram).Quantile(0.5)) +}