Skip to content

Commit

Permalink
fix(semaphore): ensure holderKeys carry all information needed. Fixes
Browse files Browse the repository at this point in the history
#8684 (#13553)

Signed-off-by: isubasinghe <isitha@pipekit.io>
  • Loading branch information
isubasinghe authored Sep 18, 2024
1 parent 0604fda commit 440f1cc
Show file tree
Hide file tree
Showing 5 changed files with 499 additions and 60 deletions.
40 changes: 20 additions & 20 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ const (
VolumeClaimGCOnSuccess VolumeClaimGCStrategy = "OnWorkflowSuccess"
)

type HoldingNameVersion int

const (
HoldingNameV1 HoldingNameVersion = 1
HoldingNameV2 HoldingNameVersion = 2
)

// Workflow is the definition of a workflow resource
// +genclient
// +genclient:noStatus
Expand Down Expand Up @@ -3782,11 +3789,7 @@ func (ss *SemaphoreStatus) LockWaiting(holderKey, lockKey string, currentHolders

func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
i, semaphoreHolding := ss.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
holdingName := holderKey
if i < 0 {
ss.Holding = append(ss.Holding, SemaphoreHolding{Semaphore: lockKey, Holders: []string{holdingName}})
return true
Expand All @@ -3800,11 +3803,8 @@ func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolder

func (ss *SemaphoreStatus) LockReleased(holderKey, lockKey string) bool {
i, semaphoreHolding := ss.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
holdingName := holderKey

if i >= 0 {
semaphoreHolding.Holders = slice.RemoveString(semaphoreHolding.Holders, holdingName)
ss.Holding[i] = semaphoreHolding
Expand Down Expand Up @@ -3875,13 +3875,17 @@ func (ms *MutexStatus) LockWaiting(holderKey, lockKey string, currentHolders []s
return false
}

func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
i, mutexHolding := ms.GetHolding(lockKey)
func CheckHolderKeyVersion(holderKey string) HoldingNameVersion {
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
if len(items) == 2 || len(items) == 3 {
return HoldingNameV2
}
holdingName := items[len(items)-1]
return HoldingNameV1
}

func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool {
i, mutexHolding := ms.GetHolding(lockKey)
holdingName := holderKey
if i < 0 {
ms.Holding = append(ms.Holding, MutexHolding{Mutex: lockKey, Holder: holdingName})
return true
Expand All @@ -3895,11 +3899,7 @@ func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []

func (ms *MutexStatus) LockReleased(holderKey, lockKey string) bool {
i, holder := ms.GetHolding(lockKey)
items := strings.Split(holderKey, "/")
if len(items) == 0 {
return false
}
holdingName := items[len(items)-1]
holdingName := holderKey
if i >= 0 && holder.Holder == holdingName {
ms.Holding = append(ms.Holding[:i], ms.Holding[i+1:]...)
return true
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,17 +1092,17 @@ spec:
configMap:
name: cache-example-steps-simple
`)
wf.Name = "example-steps-simple-gas12"
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

holdingJobs := make(map[string]string)
for _, node := range woc.wf.Status.Nodes {
holdingJobs[node.ID] = node.DisplayName
holdingJobs[fmt.Sprintf("%s/%s/%s", wf.Namespace, wf.Name, node.ID)] = node.DisplayName
}

// Check initial status: job-1 acquired the lock
Expand Down
21 changes: 13 additions & 8 deletions workflow/sync/mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ status:
mutex:
holding:
- holder: synchronization-wf-level-xxs94
mutex: default/mutex/test
mutex: default/Mutex/test
`

func TestMutexLock(t *testing.T) {
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestMutexLock(t *testing.T) {
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding)
assert.Equal(t, wf.Name, wf.Status.Synchronization.Mutex.Holding[0].Holder)
assert.Equal(t, getHolderKey(wf, ""), wf.Status.Synchronization.Mutex.Holding[0].Holder)

// Try to acquire again
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestMutexLock(t *testing.T) {
assert.True(t, wfUpdate)
assert.NotNil(t, wf2.Status.Synchronization)
assert.NotNil(t, wf2.Status.Synchronization.Mutex)
assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Mutex.Holding[0].Holder)
assert.Equal(t, getHolderKey(wf2, ""), wf2.Status.Synchronization.Mutex.Holding[0].Holder)
concurrenyMgr.ReleaseAll(wf2)
assert.Nil(t, wf2.Status.Synchronization)
})
Expand All @@ -216,7 +216,8 @@ func TestMutexLock(t *testing.T) {
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding)
assert.Equal(t, wf.Name, wf.Status.Synchronization.Mutex.Holding[0].Holder)
expected := getHolderKey(wf, "")
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)

// Try to acquire again
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization)
Expand Down Expand Up @@ -271,7 +272,8 @@ func TestMutexLock(t *testing.T) {
assert.True(t, wfUpdate)
assert.NotNil(t, wf2.Status.Synchronization)
assert.NotNil(t, wf2.Status.Synchronization.Mutex)
assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Mutex.Holding[0].Holder)
expected = getHolderKey(wf2, "")
assert.Equal(t, expected, wf2.Status.Synchronization.Mutex.Holding[0].Holder)
concurrenyMgr.ReleaseAll(wf2)
assert.Nil(t, wf2.Status.Synchronization)
})
Expand Down Expand Up @@ -395,7 +397,8 @@ func TestMutexTmplLevel(t *testing.T) {
assert.True(t, wfUpdate)
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
expected := getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474")
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)

// Try to acquire again
status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization)
Expand All @@ -410,7 +413,8 @@ func TestMutexTmplLevel(t *testing.T) {
assert.False(t, wfUpdate)
assert.False(t, status)

assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474")
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)
concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization)
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
Expand All @@ -423,7 +427,8 @@ func TestMutexTmplLevel(t *testing.T) {
assert.True(t, wfUpdate)
assert.NotNil(t, wf.Status.Synchronization)
assert.NotNil(t, wf.Status.Synchronization.Mutex)
assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-2216915482", wf.Status.Synchronization.Mutex.Holding[0].Holder)
expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482")
assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder)

assert.NotEqual(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder)
concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization)
Expand Down
118 changes: 95 additions & 23 deletions workflow/sync/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,77 @@ func (cm *Manager) CheckWorkflowExistence() {
}
}

