Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-70: allow headers to be set in Interpreter Config #424

Merged
merged 8 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import (
const testWorkflowServerPort = "9714"
const testIwfServerPort = "9715"

func createTestConfig(failAtMemoCompatibility bool, optimizedVersioning *bool) config.Config {
func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
return config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
OptimizedVersioning: optimizedVersioning,
OptimizedVersioning: testCfg.OptimizedVersioning,
},
Interpreter: config.Interpreter{
VerboseDebug: false,
FailAtMemoIncompatibility: failAtMemoCompatibility,
FailAtMemoIncompatibility: !testCfg.DisableFailAtMemoIncompatibility,
InterpreterActivityConfig: config.InterpreterActivityConfig{
DefaultHeader: testCfg.DefaultHeaders,
},
},
}
}
167 changes: 167 additions & 0 deletions integ/headers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/integ/workflow/basic"
"github.com/indeedeng/iwf/integ/workflow/headers"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/interpreter/env"
"log"
"strconv"
"testing"
"time"

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
)

func TestHeadersWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()

doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()

doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{
DisableSystemSearchAttribute: iwfidl.PtrBool(true),
})
smallWaitForFastTest()
}
}

func TestHeadersWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestWorkflowWithHeaders(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestWorkflowWithHeaders(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}

func doTestWorkflowWithHeaders(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := headers.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
DefaultHeaders: map[string]string{
headers.TestHeaderKey: headers.TestHeaderValue,
},
})
defer closeFunc2()

// start a workflow
sharedConfig := env.GetSharedConfig()
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
DefaultHeader: sharedConfig.Interpreter.InterpreterActivityConfig.DefaultHeader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this?

Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := basic.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
wfInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("test data"),
}
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: basic.WorkflowType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should start the header workflow here

WorkflowTimeoutSeconds: 100,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(basic.State1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the starting state should be the header.State1

StateInput: wfInput,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
WorkflowIDReusePolicy: ptr.Any(iwfidl.REJECT_DUPLICATE),
// TODO: need more work to write integ test for cron
// manual testing for now by uncomment the following line
// CronSchedule: iwfidl.PtrString("* * * * *"),
RetryPolicy: &iwfidl.WorkflowRetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(11),
BackoffCoefficient: iwfidl.PtrFloat32(11),
MaximumAttempts: iwfidl.PtrInt32(11),
MaximumIntervalSeconds: iwfidl.PtrInt32(11),
},
},
StateOptions: &iwfidl.WorkflowStateOptions{
StartApiTimeoutSeconds: iwfidl.PtrInt32(12),
DecideApiTimeoutSeconds: iwfidl.PtrInt32(13),
StartApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(12),
BackoffCoefficient: iwfidl.PtrFloat32(12),
MaximumAttempts: iwfidl.PtrInt32(12),
MaximumIntervalSeconds: iwfidl.PtrInt32(12),
},
DecideApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(13),
BackoffCoefficient: iwfidl.PtrFloat32(13),
MaximumAttempts: iwfidl.PtrInt32(13),
MaximumIntervalSeconds: iwfidl.PtrInt32(13),
},
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can skip all these

}
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

// start it again should return already started error
_, _, err = req.WorkflowStartRequest(startReq).Execute()
apiErr, ok := err.(*iwfidl.GenericOpenAPIError)
if !ok {
log.Fatalf("Should fail to invoke start api %v", err)
}
errResp, ok := apiErr.Model().(iwfidl.ErrorResponse)
if !ok {
log.Fatalf("should be error response")
}
assertions := assert.New(t)
assertions.Equal(errResp.GetSubStatus(), iwfidl.WORKFLOW_ALREADY_STARTED_SUB_STATUS)

req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp2, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

