From fb849aec259f2c86ea10ea4c30fd7be64f08790f Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Wed, 8 Jan 2025 23:20:28 +0800 Subject: [PATCH] fix: enable task archivers (#21943) Co-authored-by: Qiu Jian --- pkg/cloudevent/service/handlers.go | 3 +++ pkg/cloudevent/service/service.go | 8 +++++++ pkg/cloudid/service/handlers.go | 4 ++++ pkg/cloudid/service/service.go | 8 +++++++ pkg/cloudir/service/service.go | 2 -- pkg/compute/service/handlers.go | 4 ++++ pkg/compute/service/service.go | 7 ++++++ pkg/devtool/models/devtoolcronjob.go | 10 ++++++++ pkg/devtool/service/handler.go | 4 ++++ pkg/devtool/service/service.go | 4 +++- pkg/image/service/handlers.go | 3 +++ pkg/image/service/service.go | 8 +++++++ pkg/keystone/service/handlers.go | 4 ++++ pkg/keystone/service/service.go | 8 +++++++ pkg/monitor/service/handlers.go | 4 ++++ pkg/monitor/service/service.go | 34 ++++++++++++++++++---------- pkg/notify/service/service.go | 26 ++++++++++++++------- 17 files changed, 118 insertions(+), 23 deletions(-) diff --git a/pkg/cloudevent/service/handlers.go b/pkg/cloudevent/service/handlers.go index 2046bf13b2d..03491842c55 100644 --- a/pkg/cloudevent/service/handlers.go +++ b/pkg/cloudevent/service/handlers.go @@ -42,12 +42,15 @@ func InitHandlers(app *appsrv.Application) { models.InitCloudevent() + taskman.InitArchivedTaskManager() taskman.AddTaskHandler("v1", app) for _, manager := range []db.IModelManager{ taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, + db.UserCacheManager, db.TenantCacheManager, db.DistinctFieldManager, diff --git a/pkg/cloudevent/service/service.go b/pkg/cloudevent/service/service.go index 952f2c31344..2fe2b1ea44f 100644 --- a/pkg/cloudevent/service/service.go +++ b/pkg/cloudevent/service/service.go @@ -27,6 +27,7 @@ import ( common_app "yunion.io/x/onecloud/pkg/cloudcommon/app" "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" "yunion.io/x/onecloud/pkg/cloudevent/models" "yunion.io/x/onecloud/pkg/cloudevent/options" @@ -55,12 +56,19 @@ func StartService() { defer cloudcommon.CloseDB() if !opts.IsSlaveNode { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount) cron.AddJobAtIntervalsWithStartRun("SyncCloudprovider", time.Duration(opts.CloudproviderSyncIntervalMinutes)*time.Minute, models.CloudproviderManager.SyncCloudproviders, true) cron.AddJobAtIntervalsWithStartRun("CloudeventSyncTask", time.Duration(opts.CloudeventSyncIntervalHours)*time.Hour, models.CloudproviderManager.SyncCloudeventTask, true) cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + cron.Start() defer cron.Stop() } diff --git a/pkg/cloudid/service/handlers.go b/pkg/cloudid/service/handlers.go index dd6f305ca8f..b38cd5d6b84 100644 --- a/pkg/cloudid/service/handlers.go +++ b/pkg/cloudid/service/handlers.go @@ -40,13 +40,17 @@ import ( func InitHandlers(app *appsrv.Application) { db.InitAllManagers() + taskman.InitArchivedTaskManager() taskman.AddTaskHandler("v1", app) + db.AddScopeResourceCountHandler("", app) for _, manager := range []db.IModelManager{ taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, + db.UserCacheManager, db.TenantCacheManager, db.SharedResourceManager, diff --git a/pkg/cloudid/service/service.go b/pkg/cloudid/service/service.go index c83f214615a..2f91a520df8 100644 --- a/pkg/cloudid/service/service.go +++ b/pkg/cloudid/service/service.go @@ -30,6 +30,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/consts" "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" _ "yunion.io/x/onecloud/pkg/cloudid/drivers" @@ -85,12 +86,19 @@ func StartService() { } if !opts.IsSlaveNode { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount) cron.AddJobAtIntervalsWithStartRun("SyncCloudaccountResources", time.Duration(opts.CloudIdResourceSyncIntervalHours)*time.Hour, models.CloudaccountManager.SyncCloudaccountResources, true) cron.AddJobAtIntervalsWithStartRun("SyncCloudproviderResources", time.Duration(opts.CloudIdResourceSyncIntervalHours)*time.Hour, models.CloudproviderManager.SyncCloudproviderResources, true) cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + cron.Start() defer cron.Stop() } diff --git a/pkg/cloudir/service/service.go b/pkg/cloudir/service/service.go index 91abd58ab26..ffb20c73c1d 100644 --- a/pkg/cloudir/service/service.go +++ b/pkg/cloudir/service/service.go @@ -20,7 +20,6 @@ import ( "yunion.io/x/log" app_common "yunion.io/x/onecloud/pkg/cloudcommon/app" - "yunion.io/x/onecloud/pkg/cloudcommon/db" "yunion.io/x/onecloud/pkg/cloudcommon/etcd" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" "yunion.io/x/onecloud/pkg/cloudir/options" @@ -43,7 +42,6 @@ func StartService() { } app := app_common.InitApp(baseOpts, false) - db.AppDBInit(app) initHandlers(app) app_common.ServeForeverWithCleanup(app, baseOpts, func() { diff --git a/pkg/compute/service/handlers.go b/pkg/compute/service/handlers.go index f2e3ee52bd6..575edb5a57d 100644 --- a/pkg/compute/service/handlers.go +++ b/pkg/compute/service/handlers.go @@ -52,7 +52,10 @@ func InitHandlers(app *appsrv.Application) { capabilities.AddCapabilityHandler("", app) specs.AddSpecHandler("", app) sshkeys.AddSshKeysHandler("", app) + + taskman.InitArchivedTaskManager() taskman.AddTaskHandler("", app) + misc.AddMiscHandler("", app) app_common.ExportOptionsHandler(app, &options.Options) @@ -61,6 +64,7 @@ func InitHandlers(app *appsrv.Application) { taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, db.UserCacheManager, db.TenantCacheManager, db.SharedResourceManager, diff --git a/pkg/compute/service/service.go b/pkg/compute/service/service.go index 58cc50ff553..9052b6ba45e 100644 --- a/pkg/compute/service/service.go +++ b/pkg/compute/service/service.go @@ -156,6 +156,11 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll } cronFunc := func() { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds) cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount) @@ -210,6 +215,8 @@ func StartServiceWithJobsAndApp(jobs func(cron *cronman.SCronJobManager), appCll cron.AddJobEveryFewDays( "CleanRecycleDiskFiles", 1, 3, 0, 0, models.StoragesCleanRecycleDiskfiles, false) + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + if jobs != nil { jobs(cron) } diff --git a/pkg/devtool/models/devtoolcronjob.go b/pkg/devtool/models/devtoolcronjob.go index 54771e2789c..50618c643f2 100644 --- a/pkg/devtool/models/devtoolcronjob.go +++ b/pkg/devtool/models/devtoolcronjob.go @@ -23,6 +23,8 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" + "yunion.io/x/onecloud/pkg/devtool/options" "yunion.io/x/onecloud/pkg/mcclient" "yunion.io/x/onecloud/pkg/mcclient/auth" "yunion.io/x/onecloud/pkg/mcclient/modules/ansible" @@ -111,7 +113,15 @@ func AddOneCronjob(item *SCronjob, s *mcclient.ClientSession) error { } func InitializeCronjobs(ctx context.Context) error { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + DevToolCronManager = cronman.InitCronJobManager(true, 8) + + DevToolCronManager.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + DevToolCronManager.Start() Session := auth.GetAdminSession(ctx, "") diff --git a/pkg/devtool/service/handler.go b/pkg/devtool/service/handler.go index df7bda3fa89..4238d15f3e0 100644 --- a/pkg/devtool/service/handler.go +++ b/pkg/devtool/service/handler.go @@ -24,12 +24,16 @@ import ( func InitHandlers(app *appsrv.Application) { db.InitAllManagers() + + taskman.InitArchivedTaskManager() taskman.AddTaskHandler("", app) for _, manager := range []db.IModelManager{ taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, + db.SharedResourceManager, db.UserCacheManager, db.TenantCacheManager, diff --git a/pkg/devtool/service/service.go b/pkg/devtool/service/service.go index 902bd744fa6..8264f1c2ac6 100644 --- a/pkg/devtool/service/service.go +++ b/pkg/devtool/service/service.go @@ -50,7 +50,9 @@ func StartService() { db.EnsureAppSyncDB(app, dbOpts, models.InitDB) defer cloudcommon.CloseDB() - models.InitializeCronjobs(app.GetContext()) + if !opts.IsSlaveNode { + models.InitializeCronjobs(app.GetContext()) + } app_common.ServeForeverWithCleanup(app, baseOpts, func() { cloudcommon.CloseDB() diff --git a/pkg/image/service/handlers.go b/pkg/image/service/handlers.go index 6f3c0c86e50..594590f24ec 100644 --- a/pkg/image/service/handlers.go +++ b/pkg/image/service/handlers.go @@ -42,6 +42,8 @@ func InitHandlers(app *appsrv.Application) { quotas.AddQuotaHandler(&models.QuotaManager.SQuotaBaseManager, API_VERSION, app) usages.AddUsageHandler(API_VERSION, app) + + taskman.InitArchivedTaskManager() taskman.AddTaskHandler(API_VERSION, app) app_common.ExportOptionsHandler(app, &options.Options) @@ -50,6 +52,7 @@ func InitHandlers(app *appsrv.Application) { taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, db.UserCacheManager, db.TenantCacheManager, diff --git a/pkg/image/service/service.go b/pkg/image/service/service.go index be7f91f285c..20880e63f09 100644 --- a/pkg/image/service/service.go +++ b/pkg/image/service/service.go @@ -35,6 +35,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" "yunion.io/x/onecloud/pkg/cloudcommon/db/cachesync" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" "yunion.io/x/onecloud/pkg/hostman/hostdeployer/deployclient" "yunion.io/x/onecloud/pkg/image/drivers/s3" @@ -147,6 +148,11 @@ func StartService() { } if !opts.IsSlaveNode { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + cachesync.StartTenantCacheSync(opts.TenantCacheExpireSeconds) cron := cronman.InitCronJobManager(true, options.Options.CronJobWorkerCount) @@ -157,6 +163,8 @@ func StartService() { cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + cron.Start() } diff --git a/pkg/keystone/service/handlers.go b/pkg/keystone/service/handlers.go index 6e1c8e12538..9e6798ae806 100644 --- a/pkg/keystone/service/handlers.go +++ b/pkg/keystone/service/handlers.go @@ -42,6 +42,8 @@ func InitHandlers(app *appsrv.Application) { quotas.AddQuotaHandler(&models.IdentityQuotaManager.SQuotaBaseManager, API_VERSION, app) usages.AddUsageHandler(API_VERSION, app) + + taskman.InitArchivedTaskManager() taskman.AddTaskHandler(API_VERSION, app) app_common.ExportOptionsHandlerWithPrefix(app, API_VERSION, &options.Options) @@ -52,6 +54,8 @@ func InitHandlers(app *appsrv.Application) { taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, + models.SensitiveConfigManager, models.WhitelistedConfigManager, models.IdmappingManager, diff --git a/pkg/keystone/service/service.go b/pkg/keystone/service/service.go index fd3628b5d0b..56e66f5e27c 100644 --- a/pkg/keystone/service/service.go +++ b/pkg/keystone/service/service.go @@ -29,6 +29,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/consts" "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" "yunion.io/x/onecloud/pkg/cloudcommon/policy" @@ -103,6 +104,11 @@ func StartService() { cache.Init(opts.TokenExpirationSeconds) if !opts.IsSlaveNode { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount) cron.AddJobAtIntervalsWithStartRun("AutoSyncIdentityProviderTask", time.Duration(opts.AutoSyncIntervalSeconds)*time.Second, models.AutoSyncIdentityProviderTask, true) @@ -114,6 +120,8 @@ func StartService() { cron.AddJobEveryFewHour("RemoveObsoleteInvalidTokens", 6, 0, 0, models.RemoveObsoleteInvalidTokens, true) + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + cron.Start() defer cron.Stop() } diff --git a/pkg/monitor/service/handlers.go b/pkg/monitor/service/handlers.go index 28bb5519a1e..e9e432ab93a 100644 --- a/pkg/monitor/service/handlers.go +++ b/pkg/monitor/service/handlers.go @@ -41,10 +41,14 @@ func InitHandlers(app *appsrv.Application) { db.RegisterModelManager(db.RoleCacheManager) db.RegistUserCredCacheUpdater() + taskman.InitArchivedTaskManager() + taskman.AddTaskHandler("", app) + for _, manager := range []db.IModelManager{ taskman.TaskManager, taskman.SubTaskManager, taskman.TaskObjectManager, + taskman.ArchivedTaskManager, } { db.RegisterModelManager(manager) } diff --git a/pkg/monitor/service/service.go b/pkg/monitor/service/service.go index cbbdee0cfcc..ee719797f35 100644 --- a/pkg/monitor/service/service.go +++ b/pkg/monitor/service/service.go @@ -30,6 +30,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/consts" "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" "yunion.io/x/onecloud/pkg/cloudcommon/notifyclient" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" "yunion.io/x/onecloud/pkg/mcclient/auth" @@ -76,21 +77,30 @@ func StartService() { go startServices() - cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount) - cron.AddJobAtIntervalsWithStartRun("InitAlertResourceAdminRoleUsers", time.Duration(opts.InitAlertResourceAdminRoleUsersIntervalSeconds)*time.Second, models.GetAlertResourceManager().GetAdminRoleUsers, true) - cron.AddJobEveryFewDays("DeleteRecordsOfThirtyDaysAgoRecords", 1, 0, 0, 0, - models.AlertRecordManager.DeleteRecordsOfThirtyDaysAgo, false) - //cron.AddJobAtIntervalsWithStartRun("MonitorResourceSync", time.Duration(opts.MonitorResourceSyncIntervalSeconds)*time.Minute*60, models.MonitorResourceManager.SyncResources, true) - cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + if !opts.IsSlaveNode { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + + cron := cronman.InitCronJobManager(true, opts.CronJobWorkerCount) + cron.AddJobAtIntervalsWithStartRun("InitAlertResourceAdminRoleUsers", time.Duration(opts.InitAlertResourceAdminRoleUsersIntervalSeconds)*time.Second, models.GetAlertResourceManager().GetAdminRoleUsers, true) + cron.AddJobEveryFewDays("DeleteRecordsOfThirtyDaysAgoRecords", 1, 0, 0, 0, + models.AlertRecordManager.DeleteRecordsOfThirtyDaysAgo, false) + //cron.AddJobAtIntervalsWithStartRun("MonitorResourceSync", time.Duration(opts.MonitorResourceSyncIntervalSeconds)*time.Minute*60, models.MonitorResourceManager.SyncResources, true) + cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) - cron.Start() - defer cron.Stop() + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) - worker, err := worker.NewWorker(opts) - if err != nil { - log.Fatalf("new worker failed: %v", err) + cron.Start() + defer cron.Stop() + + worker, err := worker.NewWorker(opts) + if err != nil { + log.Fatalf("new worker failed: %v", err) + } + go worker.Start(app.GetContext(), app, "") } - go worker.Start(app.GetContext(), app, "") InitInfluxDBSubscriptionHandlers(app, baseOpts) diff --git a/pkg/notify/service/service.go b/pkg/notify/service/service.go index 1f47f0ad677..b9b23dc2d24 100644 --- a/pkg/notify/service/service.go +++ b/pkg/notify/service/service.go @@ -27,6 +27,7 @@ import ( "yunion.io/x/onecloud/pkg/cloudcommon/app" "yunion.io/x/onecloud/pkg/cloudcommon/cronman" "yunion.io/x/onecloud/pkg/cloudcommon/db" + "yunion.io/x/onecloud/pkg/cloudcommon/db/taskman" common_options "yunion.io/x/onecloud/pkg/cloudcommon/options" "yunion.io/x/onecloud/pkg/notify/models" "yunion.io/x/onecloud/pkg/notify/options" @@ -69,16 +70,25 @@ func StartService() { } } - cron := cronman.InitCronJobManager(true, 2) - // update service - cron.AddJobAtIntervalsWithStartRun("syncReciverFromKeystone", time.Duration(opts.SyncReceiverIntervalMinutes)*time.Minute, models.ReceiverManager.SyncUserFromKeystone, true) + if !opts.IsSlaveNode { + err := taskman.TaskManager.InitializeData() + if err != nil { + log.Fatalf("TaskManager.InitializeData fail %s", err) + } + + cron := cronman.InitCronJobManager(true, 2) + // update service + cron.AddJobAtIntervalsWithStartRun("syncReciverFromKeystone", time.Duration(opts.SyncReceiverIntervalMinutes)*time.Minute, models.ReceiverManager.SyncUserFromKeystone, true) - // wrapped func to resend notifications - cron.AddJobAtIntervals("ReSendNotifications", time.Duration(opts.ReSendScope)*time.Second, models.NotificationManager.ReSend) - cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) - cron.AddJobEveryFewDays("InitReceiverProject", 7, 0, 0, 0, models.InitReceiverProject, true) + // wrapped func to resend notifications + cron.AddJobAtIntervals("ReSendNotifications", time.Duration(opts.ReSendScope)*time.Second, models.NotificationManager.ReSend) + cron.AddJobEveryFewHour("AutoPurgeSplitable", 4, 30, 0, db.AutoPurgeSplitable, false) + cron.AddJobEveryFewDays("InitReceiverProject", 7, 0, 0, 0, models.InitReceiverProject, true) - cron.Start() + cron.AddJobAtIntervals("TaskCleanupJob", time.Duration(options.Options.TaskArchiveIntervalHours)*time.Hour, taskman.TaskManager.TaskCleanupJob) + + cron.Start() + } app.ServeForever(applicaion, baseOpts) }