Skip to content
This repository has been archived by the owner on Oct 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #589 from karimra/k8s-locker-anno
Browse files Browse the repository at this point in the history
add original key as a k8s lease annotation
  • Loading branch information
karimra authored Apr 26, 2022
2 parents 3caa03e + f9c8ece commit b1881e5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
32 changes: 9 additions & 23 deletions lockers/k8s_locker/k8s_locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
defaultLeaseDuration = 10 * time.Second
loggingPrefix = "[k8s_locker] "
defaultNamespace = "default"
origKeyName = "original-key"
)

func init() {
Expand All @@ -38,9 +39,6 @@ func init() {
acquiredlocks: make(map[string]*lock),
attemtinglocks: make(map[string]*lock),
logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags),
services: make(map[string]context.CancelFunc),
km: new(sync.RWMutex),
keyMapping: make(map[string]string),
}
})
}
Expand All @@ -52,11 +50,8 @@ type k8sLocker struct {
m *sync.RWMutex
acquiredlocks map[string]*lock
attemtinglocks map[string]*lock
services map[string]context.CancelFunc
//
identity string // hostname
km *sync.RWMutex
keyMapping map[string]string

identity string // hostname
}

type config struct {
Expand Down Expand Up @@ -98,12 +93,12 @@ func (k *k8sLocker) Init(ctx context.Context, cfg map[string]interface{}, opts .

func (k *k8sLocker) Lock(ctx context.Context, key string, val []byte) (bool, error) {
nkey := strings.ReplaceAll(key, "/", "-")
k.km.Lock()
k.keyMapping[nkey] = key
k.km.Unlock()
doneChan := make(chan struct{})
l := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
origKeyName: key,
},
Name: nkey,
Namespace: k.Cfg.Namespace,
Labels: map[string]string{
Expand Down Expand Up @@ -203,12 +198,8 @@ func (k *k8sLocker) Lock(ctx context.Context, key string, val []byte) (bool, err
func (k *k8sLocker) KeepLock(ctx context.Context, key string) (chan struct{}, chan error) {
doneChan := make(chan struct{})
errChan := make(chan error)
k.km.RLock()
nkey, ok := k.keyMapping[key]
k.km.RUnlock()
if !ok {
nkey = strings.ReplaceAll(key, "/", "-")
}
nkey := strings.ReplaceAll(key, "/", "-")

go func() {
defer close(doneChan)
ticker := time.NewTicker(k.Cfg.RenewPeriod)
Expand Down Expand Up @@ -260,12 +251,7 @@ func (k *k8sLocker) KeepLock(ctx context.Context, key string) (chan struct{}, ch
}

func (k *k8sLocker) Unlock(ctx context.Context, key string) error {
k.km.RLock()
nkey, ok := k.keyMapping[key]
k.km.RUnlock()
if !ok {
nkey = strings.ReplaceAll(key, "/", "-")
}
nkey := strings.ReplaceAll(key, "/", "-")
k.m.Lock()
defer k.m.Unlock()
k.unlock(ctx, nkey)
Expand Down
19 changes: 10 additions & 9 deletions lockers/k8s_locker/k8s_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,26 @@ func (k *k8sLocker) IsLocked(ctx context.Context, key string) (bool, error) {
}

func (k *k8sLocker) List(ctx context.Context, prefix string) (map[string]string, error) {
ll, err := k.clientset.CoordinationV1().Leases(k.Cfg.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: "app=gnmic",
})
ll, err := k.clientset.CoordinationV1().Leases(k.Cfg.Namespace).List(ctx,
metav1.ListOptions{
LabelSelector: "app=gnmic",
})
if err != nil {
return nil, err
}

prefix = strings.ReplaceAll(prefix, "/", "-")
rs := make(map[string]string, len(ll.Items))

k.km.Lock()
defer k.km.Unlock()
for _, l := range ll.Items {
for key, v := range l.Labels {
if key == "app" {
continue
}
if strings.HasPrefix(key, prefix) {
if okey, ok := k.keyMapping[key]; ok {
okey, ok := l.Annotations[origKeyName]
if ok {
rs[okey] = v
} else {
rs[strings.ReplaceAll(key, "-", "/")] = v
continue
}
}
}
Expand Down

0 comments on commit b1881e5

Please sign in to comment.