Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: add metrics for task manager #40819

Merged
merged 2 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -18049,6 +18049,108 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The TTL task statuses in each worker",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 100
},
"hiddenSeries": false,
"id": 294,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.10",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "running",
"color": "#5794F2"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(tidb_server_ttl_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)",
"interval": "",
"legendFormat": "{{ instance }} {{ type }}",
"queryType": "randomWalk",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "TTL Task Count By Status",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "TTL",
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLQueryDuration)
prometheus.MustRegister(TTLProcessedExpiredRowsCounter)
prometheus.MustRegister(TTLJobStatus)
prometheus.MustRegister(TTLTaskStatus)
prometheus.MustRegister(TTLPhaseTime)

prometheus.MustRegister(EMACPUUsageGauge)
Expand Down
8 changes: 8 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Help: "The jobs count in the specified status",
}, []string{LblType})

TTLTaskStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_task_status",
Help: "The tasks count in the specified status",
}, []string{LblType})

TTLPhaseTime = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Expand Down
2 changes: 2 additions & 0 deletions ttl/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var (

RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"})
CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"})

RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"})
)

func initWorkerPhases(workerType string) map[string]prometheus.Counter {
Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
],
embed = [":ttlworker"],
flaky = True,
race = "on",
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
deps = [
"//domain",
"//infoschema",
Expand All @@ -69,13 +70,15 @@ go_test(
"//testkit",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
"//ttl/session",
"//types",
"//util/chunk",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
5 changes: 3 additions & 2 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = %?,
current_job_owner_id = %?,
current_job_start_time = %?,
current_job_status = 'waiting',
current_job_status = 'running',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a bug fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Without this modification, the job status will be always 'waiting'.

current_job_status_update_time = %?,
current_job_ttl_expire = %?,
current_job_owner_hb_time = %?
Expand Down Expand Up @@ -161,6 +161,7 @@ func (m *JobManager) jobLoop() error {
m.taskManager.resizeWorkersWithSysVar()
for {
m.reportMetrics()
m.taskManager.reportMetrics()
now := se.Now()

select {
Expand Down Expand Up @@ -651,7 +652,7 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca
// information from schema cache directly
tbl: table,

status: cache.JobStatusWaiting,
status: cache.JobStatusRunning,
}
}

Expand Down
40 changes: 40 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/client"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util/logutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -574,3 +576,41 @@ func TestGCTTLHistory(t *testing.T) {
ttlworker.DoGC(context.TODO(), se)
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
}

func TestJobMetrics(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

waitAndStopTTLManager(t, dom)

now := time.Now()
tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL)

se := sessionFactory()
m := ttlworker.NewJobManager("manager-1", nil, store, nil)
m.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m.InfoSchemaCache().Update(se))
// schedule jobs
m.RescheduleJobs(se, now)
// set the worker to be empty, so none of the tasks will be scheduled
m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{})

sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
rows, err := se.ExecuteSQL(ctx, sql, args...)
require.NoError(t, err)
tableStatus, err := cache.RowToTableStatus(se, rows[0])
require.NoError(t, err)

require.NotEmpty(t, tableStatus.CurrentJobID)
require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID)
require.Equal(t, cache.JobStatusRunning, tableStatus.CurrentJobStatus)

m.ReportMetrics()
out := &dto.Metric{}
require.NoError(t, metrics.RunningJobsCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}
5 changes: 5 additions & 0 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no
return m.updateHeartBeat(ctx, se, now)
}

// ReportMetrics is an exported version of reportMetrics
func (m *JobManager) ReportMetrics() {
m.reportMetrics()
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
}
Expand Down
48 changes: 38 additions & 10 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/multierr"
Expand Down Expand Up @@ -327,23 +328,17 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
}

err := se.RunInTxn(ctx, func() error {
sql, args := cache.SelectFromTTLTaskWithID(task.JobID, task.ScanID)
rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.Errorf("didn't find task with jobID: %s, scanID: %d", task.JobID, task.ScanID)
}
task, err = cache.RowToTTLTask(se, rows[0])
var err error

task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, true)
if err != nil {
return err
}
if task.OwnerID != "" && !task.OwnerHBTime.Add(2*jobManagerLoopTickerInterval).Before(now) {
return errors.New("task is already scheduled")
}

sql, args = setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now)
sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
Expand All @@ -355,6 +350,12 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
return nil, err
}

// update the task after setting status and owner
task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, false)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(m.ctx)
scanTask := &ttlScanTask{
ctx: ctx,
Expand All @@ -371,6 +372,28 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now
}, nil
}

func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, detectLock bool) (*cache.TTLTask, error) {
ctx := m.ctx

sql, args := cache.SelectFromTTLTaskWithID(jobID, scanID)
if detectLock {
sql += " FOR UPDATE NOWAIT"
}
rows, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return nil, errors.Errorf("didn't find task with jobID: %s, scanID: %d", jobID, scanID)
}
task, err := cache.RowToTTLTask(se, rows[0])
if err != nil {
return nil, err
}

return task, nil
}

// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
for _, task := range m.runningTasks {
Expand Down Expand Up @@ -427,6 +450,7 @@ func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task
if err != nil {
return err
}
task.Status = cache.TaskStatusFinished

timeoutCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
_, err = se.ExecuteSQL(timeoutCtx, sql, args...)
Expand Down Expand Up @@ -474,6 +498,10 @@ func (m *taskManager) checkInvalidTask(se session.Session) {
m.runningTasks = ownRunningTask
}

func (m *taskManager) reportMetrics() {
metrics.RunningTaskCnt.Set(float64(len(m.runningTasks)))
}

type runningScanTask struct {
*ttlScanTask
cancel func()
Expand Down
Loading