diff --git a/api/metrics.go b/api/metrics.go index 570c3016..1871c3f1 100644 --- a/api/metrics.go +++ b/api/metrics.go @@ -17,9 +17,32 @@ import ( ) var ( - metrics = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "utask_task_state"}, []string{"status"}) + metrics = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "utask_task_state"}, []string{"status"}) + metricsResolverGroup = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "utask_task_state_per_resolver_group"}, []string{"status", "group"}) ) +func updateMetrics(dbp zesty.DBProvider) { + // utask_task_state + stats, err := task.LoadStateCount(dbp, nil) + if err != nil { + logrus.Warn(err) + } + for state, count := range stats { + metrics.WithLabelValues(state).Set(count) + } + + // utask_task_state_per_resolver_group + statsResolverGroup, err := task.LoadStateCountResolverGroup(dbp) + if err != nil { + logrus.Warn(err) + } + for group, groupStats := range statsResolverGroup { + for state, count := range groupStats { + metricsResolverGroup.WithLabelValues(state, group).Set(count) + } + } +} + func collectMetrics(ctx context.Context) { dbp, err := zesty.NewDBProvider(utask.DBName) if err != nil { @@ -28,17 +51,14 @@ func collectMetrics(ctx context.Context) { } tick := time.NewTicker(5 * time.Second) + + updateMetrics(dbp) + go func() { for { select { case <-tick.C: - stats, err := task.LoadStateCount(dbp, nil) - if err != nil { - logrus.Warn(err) - } - for state, count := range stats { - metrics.WithLabelValues(state).Set(count) - } + updateMetrics(dbp) case <-ctx.Done(): tick.Stop() return diff --git a/models/task/stats.go b/models/task/stats.go index 5ab6f17f..a0fda91d 100644 --- a/models/task/stats.go +++ b/models/task/stats.go @@ -25,6 +25,11 @@ type stateCount struct { Count float64 `db:"state_count"` } +type stateCountResolverGroup struct { + stateCount + Group string `db:"group_name"` +} + // RegisterValidationTime computes the duration between the task creation and // the associated resolution's creation. This metric is then pushed to Prometheus. func RegisterValidationTime(templateName string, taskCreation time.Time) { @@ -86,3 +91,46 @@ func LoadStateCount(dbp zesty.DBProvider, tags map[string]string) (sc map[string return sc, nil } + +// LoadStateCountResolverGroup returns a map containing the count of tasks grouped by state and by resolver_group +func LoadStateCountResolverGroup(dbp zesty.DBProvider) (sc map[string]map[string]float64, err error) { + defer errors.DeferredAnnotatef(&err, "Failed to load task stats") + + subQuery := sqlgenerator.PGsql.Select(`t."id"`, `t."state"`, `coalesce(nullif(t."resolver_groups", 'null'::jsonb), nullif(tt."allowed_resolver_groups", 'null'::jsonb)) as "groups"`). + From(`"task" t`). + LeftJoin(`"task_template" tt ON t."id_template" = tt."id"`) + + sel := sqlgenerator.PGsql.Select(`"group_name"`, `"state"`, `count("sq"."state") as "state_count"`). + FromSelect(subQuery, "sq"). + Join(`jsonb_array_elements_text("sq"."groups") "group_name" ON true`). + Where(`"sq"."groups" IS NOT NULL`). + GroupBy(`"group_name"`, `"sq"."state"`) + + query, params, err := sel.ToSql() + if err != nil { + return nil, err + } + + s := []stateCountResolverGroup{} + if _, err := dbp.DB().Select(&s, query, params...); err != nil { + return nil, pgjuju.Interpret(err) + } + + sc = make(map[string]map[string]float64) + + for _, gsc := range s { + if _, exists := sc[gsc.Group]; !exists { + sc[gsc.Group] = map[string]float64{ + StateTODO: 0, + StateBlocked: 0, + StateRunning: 0, + StateWontfix: 0, + StateDone: 0, + StateCancelled: 0, + } + } + sc[gsc.Group][gsc.State] = gsc.Count + } + + return sc, nil +} diff --git a/models/task/stats_test.go b/models/task/stats_test.go new file mode 100644 index 00000000..23a3776f --- /dev/null +++ b/models/task/stats_test.go @@ -0,0 +1,238 @@ +package task_test + +import ( + "context" + "fmt" + "os" + "reflect" + "sync" + "testing" + "time" + + "github.com/loopfz/gadgeto/zesty" + "github.com/stretchr/testify/assert" + + "github.com/ovh/configstore" + "github.com/ovh/utask" + "github.com/ovh/utask/api" + "github.com/ovh/utask/db" + "github.com/ovh/utask/engine" + functionrunner "github.com/ovh/utask/engine/functions/runner" + "github.com/ovh/utask/engine/step" + "github.com/ovh/utask/models/task" + "github.com/ovh/utask/models/tasktemplate" + compress "github.com/ovh/utask/pkg/compress/init" + "github.com/ovh/utask/pkg/now" + "github.com/ovh/utask/pkg/plugins" + plugincallback "github.com/ovh/utask/pkg/plugins/builtin/callback" + "github.com/ovh/utask/pkg/plugins/builtin/echo" + "github.com/ovh/utask/pkg/plugins/builtin/script" + pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask" +) + +func createTemplates(dbp zesty.DBProvider, prefix string, templates map[string][]string) (map[string]*tasktemplate.TaskTemplate, error) { + result := make(map[string]*tasktemplate.TaskTemplate) + + for name, groups := range templates { + tt, err := tasktemplate.Create(dbp, prefix+name, name+" description", nil, nil, nil, nil, groups, nil, false, false, nil, nil, nil, nil, name+" title", nil, false, nil) + if err != nil { + return nil, err + } + result[name] = tt + } + + return result, nil +} + +func createTasks(dbp zesty.DBProvider, templates map[string]*tasktemplate.TaskTemplate, tasks map[string][]string) (map[string]*task.Task, error) { + result := make(map[string]*task.Task) + + for name, groups := range tasks { + template, ok := templates[name] + if !ok { + return nil, fmt.Errorf("template %q not found", name) + } + + task, err := task.Create(dbp, template, "foo", nil, nil, nil, nil, groups, nil, nil, nil, false) + if err != nil { + return nil, err + } + + result[name] = task + } + + return result, nil +} + +func TestMain(m *testing.M) { + store := configstore.DefaultStore + store.InitFromEnvironment() + + server := api.NewServer() + service := &plugins.Service{Store: store, Server: server} + + if err := plugincallback.Init.Init(service); err != nil { + panic(err) + } + + if err := db.Init(store); err != nil { + panic(err) + } + + if err := now.Init(); err != nil { + panic(err) + } + + if err := compress.Register(); err != nil { + panic(err) + } + + var wg sync.WaitGroup + + if err := engine.Init(context.Background(), &wg, store); err != nil { + panic(err) + } + + if err := functionrunner.Init(); err != nil { + panic(err) + } + + step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin) + step.RegisterRunner(script.Plugin.PluginName(), script.Plugin) + step.RegisterRunner(pluginsubtask.Plugin.PluginName(), pluginsubtask.Plugin) + step.RegisterRunner(plugincallback.Plugin.PluginName(), plugincallback.Plugin) + + os.Exit(m.Run()) +} + +func TestLoadStateCountResolverGroup(t *testing.T) { + dbp, err := zesty.NewDBProvider(utask.DBName) + assert.NoError(t, err) + + tests := []struct { + name string + tasks map[string][]string + templates map[string][]string + wantSc map[string]map[string]float64 + }{ + { + "no-group", + map[string][]string{"task": nil}, + map[string][]string{"task": nil}, + map[string]map[string]float64{}, + }, + { + "no-override", + map[string][]string{"task": nil}, + map[string][]string{"task": {"foo"}}, + map[string]map[string]float64{ + "foo": { + task.StateTODO: 1, + task.StateBlocked: 0, + task.StateRunning: 0, + task.StateWontfix: 0, + task.StateDone: 0, + task.StateCancelled: 0, + }, + }, + }, + { + "with-override", + map[string][]string{"task": {"bar"}}, + map[string][]string{"task": {"foo"}}, + map[string]map[string]float64{ + "bar": { + task.StateTODO: 1, + task.StateBlocked: 0, + task.StateRunning: 0, + task.StateWontfix: 0, + task.StateDone: 0, + task.StateCancelled: 0, + }, + }, + }, + { + "no-override-multiple", + map[string][]string{"task": nil}, + map[string][]string{"task": {"foo", "bar"}}, + map[string]map[string]float64{ + "foo": { + task.StateTODO: 1, + task.StateBlocked: 0, + task.StateRunning: 0, + task.StateWontfix: 0, + task.StateDone: 0, + task.StateCancelled: 0, + }, + "bar": { + task.StateTODO: 1, + task.StateBlocked: 0, + task.StateRunning: 0, + task.StateWontfix: 0, + task.StateDone: 0, + task.StateCancelled: 0, + }, + }, + }, + { + "with-override-multiple", + map[string][]string{"task": {"foo", "bar"}}, + map[string][]string{"task": {"dummy"}}, + map[string]map[string]float64{ + "foo": { + task.StateTODO: 1, + task.StateBlocked: 0, + task.StateRunning: 0, + task.StateWontfix: 0, + task.StateDone: 0, + task.StateCancelled: 0, + }, + "bar": { + task.StateTODO: 1, + task.StateBlocked: 0, + task.StateRunning: 0, + task.StateWontfix: 0, + task.StateDone: 0, + task.StateCancelled: 0, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := dbp.Tx(); err != nil { + t.Errorf("Tx() error = %v", err) + return + } + + prefix := fmt.Sprintf("task-%d-", time.Now().UnixNano()) + + templates, err := createTemplates(dbp, prefix, tt.templates) + if err != nil { + t.Errorf("createTemplates() error = %v", err) + return + } + + _, err = createTasks(dbp, templates, tt.tasks) + if err != nil { + t.Errorf("createTasks() error = %v", err) + return + } + + gotSc, err := task.LoadStateCountResolverGroup(dbp) + if err != nil { + t.Errorf("LoadStateCountResolverGroup() error = %v, wantErr false", err) + return + } + + if !reflect.DeepEqual(gotSc, tt.wantSc) { + t.Errorf("LoadStateCountResolverGroup() = %v, want %v", gotSc, tt.wantSc) + } + + if err := dbp.Rollback(); err != nil { + t.Errorf("Rollback() error = %v", err) + return + } + }) + } +}