diff --git a/config/watcher/file_test.go b/config/watcher/file_test.go index 6909abb5..b24a2805 100644 --- a/config/watcher/file_test.go +++ b/config/watcher/file_test.go @@ -15,7 +15,7 @@ import ( func TestWatch(t *testing.T) { t.Run("edit", func(t *testing.T) { t.Parallel() - ch := make(chan struct{}) + ch := make(chan struct{}, 2) f, _ := ioutil.TempFile(".", "*") defer os.Remove(f.Name()) @@ -26,7 +26,7 @@ func TestWatch(t *testing.T) { defer cancel() go w.Watch(ctx, func() error { - close(ch) + ch <- struct{}{} return nil }) time.Sleep(time.Second) diff --git a/go.mod b/go.mod index 8b017aa4..0ca0372b 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-kit/kit v0.10.0 github.com/go-redis/redis/v8 v8.6.0 github.com/gogo/protobuf v1.3.2 + github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.4.3 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 diff --git a/go.sum b/go.sum index d353fde6..718d9cd2 100644 --- a/go.sum +++ b/go.sum @@ -183,6 +183,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/observability/jaeger.go b/observability/jaeger.go index 78d1903f..0256b678 100644 --- a/observability/jaeger.go +++ b/observability/jaeger.go @@ -13,17 +13,17 @@ type JaegerLogAdapter struct { Logging log.Logger } -// Infof implements jaeger.Logger +// Infof implements jaeger's logger func (l JaegerLogAdapter) Infof(msg string, args ...interface{}) { level.Info(l.Logging).Log("msg", fmt.Sprintf(msg, args...)) } -// Error implements jaeger.Logger +// Error implements jaeger's logger func (l JaegerLogAdapter) Error(msg string) { level.Error(l.Logging).Log("msg", msg) } // ProvideJaegerLogAdapter returns a valid jaeger.Logger. func ProvideJaegerLogAdapter(l log.Logger) jaeger.Logger { - return &JaegerLogAdapter{Logging: l} + return &JaegerLogAdapter{Logging: log.With(l, "tag", "observability")} } diff --git a/observability/metrics.go b/observability/metrics.go index e0bdcaec..1968f3ce 100644 --- a/observability/metrics.go +++ b/observability/metrics.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/DoNewsCode/core/contract" + "github.com/DoNewsCode/core/otgorm" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -30,3 +31,28 @@ func ProvideHistogramMetrics(appName contract.AppName, env contract.Env) metrics }) return &his } + +// ProvideGORMMetrics returns a *otgorm.Gauges that measures the connection info in databases. +// It is meant to be consumed by the otgorm.Providers. +func ProvideGORMMetrics(appName contract.AppName, env contract.Env) *otgorm.Gauges { + return &otgorm.Gauges{ + Idle: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: appName.String(), + Subsystem: env.String(), + Name: "gorm_idle_connections", + Help: "number of idle connections", + }, []string{"dbname", "driver"}), + Open: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: appName.String(), + Subsystem: env.String(), + Name: "gorm_open_connections", + Help: "number of open connections", + }, []string{"dbname", "driver"}), + InUse: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: appName.String(), + Subsystem: env.String(), + Name: "gorm_in_use_connections", + Help: "number of in use connections", + }, []string{"dbname", "driver"}), + } +} diff --git a/observability/observability.go b/observability/observability.go index e7e6eba5..6b08c669 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -2,12 +2,7 @@ package observability import ( "github.com/DoNewsCode/core/config" - "github.com/DoNewsCode/core/contract" "github.com/DoNewsCode/core/di" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" - "github.com/opentracing/opentracing-go" - "go.uber.org/dig" "gopkg.in/yaml.v3" ) @@ -24,38 +19,13 @@ Providers returns a set of providers available in package observability metrics.Histogram */ func Providers() di.Deps { - return di.Deps{provide, provideConfig} -} - -// in is the injection argument of provide. -type in struct { - dig.In - - Logger log.Logger - Conf contract.ConfigAccessor - AppName contract.AppName - Env contract.Env -} - -// out is the result of provide -type out struct { - dig.Out - - Tracer opentracing.Tracer - Hist metrics.Histogram -} - -// provide provides the observability suite for the system. It contains a tracer and -// a histogram to measure all incoming request. -func provide(in in) (out, func(), error) { - in.Logger = log.With(in.Logger, "tag", "observability") - jlogger := ProvideJaegerLogAdapter(in.Logger) - tracer, cleanup, err := ProvideOpentracing(in.AppName, in.Env, jlogger, in.Conf) - hist := ProvideHistogramMetrics(in.AppName, in.Env) - return out{ - Tracer: tracer, - Hist: hist, - }, cleanup, err + return di.Deps{ + ProvideJaegerLogAdapter, + ProvideOpentracing, + ProvideHistogramMetrics, + ProvideGORMMetrics, + provideConfig, + } } const sample = ` @@ -66,7 +36,7 @@ jaeger: reporter: log: enable: false - addr: '' + addr: ` type configOut struct { diff --git a/observability/observability_test.go b/observability/observability_test.go index 30f69eed..d848d29f 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -1,29 +1,64 @@ package observability import ( - "testing" - + "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/otgorm" "github.com/go-kit/kit/log" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/rawbytes" "github.com/stretchr/testify/assert" + "gorm.io/gorm" + "testing" ) -func TestProvide(t *testing.T) { +func TestProvideOpentracing(t *testing.T) { conf, _ := config.NewConfig(config.WithProviderLayer(rawbytes.Provider([]byte(sample)), yaml.Parser())) - Out, cleanup, err := provide(in{ - Conf: conf, - Logger: log.NewNopLogger(), - AppName: config.AppName("foo"), - Env: config.EnvTesting, - }) + Out, cleanup, err := ProvideOpentracing( + config.AppName("foo"), + config.EnvTesting, + ProvideJaegerLogAdapter(log.NewNopLogger()), + conf, + ) assert.NoError(t, err) - assert.NotNil(t, Out.Tracer) - assert.NotNil(t, Out.Hist) + assert.NotNil(t, Out) cleanup() } +func TestProvideHistogramMetrics(t *testing.T) { + Out := ProvideHistogramMetrics( + config.AppName("foo"), + config.EnvTesting, + ) + assert.NotNil(t, Out) +} + +func TestProvideGORMMetrics(t *testing.T) { + c := core.New() + c.ProvideEssentials() + c.Provide(Providers()) + c.Provide(otgorm.Providers()) + c.Invoke(func(db *gorm.DB, g *otgorm.Gauges) { + d, err := db.DB() + if err != nil { + t.Error(err) + } + stats := d.Stats() + withValues := []string{"dbname", "default", "driver", db.Name()} + g.Idle. + With(withValues...). + Set(float64(stats.Idle)) + + g.InUse. + With(withValues...). + Set(float64(stats.InUse)) + + g.Open. + With(withValues...). + Set(float64(stats.OpenConnections)) + }) +} + func Test_provideConfig(t *testing.T) { Conf := provideConfig() assert.NotEmpty(t, Conf.Config) diff --git a/otgorm/dependency.go b/otgorm/dependency.go index 2eb33df4..63bb3912 100644 --- a/otgorm/dependency.go +++ b/otgorm/dependency.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net" + "time" "github.com/DoNewsCode/core/config" "github.com/DoNewsCode/core/contract" @@ -25,6 +26,7 @@ the Maker, database configs and the default *gorm.DB instance. log.Logger GormConfigInterceptor `optional:"true"` opentracing.Tracer `optional:"true"` + Gauges `optional:"true"` Provide: Maker Factory @@ -54,7 +56,7 @@ type Maker interface { Make(name string) (*gorm.DB, error) } -// GormConfigInterceptor is a function that allows user to make last minute +// GormConfigInterceptor is a function that allows user to Make last minute // change to *gorm.Config when constructing *gorm.DB. type GormConfigInterceptor func(name string, conf *gorm.Config) @@ -86,6 +88,10 @@ type databaseConf struct { } `json:"namingStrategy" yaml:"namingStrategy"` } +type metricsConf struct { + Interval config.Duration `json:"interval" yaml:"interval"` +} + // provideMemoryDatabase provides a sqlite database in memory mode. This is // useful for testing. func provideMemoryDatabase() *SQLite { @@ -111,6 +117,7 @@ type databaseIn struct { Logger log.Logger GormConfigInterceptor GormConfigInterceptor `optional:"true"` Tracer opentracing.Tracer `optional:"true"` + Gauges *Gauges `optional:"true"` } // databaseOut is the result of provideDatabaseFactory. *gorm.DB is not a interface @@ -118,8 +125,9 @@ type databaseIn struct { type databaseOut struct { di.Out - Factory Factory - Maker Maker + Factory Factory + Maker Maker + Collector *collector } // provideDialector provides a gorm.Dialector. Mean to be used as an intermediate @@ -182,10 +190,19 @@ func provideGormDB(dialector gorm.Dialector, config *gorm.Config, tracer opentra // provideDatabaseFactory creates the Factory. It is a valid dependency for // package core. func provideDatabaseFactory(p databaseIn) (databaseOut, func(), error) { + var collector *collector + factory, cleanup := provideDBFactory(p) + if p.Gauges != nil { + var interval time.Duration + p.Conf.Unmarshal("gormMetrics.interval", &interval) + collector = newCollector(factory, p.Gauges, interval) + } + return databaseOut{ - Factory: factory, - Maker: factory, + Factory: factory, + Maker: factory, + Collector: collector, }, cleanup, nil } @@ -265,6 +282,9 @@ func provideConfig() configOut { }{}, }, }, + "gormMetrics": metricsConf{ + Interval: config.Duration{Duration: 15 * time.Second}, + }, }, Comment: "The database configuration", }, diff --git a/otgorm/gorm_metrics.go b/otgorm/gorm_metrics.go new file mode 100644 index 00000000..581c9461 --- /dev/null +++ b/otgorm/gorm_metrics.go @@ -0,0 +1,54 @@ +//go:generate mockgen -destination=./mocks/metrics.go github.com/go-kit/kit/metrics Gauge + +package otgorm + +import ( + "time" + + "github.com/go-kit/kit/metrics" + "gorm.io/gorm" +) + +type collector struct { + factory Factory + gauges *Gauges + interval time.Duration +} + +// Gauges is a collection of metrics for database connection info. +type Gauges struct { + Idle metrics.Gauge + InUse metrics.Gauge + Open metrics.Gauge +} + +// newCollector creates a new database wrapper containing the name of the database, +// it's driver and the (sql) database itself. +func newCollector(factory Factory, gauges *Gauges, interval time.Duration) *collector { + return &collector{ + factory: factory, + gauges: gauges, + interval: interval, + } +} + +// collectConnectionStats collects database connections for Prometheus to scrape. +func (d *collector) collectConnectionStats() { + for k, v := range d.factory.List() { + conn := v.Conn.(*gorm.DB) + db, _ := conn.DB() + stats := db.Stats() + withValues := []string{"dbname", k, "driver", conn.Name()} + d.gauges.Idle. + With(withValues...). + Set(float64(stats.Idle)) + + d.gauges.InUse. + With(withValues...). + Set(float64(stats.InUse)) + + d.gauges.Open. + With(withValues...). + Set(float64(stats.OpenConnections)) + } +} diff --git a/otgorm/gorm_metrics_test.go b/otgorm/gorm_metrics_test.go new file mode 100644 index 00000000..f067abfe --- /dev/null +++ b/otgorm/gorm_metrics_test.go @@ -0,0 +1,37 @@ +//go:generate mockery --name=Gauge +package otgorm + +import ( + "testing" + "time" + + "github.com/DoNewsCode/core" + "github.com/DoNewsCode/core/di" + "github.com/DoNewsCode/core/otgorm/mocks" + "github.com/golang/mock/gomock" +) + +func TestCollector(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := mock_metrics.NewMockGauge(ctrl) + var g = Gauges{ + InUse: m, + Idle: m, + Open: m, + } + m.EXPECT().With(gomock.Any()).Times(3).Return(m) + m.EXPECT().Set(gomock.Any()).Times(3) + + c := core.New() + c.ProvideEssentials() + c.Provide(Providers()) + c.Provide(di.Deps{func() *Gauges { return &g }}) + + c.Invoke(func(factory Factory, g *Gauges) { + factory.Make("default") + c := newCollector(factory, g, time.Nanosecond) + c.collectConnectionStats() + }) +} diff --git a/otgorm/log.go b/otgorm/log.go index e624c7ad..c3c6874b 100644 --- a/otgorm/log.go +++ b/otgorm/log.go @@ -10,7 +10,7 @@ import ( "gorm.io/gorm/logger" ) -// GormLogAdapter is an adapter between kitlog and gorm logger interface +// GormLogAdapter is an adapter between kitlog and gorm Logger interface type GormLogAdapter struct { Logging log.Logger } diff --git a/otgorm/mocks/metrics.go b/otgorm/mocks/metrics.go new file mode 100644 index 00000000..04cc680b --- /dev/null +++ b/otgorm/mocks/metrics.go @@ -0,0 +1,77 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/go-kit/kit/metrics (interfaces: Gauge) + +// Package mock_metrics is a generated GoMock package. +package mock_metrics + +import ( + reflect "reflect" + + metrics "github.com/go-kit/kit/metrics" + gomock "github.com/golang/mock/gomock" +) + +// MockGauge is a mock of Gauge interface. +type MockGauge struct { + ctrl *gomock.Controller + recorder *MockGaugeMockRecorder +} + +// MockGaugeMockRecorder is the mock recorder for MockGauge. +type MockGaugeMockRecorder struct { + mock *MockGauge +} + +// NewMockGauge creates a new mock instance. +func NewMockGauge(ctrl *gomock.Controller) *MockGauge { + mock := &MockGauge{ctrl: ctrl} + mock.recorder = &MockGaugeMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockGauge) EXPECT() *MockGaugeMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockGauge) Add(arg0 float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Add", arg0) +} + +// Add indicates an expected call of Add. +func (mr *MockGaugeMockRecorder) Add(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockGauge)(nil).Add), arg0) +} + +// Set mocks base method. +func (m *MockGauge) Set(arg0 float64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Set", arg0) +} + +// Set indicates an expected call of Set. +func (mr *MockGaugeMockRecorder) Set(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockGauge)(nil).Set), arg0) +} + +// With mocks base method. +func (m *MockGauge) With(arg0 ...string) metrics.Gauge { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "With", varargs...) + ret0, _ := ret[0].(metrics.Gauge) + return ret0 +} + +// With indicates an expected call of With. +func (mr *MockGaugeMockRecorder) With(arg0 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "With", reflect.TypeOf((*MockGauge)(nil).With), arg0...) +} diff --git a/otgorm/module.go b/otgorm/module.go index 271aaf20..9852dabc 100644 --- a/otgorm/module.go +++ b/otgorm/module.go @@ -1,14 +1,20 @@ package otgorm import ( + "context" "fmt" + "time" "github.com/DoNewsCode/core/contract" + "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/logging" "github.com/go-kit/kit/log" + "github.com/oklog/run" "github.com/spf13/cobra" ) +const defaultInterval = 15 * time.Second + // MigrationProvider is an interface for database migrations. modules // implementing this interface are migration providers. migrations will be // collected in migrate command. @@ -29,18 +35,58 @@ type Module struct { env contract.Env logger log.Logger container contract.Container + collector *collector + interval time.Duration +} + +type moduleIn struct { + di.In + + Maker Maker + Env contract.Env + Logger log.Logger + Container contract.Container + Collector *collector + Conf contract.ConfigAccessor } -// New creates Module -func New(make Maker, env contract.Env, logger log.Logger, container contract.Container) Module { +// New creates a Module. +func New(in moduleIn) Module { + var duration time.Duration = defaultInterval + in.Conf.Unmarshal("gormMetrics.interval", &duration) return Module{ - maker: make, - env: env, - logger: logger, - container: container, + maker: in.Maker, + env: in.Env, + logger: in.Logger, + container: in.Container, + collector: in.Collector, + interval: duration, } } +// ProvideRunGroup add a goroutine to periodically scan database connections and +// report them to metrics collector such as prometheus. +func (m Module) ProvideRunGroup(group *run.Group) { + if m.collector == nil { + return + } + ctx, cancel := context.WithCancel(context.Background()) + ticker := time.NewTicker(m.interval) + group.Add(func() error { + for { + select { + case <-ticker.C: + m.collector.collectConnectionStats() + case <-ctx.Done(): + ticker.Stop() + return nil + } + } + }, func(err error) { + cancel() + }) +} + // ProvideCommand provides migration and seed command. func (m Module) ProvideCommand(command *cobra.Command) { var ( diff --git a/otgorm/module_test.go b/otgorm/module_test.go index 395a5f63..163016ea 100644 --- a/otgorm/module_test.go +++ b/otgorm/module_test.go @@ -1,8 +1,14 @@ package otgorm import ( + "context" + "database/sql" + "time" + "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/di" + mock_metrics "github.com/DoNewsCode/core/otgorm/mocks" + "github.com/golang/mock/gomock" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" "gorm.io/gorm" @@ -86,3 +92,62 @@ func TestModule_ProvideCommand(t *testing.T) { }) } } + +func TestModule_ProvideRunGroup(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + withValues := []interface{}{ + gomock.Eq("dbname"), + gomock.Eq("default"), + gomock.Eq("driver"), + gomock.Eq("sqlite"), + } + + idle := mock_metrics.NewMockGauge(ctrl) + idle.EXPECT().With(withValues...).Return(idle).MinTimes(1) + idle.EXPECT().Set(gomock.Eq(0.0)).MinTimes(1) + + open := mock_metrics.NewMockGauge(ctrl) + open.EXPECT().With(withValues...).Return(open).MinTimes(1) + open.EXPECT().Set(gomock.Eq(2.0)).MinTimes(1) + + inUse := mock_metrics.NewMockGauge(ctrl) + inUse.EXPECT().With(withValues...).Return(inUse).MinTimes(1) + inUse.EXPECT().Set(gomock.Eq(2.0)).MinTimes(1) + + c := core.New( + core.WithInline("gorm.default.database", "sqlite"), + core.WithInline("gorm.default.dsn", "file::memory:?cache=shared"), + core.WithInline("gormMetrics.interval", "1ms"), + core.WithInline("log.level", "none"), + ) + c.ProvideEssentials() + c.Provide(di.Deps{func() *Gauges { + return &Gauges{ + Idle: idle, + InUse: inUse, + Open: open, + } + }}) + c.Provide(Providers()) + c.AddModuleFunc(New) + ctx, cancel := context.WithCancel(context.Background()) + var ( + c1 *sql.Conn + c2 *sql.Conn + ) + c.Invoke(func(db *gorm.DB) { + rawSQL, _ := db.DB() + c1, _ = rawSQL.Conn(ctx) + c2, _ = rawSQL.Conn(ctx) + }) + go c.Serve(ctx) + <-time.After(100 * time.Millisecond) + cancel() + <-time.After(1000 * time.Millisecond) + c1.Close() + c2.Close() +}