Skip to content

Commit

Permalink
fix: enable task archivers (#21943)
Browse files Browse the repository at this point in the history
Co-authored-by: Qiu Jian <qiujian@yunionyun.com>
  • Loading branch information
swordqiu and Qiu Jian authored Jan 8, 2025
1 parent a82939f commit fb849ae
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 23 deletions.
3 changes: 3 additions & 0 deletions pkg/cloudevent/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ func InitHandlers(app *appsrv.Application) {

models.InitCloudevent()

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler("v1", app)

for _, manager := range []db.IModelManager{
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,

db.UserCacheManager,
db.TenantCacheManager,
db.DistinctFieldManager,
Expand Down
8 changes: 8 additions & 0 deletions pkg/cloudevent/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
common_app "yunion.io/x/onecloud/pkg/cloudcommon/app"
"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/cloudevent/models"
"yunion.io/x/onecloud/pkg/cloudevent/options"
Expand Down Expand Up @@ -55,12 +56,19 @@ func StartService() {
defer cloudcommon.CloseDB()

if !opts.IsSlaveNode {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount)
cron.AddJobAtIntervalsWithStartRun("SyncCloudprovider", time.Duration(opts.CloudproviderSyncIntervalMinutes)*time.Minute, models.CloudproviderManager.SyncCloudproviders, true)
cron.AddJobAtIntervalsWithStartRun("CloudeventSyncTask", time.Duration(opts.CloudeventSyncIntervalHours)*time.Hour, models.CloudproviderManager.SyncCloudeventTask, true)

cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)

cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

cron.Start()
defer cron.Stop()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cloudid/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ import (
func InitHandlers(app *appsrv.Application) {
db.InitAllManagers()

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler("v1", app)

db.AddScopeResourceCountHandler("", app)

for _, manager := range []db.IModelManager{
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,

db.UserCacheManager,
db.TenantCacheManager,
db.SharedResourceManager,
Expand Down
8 changes: 8 additions & 0 deletions pkg/cloudid/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
_ "yunion.io/x/onecloud/pkg/cloudid/drivers"
Expand Down Expand Up @@ -85,12 +86,19 @@ func StartService() {
}

if !opts.IsSlaveNode {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount)
cron.AddJobAtIntervalsWithStartRun("SyncCloudaccountResources", time.Duration(opts.CloudIdResourceSyncIntervalHours)*time.Hour, models.CloudaccountManager.SyncCloudaccountResources, true)
cron.AddJobAtIntervalsWithStartRun("SyncCloudproviderResources", time.Duration(opts.CloudIdResourceSyncIntervalHours)*time.Hour, models.CloudproviderManager.SyncCloudproviderResources, true)

cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)

cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

cron.Start()
defer cron.Stop()
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/cloudir/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"yunion.io/x/log"

app_common "yunion.io/x/onecloud/pkg/cloudcommon/app"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/etcd"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/cloudir/options"
Expand All @@ -43,7 +42,6 @@ func StartService() {
}

app := app_common.InitApp(baseOpts, false)
db.AppDBInit(app)
initHandlers(app)

app_common.ServeForeverWithCleanup(app, baseOpts, func() {
Expand Down
4 changes: 4 additions & 0 deletions pkg/compute/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func InitHandlers(app *appsrv.Application) {
capabilities.AddCapabilityHandler("", app)
specs.AddSpecHandler("", app)
sshkeys.AddSshKeysHandler("", app)

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler("", app)

misc.AddMiscHandler("", app)

app_common.ExportOptionsHandler(app, &options.Options)
Expand All @@ -61,6 +64,7 @@ func InitHandlers(app *appsrv.Application) {
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,
db.UserCacheManager,
db.TenantCacheManager,
db.SharedResourceManager,
Expand Down
7 changes: 7 additions & 0 deletions pkg/compute/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll
}

cronFunc := func() {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds)

cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount)
Expand Down Expand Up @@ -210,6 +215,8 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll
cron.AddJobEveryFewDays(
"CleanRecycleDiskFiles", 1, 3, 0, 0, models.StoragesCleanRecycleDiskfiles, false)

cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

if jobs != nil {
jobs(cron)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/devtool/models/devtoolcronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/devtool/options"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/mcclient/auth"
"yunion.io/x/onecloud/pkg/mcclient/modules/ansible"
Expand Down Expand Up @@ -111,7 +113,15 @@ func AddOneCronjob(item *SCronjob, s *mcclient.ClientSession) error {
}

func InitializeCronjobs(ctx context.Context) error {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

DevToolCronManager = cronman.InitCronJobManager(true, 8)

DevToolCronManager.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

DevToolCronManager.Start()
Session := auth.GetAdminSession(ctx, "")

Expand Down
4 changes: 4 additions & 0 deletions pkg/devtool/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ import (

func InitHandlers(app *appsrv.Application) {
db.InitAllManagers()

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler("", app)

for _, manager := range []db.IModelManager{
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,

db.SharedResourceManager,
db.UserCacheManager,
db.TenantCacheManager,
Expand Down
4 changes: 3 additions & 1 deletion pkg/devtool/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func StartService() {
db.EnsureAppSyncDB(app, dbOpts, models.InitDB)
defer cloudcommon.CloseDB()

models.InitializeCronjobs(app.GetContext())
if !opts.IsSlaveNode {
models.InitializeCronjobs(app.GetContext())
}

app_common.ServeForeverWithCleanup(app, baseOpts, func() {
cloudcommon.CloseDB()
Expand Down
3 changes: 3 additions & 0 deletions pkg/image/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func InitHandlers(app *appsrv.Application) {

quotas.AddQuotaHandler(&models.QuotaManager.SQuotaBaseManager, API_VERSION, app)
usages.AddUsageHandler(API_VERSION, app)

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler(API_VERSION, app)

app_common.ExportOptionsHandler(app, &options.Options)
Expand All @@ -50,6 +52,7 @@ func InitHandlers(app *appsrv.Application) {
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,

db.UserCacheManager,
db.TenantCacheManager,
Expand Down
8 changes: 8 additions & 0 deletions pkg/image/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/cachesync"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/hostman/hostdeployer/deployclient"
"yunion.io/x/onecloud/pkg/image/drivers/s3"
Expand Down Expand Up @@ -147,6 +148,11 @@ func StartService() {
}

if !opts.IsSlaveNode {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds)

cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount)
Expand All @@ -157,6 +163,8 @@ func StartService() {

cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)

cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

cron.Start()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/keystone/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func InitHandlers(app *appsrv.Application) {
quotas.AddQuotaHandler(&models.IdentityQuotaManager.SQuotaBaseManager, API_VERSION, app)

usages.AddUsageHandler(API_VERSION, app)

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler(API_VERSION, app)

app_common.ExportOptionsHandlerWithPrefix(app, API_VERSION, &options.Options)
Expand All @@ -52,6 +54,8 @@ func InitHandlers(app *appsrv.Application) {
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,

models.SensitiveConfigManager,
models.WhitelistedConfigManager,
models.IdmappingManager,
Expand Down
8 changes: 8 additions & 0 deletions pkg/keystone/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/cloudcommon/policy"
Expand Down Expand Up @@ -103,6 +104,11 @@ func StartService() {
cache.Init(opts.TokenExpirationSeconds)

if !opts.IsSlaveNode {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount)

cron.AddJobAtIntervalsWithStartRun("AutoSyncIdentityProviderTask", time.Duration(opts.AutoSyncIntervalSeconds)*time.Second, models.AutoSyncIdentityProviderTask, true)
Expand All @@ -114,6 +120,8 @@ func StartService() {

cron.AddJobEveryFewHour("RemoveObsoleteInvalidTokens", 6, 0, 0, models.RemoveObsoleteInvalidTokens, true)

cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

cron.Start()
defer cron.Stop()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/monitor/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ func InitHandlers(app *appsrv.Application) {
db.RegisterModelManager(db.RoleCacheManager)
db.RegistUserCredCacheUpdater()

taskman.InitArchivedTaskManager()
taskman.AddTaskHandler("", app)

for _, manager := range []db.IModelManager{
taskman.TaskManager,
taskman.SubTaskManager,
taskman.TaskObjectManager,
taskman.ArchivedTaskManager,
} {
db.RegisterModelManager(manager)
}
Expand Down
34 changes: 22 additions & 12 deletions pkg/monitor/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/mcclient/auth"
Expand Down Expand Up @@ -76,21 +77,30 @@ func StartService() {

go startServices()

cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount)
cron.AddJobAtIntervalsWithStartRun("InitAlertResourceAdminRoleUsers", time.Duration(opts.InitAlertResourceAdminRoleUsersIntervalSeconds)*time.Second, models.GetAlertResourceManager().GetAdminRoleUsers, true)
cron.AddJobEveryFewDays("DeleteRecordsOfThirtyDaysAgoRecords", 1, 0, 0, 0,
models.AlertRecordManager.DeleteRecordsOfThirtyDaysAgo, false)
//cron.AddJobAtIntervalsWithStartRun("MonitorResourceSync", time.Duration(opts.MonitorResourceSyncIntervalSeconds)*time.Minute*60, models.MonitorResourceManager.SyncResources, true)
cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)
if !opts.IsSlaveNode {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount)
cron.AddJobAtIntervalsWithStartRun("InitAlertResourceAdminRoleUsers", time.Duration(opts.InitAlertResourceAdminRoleUsersIntervalSeconds)*time.Second, models.GetAlertResourceManager().GetAdminRoleUsers, true)
cron.AddJobEveryFewDays("DeleteRecordsOfThirtyDaysAgoRecords", 1, 0, 0, 0,
models.AlertRecordManager.DeleteRecordsOfThirtyDaysAgo, false)
//cron.AddJobAtIntervalsWithStartRun("MonitorResourceSync", time.Duration(opts.MonitorResourceSyncIntervalSeconds)*time.Minute*60, models.MonitorResourceManager.SyncResources, true)
cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)

cron.Start()
defer cron.Stop()
cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

worker, err := worker.NewWorker(opts)
if err != nil {
log.Fatalf("new worker failed: %v", err)
cron.Start()
defer cron.Stop()

worker, err := worker.NewWorker(opts)
if err != nil {
log.Fatalf("new worker failed: %v", err)
}
go worker.Start(app.GetContext(), app, "")
}
go worker.Start(app.GetContext(), app, "")

InitInfluxDBSubscriptionHandlers(app, baseOpts)

Expand Down
26 changes: 18 additions & 8 deletions pkg/notify/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/app"
"yunion.io/x/onecloud/pkg/cloudcommon/cronman"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/notify/models"
"yunion.io/x/onecloud/pkg/notify/options"
Expand Down Expand Up @@ -69,16 +70,25 @@ func StartService() {
}
}

cron := cronman.InitCronJobManager(true, 2)
// update service
cron.AddJobAtIntervalsWithStartRun("syncReciverFromKeystone", time.Duration(opts.SyncReceiverIntervalMinutes)*time.Minute, models.ReceiverManager.SyncUserFromKeystone, true)
if !opts.IsSlaveNode {
err := taskman.TaskManager.InitializeData()
if err != nil {
log.Fatalf("TaskManager.InitializeData fail %s", err)
}

cron := cronman.InitCronJobManager(true, 2)
// update service
cron.AddJobAtIntervalsWithStartRun("syncReciverFromKeystone", time.Duration(opts.SyncReceiverIntervalMinutes)*time.Minute, models.ReceiverManager.SyncUserFromKeystone, true)

// wrapped func to resend notifications
cron.AddJobAtIntervals("ReSendNotifications", time.Duration(opts.ReSendScope)*time.Second, models.NotificationManager.ReSend)
cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)
cron.AddJobEveryFewDays("InitReceiverProject", 7, 0, 0, 0, models.InitReceiverProject, true)
// wrapped func to resend notifications
cron.AddJobAtIntervals("ReSendNotifications", time.Duration(opts.ReSendScope)*time.Second, models.NotificationManager.ReSend)
cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false)
cron.AddJobEveryFewDays("InitReceiverProject", 7, 0, 0, 0, models.InitReceiverProject, true)

cron.Start()
cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob)

cron.Start()
}

app.ServeForever(applicaion, baseOpts)
}

0 comments on commit fb849ae

Please sign in to comment.