Skip to content

Commit

Permalink
fix: trigger project sync by informer (#19836)
Browse files Browse the repository at this point in the history
Co-authored-by: Qiu Jian <qiujian@yunionyun.com>
  • Loading branch information
swordqiu and Qiu Jian authored Apr 1, 2024
1 parent 4831e4a commit 6f5ec57
Show file tree
Hide file tree
Showing 23 changed files with 524 additions and 230 deletions.
12 changes: 12 additions & 0 deletions pkg/appsrv/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,15 @@ func (r *Ring) Size() int {
return len(r.buffer) - r.tail + r.header
}
}

func (r *Ring) Range(proc func(obj interface{}) bool) {
r.lock.Lock()
defer r.lock.Unlock()

for i := r.header; i != r.tail && i != r.header; i = nextPointer(i, len(r.buffer)) {
cont := proc(r.buffer[i])
if !cont {
break
}
}
}
26 changes: 26 additions & 0 deletions pkg/appsrv/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ func (worker *SWorker) run() {
req := worker.manager.queue.Pop()
if req != nil {
task := req.(*sWorkerTask)

if worker.manager.cancelPrevIdent {
// cancel previous identical tasks
findIdent := false
worker.manager.queue.Range(func(iReq interface{}) bool {
iTask := iReq.(*sWorkerTask)
if iTask.task.Dump() == task.task.Dump() {
// found idential task
findIdent = true
return false
}
return true
})
if findIdent {
continue
}
}

if task.worker != nil {
task.worker <- worker
}
Expand Down Expand Up @@ -161,6 +179,8 @@ type SWorkerManager struct {
dbWorker bool

ignoreOverflow bool

cancelPrevIdent bool
}

func NewWorkerManager(name string, workerCount int, backlog int, dbWorker bool) *SWorkerManager {
Expand All @@ -179,6 +199,8 @@ func NewWorkerManagerIgnoreOverflow(name string, workerCount int, backlog int, d
dbWorker: dbWorker,

ignoreOverflow: ignoreOverflow,

cancelPrevIdent: false,
}

workerManagerLock.Lock()
Expand All @@ -200,6 +222,10 @@ type sWorkerTask struct {
start time.Time
}

func (wm *SWorkerManager) EnableCancelPreviousIdenticalTask() {
wm.cancelPrevIdent = true
}

func (wm *SWorkerManager) UpdateWorkerCount(workerCount int) error {
wm.workerLock.Lock()
defer wm.workerLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/app/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newEndpointChangeManager() *SEndpointChangeManager {
return man
}

func (man *SEndpointChangeManager) DoSync(first bool) (time.Duration, error) {
func (man *SEndpointChangeManager) DoSync(first bool, timeout bool) (time.Duration, error) {
// reauth to refresh endpoint list
auth.ReAuth()
return time.Hour * 2, nil
Expand Down
33 changes: 33 additions & 0 deletions pkg/cloudcommon/db/cachesync/cachesync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync

import (
"yunion.io/x/onecloud/pkg/appsrv"
identity_modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
)

var tenantCacheSyncWorkerMan *appsrv.SWorkerManager

func init() {
tenantCacheSyncWorkerMan = appsrv.NewWorkerManagerIgnoreOverflow("tenant_cache_sync_worker", 1, 1, true, true)
// tenantCacheSyncWorkerMan.EnableCancelPreviousIdenticalTask()
}

func StartTenantCacheSync(intvalSeconds int) {
newResourceChangeManager(identity_modules.Projects, intvalSeconds)
newResourceChangeManager(identity_modules.Domains, intvalSeconds)
newResourceChangeManager(identity_modules.UsersV3, intvalSeconds)
}
109 changes: 109 additions & 0 deletions pkg/cloudcommon/db/cachesync/changeman.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync

import (
"fmt"
"strings"
"sync"
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/log"

"yunion.io/x/onecloud/pkg/cloudcommon/syncman/watcher"
"yunion.io/x/onecloud/pkg/mcclient/informer"
identity_modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
)

type SResourceChangeManager struct {
watcher.SInformerSyncManager

resMan informer.IResourceManager
intervalSeconds int

ids []string
idsLock *sync.Mutex
}

func newResourceChangeManager(resMan informer.IResourceManager, intvalSecs int) *SResourceChangeManager {
man := &SResourceChangeManager{
resMan: resMan,
intervalSeconds: intvalSecs,

ids: make([]string, 0),
idsLock: &sync.Mutex{},
}
man.InitSync(man)
man.FirstSync()
man.StartWatching(resMan)
return man
}

func (man *SResourceChangeManager) DoSync(first bool, timeout bool) (time.Duration, error) {
if first || timeout {
// reset id list
man.resetId()
} else {
log.Debugf("to do incremental sync ids %s", jsonutils.Marshal(man.ids))
}

switch man.resMan.KeyString() {
case identity_modules.Projects.KeywordPlural:
tenantCacheSyncWorkerMan.Run(&tenantCacheSyncWorker{
ids: man.ids,
}, nil, nil)
case identity_modules.Domains.KeywordPlural:
tenantCacheSyncWorkerMan.Run(&domainCacheSyncWorker{
ids: man.ids,
}, nil, nil)
case identity_modules.UsersV3.KeywordPlural:
tenantCacheSyncWorkerMan.Run(&userCacheSyncWorker{
ids: man.ids,
}, nil, nil)
}
man.resetId()
log.Debugf("sync DONE, next sync %d seconds later...", man.intervalSeconds*8)
return time.Second * time.Duration(man.intervalSeconds) * 8, nil
}

func (man *SResourceChangeManager) NeedSync(dat *jsonutils.JSONDict) bool {
if dat != nil && dat.Contains("id") {
idstr, _ := dat.GetString("id")
idstr = strings.TrimSpace(idstr)
if len(idstr) > 0 {
man.addId(idstr)
}
}
return true
}

func (man *SResourceChangeManager) addId(idstr string) {
man.idsLock.Lock()
defer man.idsLock.Unlock()

man.ids = append(man.ids, idstr)
}

func (man *SResourceChangeManager) resetId() {
man.idsLock.Lock()
defer man.idsLock.Unlock()

man.ids = man.ids[0:0]
}

func (man *SResourceChangeManager) Name() string {
return fmt.Sprintf("ResourceChangeManager:%s", man.resMan.GetKeyword())
}
15 changes: 15 additions & 0 deletions pkg/cloudcommon/db/cachesync/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync // import "yunion.io/x/onecloud/pkg/cloudcommon/db/cachesync"
87 changes: 87 additions & 0 deletions pkg/cloudcommon/db/cachesync/domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync

import (
"context"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/rbacscope"

identityapi "yunion.io/x/onecloud/pkg/apis/identity"
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/mcclient/auth"
modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
)

type domainCacheSyncWorker struct {
ids []string
}

func (w *domainCacheSyncWorker) Run() {
log.Debugf("[domainCacheSyncWorker] Run domain cache sync worker ...")
err := syncDomains(context.Background(), w.ids)
if err != nil {
log.Errorf("fail to syncDomains %s", err)
}
}

func (w *domainCacheSyncWorker) Dump() string {
return "domainCacheSyncWorker"
}

func syncDomains(ctx context.Context, ids []string) error {
s := auth.GetAdminSession(ctx, consts.GetRegion())
query := jsonutils.NewDict()
query.Add(jsonutils.NewInt(1024), "limit")
query.Add(jsonutils.NewString(string(rbacscope.ScopeSystem)), "scope")
query.Add(jsonutils.JSONTrue, "details")
query.Add(jsonutils.NewString("all"), "pending_delete")
query.Add(jsonutils.NewString("all"), "delete")
if len(ids) > 0 {
query.Add(jsonutils.NewStringArray(ids), "id")
}
total := -1
offset := 0
for total < 0 || offset < total {
query.Set("offset", jsonutils.NewInt(int64(offset)))
results, err := modules.Domains.List(s, query)
if err != nil {
return errors.Wrap(err, "Domains.List")
}
total = results.Total
for i := range results.Data {
// update domain cache
item := db.SCachedTenant{}
deleted := jsonutils.QueryBoolean(results.Data[i], "deleted", false)
err := results.Data[i].Unmarshal(&item)
if err == nil && !deleted {
item.ProjectDomain = identityapi.KeystoneDomainRoot
item.DomainId = identityapi.KeystoneDomainRoot
db.TenantCacheManager.Save(ctx, item, true)
} else if deleted {
tenantObj, _ := db.TenantCacheManager.FetchById(item.Id)
if tenantObj != nil {
tenantObj.Delete(ctx, nil)
}
}
offset++
}
}
return nil
}
Loading

0 comments on commit 6f5ec57

Please sign in to comment.