Skip to content

Commit

Permalink
feat: allow crossnamespace locking via sempahores
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Wöhrl <lukas.woehrl@plentymarkets.com>
  • Loading branch information
woehrl01 committed May 17, 2023
1 parent 78f655e commit b17f284
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 97 deletions.
1 change: 0 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,6 @@ func (s *Synchronization) GetType() SynchronizationType {
type SemaphoreRef struct {
// ConfigMapKeyRef is configmap selector for Semaphore configuration
ConfigMapKeyRef *apiv1.ConfigMapKeySelector `json:"configMapKeyRef,omitempty" protobuf:"bytes,1,opt,name=configMapKeyRef"`
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
}

// Mutex holds Mutex configuration
Expand Down
90 changes: 0 additions & 90 deletions workflow/controller/operator_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,33 +79,6 @@ spec:
sys.exit(exit_code)
`

const ScriptWfWithSemaphoreDifferentNamespace = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: script-wf
namespace: default
spec:
entrypoint: scriptTmpl
templates:
- name: scriptTmpl
synchronization:
semaphore:
namespace: other
configMapKeyRef:
key: template
name: my-config
script:
image: python:alpine3.6
command: ["python"]
# fail with a 66% probability
source: |
import random;
import sys;
exit_code = random.choice([0, 1, 1]);
sys.exit(exit_code)
`

const ResourceWfWithSemaphore = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down Expand Up @@ -277,69 +250,6 @@ func TestSemaphoreScriptTmplLevel(t *testing.T) {
})
}


func TestSemaphoreScriptConfigMapInDifferentNamespace(t *testing.T) {
cancel, controller := newController()
defer cancel()
ctx := context.Background()
controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) {
}, workflowExistenceFunc)
var cm v1.ConfigMap
wfv1.MustUnmarshal([]byte(configMap), &cm)
_, err := controller.kubeclientset.CoreV1().ConfigMaps("other").Create(ctx, &cm, metav1.CreateOptions{})
assert.NoError(t, err)

t.Run("ScriptTmplLevelAcquireAndRelease", func(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(ScriptWfWithSemaphoreDifferentNamespace)
wf.Name = "one"
wf.Namespace = "namespace-one"
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
woc := newWorkflowOperationCtx(wf, controller)

// acquired the lock
woc.operate(ctx)
assert.NotNil(t, woc.wf.Status.Synchronization)
assert.NotNil(t, woc.wf.Status.Synchronization.Semaphore)
assert.Equal(t, 1, len(woc.wf.Status.Synchronization.Semaphore.Holding))

for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
}

// Try to Acquire the lock, But lock is not available
wf_Two := wf.DeepCopy()
wf_Two.Name = "two"
wf_Two.Namespace = "namespace-two"
wf_Two, err = controller.wfclientset.ArgoprojV1alpha1().Workflows(wf_Two.Namespace).Create(ctx, wf_Two, metav1.CreateOptions{})
assert.NoError(t, err)
woc_two := newWorkflowOperationCtx(wf_Two, controller)
// Try Acquire the lock
woc_two.operate(ctx)

// Check Node status
err = woc_two.podReconciliation(ctx)
assert.NoError(t, err)
for _, node := range woc_two.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
}
// Updating Pod state
makePodsPhase(ctx, woc, v1.PodFailed)

// Release the lock
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.Synchronization)

// Try to acquired the lock
woc_two = newWorkflowOperationCtx(woc_two.wf, controller)
woc_two.operate(ctx)
assert.NotNil(t, woc_two.wf.Status.Synchronization)
assert.NotNil(t, woc_two.wf.Status.Synchronization.Semaphore)
assert.Equal(t, 1, len(woc_two.wf.Status.Synchronization.Semaphore.Holding))
})
}

func TestSemaphoreResourceTmplLevel(t *testing.T) {
cancel, controller := newController()
defer cancel()
Expand Down
8 changes: 2 additions & 6 deletions workflow/sync/lock_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,15 @@ func NewLockName(namespace, resourceName, lockKey string, kind LockKind) *LockNa
}
}

func GetLockName(sync *v1alpha1.Synchronization, wfNamespace string) (*LockName, error) {
func GetLockName(sync *v1alpha1.Synchronization, namespace string) (*LockName, error) {
switch sync.GetType() {
case v1alpha1.SynchronizationTypeSemaphore:
if sync.Semaphore.ConfigMapKeyRef != nil {
namespace := sync.Semaphore.Namespace
if namespace == "" {
namespace = wfNamespace
}
return NewLockName(namespace, sync.Semaphore.ConfigMapKeyRef.Name, sync.Semaphore.ConfigMapKeyRef.Key, LockKindConfigMap), nil
}
return nil, fmt.Errorf("cannot get LockName for a Semaphore without a ConfigMapRef")
case v1alpha1.SynchronizationTypeMutex:
return NewLockName(wfNamespace, sync.Mutex.Name, "", LockKindMutex), nil
return NewLockName(namespace, sync.Mutex.Name, "", LockKindMutex), nil
default:
return nil, fmt.Errorf("cannot get LockName for a Sync of Unknown type")
}
Expand Down

0 comments on commit b17f284

Please sign in to comment.