Skip to content

Commit

Permalink
iwf-401: update activity type name to find reset history eventID (#515)
Browse files Browse the repository at this point in the history
Co-authored-by: Katie Atrops <katiea@indeed.com>
  • Loading branch information
ktrops and Katie Atrops authored Dec 18, 2024
1 parent 8fffb80 commit 36e5a27
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 2 deletions.
167 changes: 167 additions & 0 deletions integ/reset_by_state_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/reset"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
"time"
)

func TestResetByStateIdWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()

//TODO: uncomment below when IWF-403 implementation is done.
//TODO cont.: Reset with state id & state execution id is broken for local activities.
//doTestResetByStatIdWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
//smallWaitForFastTest()
}
}

func TestResetByStateIdWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()

//TODO: uncomment below when IWF-403 implementation is done.
//TODO cont.: Reset with state id & state execution id is broken for local activities.
//doTestResetByStatIdWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
//smallWaitForFastTest()
}
}

func doTestResetByStatIdWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := reset.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
})
defer closeFunc2()

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := reset.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
wfInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("1"),
}
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: reset.WorkflowType,
WorkflowTimeoutSeconds: 100,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(reset.State1),
StateInput: wfInput,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE),
},
StateOptions: &iwfidl.WorkflowStateOptions{
//Skipping wait until for state1
SkipWaitUntil: iwfidl.PtrBool(true),
},
}
startResp, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

assertions := assert.New(t)

req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

history, _ := wfHandler.GetTestResult()
//expect no starts in history as WaitUntil api is skipped.
assertions.Equalf(map[string]int64{
"S1_decide": 1,
"S2_decide": 5,
}, history, "reset test fail, %v", history)

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
assertions.Equal("S2", resp2.GetResults()[0].CompletedStateId)
assertions.Equal("S2-5", resp2.GetResults()[0].CompletedStateExecutionId)
assertions.Equal("5", resp2.GetResults()[0].CompletedStateOutput.GetData())

//reset workflow by state id
resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background())
_, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{
WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()),
WorkflowId: wfId,
ResetType: iwfidl.STATE_ID,
StateId: iwfidl.PtrString(reset.State2),
}).Execute()
panicAtHttpError(err, httpResp)

req3 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp3, httpResp, err := req3.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

resetHistory, _ := wfHandler.GetTestResult()
//expect no starts in history as WaitUntil api is skipped.
assertions.Equalf(map[string]int64{
"S1_decide": 1,
"S2_decide": 10,
}, resetHistory, "reset test fail, %v", resetHistory)

assertions.Equal(iwfidl.COMPLETED, resp3.GetWorkflowStatus())
assertions.Equal(1, len(resp3.GetResults()))
assertions.Equal("S2", resp3.GetResults()[0].CompletedStateId)
assertions.Equal("S2-5", resp3.GetResults()[0].CompletedStateExecutionId)
assertions.Equal("5", resp3.GetResults()[0].CompletedStateOutput.GetData())

//reset workflow by state execution id
reset2Req := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background())
_, httpResp, err = reset2Req.WorkflowResetRequest(iwfidl.WorkflowResetRequest{
WorkflowRunId: iwfidl.PtrString(startResp.GetWorkflowRunId()),
WorkflowId: wfId,
ResetType: iwfidl.STATE_EXECUTION_ID,
StateExecutionId: iwfidl.PtrString(reset.State2 + "-4"),
}).Execute()
panicAtHttpError(err, httpResp)

req4 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp4, httpResp, err := req4.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

reset2History, _ := wfHandler.GetTestResult()
//expect no starts in history as WaitUntil api is skipped.
assertions.Equalf(map[string]int64{
"S1_decide": 1,
"S2_decide": 12,
}, reset2History, "reset test fail, %v", reset2History)

assertions.Equal(iwfidl.COMPLETED, resp4.GetWorkflowStatus())
assertions.Equal(1, len(resp4.GetResults()))
assertions.Equal("S2", resp4.GetResults()[0].CompletedStateId)
assertions.Equal("S2-5", resp4.GetResults()[0].CompletedStateExecutionId)
assertions.Equal("5", resp4.GetResults()[0].CompletedStateOutput.GetData())
}
121 changes: 121 additions & 0 deletions integ/workflow/reset/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package reset

import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
"strconv"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
* State1:
* - No WaitUntil
* - Execute moves to State2
* State2:
* - No WaitUntil
* - Execute loops through state2 5 times, then gracefully completes the workflow.
* This test is used for testing reset by state id and state execution id without WaitUntil
*/
const (
WorkflowType = "reset"
State1 = "S1"
State2 = "S2"
)

type handler struct {
invokeHistory map[string]int64
}

func NewHandler() *handler {
return &handler{
invokeHistory: make(map[string]int64),
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
panic("No start call is expected.")
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
log.Println("start of decide")
var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state decide request, ", req)
context := req.GetContext()
if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 {
panic("attempt and firstAttemptTimestamp should be greater than zero")
}

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
// go to S2
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
StateInput: req.StateInput,
StateOptions: &iwfidl.WorkflowStateOptions{
//Skipping wait until for 1st execution of state2
SkipWaitUntil: iwfidl.PtrBool(true),
},
},
},
},
})
return
} else if req.GetWorkflowStateId() == State2 {
input := req.GetStateInput()
i, _ := strconv.Atoi(input.GetData())
if i < 5 {
updatedInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(fmt.Sprintf("%v", i+1)),
}
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
StateInput: updatedInput,
StateOptions: &iwfidl.WorkflowStateOptions{
//Skipping wait until for all executions of state2 after the 1st execution.
SkipWaitUntil: iwfidl.PtrBool(true),
},
},
},
},
})
return
} else {
// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
StateInput: req.StateInput,
},
},
},
})
return
}
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, nil
}
3 changes: 2 additions & 1 deletion service/client/cadence/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ func getDecisionEventIDByStateOrStateExecutionId(
if e.GetEventType() == shared.EventTypeDecisionTaskCompleted {
decisionFinishID = e.GetEventId()
}
//TODO: Add check for local activity. (IWF-403)
if e.GetEventType() == shared.EventTypeActivityTaskScheduled {
typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName()
if strings.Contains(typeName, "StateStart") || strings.Contains(typeName, "StateApiWaitUntil") {
if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") {
var backendType service.BackendType
var input service.StateStartActivityInput
err = converter.FromData(e.GetActivityTaskScheduledEventAttributes().Input, &backendType, &input)
Expand Down
3 changes: 2 additions & 1 deletion service/client/temporal/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ func getDecisionEventIDByStateOrStateExecutionId(
if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
decisionFinishID = e.GetEventId()
}
//TODO: Add check for local activity. (IWF-403)
if e.GetEventType() == enums.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED {
typeName := e.GetActivityTaskScheduledEventAttributes().GetActivityType().GetName()
if strings.Contains(typeName, "StateStart") || strings.Contains(typeName, "StateApiWaitUntil") {
if strings.Contains(typeName, "StateApiExecute") || strings.Contains(typeName, "StateApiWaitUntil") {
var backendType service.BackendType
var input service.StateStartActivityInput
err = converter.FromPayloads(e.GetActivityTaskScheduledEventAttributes().Input, &backendType, &input)
Expand Down

0 comments on commit 36e5a27

Please sign in to comment.