-
Notifications
You must be signed in to change notification settings - Fork 55
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
302 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
package integ | ||
|
||
import ( | ||
"context" | ||
"github.com/indeedeng/iwf/integ/workflow/basic" | ||
"github.com/indeedeng/iwf/integ/workflow/headers" | ||
"github.com/indeedeng/iwf/service/common/ptr" | ||
"github.com/indeedeng/iwf/service/interpreter/env" | ||
"log" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/indeedeng/iwf/gen/iwfidl" | ||
"github.com/indeedeng/iwf/service" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestHeadersWorkflowTemporal(t *testing.T) { | ||
if !*temporalIntegTest { | ||
t.Skip() | ||
} | ||
for i := 0; i < *repeatIntegTest; i++ { | ||
doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, nil) | ||
smallWaitForFastTest() | ||
|
||
doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true)) | ||
smallWaitForFastTest() | ||
|
||
doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{ | ||
DisableSystemSearchAttribute: iwfidl.PtrBool(true), | ||
}) | ||
smallWaitForFastTest() | ||
} | ||
} | ||
|
||
func TestHeadersWorkflowCadence(t *testing.T) { | ||
if !*cadenceIntegTest { | ||
t.Skip() | ||
} | ||
for i := 0; i < *repeatIntegTest; i++ { | ||
doTestWorkflowWithHeaders(t, service.BackendTypeCadence, nil) | ||
smallWaitForFastTest() | ||
doTestWorkflowWithHeaders(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false)) | ||
smallWaitForFastTest() | ||
} | ||
} | ||
|
||
func doTestWorkflowWithHeaders(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { | ||
// start test workflow server | ||
wfHandler := headers.NewHandler() | ||
closeFunc1 := startWorkflowWorker(wfHandler) | ||
defer closeFunc1() | ||
|
||
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{ | ||
BackendType: backendType, | ||
DefaultHeaders: map[string]string{ | ||
headers.TestHeaderKey: headers.TestHeaderValue, | ||
}, | ||
}) | ||
defer closeFunc2() | ||
|
||
// start a workflow | ||
sharedConfig := env.GetSharedConfig() | ||
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{ | ||
DefaultHeader: sharedConfig.Interpreter.InterpreterActivityConfig.DefaultHeader, | ||
Servers: []iwfidl.ServerConfiguration{ | ||
{ | ||
URL: "http://localhost:" + testIwfServerPort, | ||
}, | ||
}, | ||
}) | ||
wfId := basic.WorkflowType + strconv.Itoa(int(time.Now().UnixNano())) | ||
wfInput := &iwfidl.EncodedObject{ | ||
Encoding: iwfidl.PtrString("json"), | ||
Data: iwfidl.PtrString("test data"), | ||
} | ||
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background()) | ||
startReq := iwfidl.WorkflowStartRequest{ | ||
WorkflowId: wfId, | ||
IwfWorkflowType: basic.WorkflowType, | ||
WorkflowTimeoutSeconds: 100, | ||
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort, | ||
StartStateId: ptr.Any(basic.State1), | ||
StateInput: wfInput, | ||
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{ | ||
WorkflowConfigOverride: config, | ||
WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE), | ||
// TODO: need more work to write integ test for cron | ||
// manual testing for now by uncomment the following line | ||
// CronSchedule: iwfidl.PtrString("* * * * *"), | ||
RetryPolicy: &iwfidl.WorkflowRetryPolicy{ | ||
InitialIntervalSeconds: iwfidl.PtrInt32(11), | ||
BackoffCoefficient: iwfidl.PtrFloat32(11), | ||
MaximumAttempts: iwfidl.PtrInt32(11), | ||
MaximumIntervalSeconds: iwfidl.PtrInt32(11), | ||
}, | ||
}, | ||
StateOptions: &iwfidl.WorkflowStateOptions{ | ||
StartApiTimeoutSeconds: iwfidl.PtrInt32(12), | ||
DecideApiTimeoutSeconds: iwfidl.PtrInt32(13), | ||
StartApiRetryPolicy: &iwfidl.RetryPolicy{ | ||
InitialIntervalSeconds: iwfidl.PtrInt32(12), | ||
BackoffCoefficient: iwfidl.PtrFloat32(12), | ||
MaximumAttempts: iwfidl.PtrInt32(12), | ||
MaximumIntervalSeconds: iwfidl.PtrInt32(12), | ||
}, | ||
DecideApiRetryPolicy: &iwfidl.RetryPolicy{ | ||
InitialIntervalSeconds: iwfidl.PtrInt32(13), | ||
BackoffCoefficient: iwfidl.PtrFloat32(13), | ||
MaximumAttempts: iwfidl.PtrInt32(13), | ||
MaximumIntervalSeconds: iwfidl.PtrInt32(13), | ||
}, | ||
}, | ||
} | ||
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute() | ||
panicAtHttpError(err, httpResp) | ||
|
||
// start it again should return already started error | ||
_, _, err = req.WorkflowStartRequest(startReq).Execute() | ||
apiErr, ok := err.(*iwfidl.GenericOpenAPIError) | ||
if !ok { | ||
log.Fatalf("Should fail to invoke start api %v", err) | ||
} | ||
errResp, ok := apiErr.Model().(iwfidl.ErrorResponse) | ||
if !ok { | ||
log.Fatalf("should be error response") | ||
} | ||
assertions := assert.New(t) | ||
assertions.Equal(errResp.GetSubStatus(), iwfidl.WORKFLOW_ALREADY_STARTED_SUB_STATUS) | ||
|
||
req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background()) | ||
resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ | ||
WorkflowId: wfId, | ||
}).Execute() | ||
panicAtHttpError(err, httpResp) | ||
|
||
// use a wrong workflowId to test the error case | ||
_, _, err = req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{ | ||
WorkflowId: "a wrong workflowId", | ||
}).Execute() | ||
apiErr, ok = err.(*iwfidl.GenericOpenAPIError) | ||
if !ok { | ||
log.Fatalf("Should fail to invoke get api %v", err) | ||
} | ||
errResp, ok = apiErr.Model().(iwfidl.ErrorResponse) | ||
if !ok { | ||
log.Fatalf("should be error response") | ||
} | ||
assertions.Equal(errResp.GetSubStatus(), iwfidl.WORKFLOW_NOT_EXISTS_SUB_STATUS) | ||
|
||
history, _ := wfHandler.GetTestResult() | ||
assertions.Equalf(map[string]int64{ | ||
"S1_start": 1, | ||
"S1_decide": 1, | ||
"S2_start": 1, | ||
"S2_decide": 1, | ||
}, history, "basic test fail, %v", history) | ||
|
||
assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus()) | ||
assertions.Equal(1, len(resp2.GetResults())) | ||
assertions.Equal(iwfidl.StateCompletionOutput{ | ||
CompletedStateId: "S2", | ||
CompletedStateExecutionId: "S2-1", | ||
CompletedStateOutput: wfInput, | ||
}, resp2.GetResults()[0]) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
package headers | ||
|
||
const TestHeaderKey = "integration-test-header" | ||
const TestHeaderValue = "integration-test-value" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package headers | ||
|
||
import ( | ||
"github.com/gin-gonic/gin" | ||
"github.com/indeedeng/iwf/gen/iwfidl" | ||
"github.com/indeedeng/iwf/service" | ||
"log" | ||
"net/http" | ||
) | ||
|
||
const ( | ||
WorkflowType = "basic" | ||
State1 = "S1" | ||
State2 = "S2" | ||
) | ||
|
||
type handler struct { | ||
invokeHistory map[string]int64 | ||
} | ||
|
||
func NewHandler() *handler { | ||
return &handler{ | ||
invokeHistory: make(map[string]int64), | ||
} | ||
} | ||
|
||
// ApiV1WorkflowStartPost - for a workflow | ||
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) { | ||
headerValue := c.GetHeader(TestHeaderKey) | ||
if headerValue != TestHeaderValue { | ||
panic("test header not found") | ||
} | ||
|
||
var req iwfidl.WorkflowStateStartRequest | ||
if err := c.ShouldBindJSON(&req); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) | ||
return | ||
} | ||
log.Println("received state start request, ", req) | ||
|
||
context := req.GetContext() | ||
if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { | ||
panic("attempt and firstAttemptTimestamp should be greater than zero") | ||
} | ||
|
||
if req.GetWorkflowType() == WorkflowType { | ||
// basic workflow go straight to decide methods without any commands | ||
if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 { | ||
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++ | ||
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{ | ||
CommandRequest: &iwfidl.CommandRequest{ | ||
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(), | ||
}, | ||
}) | ||
return | ||
} | ||
} | ||
|
||
c.JSON(http.StatusBadRequest, struct{}{}) | ||
} | ||
|
||
func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) { | ||
headerValue := c.GetHeader(TestHeaderKey) | ||
if headerValue != TestHeaderValue { | ||
panic("test header not found") | ||
} | ||
|
||
var req iwfidl.WorkflowStateDecideRequest | ||
if err := c.ShouldBindJSON(&req); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) | ||
return | ||
} | ||
log.Println("received state decide request, ", req) | ||
context := req.GetContext() | ||
if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 { | ||
panic("attempt and firstAttemptTimestamp should be greater than zero") | ||
} | ||
|
||
if req.GetWorkflowType() == WorkflowType { | ||
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++ | ||
if req.GetWorkflowStateId() == State1 { | ||
// go to S2 | ||
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ | ||
StateDecision: &iwfidl.StateDecision{ | ||
NextStates: []iwfidl.StateMovement{ | ||
{ | ||
StateId: State2, | ||
StateInput: req.StateInput, | ||
StateOptions: &iwfidl.WorkflowStateOptions{ | ||
StartApiTimeoutSeconds: iwfidl.PtrInt32(14), | ||
ExecuteApiTimeoutSeconds: iwfidl.PtrInt32(15), | ||
StartApiRetryPolicy: &iwfidl.RetryPolicy{ | ||
InitialIntervalSeconds: iwfidl.PtrInt32(14), | ||
BackoffCoefficient: iwfidl.PtrFloat32(14), | ||
MaximumAttempts: iwfidl.PtrInt32(14), | ||
MaximumIntervalSeconds: iwfidl.PtrInt32(14), | ||
}, | ||
ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{ | ||
InitialIntervalSeconds: iwfidl.PtrInt32(15), | ||
BackoffCoefficient: iwfidl.PtrFloat32(15), | ||
MaximumAttempts: iwfidl.PtrInt32(15), | ||
MaximumIntervalSeconds: iwfidl.PtrInt32(15), | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}) | ||
return | ||
} else if req.GetWorkflowStateId() == State2 { | ||
// go to complete | ||
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{ | ||
StateDecision: &iwfidl.StateDecision{ | ||
NextStates: []iwfidl.StateMovement{ | ||
{ | ||
StateId: service.GracefulCompletingWorkflowStateId, | ||
StateInput: req.StateInput, | ||
}, | ||
}, | ||
}, | ||
}) | ||
return | ||
} | ||
} | ||
|
||
c.JSON(http.StatusBadRequest, struct{}{}) | ||
} | ||
|
||
func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) { | ||
return h.invokeHistory, nil | ||
} |