From b17f284ae70fea34a4c86786b1fca7affa794110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Wed, 17 May 2023 12:36:50 +0000 Subject: [PATCH] feat: allow crossnamespace locking via sempahores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Lukas Wöhrl --- pkg/apis/workflow/v1alpha1/workflow_types.go | 1 - .../controller/operator_concurrency_test.go | 90 ------------------- workflow/sync/lock_name.go | 8 +- 3 files changed, 2 insertions(+), 97 deletions(-) diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index ce77e6c6e9a4..3c02ef368247 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -1652,7 +1652,6 @@ func (s *Synchronization) GetType() SynchronizationType { type SemaphoreRef struct { // ConfigMapKeyRef is configmap selector for Semaphore configuration ConfigMapKeyRef *apiv1.ConfigMapKeySelector `json:"configMapKeyRef,omitempty" protobuf:"bytes,1,opt,name=configMapKeyRef"` - Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"` } // Mutex holds Mutex configuration diff --git a/workflow/controller/operator_concurrency_test.go b/workflow/controller/operator_concurrency_test.go index bc6832f8c292..66f4940b4727 100644 --- a/workflow/controller/operator_concurrency_test.go +++ b/workflow/controller/operator_concurrency_test.go @@ -79,33 +79,6 @@ spec: sys.exit(exit_code) ` -const ScriptWfWithSemaphoreDifferentNamespace = ` -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - name: script-wf - namespace: default -spec: - entrypoint: scriptTmpl - templates: - - name: scriptTmpl - synchronization: - semaphore: - namespace: other - configMapKeyRef: - key: template - name: my-config - script: - image: python:alpine3.6 - command: ["python"] - # fail with a 66% probability - source: | - import random; - import sys; - exit_code = random.choice([0, 1, 1]); - sys.exit(exit_code) -` - const ResourceWfWithSemaphore = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow @@ -277,69 +250,6 @@ func TestSemaphoreScriptTmplLevel(t *testing.T) { }) } - -func TestSemaphoreScriptConfigMapInDifferentNamespace(t *testing.T) { - cancel, controller := newController() - defer cancel() - ctx := context.Background() - controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) { - }, workflowExistenceFunc) - var cm v1.ConfigMap - wfv1.MustUnmarshal([]byte(configMap), &cm) - _, err := controller.kubeclientset.CoreV1().ConfigMaps("other").Create(ctx, &cm, metav1.CreateOptions{}) - assert.NoError(t, err) - - t.Run("ScriptTmplLevelAcquireAndRelease", func(t *testing.T) { - wf := wfv1.MustUnmarshalWorkflow(ScriptWfWithSemaphoreDifferentNamespace) - wf.Name = "one" - wf.Namespace = "namespace-one" - wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Create(ctx, wf, metav1.CreateOptions{}) - assert.NoError(t, err) - woc := newWorkflowOperationCtx(wf, controller) - - // acquired the lock - woc.operate(ctx) - assert.NotNil(t, woc.wf.Status.Synchronization) - assert.NotNil(t, woc.wf.Status.Synchronization.Semaphore) - assert.Equal(t, 1, len(woc.wf.Status.Synchronization.Semaphore.Holding)) - - for _, node := range woc.wf.Status.Nodes { - assert.Equal(t, wfv1.NodePending, node.Phase) - } - - // Try to Acquire the lock, But lock is not available - wf_Two := wf.DeepCopy() - wf_Two.Name = "two" - wf_Two.Namespace = "namespace-two" - wf_Two, err = controller.wfclientset.ArgoprojV1alpha1().Workflows(wf_Two.Namespace).Create(ctx, wf_Two, metav1.CreateOptions{}) - assert.NoError(t, err) - woc_two := newWorkflowOperationCtx(wf_Two, controller) - // Try Acquire the lock - woc_two.operate(ctx) - - // Check Node status - err = woc_two.podReconciliation(ctx) - assert.NoError(t, err) - for _, node := range woc_two.wf.Status.Nodes { - assert.Equal(t, wfv1.NodePending, node.Phase) - } - // Updating Pod state - makePodsPhase(ctx, woc, v1.PodFailed) - - // Release the lock - woc = newWorkflowOperationCtx(woc.wf, controller) - woc.operate(ctx) - assert.Nil(t, woc.wf.Status.Synchronization) - - // Try to acquired the lock - woc_two = newWorkflowOperationCtx(woc_two.wf, controller) - woc_two.operate(ctx) - assert.NotNil(t, woc_two.wf.Status.Synchronization) - assert.NotNil(t, woc_two.wf.Status.Synchronization.Semaphore) - assert.Equal(t, 1, len(woc_two.wf.Status.Synchronization.Semaphore.Holding)) - }) -} - func TestSemaphoreResourceTmplLevel(t *testing.T) { cancel, controller := newController() defer cancel() diff --git a/workflow/sync/lock_name.go b/workflow/sync/lock_name.go index d97cb1d32dd8..867dbce3112c 100644 --- a/workflow/sync/lock_name.go +++ b/workflow/sync/lock_name.go @@ -31,19 +31,15 @@ func NewLockName(namespace, resourceName, lockKey string, kind LockKind) *LockNa } } -func GetLockName(sync *v1alpha1.Synchronization, wfNamespace string) (*LockName, error) { +func GetLockName(sync *v1alpha1.Synchronization, namespace string) (*LockName, error) { switch sync.GetType() { case v1alpha1.SynchronizationTypeSemaphore: if sync.Semaphore.ConfigMapKeyRef != nil { - namespace := sync.Semaphore.Namespace - if namespace == "" { - namespace = wfNamespace - } return NewLockName(namespace, sync.Semaphore.ConfigMapKeyRef.Name, sync.Semaphore.ConfigMapKeyRef.Key, LockKindConfigMap), nil } return nil, fmt.Errorf("cannot get LockName for a Semaphore without a ConfigMapRef") case v1alpha1.SynchronizationTypeMutex: - return NewLockName(wfNamespace, sync.Mutex.Name, "", LockKindMutex), nil + return NewLockName(namespace, sync.Mutex.Name, "", LockKindMutex), nil default: return nil, fmt.Errorf("cannot get LockName for a Sync of Unknown type") }