Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trigger project sync by informer #19836

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/appsrv/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,15 @@ func (r *Ring) Size() int {
return len(r.buffer) - r.tail + r.header
}
}

func (r *Ring) Range(proc func(obj interface{}) bool) {
r.lock.Lock()
defer r.lock.Unlock()

for i := r.header; i != r.tail && i != r.header; i = nextPointer(i, len(r.buffer)) {
cont := proc(r.buffer[i])
if !cont {
break
}
}
}
26 changes: 26 additions & 0 deletions pkg/appsrv/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ func (worker *SWorker) run() {
req := worker.manager.queue.Pop()
if req != nil {
task := req.(*sWorkerTask)

if worker.manager.cancelPrevIdent {
// cancel previous identical tasks
findIdent := false
worker.manager.queue.Range(func(iReq interface{}) bool {
iTask := iReq.(*sWorkerTask)
if iTask.task.Dump() == task.task.Dump() {
// found idential task
findIdent = true
return false
}
return true
})
if findIdent {
continue
}
}

if task.worker != nil {
task.worker <- worker
}
Expand Down Expand Up @@ -161,6 +179,8 @@ type SWorkerManager struct {
dbWorker bool

ignoreOverflow bool

cancelPrevIdent bool
}

func NewWorkerManager(name string, workerCount int, backlog int, dbWorker bool) *SWorkerManager {
Expand All @@ -179,6 +199,8 @@ func NewWorkerManagerIgnoreOverflow(name string, workerCount int, backlog int, d
dbWorker: dbWorker,

ignoreOverflow: ignoreOverflow,

cancelPrevIdent: false,
}

workerManagerLock.Lock()
Expand All @@ -200,6 +222,10 @@ type sWorkerTask struct {
start time.Time
}

func (wm *SWorkerManager) EnableCancelPreviousIdenticalTask() {
wm.cancelPrevIdent = true
}

func (wm *SWorkerManager) UpdateWorkerCount(workerCount int) error {
wm.workerLock.Lock()
defer wm.workerLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudcommon/app/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newEndpointChangeManager() *SEndpointChangeManager {
return man
}

func (man *SEndpointChangeManager) DoSync(first bool) (time.Duration, error) {
func (man *SEndpointChangeManager) DoSync(first bool, timeout bool) (time.Duration, error) {
// reauth to refresh endpoint list
auth.ReAuth()
return time.Hour * 2, nil
Expand Down
33 changes: 33 additions & 0 deletions pkg/cloudcommon/db/cachesync/cachesync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync

import (
"yunion.io/x/onecloud/pkg/appsrv"
identity_modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
)

var tenantCacheSyncWorkerMan *appsrv.SWorkerManager

func init() {
tenantCacheSyncWorkerMan = appsrv.NewWorkerManagerIgnoreOverflow("tenant_cache_sync_worker", 1, 1, true, true)
// tenantCacheSyncWorkerMan.EnableCancelPreviousIdenticalTask()
}

func StartTenantCacheSync(intvalSeconds int) {
newResourceChangeManager(identity_modules.Projects, intvalSeconds)
newResourceChangeManager(identity_modules.Domains, intvalSeconds)
newResourceChangeManager(identity_modules.UsersV3, intvalSeconds)
}
109 changes: 109 additions & 0 deletions pkg/cloudcommon/db/cachesync/changeman.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync

import (
"fmt"
"strings"
"sync"
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/log"

"yunion.io/x/onecloud/pkg/cloudcommon/syncman/watcher"
"yunion.io/x/onecloud/pkg/mcclient/informer"
identity_modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
)

type SResourceChangeManager struct {
watcher.SInformerSyncManager

resMan informer.IResourceManager
intervalSeconds int

ids []string
idsLock *sync.Mutex
}

func newResourceChangeManager(resMan informer.IResourceManager, intvalSecs int) *SResourceChangeManager {
man := &SResourceChangeManager{
resMan: resMan,
intervalSeconds: intvalSecs,

ids: make([]string, 0),
idsLock: &sync.Mutex{},
}
man.InitSync(man)
man.FirstSync()
man.StartWatching(resMan)
return man
}

func (man *SResourceChangeManager) DoSync(first bool, timeout bool) (time.Duration, error) {
if first || timeout {
// reset id list
man.resetId()
} else {
log.Debugf("to do incremental sync ids %s", jsonutils.Marshal(man.ids))
}

switch man.resMan.KeyString() {
case identity_modules.Projects.KeywordPlural:
tenantCacheSyncWorkerMan.Run(&tenantCacheSyncWorker{
ids: man.ids,
}, nil, nil)
case identity_modules.Domains.KeywordPlural:
tenantCacheSyncWorkerMan.Run(&domainCacheSyncWorker{
ids: man.ids,
}, nil, nil)
case identity_modules.UsersV3.KeywordPlural:
tenantCacheSyncWorkerMan.Run(&userCacheSyncWorker{
ids: man.ids,
}, nil, nil)
}
man.resetId()
log.Debugf("sync DONE, next sync %d seconds later...", man.intervalSeconds*8)
return time.Second * time.Duration(man.intervalSeconds) * 8, nil
}

func (man *SResourceChangeManager) NeedSync(dat *jsonutils.JSONDict) bool {
if dat != nil && dat.Contains("id") {
idstr, _ := dat.GetString("id")
idstr = strings.TrimSpace(idstr)
if len(idstr) > 0 {
man.addId(idstr)
}
}
return true
}

func (man *SResourceChangeManager) addId(idstr string) {
man.idsLock.Lock()
defer man.idsLock.Unlock()

man.ids = append(man.ids, idstr)
}

func (man *SResourceChangeManager) resetId() {
man.idsLock.Lock()
defer man.idsLock.Unlock()

man.ids = man.ids[0:0]
}

func (man *SResourceChangeManager) Name() string {
return fmt.Sprintf("ResourceChangeManager:%s", man.resMan.GetKeyword())
}
15 changes: 15 additions & 0 deletions pkg/cloudcommon/db/cachesync/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync // import "yunion.io/x/onecloud/pkg/cloudcommon/db/cachesync"
87 changes: 87 additions & 0 deletions pkg/cloudcommon/db/cachesync/domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cachesync

import (
"context"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/rbacscope"

identityapi "yunion.io/x/onecloud/pkg/apis/identity"
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/mcclient/auth"
modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
)

type domainCacheSyncWorker struct {
ids []string
}

func (w *domainCacheSyncWorker) Run() {
log.Debugf("[domainCacheSyncWorker] Run domain cache sync worker ...")
err := syncDomains(context.Background(), w.ids)
if err != nil {
log.Errorf("fail to syncDomains %s", err)
}
}

func (w *domainCacheSyncWorker) Dump() string {
return "domainCacheSyncWorker"
}

func syncDomains(ctx context.Context, ids []string) error {
s := auth.GetAdminSession(ctx, consts.GetRegion())
query := jsonutils.NewDict()
query.Add(jsonutils.NewInt(1024), "limit")
query.Add(jsonutils.NewString(string(rbacscope.ScopeSystem)), "scope")
query.Add(jsonutils.JSONTrue, "details")
query.Add(jsonutils.NewString("all"), "pending_delete")
query.Add(jsonutils.NewString("all"), "delete")
if len(ids) > 0 {
query.Add(jsonutils.NewStringArray(ids), "id")
}
total := -1
offset := 0
for total < 0 || offset < total {
query.Set("offset", jsonutils.NewInt(int64(offset)))
results, err := modules.Domains.List(s, query)
if err != nil {
return errors.Wrap(err, "Domains.List")
}
total = results.Total
for i := range results.Data {
// update domain cache
item := db.SCachedTenant{}
deleted := jsonutils.QueryBoolean(results.Data[i], "deleted", false)
err := results.Data[i].Unmarshal(&item)
if err == nil && !deleted {
item.ProjectDomain = identityapi.KeystoneDomainRoot
item.DomainId = identityapi.KeystoneDomainRoot
db.TenantCacheManager.Save(ctx, item, true)
} else if deleted {
tenantObj, _ := db.TenantCacheManager.FetchById(item.Id)
if tenantObj != nil {
tenantObj.Delete(ctx, nil)
}
}
offset++
}
}
return nil
}
Loading
Loading