-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a55adb6
commit 45c8a17
Showing
2 changed files
with
251 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package integ | ||
|
||
import ( | ||
"context" | ||
"github.com/indeedeng/iwf/service/common/ptr" | ||
"log" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/indeedeng/iwf/gen/iwfidl" | ||
"github.com/indeedeng/iwf/integ/workflow/wf_ignore_already_started" | ||
"github.com/indeedeng/iwf/service" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestIgnoreAlreadyStartedWorkflowTemporal(t *testing.T) { | ||
if !*temporalIntegTest { | ||
t.Skip() | ||
} | ||
for i := 0; i < *repeatIntegTest; i++ { | ||
// Case 1: secondReq does not ignore AlreadyStartedError; second start request should return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeTemporal, nil, nil, true) | ||
smallWaitForFastTest() | ||
|
||
// Case 2: secondReq does ignore AlreadyStartedError; second start request should not return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeTemporal, nil, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: true, | ||
}, false) | ||
smallWaitForFastTest() | ||
|
||
// Case 3: secondReq does ignore AlreadyStartedError only if requestId match; they do, so second start request should not return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeTemporal, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: false, | ||
RequestId: iwfidl.PtrString("test"), | ||
}, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: true, | ||
RequestId: iwfidl.PtrString("test"), | ||
}, false) | ||
smallWaitForFastTest() | ||
|
||
// Case 4: secondReq does ignore AlreadyStartedError only if requestId match; they do not, so second start request should return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeTemporal, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: false, | ||
RequestId: iwfidl.PtrString("test1"), | ||
}, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: true, | ||
RequestId: iwfidl.PtrString("test2"), | ||
}, true) | ||
smallWaitForFastTest() | ||
} | ||
} | ||
|
||
func TestIgnoreAlreadyStartedWorkflowCadence(t *testing.T) { | ||
if !*cadenceIntegTest { | ||
t.Skip() | ||
} | ||
for i := 0; i < *repeatIntegTest; i++ { | ||
// Case 1: secondReq does not ignore AlreadyStartedError; second start request should return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeCadence, nil, nil, true) | ||
smallWaitForFastTest() | ||
|
||
// Case 2: secondReq does ignore AlreadyStartedError; second start request should not return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeCadence, nil, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: true, | ||
}, false) | ||
smallWaitForFastTest() | ||
|
||
// Case 3: secondReq does ignore AlreadyStartedError only if requestId match; they do, so second start request should not return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeCadence, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: false, | ||
RequestId: iwfidl.PtrString("test"), | ||
}, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: true, | ||
RequestId: iwfidl.PtrString("test"), | ||
}, false) | ||
smallWaitForFastTest() | ||
|
||
// Case 4: secondReq does ignore AlreadyStartedError only if requestId match; they do not, so second start request should return error | ||
doIgnoreAlreadyStartedWorkflow(t, service.BackendTypeCadence, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: false, | ||
RequestId: iwfidl.PtrString("test1"), | ||
}, &iwfidl.WorkflowAlreadyStartedOptions{ | ||
IgnoreAlreadyStartedError: true, | ||
RequestId: iwfidl.PtrString("test2"), | ||
}, true) | ||
smallWaitForFastTest() | ||
} | ||
} | ||
|
||
func doIgnoreAlreadyStartedWorkflow(t *testing.T, backendType service.BackendType, firstReqConfig *iwfidl.WorkflowAlreadyStartedOptions, secondReqConfig *iwfidl.WorkflowAlreadyStartedOptions, errorExpected bool) { | ||
// start test workflow server | ||
wfHandler := wf_ignore_already_started.NewHandler() | ||
closeFunc1 := startWorkflowWorker(wfHandler) | ||
defer closeFunc1() | ||
|
||
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ | ||
BackendType: backendType, | ||
}) | ||
defer closeFunc2() | ||
|
||
// start a workflow | ||
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ | ||
Servers: []iwfidl.ServerConfiguration{ | ||
{ | ||
URL: "http://localhost:" + testIwfServerPort, | ||
}, | ||
}, | ||
}) | ||
wfId := wf_ignore_already_started.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) | ||
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) | ||
|
||
firstReq := createReq(wfId, firstReqConfig) | ||
|
||
firstRes, firstHttpResp, err := req.WorkflowStartRequest(firstReq).Execute() | ||
panicAtHttpError(err, firstHttpResp) | ||
|
||
secondReq := createReq(wfId, secondReqConfig) | ||
secondRes, secondHttpResp, err := req.WorkflowStartRequest(secondReq).Execute() | ||
|
||
assertions := assert.New(t) | ||
if errorExpected { | ||
apiErr, ok := err.(*iwfidl.GenericOpenAPIError) | ||
if !ok { | ||
log.Fatalf("Should fail to invoke start api %v", err) | ||
} | ||
errResp, ok := apiErr.Model().(iwfidl.ErrorResponse) | ||
if !ok { | ||
log.Fatalf("should be error response") | ||
} | ||
assertions.Equal(iwfidl.WORKFLOW_ALREADY_STARTED_SUB_STATUS, errResp.GetSubStatus()) | ||
} else { | ||
assertions.Equal(nil, err) | ||
assertions.Equal(firstRes.GetWorkflowRunId(), secondRes.GetWorkflowRunId()) | ||
assertions.Equal(200, secondHttpResp.StatusCode) | ||
} | ||
} | ||
|
||
func createReq(wfId string, options *iwfidl.WorkflowAlreadyStartedOptions) iwfidl.WorkflowStartRequest { | ||
return iwfidl.WorkflowStartRequest{ | ||
WorkflowId: wfId, | ||
IwfWorkflowType: wf_ignore_already_started.WorkflowType, | ||
WorkflowTimeoutSeconds: 10, | ||
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, | ||
StartStateId: ptr.Any(wf_ignore_already_started.State1), | ||
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ | ||
WorkflowAlreadyStartedOptions: options, | ||
}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package wf_ignore_already_started | ||
|
||
import ( | ||
"github.com/indeedeng/iwf/service/common/ptr" | ||
"log" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/gin-gonic/gin" | ||
"github.com/indeedeng/iwf/gen/iwfidl" | ||
"github.com/indeedeng/iwf/service" | ||
) | ||
|
||
const ( | ||
WorkflowType = "wf_ignore_already_started" | ||
State1 = "S1" | ||
) | ||
|
||
type handler struct { | ||
invokeHistory map[string]int64 | ||
invokeData map[string]interface{} | ||
} | ||
|
||
func NewHandler() *handler { | ||
return &handler{ | ||
invokeHistory: make(map[string]int64), | ||
} | ||
} | ||
|
||
// ApiV1WorkflowStartPost - for a workflow | ||
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { | ||
var req iwfidl.WorkflowStateStartRequest | ||
if err := c.ShouldBindJSON(&req); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) | ||
return | ||
} | ||
log.Println("received state start request, ", req) | ||
|
||
if req.GetWorkflowType() == WorkflowType { | ||
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ | ||
if req.GetWorkflowStateId() == State1 { | ||
nowInt, err := strconv.Atoi(req.StateInput.GetData()) | ||
if err != nil { | ||
panic(err) | ||
} | ||
now := int64(nowInt) | ||
h.invokeData["scheduled_at"] = now | ||
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ | ||
CommandRequest: &iwfidl.CommandRequest{ | ||
TimerCommands: []iwfidl.TimerCommand{ | ||
{ | ||
CommandId: ptr.Any("timer-cmd-id"), | ||
DurationSeconds: iwfidl.PtrInt64(10), // fire after 10s | ||
}, | ||
}, | ||
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), | ||
}, | ||
}) | ||
return | ||
} | ||
} | ||
|
||
c.JSON(http.StatusBadRequest, struct{}{}) | ||
} | ||
|
||
func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) { | ||
var req iwfidl.WorkflowStateDecideRequest | ||
if err := c.ShouldBindJSON(&req); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) | ||
return | ||
} | ||
log.Println("received state decide request, ", req) | ||
|
||
if req.GetWorkflowType() == WorkflowType { | ||
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ | ||
if req.GetWorkflowStateId() == State1 { | ||
now := time.Now().Unix() | ||
h.invokeData["fired_at"] = now | ||
timerResults := req.GetCommandResults() | ||
timerId := timerResults.GetTimerResults()[0].GetCommandId() | ||
h.invokeData["timer_id"] = timerId | ||
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ | ||
StateDecision: &iwfidl.StateDecision{ | ||
NextStates: []iwfidl.StateMovement{ | ||
{ | ||
StateId: service.GracefulCompletingWorkflowStateId, | ||
}, | ||
}, | ||
}, | ||
}) | ||
return | ||
} | ||
} | ||
|
||
c.JSON(http.StatusBadRequest, struct{}{}) | ||
} | ||
|
||
func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { | ||
return h.invokeHistory, h.invokeData | ||
} |