From 73f8e083891c3a8c57dbed1be979223be8d4d082 Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Fri, 22 Nov 2024 17:36:25 -0300 Subject: [PATCH] Added new Idempotency Strategy --- sdk/model/idempotency_strategy.go | 1 + .../workflow_idempotency_test.go | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/sdk/model/idempotency_strategy.go b/sdk/model/idempotency_strategy.go index f1eecbc7..7992676c 100644 --- a/sdk/model/idempotency_strategy.go +++ b/sdk/model/idempotency_strategy.go @@ -14,4 +14,5 @@ type IdempotencyStrategy string const ( FailOnConflict IdempotencyStrategy = "FAIL" ReturnExisting IdempotencyStrategy = "RETURN_EXISTING" + FailOnRunning IdempotencyStrategy = "FAIL_ON_RUNNING" ) diff --git a/test/integration_tests/workflow_idempotency_test.go b/test/integration_tests/workflow_idempotency_test.go index b65ea355..bd25ee43 100644 --- a/test/integration_tests/workflow_idempotency_test.go +++ b/test/integration_tests/workflow_idempotency_test.go @@ -8,6 +8,7 @@ import ( "github.com/conductor-sdk/conductor-go/sdk/model" "github.com/conductor-sdk/conductor-go/sdk/workflow" + "github.com/conductor-sdk/conductor-go/sdk/workflow/executor" "github.com/conductor-sdk/conductor-go/test/testdata" "github.com/stretchr/testify/assert" ) @@ -49,3 +50,48 @@ func TestIdempotencyCombinations(t *testing.T) { ) assert.NoError(t, err, "Failed to delete workflow definition ", err) } + +func TestIdempotencyFailOnRunning(t *testing.T) { + executor := testdata.WorkflowExecutor + wf := workflow.NewConductorWorkflow(executor) + wf.Name("temp_wf_" + strconv.Itoa(time.Now().Nanosecond())).Version(1) + wf = wf.Add(workflow.NewSimpleTask("simple_task_1", "simple_task_1")) + err := wf.Register(true) + assert.NoError(t, err, "Failed to register workflow") + + // (1) workflow should start + id, err := executor.StartWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), IdempotencyKey: "test", IdempotencyStrategy: model.FailOnRunning}) + assert.NoError(t, err, "Failed to start workflow") + + // (2) workflow start should fail because (1) is running + _, err = executor.StartWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), IdempotencyKey: "test", IdempotencyStrategy: model.FailOnRunning}) + assert.Error(t, err, "Workflow should have failed but there was no error") + + // complete task so that workflow is completed + err = executor.UpdateTaskByRefName("simple_task_1", id, model.CompletedTask, map[string]interface{}{}) + assert.NoError(t, err, "Failed to update task") + + checkWorkflowIsCompleted(t, executor, id) + + // workflow should start + id2, err := executor.StartWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), IdempotencyKey: "test", IdempotencyStrategy: model.FailOnRunning}) + assert.NoError(t, err, "Failed to start workflow") + assert.NotEqual(t, id, id2) +} + +func checkWorkflowIsCompleted(t *testing.T, executor *executor.WorkflowExecutor, id string) { + timeout := time.After(5 * time.Second) + tick := time.Tick(1 * time.Second) + + for { + select { + case <-timeout: + t.Fatalf("Timed out and workflow %s didn't complete", id) + case <-tick: + wf, err := executor.GetWorkflow(id, false) + assert.NoError(t, err) + assert.Equal(t, model.CompletedWorkflow, wf.Status) + return + } + } +}