// use a wrong workflowId to test the error case
_, _, err = req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: "a wrong workflowId",
}).Execute()
apiErr, ok = err.(*iwfidl.GenericOpenAPIError)
if !ok {
log.Fatalf("Should fail to invoke get api %v", err)
}
errResp, ok = apiErr.Model().(iwfidl.ErrorResponse)
if !ok {
log.Fatalf("should be error response")
}
assertions.Equal(errResp.GetSubStatus(), iwfidl.WORKFLOW_NOT_EXISTS_SUB_STATUS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can skip these. So that the test is focusing on the header stuff only


history, _ := wfHandler.GetTestResult()
assertions.Equalf(map[string]int64{
"S1_start": 1,
"S1_decide": 1,
"S2_start": 1,
"S2_decide": 1,
}, history, "basic test fail, %v", history)

assertions.Equal(iwfidl.COMPLETED, resp2.GetWorkflowStatus())
assertions.Equal(1, len(resp2.GetResults()))
assertions.Equal(iwfidl.StateCompletionOutput{
CompletedStateId: "S2",
CompletedStateExecutionId: "S2-1",
CompletedStateOutput: wfInput,
}, resp2.GetResults()[0])
}
10 changes: 5 additions & 5 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type IwfServiceTestConfig struct {
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
OptimizedVersioning *bool
DefaultHeaders map[string]string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that the previous implementation had integration tests that would check for the presence of the headers in every call, am I seeing that right? https://github.com/indeedeng/iwf/pull/356/files#diff-c3752f9aebfa9a670e62a5c0cc34c36bbfbfcbe849288ece0973806a34eac156R30-R33

I wanted to do something more flexible, where we would have a specific integration test for checking the headers passed in the configuration are being used, rather than having the route check it every time a call was made.

I do need some of your guidance here, I can't figure out how to do the integration tests with this approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The headers will be used when making calls for stateAPI. So for a specific integ tests, you will still need to build out a router. But it will be cleaner to have that specific router, is that what you meant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I'm thinking yes. Do you think that would be overkill for testing such a small feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s a good idea. I have been lazy to do the right thing and the tests have become messy here

Copy link
Contributor Author

@samuel27m samuel27m Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created the tests now, I struggled a bit navigating through the existing ones trying to use one as a template. I'm sure the tests I created can be better. We can have a chat tomorrow about them and you can point me where I've gone wrong! Thanks in advance! 🙌

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I went through it and left you some comments already. and we can go through them later

}

func startIwfService(backendType service.BackendType) (closeFunc func()) {
Expand Down Expand Up @@ -98,7 +99,6 @@ func startIwfServiceWithClient(backendType service.BackendType) (uclient uclient
//var integTemporalUclientCached api.UnifiedClient

func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.UnifiedClient, closeFunc func()) {
failAtMemoIncompatibility := !config.DisableFailAtMemoIncompatibility
if config.BackendType == service.BackendTypeTemporal {
dataConverter := converter.GetDefaultDataConverter()
if config.MemoEncryption {
Expand All @@ -111,7 +111,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), uclient, logger)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -123,7 +123,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand All @@ -142,7 +142,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), uclient, logger)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -154,7 +154,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand Down
4 changes: 4 additions & 0 deletions integ/workflow/headers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package headers

const TestHeaderKey = "integration-test-header"
const TestHeaderValue = "integration-test-value"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe constant.go? or just put it into routers.go

131 changes: 131 additions & 0 deletions integ/workflow/headers/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package headers

import (
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
)

const (
WorkflowType = "basic"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe "header"?

State1 = "S1"
State2 = "S2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can only have one state for this test

)

type handler struct {
invokeHistory map[string]int64
}

func NewHandler() *handler {
return &handler{
invokeHistory: make(map[string]int64),
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
headerValue := c.GetHeader(TestHeaderKey)
if headerValue != TestHeaderValue {
panic("test header not found")
}

var req iwfidl.WorkflowStateStartRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state start request, ", req)

context := req.GetContext()
if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 {
panic("attempt and firstAttemptTimestamp should be greater than zero")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't have to check this


if req.GetWorkflowType() == WorkflowType {
// basic workflow go straight to decide methods without any commands
if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(),
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
headerValue := c.GetHeader(TestHeaderKey)
if headerValue != TestHeaderValue {
panic("test header not found")
}

var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state decide request, ", req)
context := req.GetContext()
if context.GetAttempt() <= 0 || context.GetFirstAttemptTimestamp() <= 0 {
panic("attempt and firstAttemptTimestamp should be greater than zero")
}

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
// go to S2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: wrong comment

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
StateInput: req.StateInput,
StateOptions: &iwfidl.WorkflowStateOptions{
StartApiTimeoutSeconds: iwfidl.PtrInt32(14),
ExecuteApiTimeoutSeconds: iwfidl.PtrInt32(15),
StartApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(14),
BackoffCoefficient: iwfidl.PtrFloat32(14),
MaximumAttempts: iwfidl.PtrInt32(14),
MaximumIntervalSeconds: iwfidl.PtrInt32(14),
},
ExecuteApiRetryPolicy: &iwfidl.RetryPolicy{
InitialIntervalSeconds: iwfidl.PtrInt32(15),
BackoffCoefficient: iwfidl.PtrFloat32(15),
MaximumAttempts: iwfidl.PtrInt32(15),
MaximumIntervalSeconds: iwfidl.PtrInt32(15),
},
},
},
},
},
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
StateInput: req.StateInput,
},
},
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, nil
}
1 change: 1 addition & 0 deletions service/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type (
// default is http://localhost:ApiConfig.Port
ApiServiceAddress string `json:"serviceAddress"`
DumpWorkflowInternalActivityConfig *DumpWorkflowInternalActivityConfig `json:"dumpWorkflowInternalActivityConfig"`
DefaultHeader map[string]string `json:"defaultHeader"`
}

DumpWorkflowInternalActivityConfig struct {
Expand Down
Loading
Loading