diff --git a/sdk/authentication/token_helper.go b/sdk/authentication/token_helper.go index 6b9cb332..e5895cc7 100644 --- a/sdk/authentication/token_helper.go +++ b/sdk/authentication/token_helper.go @@ -234,9 +234,7 @@ func contains(haystack []string, needle string) bool { } func setBody(body interface{}, contentType string) (bodyBuf *bytes.Buffer, err error) { - if bodyBuf == nil { - bodyBuf = &bytes.Buffer{} - } + bodyBuf = &bytes.Buffer{} if reader, ok := body.(io.Reader); ok { _, err = bodyBuf.ReadFrom(reader) } else if b, ok := body.([]byte); ok { @@ -248,7 +246,7 @@ func setBody(body interface{}, contentType string) (bodyBuf *bytes.Buffer, err e } else if jsonCheck.MatchString(contentType) { err = json.NewEncoder(bodyBuf).Encode(body) } else if xmlCheck.MatchString(contentType) { - xml.NewEncoder(bodyBuf).Encode(body) + err = xml.NewEncoder(bodyBuf).Encode(body) } if err != nil { diff --git a/sdk/client/api_client.go b/sdk/client/api_client.go index 49c49417..5d50f33e 100644 --- a/sdk/client/api_client.go +++ b/sdk/client/api_client.go @@ -247,9 +247,8 @@ func parameterToString(obj interface{}, collectionFormat string) string { } func setBody(body interface{}, contentType string) (bodyBuf *bytes.Buffer, err error) { - if bodyBuf == nil { - bodyBuf = &bytes.Buffer{} - } + bodyBuf = &bytes.Buffer{} + if reader, ok := body.(io.Reader); ok { _, err = bodyBuf.ReadFrom(reader) } else if b, ok := body.([]byte); ok { @@ -261,7 +260,7 @@ func setBody(body interface{}, contentType string) (bodyBuf *bytes.Buffer, err e } else if jsonCheck.MatchString(contentType) { err = json.NewEncoder(bodyBuf).Encode(body) } else if xmlCheck.MatchString(contentType) { - xml.NewEncoder(bodyBuf).Encode(body) + err = xml.NewEncoder(bodyBuf).Encode(body) } if err != nil { diff --git a/sdk/client/http_requester.go b/sdk/client/http_requester.go index 0ae4a52e..2e6c8950 100644 --- a/sdk/client/http_requester.go +++ b/sdk/client/http_requester.go @@ -139,12 +139,12 @@ func (h *HttpRequester) prepareRequest( // Encode the parameters. url.RawQuery = query.Encode() - // Generate a new request if body != nil { - localVarRequest, err = http.NewRequest(method, url.String(), body) + localVarRequest, err = http.NewRequestWithContext(ctx, method, url.String(), body) } else { - localVarRequest, err = http.NewRequest(method, url.String(), nil) + localVarRequest, err = http.NewRequestWithContext(ctx, method, url.String(), nil) } + if err != nil { return nil, err } diff --git a/sdk/model/rbac/create_or_update_application_request.go b/sdk/model/rbac/create_or_update_application_request.go index 82a9ab90..9df3a130 100644 --- a/sdk/model/rbac/create_or_update_application_request.go +++ b/sdk/model/rbac/create_or_update_application_request.go @@ -11,6 +11,5 @@ package rbac type CreateOrUpdateApplicationRequest struct { // Application's name e.g.: Payment Processors - Name string `json:"name"` - Description string `json"description"` + Name string `json:"name"` } diff --git a/sdk/workflow/executor/executor.go b/sdk/workflow/executor/executor.go index 63da6851..9bc18d24 100644 --- a/sdk/workflow/executor/executor.go +++ b/sdk/workflow/executor/executor.go @@ -11,16 +11,13 @@ package executor import ( "context" - "errors" "fmt" "net/http" "os" "strconv" - "strings" "sync" "time" - "github.com/antihax/optional" "github.com/conductor-sdk/conductor-go/sdk/client" "github.com/conductor-sdk/conductor-go/sdk/concurrency" "github.com/conductor-sdk/conductor-go/sdk/event/queue" @@ -83,42 +80,18 @@ func NewWorkflowExecutor(apiClient *client.APIClient) *WorkflowExecutor { // RegisterWorkflow Registers the workflow on the server. Overwrites if the flag is set. If the 'overwrite' flag is not set // and the workflow definition differs from the one on the server, the call will fail with response code 409 func (e *WorkflowExecutor) RegisterWorkflow(overwrite bool, workflow *model.WorkflowDef) error { - _, err := e.metadataClient.RegisterWorkflowDef( - context.Background(), - overwrite, - *workflow, - ) - if err != nil { - return err - } - - return nil + return e.RegisterWorkflowWithContext(context.Background(), overwrite, workflow) } // UnRegisterWorkflow Un-registers the workflow on the server. func (e *WorkflowExecutor) UnRegisterWorkflow(name string, version int32) error { - _, err := e.metadataClient.UnregisterWorkflowDef( - context.Background(), - name, - version, - ) - if err != nil { - return err - } - - return nil + return e.UnRegisterWorkflowWithContext(context.Background(), name, version) } // ExecuteWorkflow start a workflow and wait until the workflow completes or the waitUntilTask completes // Returns the output of the workflow func (e *WorkflowExecutor) ExecuteWorkflow(startWorkflowRequest *model.StartWorkflowRequest, waitUntilTask string) (run *model.WorkflowRun, err error) { - requestId := "" - version := startWorkflowRequest.Version - workflowRun, _, error := e.workflowClient.ExecuteWorkflow(context.Background(), *startWorkflowRequest, requestId, startWorkflowRequest.Name, version, waitUntilTask) - if error != nil { - return nil, error - } - return &workflowRun, nil + return e.ExecuteWorkflowWithContext(context.Background(), startWorkflowRequest, waitUntilTask) } // MonitorExecution monitors the workflow execution @@ -131,14 +104,7 @@ func (e *WorkflowExecutor) MonitorExecution(workflowId string) (workflowMonitor // StartWorkflow Start workflows // Returns the id of the newly created workflow func (e *WorkflowExecutor) StartWorkflow(startWorkflowRequest *model.StartWorkflowRequest) (workflowId string, err error) { - id, _, err := e.workflowClient.StartWorkflowWithRequest( - context.Background(), - *startWorkflowRequest, - ) - if err != nil { - return "", err - } - return id, nil + return e.StartWorkflowWithContext(context.Background(), startWorkflowRequest) } // StartWorkflows Start workflows in bulk @@ -178,7 +144,7 @@ func WaitForWorkflowCompletionUntilTimeout(executionChannel WorkflowExecutionCha } } -// WaitForRunningWorkflowUntilTimeout Helper method to wait for running workflows until the timeout for the workflow execution to complete +// WaitForRunningWorkflowsUntilTimeout Helper method to wait for running workflows until the timeout for the workflow execution to complete func (e *WorkflowExecutor) WaitForRunningWorkflowsUntilTimeout(timeout time.Duration, runningWorkflows ...*RunningWorkflow) { for idx := 0; idx < len(runningWorkflows); { var waitGroup sync.WaitGroup @@ -198,86 +164,27 @@ func waitForRunningWorkflowUntilTimeoutDaemon(timeout time.Duration, runningWork // GetWorkflow Get workflow execution by workflow Id. If includeTasks is set, also fetches all the task details. // Returns nil if no workflow is found by the id func (e *WorkflowExecutor) GetWorkflow(workflowId string, includeTasks bool) (*model.Workflow, error) { - return e.getWorkflow(4, workflowId, includeTasks) -} - -func (e *WorkflowExecutor) getWorkflow(retry int, workflowId string, includeTasks bool) (*model.Workflow, error) { - workflow, response, err := e.workflowClient.GetExecutionStatus( - context.Background(), - workflowId, - &client.WorkflowResourceApiGetExecutionStatusOpts{ - IncludeTasks: optional.NewBool(includeTasks)}, - ) - - // 404s in GetWorkflow are a bit inconsistent with other errors since - // it's using fmt.Errorf("..."). Keeping it this way to avoid breaking changes - if response.StatusCode == 404 { - return nil, fmt.Errorf("no such workflow by Id %s", workflowId) - } - - if response.StatusCode > 399 && response.StatusCode < 500 && response.StatusCode != 429 { - return nil, err - } - - if err != nil { - if retry < 0 { - return nil, err - } else { - time.Sleep(time.Duration(4-retry) * 10 * time.Second) - retry = retry - 1 - return e.getWorkflow(retry, workflowId, includeTasks) - } - } - - return &workflow, nil + return e.GetWorkflowWithContext(context.Background(), workflowId, includeTasks) } // GetWorkflowStatus Get the status of the workflow execution. // This is a lightweight method that returns only overall state of the workflow func (e *WorkflowExecutor) GetWorkflowStatus(workflowId string, includeOutput bool, includeVariables bool) (*model.WorkflowState, error) { - state, response, err := e.workflowClient.GetWorkflowState(context.Background(), workflowId, includeOutput, includeVariables) - if response.StatusCode == 404 { - return nil, nil - } - return &state, err + return e.GetWorkflowStatusWithContext(context.Background(), workflowId, includeOutput, includeVariables) } // GetByCorrelationIds Given the list of correlation ids, find and return workflows // Returns a map with key as correlationId and value as a list of Workflows // When IncludeClosed is set to true, the return value also includes workflows that are completed otherwise only running workflows are returned func (e *WorkflowExecutor) GetByCorrelationIds(workflowName string, includeClosed bool, includeTasks bool, correlationIds ...string) (map[string][]model.Workflow, error) { - workflows, _, err := e.workflowClient.GetWorkflows( - context.Background(), - correlationIds, - workflowName, - &client.WorkflowResourceApiGetWorkflowsOpts{ - IncludeClosed: optional.NewBool(includeClosed), - IncludeTasks: optional.NewBool(includeTasks), - }) - if err != nil { - return nil, err - } - return workflows, nil + return e.GetByCorrelationIdsWithContext(context.Background(), workflowName, includeClosed, includeTasks, correlationIds...) } // GetByCorrelationIdsAndNames Given the list of correlation ids and list of workflow names, find and return workflows // Returns a map with key as correlationId and value as a list of Workflows // When IncludeClosed is set to true, the return value also includes workflows that are completed otherwise only running workflows are returned func (e *WorkflowExecutor) GetByCorrelationIdsAndNames(includeClosed bool, includeTasks bool, correlationIds []string, workflowNames []string) (map[string][]model.Workflow, error) { - workflows, _, err := e.workflowClient.GetWorkflowsBatch( - context.Background(), - map[string][]string{ - "workflowNames": workflowNames, - "correlationIds": correlationIds, - }, - &client.WorkflowResourceApiGetWorkflowsOpts{ - IncludeClosed: optional.NewBool(includeClosed), - IncludeTasks: optional.NewBool(includeTasks), - }) - if err != nil { - return nil, err - } - return workflows, nil + return e.GetByCorrelationIdsAndNamesWithContext(context.Background(), includeClosed, includeTasks, correlationIds, workflowNames) } // Search searches for workflows @@ -293,181 +200,75 @@ func (e *WorkflowExecutor) GetByCorrelationIdsAndNames(includeClosed bool, inclu // - FreeText: Full text search. All the workflow input, output and task outputs upto certain limit (check with your admins to find the size limit) // are full text indexed and can be used to search func (e *WorkflowExecutor) Search(start int32, size int32, query string, freeText string) ([]model.WorkflowSummary, error) { - workflows, _, err := e.workflowClient.Search( - context.Background(), - &client.WorkflowResourceApiSearchOpts{ - Start: optional.NewInt32(start), - Size: optional.NewInt32(size), - FreeText: optional.NewString(freeText), - Query: optional.NewString(query), - }, - ) - if err != nil { - return nil, err - } - return workflows.Results, nil + return e.SearchWithContext(context.Background(), start, size, query, freeText) } // Pause the execution of a running workflow. // Any tasks that are currently running will finish but no new tasks are scheduled until the workflow is resumed func (e *WorkflowExecutor) Pause(workflowId string) error { - _, err := e.workflowClient.PauseWorkflow(context.Background(), workflowId) - if err != nil { - return err - } - return nil + return e.PauseWithContext(context.Background(), workflowId) } // Resume the execution of a workflow that is paused. If the workflow is not paused, this method has no effect func (e *WorkflowExecutor) Resume(workflowId string) error { - _, err := e.workflowClient.ResumeWorkflow(context.Background(), workflowId) - if err != nil { - return err - } - return nil + return e.ResumeWithContext(context.Background(), workflowId) } -// Terminate a running workflow. Reason must be provided that is captured as the termination resaon for the workflow +// Terminate Terminates a running workflow. Reason must be provided that is captured as the termination reason for the workflow. func (e *WorkflowExecutor) Terminate(workflowId string, reason string) error { - if strings.TrimSpace(workflowId) == "" { - err := errors.New("workflow id cannot be empty when calling terminate workflow API") - log.Error("Failed to terminate workflow: ", err.Error()) - return err - } - _, err := e.workflowClient.Terminate(context.Background(), workflowId, - &client.WorkflowResourceApiTerminateOpts{Reason: optional.NewString(reason), TriggerFailureWorkflow: optional.NewBool(false)}, - ) - if err != nil { - return err - } - return nil + return e.TerminateWithContext(context.Background(), workflowId, reason) } +// TerminateWithFailure Terminates a running workflow. func (e *WorkflowExecutor) TerminateWithFailure(workflowId string, reason string, triggerFailureWorkflow bool) error { - if strings.TrimSpace(workflowId) == "" { - err := errors.New("workflow id cannot be empty when calling terminate workflow API") - log.Error("Failed to terminate workflow: ", err.Error()) - return err - } - _, err := e.workflowClient.Terminate(context.Background(), workflowId, - &client.WorkflowResourceApiTerminateOpts{Reason: optional.NewString(reason), TriggerFailureWorkflow: optional.NewBool(triggerFailureWorkflow)}, - ) - if err != nil { - return err - } - return nil + return e.TerminateWithFailureWithContext(context.Background(), workflowId, reason, triggerFailureWorkflow) } // Restart a workflow execution from the beginning with the same input. // When called on a workflow that is not in a terminal status, this operation has no effect // If useLatestDefinition is set, the restarted workflow fetches the latest definition from the metadata store func (e *WorkflowExecutor) Restart(workflowId string, useLatestDefinition bool) error { - _, err := e.workflowClient.Restart( - context.Background(), - workflowId, - &client.WorkflowResourceApiRestartOpts{ - UseLatestDefinitions: optional.NewBool(useLatestDefinition), - }) - if err != nil { - return err - } - return nil + return e.RestartWithContext(context.Background(), workflowId, useLatestDefinition) } // Retry a failed workflow from the last task that failed. When called the task in the failed state is scheduled again // and workflow moves to RUNNING status. If resumeSubworkflowTasks is set and the last failed task was a sub-workflow // the server restarts the subworkflow from the failed task. If set to false, the sub-workflow is re-executed. func (e *WorkflowExecutor) Retry(workflowId string, resumeSubworkflowTasks bool) error { - _, err := e.workflowClient.Retry( - context.Background(), - workflowId, - &client.WorkflowResourceApiRetryOpts{ - ResumeSubworkflowTasks: optional.NewBool(resumeSubworkflowTasks), - }, - ) - if err != nil { - return err - } - return nil + return e.RetryWithContext(context.Background(), workflowId, resumeSubworkflowTasks) } // ReRun a completed workflow from a specific task (ReRunFromTaskId) and optionally change the input // Also update the completed tasks with new input (ReRunFromTaskId) if required func (e *WorkflowExecutor) ReRun(workflowId string, reRunRequest model.RerunWorkflowRequest) (id string, error error) { - id, _, err := e.workflowClient.Rerun( - context.Background(), - reRunRequest, - workflowId, - ) - if err != nil { - return "", err - } - return id, nil + return e.ReRunWithContext(context.Background(), workflowId, reRunRequest) } // SkipTasksFromWorkflow Skips a given task execution from a current running workflow. // When skipped the task's input and outputs are updated from skipTaskRequest parameter. func (e *WorkflowExecutor) SkipTasksFromWorkflow(workflowId string, taskReferenceName string, skipTaskRequest model.SkipTaskRequest) error { - _, err := e.workflowClient.SkipTaskFromWorkflow( - context.Background(), - workflowId, - taskReferenceName, - skipTaskRequest, - ) - if err != nil { - return err - } - return nil + return e.SkipTasksFromWorkflowWithContext(context.Background(), workflowId, taskReferenceName, skipTaskRequest) } // UpdateTask update the task with output and status. func (e *WorkflowExecutor) UpdateTask(taskId string, workflowInstanceId string, status model.TaskResultStatus, output interface{}) error { - taskResult, err := getTaskResultFromOutput(taskId, workflowInstanceId, output) - if err != nil { - return err - } - taskResult.Status = status - _, _, err = e.taskClient.UpdateTask(context.Background(), taskResult) - if err != nil { - return err - } - return nil + return e.UpdateTaskWithContext(context.Background(), taskId, workflowInstanceId, status, output) } // UpdateTaskByRefName Update the execution status and output of the task and status func (e *WorkflowExecutor) UpdateTaskByRefName(taskRefName string, workflowInstanceId string, status model.TaskResultStatus, output interface{}) error { - outputData, err := model.ConvertToMap(output) - if err != nil { - return err - } - - _, _, err = e.taskClient.UpdateTaskByRefName(context.Background(), outputData, workflowInstanceId, taskRefName, string(status)) - if err != nil { - return err - } - - return nil + return e.UpdateTaskByRefNameWithContext(context.Background(), taskRefName, workflowInstanceId, status, output) } // GetTask by task Id returns nil if no such task is found by the id func (e *WorkflowExecutor) GetTask(taskId string) (task *model.Task, err error) { - t, _, err := e.taskClient.GetTask(context.Background(), taskId) - if err != nil { - return nil, err - } - - return &t, nil + return e.GetTaskWithContext(context.Background(), taskId) } // RemoveWorkflow Remove workflow execution permanently from the system // Returns nil if no workflow is found by the id func (e *WorkflowExecutor) RemoveWorkflow(workflowId string) error { - _, err := e.workflowClient.Delete(context.Background(), workflowId, &client.WorkflowResourceApiDeleteOpts{ArchiveWorkflow: optional.NewBool(false)}) - if err != nil { - return err - } - - return nil + return e.RemoveWorkflowWithContext(context.Background(), workflowId) } // DeleteQueueConfiguration Delete queue configuration permanently from the system @@ -482,69 +283,16 @@ func (e *WorkflowExecutor) GetQueueConfiguration(queueConfiguration queue.QueueC return e.eventClient.GetQueueConfig(context.Background(), queueConfiguration.QueueType, queueConfiguration.QueueName) } -// GetQueueConfiguration Create or update a queue configuration +// PutQueueConfiguration Create or update a queue configuration // Returns nil if no error occurred func (e *WorkflowExecutor) PutQueueConfiguration(queueConfiguration queue.QueueConfiguration) (*http.Response, error) { - body, err := queueConfiguration.GetConfiguration() - if err != nil { - return nil, err - } - return e.eventClient.PutQueueConfig(context.Background(), body, queueConfiguration.QueueType, queueConfiguration.QueueName) -} - -func getTaskResultFromOutput(taskId string, workflowInstanceId string, taskExecutionOutput interface{}) (*model.TaskResult, error) { - taskResult, ok := taskExecutionOutput.(*model.TaskResult) - if !ok { - taskResult = model.NewTaskResult(taskId, workflowInstanceId) - outputData, err := model.ConvertToMap(taskExecutionOutput) - if err != nil { - return nil, err - } - taskResult.OutputData = outputData - taskResult.Status = model.CompletedTask - } - return taskResult, nil + return e.PutQueueConfigurationWithContext(context.Background(), queueConfiguration) } // ExecuteWorkflow Executes a workflow // Returns workflow Id for the newly started workflow func (e *WorkflowExecutor) executeWorkflow(workflow *model.WorkflowDef, request *model.StartWorkflowRequest) (workflowId string, err error) { - startWorkflowRequest := model.StartWorkflowRequest{ - Name: request.Name, - Version: request.Version, - CorrelationId: request.CorrelationId, - Input: request.Input, - TaskToDomain: request.TaskToDomain, - ExternalInputPayloadStoragePath: request.ExternalInputPayloadStoragePath, - Priority: request.Priority, - } - if workflow != nil { - startWorkflowRequest.WorkflowDef = workflow - } - workflowId, response, err := e.workflowClient.StartWorkflowWithRequest( - context.Background(), - startWorkflowRequest, - ) - if err != nil { - log.Debug( - "Failed to start workflow", - ", reason: ", err.Error(), - ", name: ", request.Name, - ", version: ", request.Version, - ", input: ", request.Input, - ", workflowId: ", workflowId, - ", response: ", response, - ) - return "", err - } - log.Debug( - "Started workflow", - ", workflowId: ", workflowId, - ", name: ", request.Name, - ", version: ", request.Version, - ", input: ", request.Input, - ) - return workflowId, err + return e.executeWorkflowWithContext(context.Background(), workflow, request) } func (e *WorkflowExecutor) startWorkflowDaemon(monitorExecution bool, request *model.StartWorkflowRequest, runningWorkflowChannel chan *RunningWorkflow, waitGroup *sync.WaitGroup) { diff --git a/sdk/workflow/executor/executor_with_context.go b/sdk/workflow/executor/executor_with_context.go new file mode 100644 index 00000000..061699f2 --- /dev/null +++ b/sdk/workflow/executor/executor_with_context.go @@ -0,0 +1,459 @@ +package executor + +import ( + "context" + "errors" + "fmt" + "github.com/antihax/optional" + "github.com/conductor-sdk/conductor-go/sdk/client" + "github.com/conductor-sdk/conductor-go/sdk/event/queue" + "github.com/conductor-sdk/conductor-go/sdk/model" + log "github.com/sirupsen/logrus" + "net/http" + "strings" + "time" +) + +func (e *WorkflowExecutor) RegisterWorkflowWithContext(ctx context.Context, overwrite bool, workflow *model.WorkflowDef) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.metadataClient.RegisterWorkflowDef(ctx, overwrite, *workflow) + return err +} + +func (e *WorkflowExecutor) UnRegisterWorkflowWithContext(ctx context.Context, name string, version int32) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.metadataClient.UnregisterWorkflowDef(ctx, name, version) + return err +} + +func (e *WorkflowExecutor) ExecuteWorkflowWithContext(ctx context.Context, startWorkflowRequest *model.StartWorkflowRequest, waitUntilTask string) (run *model.WorkflowRun, err error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + requestId := "" + version := startWorkflowRequest.Version + workflowRun, _, err := e.workflowClient.ExecuteWorkflow(ctx, *startWorkflowRequest, requestId, startWorkflowRequest.Name, version, waitUntilTask) + if err != nil { + return nil, err + } + return &workflowRun, nil +} + +func (e *WorkflowExecutor) StartWorkflowWithContext(ctx context.Context, startWorkflowRequest *model.StartWorkflowRequest) (workflowId string, err error) { + if err := ctx.Err(); err != nil { + return "", err + } + + id, _, err := e.workflowClient.StartWorkflowWithRequest( + ctx, + *startWorkflowRequest, + ) + if err != nil { + return "", err + } + return id, nil +} + +func (e *WorkflowExecutor) GetWorkflowWithContext(ctx context.Context, workflowId string, includeTasks bool) (*model.Workflow, error) { + return e.getWorkflowWithContext(ctx, 4, workflowId, includeTasks) +} + +func (e *WorkflowExecutor) getWorkflowWithContext(ctx context.Context, retry int, workflowId string, includeTasks bool) (*model.Workflow, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + workflow, response, err := e.workflowClient.GetExecutionStatus( + ctx, + workflowId, + &client.WorkflowResourceApiGetExecutionStatusOpts{ + IncludeTasks: optional.NewBool(includeTasks)}, + ) + + if response.StatusCode == 404 { + return nil, fmt.Errorf("no such workflow by Id %s", workflowId) + } + + if response.StatusCode > 399 && response.StatusCode < 500 && response.StatusCode != 429 { + return nil, err + } + + if err != nil { + if retry < 0 { + return nil, err + } else { + time.Sleep(time.Duration(4-retry) * 10 * time.Second) + retry = retry - 1 + return e.getWorkflowWithContext(ctx, retry, workflowId, includeTasks) + } + } + + return &workflow, nil +} + +func (e *WorkflowExecutor) GetWorkflowStatusWithContext(ctx context.Context, workflowId string, includeOutput bool, includeVariables bool) (*model.WorkflowState, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + state, response, err := e.workflowClient.GetWorkflowState(ctx, workflowId, includeOutput, includeVariables) + if response != nil && response.StatusCode == 404 { + return nil, nil + } + return &state, err +} + +func (e *WorkflowExecutor) GetByCorrelationIdsWithContext(ctx context.Context, workflowName string, includeClosed bool, includeTasks bool, correlationIds ...string) (map[string][]model.Workflow, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + workflows, _, err := e.workflowClient.GetWorkflows( + ctx, + correlationIds, + workflowName, + &client.WorkflowResourceApiGetWorkflowsOpts{ + IncludeClosed: optional.NewBool(includeClosed), + IncludeTasks: optional.NewBool(includeTasks), + }) + if err != nil { + return nil, err + } + return workflows, nil +} + +func (e *WorkflowExecutor) GetByCorrelationIdsAndNamesWithContext(ctx context.Context, includeClosed bool, includeTasks bool, correlationIds []string, workflowNames []string) (map[string][]model.Workflow, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + workflows, _, err := e.workflowClient.GetWorkflowsBatch( + ctx, + map[string][]string{ + "workflowNames": workflowNames, + "correlationIds": correlationIds, + }, + &client.WorkflowResourceApiGetWorkflowsOpts{ + IncludeClosed: optional.NewBool(includeClosed), + IncludeTasks: optional.NewBool(includeTasks), + }) + + if err != nil { + return nil, err + } + + return workflows, nil +} + +func (e *WorkflowExecutor) SearchWithContext(ctx context.Context, start int32, size int32, query string, freeText string) ([]model.WorkflowSummary, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + workflows, _, err := e.workflowClient.Search( + ctx, + &client.WorkflowResourceApiSearchOpts{ + Start: optional.NewInt32(start), + Size: optional.NewInt32(size), + FreeText: optional.NewString(freeText), + Query: optional.NewString(query), + }, + ) + if err != nil { + return nil, err + } + + return workflows.Results, nil +} + +func (e *WorkflowExecutor) PauseWithContext(ctx context.Context, workflowId string) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.workflowClient.PauseWorkflow(ctx, workflowId) + if err != nil { + return err + } + return nil +} + +func (e *WorkflowExecutor) ResumeWithContext(ctx context.Context, workflowId string) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.workflowClient.ResumeWorkflow(ctx, workflowId) + if err != nil { + return err + } + return nil +} + +func (e *WorkflowExecutor) TerminateWithContext(ctx context.Context, workflowId string, reason string) error { + if err := ctx.Err(); err != nil { + return err + } + + if strings.TrimSpace(workflowId) == "" { + err := errors.New("workflow id cannot be empty when calling terminate workflow API") + log.Error("Failed to terminate workflow: ", err.Error()) + return err + } + _, err := e.workflowClient.Terminate(ctx, workflowId, + &client.WorkflowResourceApiTerminateOpts{Reason: optional.NewString(reason), TriggerFailureWorkflow: optional.NewBool(false)}, + ) + + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) TerminateWithFailureWithContext(ctx context.Context, workflowId string, reason string, triggerFailureWorkflow bool) error { + if strings.TrimSpace(workflowId) == "" { + err := errors.New("workflow id cannot be empty when calling terminate workflow API") + log.Error("Failed to terminate workflow: ", err.Error()) + return err + } + _, err := e.workflowClient.Terminate(ctx, workflowId, + &client.WorkflowResourceApiTerminateOpts{Reason: optional.NewString(reason), TriggerFailureWorkflow: optional.NewBool(triggerFailureWorkflow)}, + ) + + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) RestartWithContext(ctx context.Context, workflowId string, useLatestDefinition bool) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.workflowClient.Restart( + ctx, + workflowId, + &client.WorkflowResourceApiRestartOpts{ + UseLatestDefinitions: optional.NewBool(useLatestDefinition), + }) + + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) RetryWithContext(ctx context.Context, workflowId string, resumeSubworkflowTasks bool) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.workflowClient.Retry( + ctx, + workflowId, + &client.WorkflowResourceApiRetryOpts{ + ResumeSubworkflowTasks: optional.NewBool(resumeSubworkflowTasks), + }, + ) + + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) ReRunWithContext(ctx context.Context, workflowId string, reRunRequest model.RerunWorkflowRequest) (id string, error error) { + if err := ctx.Err(); err != nil { + return "", err + } + + id, _, err := e.workflowClient.Rerun( + ctx, + reRunRequest, + workflowId, + ) + + if err != nil { + return "", err + } + + return id, nil +} + +func (e *WorkflowExecutor) SkipTasksFromWorkflowWithContext(ctx context.Context, workflowId string, taskReferenceName string, skipTaskRequest model.SkipTaskRequest) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.workflowClient.SkipTaskFromWorkflow( + ctx, + workflowId, + taskReferenceName, + skipTaskRequest, + ) + + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) UpdateTaskWithContext(ctx context.Context, taskId string, workflowInstanceId string, status model.TaskResultStatus, output interface{}) error { + if err := ctx.Err(); err != nil { + return err + } + + taskResult, err := getTaskResultFromOutput(taskId, workflowInstanceId, output) + if err != nil { + return err + } + + taskResult.Status = status + _, _, err = e.taskClient.UpdateTask(ctx, taskResult) + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) UpdateTaskByRefNameWithContext(ctx context.Context, taskRefName string, workflowInstanceId string, status model.TaskResultStatus, output interface{}) error { + if err := ctx.Err(); err != nil { + return err + } + + outputData, err := model.ConvertToMap(output) + if err != nil { + return err + } + + _, _, err = e.taskClient.UpdateTaskByRefName(ctx, outputData, workflowInstanceId, taskRefName, string(status)) + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) GetTaskWithContext(ctx context.Context, taskId string) (task *model.Task, err error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + t, _, err := e.taskClient.GetTask(ctx, taskId) + if err != nil { + return nil, err + } + + return &t, nil +} + +func (e *WorkflowExecutor) RemoveWorkflowWithContext(ctx context.Context, workflowId string) error { + if err := ctx.Err(); err != nil { + return err + } + + _, err := e.workflowClient.Delete(ctx, workflowId, &client.WorkflowResourceApiDeleteOpts{ArchiveWorkflow: optional.NewBool(false)}) + if err != nil { + return err + } + + return nil +} + +func (e *WorkflowExecutor) DeleteQueueConfigurationWithContext(ctx context.Context, queueConfiguration queue.QueueConfiguration) (*http.Response, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + return e.eventClient.DeleteQueueConfig(ctx, queueConfiguration.QueueType, queueConfiguration.QueueName) +} + +func (e *WorkflowExecutor) GetQueueConfigurationWithContext(ctx context.Context, queueConfiguration queue.QueueConfiguration) (map[string]interface{}, *http.Response, error) { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + + return e.eventClient.GetQueueConfig(ctx, queueConfiguration.QueueType, queueConfiguration.QueueName) +} + +func (e *WorkflowExecutor) PutQueueConfigurationWithContext(ctx context.Context, queueConfiguration queue.QueueConfiguration) (*http.Response, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + body, err := queueConfiguration.GetConfiguration() + if err != nil { + return nil, err + } + + return e.eventClient.PutQueueConfig(ctx, body, queueConfiguration.QueueType, queueConfiguration.QueueName) +} + +func getTaskResultFromOutput(taskId string, workflowInstanceId string, taskExecutionOutput interface{}) (*model.TaskResult, error) { + taskResult, ok := taskExecutionOutput.(*model.TaskResult) + if !ok { + taskResult = model.NewTaskResult(taskId, workflowInstanceId) + outputData, err := model.ConvertToMap(taskExecutionOutput) + if err != nil { + return nil, err + } + taskResult.OutputData = outputData + taskResult.Status = model.CompletedTask + } + + return taskResult, nil +} + +func (e *WorkflowExecutor) executeWorkflowWithContext(ctx context.Context, workflow *model.WorkflowDef, request *model.StartWorkflowRequest) (workflowId string, err error) { + startWorkflowRequest := model.StartWorkflowRequest{ + Name: request.Name, + Version: request.Version, + CorrelationId: request.CorrelationId, + Input: request.Input, + TaskToDomain: request.TaskToDomain, + ExternalInputPayloadStoragePath: request.ExternalInputPayloadStoragePath, + Priority: request.Priority, + } + if workflow != nil { + startWorkflowRequest.WorkflowDef = workflow + } + workflowId, response, err := e.workflowClient.StartWorkflowWithRequest( + ctx, + startWorkflowRequest, + ) + if err != nil { + log.Debug( + "Failed to start workflow", + ", reason: ", err.Error(), + ", name: ", request.Name, + ", version: ", request.Version, + ", input: ", request.Input, + ", workflowId: ", workflowId, + ", response: ", response, + ) + return "", err + } + log.Debug( + "Started workflow", + ", workflowId: ", workflowId, + ", name: ", request.Name, + ", version: ", request.Version, + ", input: ", request.Input, + ) + + return workflowId, err +} diff --git a/test/integration_tests/application_client_test.go b/test/integration_tests/application_client_test.go index f225390c..09fc86c6 100644 --- a/test/integration_tests/application_client_test.go +++ b/test/integration_tests/application_client_test.go @@ -90,7 +90,7 @@ func TestGetTagsForApplication(t *testing.T) { // Create an application to use in the test ctx := context.Background() - createReq := rbac.CreateOrUpdateApplicationRequest{Name: "TestAppTags", Description: "Test Get Tags"} + createReq := rbac.CreateOrUpdateApplicationRequest{Name: "TestAppTags"} application, _, _ := appClient.CreateApplication(ctx, createReq) // Assuming tags are added here or exists; this could use PutTagForApplication if needed @@ -114,7 +114,7 @@ func TestApplicationClientIntegration(t *testing.T) { ctx := context.Background() // Create an application - createReq := rbac.CreateOrUpdateApplicationRequest{Name: "IntegrationTestApp", Description: "Integration test for all methods"} + createReq := rbac.CreateOrUpdateApplicationRequest{Name: "IntegrationTestApp"} application, _, err := appClient.CreateApplication(ctx, createReq) assert.Nil(t, err) @@ -151,7 +151,7 @@ func TestApplicationClientIntegration(t *testing.T) { assert.Nil(t, err) // Update the application - updateReq := rbac.CreateOrUpdateApplicationRequest{Name: "UpdatedIntegrationTestApp", Description: "Updated description"} + updateReq := rbac.CreateOrUpdateApplicationRequest{Name: "UpdatedIntegrationTestApp"} updatedApp, _, err := appClient.UpdateApplication(ctx, updateReq, application.Id) assert.Nil(t, err) assert.Equal(t, "UpdatedIntegrationTestApp", updatedApp.Name) @@ -181,7 +181,7 @@ func TestApplicationClientErrorHandling(t *testing.T) { assert.Equal(t, 404, resp.StatusCode) // Try to update a non-existent application - updateReq := rbac.CreateOrUpdateApplicationRequest{Name: "NonExistentApp", Description: "Non-existent update"} + updateReq := rbac.CreateOrUpdateApplicationRequest{Name: "NonExistentApp"} _, resp, err = appClient.UpdateApplication(ctx, updateReq, invalidAppId) assert.NotNil(t, err) assert.Equal(t, 404, resp.StatusCode) @@ -225,7 +225,7 @@ func TestGetAccessKeys(t *testing.T) { ctx := context.Background() // Create an application to use in the test - createReq := rbac.CreateOrUpdateApplicationRequest{Name: "TestAppAccessKeys", Description: "Test for GetAccessKeys method"} + createReq := rbac.CreateOrUpdateApplicationRequest{Name: "TestAppAccessKeys"} application, _, err := appClient.CreateApplication(ctx, createReq) assert.Nil(t, err) diff --git a/test/integration_tests/executor_test.go b/test/integration_tests/executor_test.go index 19610a97..91f89904 100644 --- a/test/integration_tests/executor_test.go +++ b/test/integration_tests/executor_test.go @@ -1,14 +1,14 @@ package integration_tests import ( + "context" "fmt" - "testing" - "github.com/conductor-sdk/conductor-go/sdk/client" "github.com/conductor-sdk/conductor-go/sdk/model" "github.com/conductor-sdk/conductor-go/sdk/workflow" "github.com/conductor-sdk/conductor-go/test/testdata" "github.com/stretchr/testify/assert" + "testing" ) const ( @@ -90,3 +90,15 @@ func TestUpdate(t *testing.T) { assert.Fail(t, "err is not of type GenericSwaggerError") } } + +func TestStartWorkflowWithContext(t *testing.T) { + executor := testdata.WorkflowExecutor + + ctx, cancel := context.WithCancel(context.Background()) + // cancel straightaway on purpose + cancel() + + _, err := executor.StartWorkflowWithContext(ctx, &model.StartWorkflowRequest{}) + assert.Error(t, err, "StartWorkflowWithContext is expected to return an error") + assert.Equal(t, context.Canceled, err, "Expected context canceled error") +}