From 36e5a27b10a0b3a5334b7c2374a4dba9ed2a01f0 Mon Sep 17 00:00:00 2001 From: Ktrops Date: Tue, 17 Dec 2024 18:27:33 -0800 Subject: [PATCH] iwf-401: update activity type name to find reset history eventID (#515) Co-authored-by: Katie Atrops --- integ/reset_by_state_id_test.go | 167 +++++++++++++++++++++++++++++++ integ/workflow/reset/routers.go | 121 ++++++++++++++++++++++ service/client/cadence/reset.go | 3 +- service/client/temporal/reset.go | 3 +- 4 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 integ/reset_by_state_id_test.go create mode 100644 integ/workflow/reset/routers.go diff --git a/integ/reset_by_state_id_test.go b/integ/reset_by_state_id_test.go new file mode 100644 index 00000000..9f1d5323 --- /dev/null +++ b/integ/reset_by_state_id_test.go @@ -0,0 +1,167 @@ +package integ + +import ( + "context" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/integ/workflow/reset" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/ptr" + "github.com/stretchr/testify/assert" + "strconv" + "testing" + "time" +) + +func TestResetByStateIdWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, nil) + smallWaitForFastTest() + + //TODO: uncomment below when IWF-403 implementation is done. + //TODO cont.: Reset with state id & state execution id is broken for local activities. + //doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) + //smallWaitForFastTest() + } +} + +func TestResetByStateIdWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, nil) + smallWaitForFastTest() + + //TODO: uncomment below when IWF-403 implementation is done. + //TODO cont.: Reset with state id & state execution id is broken for local activities. + //doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) + //smallWaitForFastTest() + } +} + +func doTestResetByStatIdWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { + // start test workflow server + wfHandler := reset.NewHandler() + closeFunc1 := startWorkflowWorker(wfHandler) + defer closeFunc1() + + _, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ + BackendType: backendType, + }) + defer closeFunc2() + + // start a workflow + apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ + Servers: []iwfidl.ServerConfiguration{ + { + URL: "http://localhost:" + testIwfServerPort, + }, + }, + }) + wfId := reset.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) + wfInput := &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString("1"), + } + req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) + startReq := iwfidl.WorkflowStartRequest{ + WorkflowId: wfId, + IwfWorkflowType: reset.WorkflowType, + WorkflowTimeoutSeconds: 100, + IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, + StartStateId: ptr.Any(reset.State1), + StateInput: wfInput, + WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ + WorkflowConfigOverride: config, + WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE), + }, + StateOptions: &iwfidl.WorkflowStateOptions{ + //Skipping wait until for state1 + SkipWaitUntil: iwfidl.PtrBool(true), + }, + } + startResp, httpResp, err := req.WorkflowStartRequest(startReq).Execute() + panicAtHttpError(err, httpResp) + + assertions := assert.New(t) + + req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + history, _ := wfHandler.GetTestResult() + //expect no starts in history as WaitUntil api is skipped. + assertions.Equalf(map[string]int64{ + "S1_decide": 1, + "S2_decide": 5, + }, history, "reset test fail, %v", history) + + assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus()) + assertions.Equal(1, len(resp2.GetResults())) + assertions.Equal("S2", resp2.GetResults()[0].CompletedStateId) + assertions.Equal("S2-5", resp2.GetResults()[0].CompletedStateExecutionId) + assertions.Equal("5", resp2.GetResults()[0].CompletedStateOutput.GetData()) + + //reset workflow by state id + resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background()) + _, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{ + WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()), + WorkflowId: wfId, + ResetType: iwfidl.STATE_ID, + StateId: iwfidl.PtrString(reset.State2), + }).Execute() + panicAtHttpError(err, httpResp) + + req3 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp3, httpResp, err := req3.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + resetHistory, _ := wfHandler.GetTestResult() + //expect no starts in history as WaitUntil api is skipped. + assertions.Equalf(map[string]int64{ + "S1_decide": 1, + "S2_decide": 10, + }, resetHistory, "reset test fail, %v", resetHistory) + + assertions.Equal(iwfidl.COMPLETED, resp3.GetWorkflowStatus()) + assertions.Equal(1, len(resp3.GetResults())) + assertions.Equal("S2", resp3.GetResults()[0].CompletedStateId) + assertions.Equal("S2-5", resp3.GetResults()[0].CompletedStateExecutionId) + assertions.Equal("5", resp3.GetResults()[0].CompletedStateOutput.GetData()) + + //reset workflow by state execution id + reset2Req := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background()) + _, httpResp, err = reset2Req.WorkflowResetRequest(iwfidl.WorkflowResetRequest{ + WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()), + WorkflowId: wfId, + ResetType: iwfidl.STATE_EXECUTION_ID, + StateExecutionId: iwfidl.PtrString(reset.State2 + "-4"), + }).Execute() + panicAtHttpError(err, httpResp) + + req4 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) + resp4, httpResp, err := req4.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ + WorkflowId: wfId, + }).Execute() + panicAtHttpError(err, httpResp) + + reset2History, _ := wfHandler.GetTestResult() + //expect no starts in history as WaitUntil api is skipped. + assertions.Equalf(map[string]int64{ + "S1_decide": 1, + "S2_decide": 12, + }, reset2History, "reset test fail, %v", reset2History) + + assertions.Equal(iwfidl.COMPLETED, resp4.GetWorkflowStatus()) + assertions.Equal(1, len(resp4.GetResults())) + assertions.Equal("S2", resp4.GetResults()[0].CompletedStateId) + assertions.Equal("S2-5", resp4.GetResults()[0].CompletedStateExecutionId) + assertions.Equal("5", resp4.GetResults()[0].CompletedStateOutput.GetData()) +} diff --git a/integ/workflow/reset/routers.go b/integ/workflow/reset/routers.go new file mode 100644 index 00000000..cdd4a642 --- /dev/null +++ b/integ/workflow/reset/routers.go @@ -0,0 +1,121 @@ +package reset + +import ( + "fmt" + "github.com/gin-gonic/gin" + "github.com/indeedeng/iwf/gen/iwfidl" + "github.com/indeedeng/iwf/service" + "log" + "net/http" + "strconv" +) + +/** +* This test workflow has 2 states, using REST controller to implement the workflow directly. +* State1: +* - No WaitUntil +* - Execute moves to State2 +* State2: +* - No WaitUntil +* - Execute loops through state2 5 times, then gracefully completes the workflow. +* This test is used for testing reset by state id and state execution id without WaitUntil + */ +const ( + WorkflowType = "reset" + State1 = "S1" + State2 = "S2" +) + +type handler struct { + invokeHistory map[string]int64 +} + +func NewHandler() *handler { + return &handler{ + invokeHistory: make(map[string]int64), + } +} + +// ApiV1WorkflowStartPost - for a workflow +func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { + panic("No start call is expected.") +} + +func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) { + log.Println("start of decide") + var req iwfidl.WorkflowStateDecideRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Println("received state decide request, ", req) + context := req.GetContext() + if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { + panic("attempt and firstAttemptTimestamp should be greater than zero") + } + + if req.GetWorkflowType() == WorkflowType { + h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ + if req.GetWorkflowStateId() == State1 { + // go to S2 + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: State2, + StateInput: req.StateInput, + StateOptions: &iwfidl.WorkflowStateOptions{ + //Skipping wait until for 1st execution of state2 + SkipWaitUntil: iwfidl.PtrBool(true), + }, + }, + }, + }, + }) + return + } else if req.GetWorkflowStateId() == State2 { + input := req.GetStateInput() + i, _ := strconv.Atoi(input.GetData()) + if i < 5 { + updatedInput := &iwfidl.EncodedObject{ + Encoding: iwfidl.PtrString("json"), + Data: iwfidl.PtrString(fmt.Sprintf("%v", i+1)), + } + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: State2, + StateInput: updatedInput, + StateOptions: &iwfidl.WorkflowStateOptions{ + //Skipping wait until for all executions of state2 after the 1st execution. + SkipWaitUntil: iwfidl.PtrBool(true), + }, + }, + }, + }, + }) + return + } else { + // go to complete + c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ + StateDecision: &iwfidl.StateDecision{ + NextStates: []iwfidl.StateMovement{ + { + StateId: service.GracefulCompletingWorkflowStateId, + StateInput: req.StateInput, + }, + }, + }, + }) + return + } + } + } + + c.JSON(http.StatusBadRequest, struct{}{}) +} + +func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { + return h.invokeHistory, nil +} diff --git a/service/client/cadence/reset.go b/service/client/cadence/reset.go index 308cc790..080a646b 100644 --- a/service/client/cadence/reset.go +++ b/service/client/cadence/reset.go @@ -167,9 +167,10 @@ func getDecisionEventIDByStateOrStateExecutionId( if e.GetEventType() == shared.EventTypeDecisionTaskCompleted { decisionFinishID = e.GetEventId() } + //TODO: Add check for local activity. (IWF-403) if e.GetEventType() == shared.EventTypeActivityTaskScheduled { typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName() - if strings.Contains(typeName, "StateStart") || strings.Contains(typeName, "StateApiWaitUntil") { + if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") { var backendType service.BackendType var input service.StateStartActivityInput err = converter.FromData(e.GetActivityTaskScheduledEventAttributes().Input, &backendType, &input) diff --git a/service/client/temporal/reset.go b/service/client/temporal/reset.go index 527d9654..6e6de797 100644 --- a/service/client/temporal/reset.go +++ b/service/client/temporal/reset.go @@ -162,9 +162,10 @@ func getDecisionEventIDByStateOrStateExecutionId( if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { decisionFinishID = e.GetEventId() } + //TODO: Add check for local activity. (IWF-403) if e.GetEventType() == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED { typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName() - if strings.Contains(typeName, "StateStart") || strings.Contains(typeName, "StateApiWaitUntil") { + if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") { var backendType service.BackendType var input service.StateStartActivityInput err = converter.FromPayloads(e.GetActivityTaskScheduledEventAttributes().Input, &backendType, &input)