func getUpgradedKey(wf *wfv1.Workflow, key string, level SyncLevelType) string {
if wfv1.CheckHolderKeyVersion(key) == wfv1.HoldingNameV1 {
if level == WorkflowLevel {
return getHolderKey(wf, "")
}
return getHolderKey(wf, key)
}
return key
}

type SyncLevelType int

const (
WorkflowLevel SyncLevelType = 1
TemplateLevel SyncLevelType = 2
ErrorLevel SyncLevelType = 3
)

// HoldingNameV1 keys can be of the form
// x where x is a workflow name
// unfortunately this doesn't differentiate between workflow level keys
// and template level keys. So upgrading is a bit tricky here.

// given a legacy holding name x, namespace y and workflow name z.
// in the case of a workflow level
// if x != z
// upgradedKey := y/z
// elseif x == z
// upgradedKey := y/z
// in the case of a template level
// if x != z
// upgradedKey := y/z/x
// elif x == z
// upgradedKey := y/z/x

// there is a possibility that
// a synchronization exists both at the template level
// and at the workflow level -> impossible to upgrade correctly
// due to ambiguity. Currently we just assume workflow level.
func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelType, error) {
if wf.Spec.Synchronization != nil {
syncLockName, err := GetLockName(wf.Spec.Synchronization, wf.Namespace)
if err != nil {
return ErrorLevel, err
}
checkName := syncLockName.EncodeName()
if lockName == checkName {
return WorkflowLevel, nil
}
}

var lastErr error
for _, template := range wf.Spec.Templates {
if template.Synchronization != nil {
syncLockName, err := GetLockName(template.Synchronization, wf.Namespace)
if err != nil {
lastErr = err
continue
}
checkName := syncLockName.EncodeName()
if lockName == checkName {
return TemplateLevel, nil
}
}
}
if lastErr == nil {
lastErr = fmt.Errorf("was unable to determine level for %s", lockName)
}
return ErrorLevel, lastErr
}

