Skip to content

Commit

Permalink
feat(metrics): publish task state by resolver group
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Bétrancourt <thomas@betrancourt.net>
  • Loading branch information
rclsilver committed May 23, 2023
1 parent 9b61952 commit 9acb484
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 8 deletions.
36 changes: 28 additions & 8 deletions api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,32 @@ import (
)

var (
metrics = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "utask_task_state"}, []string{"status"})
metrics = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "utask_task_state"}, []string{"status"})
metricsResolverGroup = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "utask_task_state_per_resolver_group"}, []string{"status", "group"})
)

func updateMetrics(dbp zesty.DBProvider) {
// utask_task_state
stats, err := task.LoadStateCount(dbp, nil)
if err != nil {
logrus.Warn(err)
}
for state, count := range stats {
metrics.WithLabelValues(state).Set(count)
}

// utask_task_state_per_resolver_group
statsResolverGroup, err := task.LoadStateCountResolverGroup(dbp)
if err != nil {
logrus.Warn(err)
}
for group, groupStats := range statsResolverGroup {
for state, count := range groupStats {
metricsResolverGroup.WithLabelValues(state, group).Set(count)
}
}
}

func collectMetrics(ctx context.Context) {
dbp, err := zesty.NewDBProvider(utask.DBName)
if err != nil {
Expand All @@ -28,17 +51,14 @@ func collectMetrics(ctx context.Context) {
}

tick := time.NewTicker(5 * time.Second)

updateMetrics(dbp)

go func() {
for {
select {
case <-tick.C:
stats, err := task.LoadStateCount(dbp, nil)
if err != nil {
logrus.Warn(err)
}
for state, count := range stats {
metrics.WithLabelValues(state).Set(count)
}
updateMetrics(dbp)
case <-ctx.Done():
tick.Stop()
return
Expand Down
48 changes: 48 additions & 0 deletions models/task/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type stateCount struct {
Count float64 `db:"state_count"`
}

type stateCountResolverGroup struct {
stateCount
Group string `db:"group_name"`
}

// RegisterValidationTime computes the duration between the task creation and
// the associated resolution's creation. This metric is then pushed to Prometheus.
func RegisterValidationTime(templateName string, taskCreation time.Time) {
Expand Down Expand Up @@ -86,3 +91,46 @@ func LoadStateCount(dbp zesty.DBProvider, tags map[string]string) (sc map[string

return sc, nil
}

// LoadStateCountResolverGroup returns a map containing the count of tasks grouped by state and by resolver_group
func LoadStateCountResolverGroup(dbp zesty.DBProvider) (sc map[string]map[string]float64, err error) {
defer errors.DeferredAnnotatef(&err, "Failed to load task stats")

subQuery := sqlgenerator.PGsql.Select(`t."id"`, `t."state"`, `coalesce(nullif(t."resolver_groups", 'null'::jsonb), nullif(tt."allowed_resolver_groups", 'null'::jsonb)) as "groups"`).
From(`"task" t`).
LeftJoin(`"task_template" tt ON t."id_template" = tt."id"`)

sel := sqlgenerator.PGsql.Select(`"group_name"`, `"state"`, `count("sq"."state") as "state_count"`).
FromSelect(subQuery, "sq").
Join(`jsonb_array_elements_text("sq"."groups") "group_name" ON true`).
Where(`"sq"."groups" IS NOT NULL`).
GroupBy(`"group_name"`, `"sq"."state"`)

query, params, err := sel.ToSql()
if err != nil {
return nil, err
}

s := []stateCountResolverGroup{}
if _, err := dbp.DB().Select(&s, query, params...); err != nil {
return nil, pgjuju.Interpret(err)
}

sc = make(map[string]map[string]float64)

for _, gsc := range s {
if _, exists := sc[gsc.Group]; !exists {
sc[gsc.Group] = map[string]float64{
StateTODO: 0,
StateBlocked: 0,
StateRunning: 0,
StateWontfix: 0,
StateDone: 0,
StateCancelled: 0,
}
}
sc[gsc.Group][gsc.State] = gsc.Count
}

return sc, nil
}
238 changes: 238 additions & 0 deletions models/task/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package task_test

import (
"context"
"fmt"
"os"
"reflect"
"sync"
"testing"
"time"

"github.com/loopfz/gadgeto/zesty"
"github.com/stretchr/testify/assert"

"github.com/ovh/configstore"
"github.com/ovh/utask"
"github.com/ovh/utask/api"
"github.com/ovh/utask/db"
"github.com/ovh/utask/engine"
functionrunner "github.com/ovh/utask/engine/functions/runner"
"github.com/ovh/utask/engine/step"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
compress "github.com/ovh/utask/pkg/compress/init"
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/plugins"
plugincallback "github.com/ovh/utask/pkg/plugins/builtin/callback"
"github.com/ovh/utask/pkg/plugins/builtin/echo"
"github.com/ovh/utask/pkg/plugins/builtin/script"
pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask"
)

func createTemplates(dbp zesty.DBProvider, prefix string, templates map[string][]string) (map[string]*tasktemplate.TaskTemplate, error) {
result := make(map[string]*tasktemplate.TaskTemplate)

for name, groups := range templates {
tt, err := tasktemplate.Create(dbp, prefix+name, name+" description", nil, nil, nil, nil, groups, nil, false, false, nil, nil, nil, nil, name+" title", nil, false, nil)
if err != nil {
return nil, err
}
result[name] = tt
}

return result, nil
}

func createTasks(dbp zesty.DBProvider, templates map[string]*tasktemplate.TaskTemplate, tasks map[string][]string) (map[string]*task.Task, error) {
result := make(map[string]*task.Task)

for name, groups := range tasks {
template, ok := templates[name]
if !ok {
return nil, fmt.Errorf("template %q not found", name)
}

task, err := task.Create(dbp, template, "foo", nil, nil, nil, nil, groups, nil, nil, nil, false)
if err != nil {
return nil, err
}

result[name] = task
}

return result, nil
}

func TestMain(m *testing.M) {
store := configstore.DefaultStore
store.InitFromEnvironment()

server := api.NewServer()
service := &plugins.Service{Store: store, Server: server}

if err := plugincallback.Init.Init(service); err != nil {
panic(err)
}

if err := db.Init(store); err != nil {
panic(err)
}

if err := now.Init(); err != nil {
panic(err)
}

if err := compress.Register(); err != nil {
panic(err)
}

var wg sync.WaitGroup

if err := engine.Init(context.Background(), &wg, store); err != nil {
panic(err)
}

if err := functionrunner.Init(); err != nil {
panic(err)
}

step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin)
step.RegisterRunner(script.Plugin.PluginName(), script.Plugin)
step.RegisterRunner(pluginsubtask.Plugin.PluginName(), pluginsubtask.Plugin)
step.RegisterRunner(plugincallback.Plugin.PluginName(), plugincallback.Plugin)

os.Exit(m.Run())
}

func TestLoadStateCountResolverGroup(t *testing.T) {
dbp, err := zesty.NewDBProvider(utask.DBName)
assert.NoError(t, err)

tests := []struct {
name string
tasks map[string][]string
templates map[string][]string
wantSc map[string]map[string]float64
}{
{
"no-group",
map[string][]string{"task": nil},
map[string][]string{"task": nil},
map[string]map[string]float64{},
},
{
"no-override",
map[string][]string{"task": nil},
map[string][]string{"task": {"foo"}},
map[string]map[string]float64{
"foo": {
task.StateTODO: 1,
task.StateBlocked: 0,
task.StateRunning: 0,
task.StateWontfix: 0,
task.StateDone: 0,
task.StateCancelled: 0,
},
},
},
{
"with-override",
map[string][]string{"task": {"bar"}},
map[string][]string{"task": {"foo"}},
map[string]map[string]float64{
"bar": {
task.StateTODO: 1,
task.StateBlocked: 0,
task.StateRunning: 0,
task.StateWontfix: 0,
task.StateDone: 0,
task.StateCancelled: 0,
},
},
},
{
"no-override-multiple",
map[string][]string{"task": nil},
map[string][]string{"task": {"foo", "bar"}},
map[string]map[string]float64{
"foo": {
task.StateTODO: 1,
task.StateBlocked: 0,
task.StateRunning: 0,
task.StateWontfix: 0,
task.StateDone: 0,
task.StateCancelled: 0,
},
"bar": {
task.StateTODO: 1,
task.StateBlocked: 0,
task.StateRunning: 0,
task.StateWontfix: 0,
task.StateDone: 0,
task.StateCancelled: 0,
},
},
},
{
"with-override-multiple",
map[string][]string{"task": {"foo", "bar"}},
map[string][]string{"task": {"dummy"}},
map[string]map[string]float64{
"foo": {
task.StateTODO: 1,
task.StateBlocked: 0,
task.StateRunning: 0,
task.StateWontfix: 0,
task.StateDone: 0,
task.StateCancelled: 0,
},
"bar": {
task.StateTODO: 1,
task.StateBlocked: 0,
task.StateRunning: 0,
task.StateWontfix: 0,
task.StateDone: 0,
task.StateCancelled: 0,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := dbp.Tx(); err != nil {
t.Errorf("Tx() error = %v", err)
return
}

prefix := fmt.Sprintf("task-%d-", time.Now().UnixNano())

templates, err := createTemplates(dbp, prefix, tt.templates)
if err != nil {
t.Errorf("createTemplates() error = %v", err)
return
}

_, err = createTasks(dbp, templates, tt.tasks)
if err != nil {
t.Errorf("createTasks() error = %v", err)
return
}

gotSc, err := task.LoadStateCountResolverGroup(dbp)
if err != nil {
t.Errorf("LoadStateCountResolverGroup() error = %v, wantErr false", err)
return
}

if !reflect.DeepEqual(gotSc, tt.wantSc) {
t.Errorf("LoadStateCountResolverGroup() = %v, want %v", gotSc, tt.wantSc)
}

if err := dbp.Rollback(); err != nil {
t.Errorf("Rollback() error = %v", err)
return
}
})
}
}

0 comments on commit 9acb484

Please sign in to comment.