diff --git a/resourcemanager/pooltask/BUILD.bazel b/resourcemanager/pooltask/BUILD.bazel index 151a0ddfdec02..c4113b69dd141 100644 --- a/resourcemanager/pooltask/BUILD.bazel +++ b/resourcemanager/pooltask/BUILD.bazel @@ -5,6 +5,8 @@ go_library( srcs = [ "task.go", "task_manager.go", + "task_manager_iterator.go", + "task_manager_scheduler.go", ], importpath = "github.com/pingcap/tidb/resourcemanager/pooltask", visibility = ["//visibility:public"], diff --git a/resourcemanager/pooltask/task_manager.go b/resourcemanager/pooltask/task_manager.go index 25ce9e8ad1b4b..66d6451b163ba 100644 --- a/resourcemanager/pooltask/task_manager.go +++ b/resourcemanager/pooltask/task_manager.go @@ -32,29 +32,29 @@ type tContainer[T any, U any, C any, CT any, TF Context[CT]] struct { task *TaskBox[T, U, C, CT, TF] } -type meta struct { - stats *list.List - createTS time.Time - origin int32 - running int32 +type meta[T any, U any, C any, CT any, TF Context[CT]] struct { + stats *list.List + createTS time.Time + initialConcurrency int32 + running atomic.Int32 } -func newStats(concurrency int32) *meta { - s := &meta{ - createTS: time.Now(), - stats: list.New(), - origin: concurrency, +func newStats[T any, U any, C any, CT any, TF Context[CT]](concurrency int32) *meta[T, U, C, CT, TF] { + s := &meta[T, U, C, CT, TF]{ + createTS: time.Now(), + stats: list.New(), + initialConcurrency: concurrency, } return s } -func (m *meta) getOriginConcurrency() int32 { - return m.origin +func (m *meta[T, U, C, CT, TF]) getOriginConcurrency() int32 { + return m.initialConcurrency } // TaskStatusContainer is a container that can control or watch the pool. type TaskStatusContainer[T any, U any, C any, CT any, TF Context[CT]] struct { - stats map[uint64]*meta + stats map[uint64]*meta[T, U, C, CT, TF] rw sync.RWMutex } @@ -70,7 +70,7 @@ func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskMa task := make([]TaskStatusContainer[T, U, C, CT, TF], shard) for i := 0; i < shard; i++ { task[i] = TaskStatusContainer[T, U, C, CT, TF]{ - stats: make(map[uint64]*meta), + stats: make(map[uint64]*meta[T, U, C, CT, TF]), } } return TaskManager[T, U, C, CT, TF]{ @@ -83,7 +83,7 @@ func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskMa func (t *TaskManager[T, U, C, CT, TF]) RegisterTask(taskID uint64, concurrency int32) { id := getShardID(taskID) t.task[id].rw.Lock() - t.task[id].stats[taskID] = newStats(concurrency) + t.task[id].stats[taskID] = newStats[T, U, C, CT, TF](concurrency) t.task[id].rw.Unlock() } @@ -113,7 +113,7 @@ func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T t.running.Inc() t.task[shardID].rw.Lock() t.task[shardID].stats[taskID].stats.PushBack(tc) - t.task[shardID].stats[taskID].running++ // running job in this task + t.task[shardID].stats[taskID].running.Inc() // running job in this task t.task[shardID].rw.Unlock() } @@ -122,7 +122,7 @@ func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) { shardID := getShardID(taskID) t.running.Dec() // total running tasks t.task[shardID].rw.Lock() - t.task[shardID].stats[taskID].running-- // running job in this task + t.task[shardID].stats[taskID].running.Dec() // running job in this task t.task[shardID].rw.Unlock() } @@ -131,7 +131,7 @@ func (t *TaskManager[T, U, C, CT, TF]) Running(taskID uint64) int32 { shardID := getShardID(taskID) t.task[shardID].rw.Lock() defer t.task[shardID].rw.Unlock() - return t.task[shardID].stats[taskID].running + return t.task[shardID].stats[taskID].running.Load() } // StopTask is to stop a task by TaskID. diff --git a/resourcemanager/pooltask/task_manager_iterator.go b/resourcemanager/pooltask/task_manager_iterator.go new file mode 100644 index 0000000000000..ada5994599ff5 --- /dev/null +++ b/resourcemanager/pooltask/task_manager_iterator.go @@ -0,0 +1,131 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pooltask + +import ( + "container/list" + "time" +) + +func (t *TaskManager[T, U, C, CT, TF]) getBoostTask() (tid uint64, result *TaskBox[T, U, C, CT, TF]) { + // boost task + // 1、the count of running task is less than concurrency + // 2、less run time, more possible to boost + tid, element := t.iter(canBoost[T, U, C, CT, TF]) + if element != nil { + return tid, element.Value.(tContainer[T, U, C, CT, TF]).task + } + return 0, nil +} + +func (t *TaskManager[T, U, C, CT, TF]) pauseTask() { + // pause task, + // 1、more run time, more possible to pause + // 2、if task have been boosted, first to pause. + tid, result := t.iter(canPause[T, U, C, CT, TF]) + if result != nil { + result.Value.(tContainer[T, U, C, CT, TF]).task.status.CompareAndSwap(RunningTask, StopTask) + // delete it from list + shardID := getShardID(tid) + t.task[shardID].rw.Lock() + defer t.task[shardID].rw.Unlock() + t.task[shardID].stats[tid].stats.Remove(result) + } +} + +func (t *TaskManager[T, U, C, CT, TF]) iter(fn func(m *meta[T, U, C, CT, TF], max time.Time) (*list.Element, bool)) (tid uint64, result *list.Element) { + var compareTS time.Time + for i := 0; i < shard; i++ { + breakFind := func(index int) (breakFind bool) { + t.task[i].rw.RLock() + defer t.task[i].rw.RUnlock() + for id, stats := range t.task[i].stats { + if result == nil { + result = findTask[T, U, C, CT, TF](stats, RunningTask) + tid = id + compareTS = stats.createTS + continue + } + newResult, pauseFind := fn(stats, compareTS) + if pauseFind { + result = newResult + tid = id + compareTS = stats.createTS + return true + } + if newResult != nil { + result = newResult + tid = id + compareTS = stats.createTS + } + } + return false + }(shard) + if breakFind { + break + } + } + return tid, result +} + +func canPause[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], max time.Time) (result *list.Element, isBreak bool) { + if m.initialConcurrency < m.running.Load() { + box := findTask[T, U, C, CT, TF](m, RunningTask) + if box != nil { + return box, true + } + } + if m.createTS.Before(max) { + box := findTask[T, U, C, CT, TF](m, RunningTask) + if box != nil { + return box, false + } + } + return nil, false +} + +func canBoost[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], min time.Time) (result *list.Element, isBreak bool) { + if m.running.Load() < m.initialConcurrency { + box := getTask[T, U, C, CT, TF](m) + if box != nil { + return box, true + } + } + if m.createTS.After(min) { + box := getTask[T, U, C, CT, TF](m) + if box != nil { + return box, false + } + } + return nil, false +} + +func findTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF], status int32) *list.Element { + for e := m.stats.Front(); e != nil; e = e.Next() { + box := e.Value.(tContainer[T, U, C, CT, TF]) + if box.task.status.Load() == status { + return e + } + } + return nil +} + +func getTask[T any, U any, C any, CT any, TF Context[CT]](m *meta[T, U, C, CT, TF]) *list.Element { + e := m.stats.Front() + if e != nil { + return e + } + return nil +} diff --git a/resourcemanager/pooltask/task_manager_scheduler.go b/resourcemanager/pooltask/task_manager_scheduler.go new file mode 100644 index 0000000000000..dcc158df06d82 --- /dev/null +++ b/resourcemanager/pooltask/task_manager_scheduler.go @@ -0,0 +1,28 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pooltask + +// Overclock is to increase the concurrency of pool. +func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) { + if t.concurrency > t.running.Load() { + return t.getBoostTask() + } + return 0, nil +} + +// Downclock is to decrease the concurrency of pool. +func (t *TaskManager[T, U, C, CT, TF]) Downclock() { + t.pauseTask() +} diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index e6e48de2059cd..025eb0fcbc129 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -78,7 +78,7 @@ func (r *ResourceManager) Stop() { } // Register is to register pool into resource manager -func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error { +func (r *ResourceManager) Register(pool util.GoroutinePool, name string, component util.Component) error { p := util.PoolContainer{Pool: pool, Component: component} return r.registerPool(name, &p) } diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go index f6ac691e09b15..50a5f54697800 100644 --- a/resourcemanager/schedule.go +++ b/resourcemanager/schedule.go @@ -52,14 +52,14 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { switch cmd { case scheduler.Downclock: concurrency := con - 1 - log.Info("downclock goroutine pool", + log.Info("[resource manager] downclock goroutine pool", zap.Int("origin concurrency", con), zap.Int("concurrency", concurrency), zap.String("name", pool.Pool.Name())) pool.Pool.Tune(concurrency) case scheduler.Overclock: concurrency := con + 1 - log.Info("overclock goroutine pool", + log.Info("[resource manager] overclock goroutine pool", zap.Int("origin concurrency", con), zap.Int("concurrency", concurrency), zap.String("name", pool.Pool.Name())) diff --git a/resourcemanager/scheduler/cpu_scheduler.go b/resourcemanager/scheduler/cpu_scheduler.go index c84fcf36fb697..14338d80683d4 100644 --- a/resourcemanager/scheduler/cpu_scheduler.go +++ b/resourcemanager/scheduler/cpu_scheduler.go @@ -30,7 +30,7 @@ func NewCPUScheduler() *CPUScheduler { } // Tune is to tune the goroutine pool -func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command { +func (*CPUScheduler) Tune(_ util.Component, pool util.GoroutinePool) Command { if time.Since(pool.LastTunerTs()) < util.MinSchedulerInterval.Load() { return Hold } diff --git a/resourcemanager/scheduler/scheduler.go b/resourcemanager/scheduler/scheduler.go index 3af8e6aff5b0b..521536a741dee 100644 --- a/resourcemanager/scheduler/scheduler.go +++ b/resourcemanager/scheduler/scheduler.go @@ -32,5 +32,5 @@ const ( // Scheduler is a scheduler interface type Scheduler interface { - Tune(component util.Component, p util.GorotinuePool) Command + Tune(component util.Component, p util.GoroutinePool) Command } diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go index 4d433975fabb7..6d1959bd08904 100644 --- a/resourcemanager/util/util.go +++ b/resourcemanager/util/util.go @@ -25,8 +25,8 @@ var ( MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond) ) -// GorotinuePool is a pool interface -type GorotinuePool interface { +// GoroutinePool is a pool interface +type GoroutinePool interface { ReleaseAndWait() Tune(size int) LastTunerTs() time.Time @@ -37,7 +37,7 @@ type GorotinuePool interface { // PoolContainer is a pool container type PoolContainer struct { - Pool GorotinuePool + Pool GoroutinePool Component Component } diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 6644a0e895650..5f58bba12d5b4 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -140,12 +140,22 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) { p.SetLastTuneTs(time.Now()) p.capacity.Store(int32(size)) if size > capacity { - // boost + for i := 0; i < size-capacity; i++ { + if tid, boostTask := p.taskManager.Overclock(); boostTask != nil { + p.addWaitingTask() + p.taskManager.AddSubTask(tid, boostTask.Clone()) + p.taskCh <- boostTask + } + } if size-capacity == 1 { p.cond.Signal() return } p.cond.Broadcast() + return + } + if size < capacity { + p.taskManager.Downclock() } } diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 3036ad7412a3c..5bc5da4fdf3bc 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -15,9 +15,11 @@ package spmc import ( + "fmt" "sync" "sync/atomic" "testing" + "time" "github.com/pingcap/tidb/resourcemanager/pooltask" rmutil "github.com/pingcap/tidb/resourcemanager/util" @@ -121,6 +123,78 @@ func TestStopPool(t *testing.T) { pool.ReleaseAndWait() } +func TestTuneSimplePool(t *testing.T) { + testTunePool(t, "TestTuneSimplePool") +} + +func TestTuneMultiPool(t *testing.T) { + var concurrency = 5 + var wg sync.WaitGroup + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func(id int) { + testTunePool(t, fmt.Sprintf("TestTuneMultiPool%d", id)) + wg.Done() + }(i) + } + wg.Wait() +} + +func testTunePool(t *testing.T, name string) { + type ConstArgs struct { + a int + } + myArgs := ConstArgs{a: 10} + // init the pool + // input type, output type, constArgs type + pool, err := NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext](name, 10, rmutil.UNKNOWN) + require.NoError(t, err) + pool.SetConsumerFunc(func(task int, constArgs ConstArgs, ctx any) int { + return task + constArgs.a + }) + + exit := make(chan struct{}) + + pfunc := func() (int, error) { + select { + case <-exit: + return 0, gpool.ErrProducerClosed + default: + return 1, nil + } + } + // add new task + resultCh, control := pool.AddProducer(pfunc, myArgs, pooltask.NilContext{}, WithConcurrency(10)) + tid := control.TaskID() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for result := range resultCh { + require.Greater(t, result, 10) + } + }() + time.Sleep(1 * time.Second) + newSize := pool.Cap() - 1 + pool.Tune(newSize) + time.Sleep(1 * time.Second) + require.Equal(t, newSize, pool.Cap()) + require.Equal(t, int32(newSize), pool.taskManager.Running(tid)) + + newSize = pool.Cap() + 1 + pool.Tune(newSize) + time.Sleep(1 * time.Second) + require.Equal(t, newSize, pool.Cap()) + require.Equal(t, int32(newSize), pool.taskManager.Running(tid)) + + // exit test + close(exit) + control.Wait() + wg.Wait() + // close pool + pool.ReleaseAndWait() +} + func TestPoolWithEnoughCapacity(t *testing.T) { const ( RunTimes = 1000 diff --git a/util/gpool/spmc/worker.go b/util/gpool/spmc/worker.go index b8e22376bb79a..158c677775987 100644 --- a/util/gpool/spmc/worker.go +++ b/util/gpool/spmc/worker.go @@ -67,7 +67,7 @@ func (w *goWorker[T, U, C, CT, TF]) run() { for t := range f.GetTaskCh() { if f.GetStatus() == pooltask.StopTask { f.Done() - continue + break } f.GetResultCh() <- w.pool.consumerFunc(t.Task, f.ConstArgs(), ctx) f.Done()