func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
for _, wf := range wfs {
if wf.Status.Synchronization == nil {
Expand All @@ -86,11 +157,17 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
}

for _, holders := range holding.Holders {
resourceKey := getResourceKey(wf.Namespace, wf.Name, holders)
if semaphore != nil && semaphore.acquire(resourceKey) {
log.Infof("Lock acquired by %s from %s", resourceKey, holding.Semaphore)
level, err := getWorkflowSyncLevelByName(&wf, holding.Semaphore)
if err != nil {
log.Warnf("cannot obtain lock level for '%s' : %v", holding.Semaphore, err)
continue
}
key := getUpgradedKey(&wf, holders, level)
if semaphore != nil && semaphore.acquire(key) {
log.Infof("Lock acquired by %s from %s", key, holding.Semaphore)
}
}

}
}

Expand All @@ -101,8 +178,13 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) {
if mutex == nil {
mutex := cm.initializeMutex(holding.Mutex)
if holding.Holder != "" {
resourceKey := getResourceKey(wf.Namespace, wf.Name, holding.Holder)
mutex.acquire(resourceKey)
level, err := getWorkflowSyncLevelByName(&wf, holding.Mutex)
if err != nil {
log.Warnf("cannot obtain lock level for '%s' : %v", holding.Mutex, err)
continue
}
key := getUpgradedKey(&wf, holding.Holder, level)
mutex.acquire(key)
}
cm.syncLockMap[holding.Mutex] = mutex
}
Expand Down Expand Up @@ -214,10 +296,9 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
}

for _, holderKey := range holding.Holders {
resourceKey := getResourceKey(wf.Namespace, wf.Name, holderKey)
syncLockHolder.release(resourceKey)
syncLockHolder.release(holderKey)
wf.Status.Synchronization.Semaphore.LockReleased(holderKey, holding.Semaphore)
log.Infof("%s released a lock from %s", resourceKey, holding.Semaphore)
log.Infof("%s released a lock from %s", holderKey, holding.Semaphore)
}
}

Expand All @@ -227,8 +308,8 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
if syncLockHolder == nil {
continue
}
resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name)
syncLockHolder.removeFromQueue(resourceKey)
key := getHolderKey(wf, "")
syncLockHolder.removeFromQueue(key)
}
wf.Status.Synchronization.Semaphore = nil
}
Expand All @@ -240,10 +321,9 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
continue
}

resourceKey := getResourceKey(wf.Namespace, wf.Name, holding.Holder)
syncLockHolder.release(resourceKey)
syncLockHolder.release(holding.Holder)
wf.Status.Synchronization.Mutex.LockReleased(holding.Holder, holding.Mutex)
log.Infof("%s released a lock from %s", resourceKey, holding.Mutex)
log.Infof("%s released a lock from %s", holding.Holder, holding.Mutex)
}

// Remove the pending Workflow level mutex keys
Expand All @@ -252,8 +332,8 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool {
if syncLockHolder == nil {
continue
}
resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name)
syncLockHolder.removeFromQueue(resourceKey)
key := getHolderKey(wf, "")
syncLockHolder.removeFromQueue(key)
}
wf.Status.Synchronization.Mutex = nil
}
Expand Down Expand Up @@ -296,14 +376,6 @@ func getHolderKey(wf *wfv1.Workflow, nodeName string) string {
return key
}

func getResourceKey(namespace, wfName, resourceName string) string {
resourceKey := fmt.Sprintf("%s/%s", namespace, wfName)
if resourceName != wfName {
resourceKey = fmt.Sprintf("%s/%s", resourceKey, resourceName)
}
return resourceKey
}

func (cm *Manager) getCurrentLockHolders(lockName string) []string {
if concurrency, ok := cm.syncLockMap[lockName]; ok {
return concurrency.getCurrentHolders()
Expand Down
Loading

0 comments on commit 440f1cc

Please sign in to comment.