Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-70: allow headers to be set in Interpreter Config #424

Merged
merged 8 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type (
// default is http://localhost:ApiConfig.Port
ApiServiceAddress string `json:"serviceAddress"`
DumpWorkflowInternalActivityConfig *DumpWorkflowInternalActivityConfig `json:"dumpWorkflowInternalActivityConfig"`
DefaultHeader map[string]string `json:"defaultHeader"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not: headers (plural)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye! Thanks, fixed now 🙌

}

DumpWorkflowInternalActivityConfig struct {
Expand Down
9 changes: 6 additions & 3 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ import (
const testWorkflowServerPort = "9714"
const testIwfServerPort = "9715"

func createTestConfig(failAtMemoCompatibility bool, optimizedVersioning *bool) config.Config {
func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
return config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
OptimizedVersioning: optimizedVersioning,
OptimizedVersioning: testCfg.OptimizedVersioning,
},
Interpreter: config.Interpreter{
VerboseDebug: false,
FailAtMemoIncompatibility: failAtMemoCompatibility,
FailAtMemoIncompatibility: !testCfg.DisableFailAtMemoIncompatibility,
InterpreterActivityConfig: config.InterpreterActivityConfig{
DefaultHeader: testCfg.DefaultHeaders,
},
},
}
}
102 changes: 102 additions & 0 deletions integ/headers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package integ

import (
"context"
"github.com/indeedeng/iwf/integ/workflow/headers"
"github.com/indeedeng/iwf/service/common/ptr"
"strconv"
"testing"
"time"

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/stretchr/testify/assert"
)

func TestHeadersWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}

for i := 0; i < *repeatIntegTest; i++ {
doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()

doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()

doTestWorkflowWithHeaders(t, service.BackendTypeTemporal, &iwfidl.WorkflowConfig{
DisableSystemSearchAttribute: iwfidl.PtrBool(true),
})
smallWaitForFastTest()
}
}

func TestHeadersWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestWorkflowWithHeaders(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestWorkflowWithHeaders(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}

func doTestWorkflowWithHeaders(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := headers.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
DefaultHeaders: map[string]string{
headers.TestHeaderKey: headers.TestHeaderValue,
},
})
defer closeFunc2()

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
wfId := headers.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))
wfInput := &iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString("test data"),
}
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: headers.WorkflowType,
WorkflowTimeoutSeconds: 100,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(headers.State1),
StateInput: wfInput,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp2, err2 := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err2, httpResp2)

assertions := assert.New(t)

history, _ := wfHandler.GetTestResult()
assertions.Equalf(map[string]int64{
"S1_start": 1,
"S1_decide": 1,
}, history, "headers test fail, %v", history)
}
10 changes: 5 additions & 5 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type IwfServiceTestConfig struct {
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
OptimizedVersioning *bool
DefaultHeaders map[string]string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that the previous implementation had integration tests that would check for the presence of the headers in every call, am I seeing that right? https://github.com/indeedeng/iwf/pull/356/files#diff-c3752f9aebfa9a670e62a5c0cc34c36bbfbfcbe849288ece0973806a34eac156R30-R33

I wanted to do something more flexible, where we would have a specific integration test for checking the headers passed in the configuration are being used, rather than having the route check it every time a call was made.

I do need some of your guidance here, I can't figure out how to do the integration tests with this approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The headers will be used when making calls for stateAPI. So for a specific integ tests, you will still need to build out a router. But it will be cleaner to have that specific router, is that what you meant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I'm thinking yes. Do you think that would be overkill for testing such a small feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s a good idea. I have been lazy to do the right thing and the tests have become messy here

Copy link
Contributor Author

@samuel27m samuel27m Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created the tests now, I struggled a bit navigating through the existing ones trying to use one as a template. I'm sure the tests I created can be better. We can have a chat tomorrow about them and you can point me where I've gone wrong! Thanks in advance! 🙌

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I went through it and left you some comments already. and we can go through them later

}

func startIwfService(backendType service.BackendType) (closeFunc func()) {
Expand Down Expand Up @@ -98,7 +99,6 @@ func startIwfServiceWithClient(backendType service.BackendType) (uclient uclient
//var integTemporalUclientCached api.UnifiedClient

func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.UnifiedClient, closeFunc func()) {
failAtMemoIncompatibility := !config.DisableFailAtMemoIncompatibility
if config.BackendType == service.BackendTypeTemporal {
dataConverter := converter.GetDefaultDataConverter()
if config.MemoEncryption {
Expand All @@ -111,7 +111,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), uclient, logger)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -123,7 +123,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand All @@ -142,7 +142,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), uclient, logger)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -154,7 +154,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.OptimizedVersioning), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand Down
4 changes: 4 additions & 0 deletions integ/workflow/headers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package headers

const TestHeaderKey = "integration-test-header"
const TestHeaderValue = "integration-test-value"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe constant.go? or just put it into routers.go

92 changes: 92 additions & 0 deletions integ/workflow/headers/routers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package headers

import (
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
)

const (
WorkflowType = "headers"
State1 = "S1"
)

type handler struct {
invokeHistory map[string]int64
}

func NewHandler() *handler {
return &handler{
invokeHistory: make(map[string]int64),
}
}

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
headerValue := c.GetHeader(TestHeaderKey)
if headerValue != TestHeaderValue {
c.JSON(http.StatusBadRequest, gin.H{"error": "test header not found"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to return here after c.JSON. Otherwise the remaining code will override it.

}

var req iwfidl.WorkflowStateStartRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state start request, ", req)

if req.GetWorkflowType() == WorkflowType {
// basic workflow go straight to decide methods without any commands
if req.GetWorkflowStateId() == State1 {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(),
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
headerValue := c.GetHeader(TestHeaderKey)
if headerValue != TestHeaderValue {
c.JSON(http.StatusBadRequest, gin.H{"error": "test header not found"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

}

var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Println("received state decide request, ", req)

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++
if req.GetWorkflowStateId() == State1 {
// go to S2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: wrong comment

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: service.GracefulCompletingWorkflowStateId,
StateInput: req.StateInput,
},
},
},
})
return
}
}

c.JSON(http.StatusBadRequest, struct{}{})
}

func (h *handler) GetTestResult() (map[string]int64, map[string]interface{}) {
return h.invokeHistory, nil
}
7 changes: 6 additions & 1 deletion service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func StateApiWaitUntil(
logger.Info("StateStartActivity", "input", input)
iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)

svcCfg := env.GetSharedConfig()
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeader,
Servers: []iwfidl.ServerConfiguration{
{
URL: iwfWorkerBaseUrl,
Expand Down Expand Up @@ -86,7 +88,9 @@ func StateApiExecute(
logger.Info("StateDecideActivity", "input", input)

iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)
svcCfg := env.GetSharedConfig()
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeader,
Servers: []iwfidl.ServerConfiguration{
{
URL: iwfWorkerBaseUrl,
Expand Down Expand Up @@ -272,8 +276,9 @@ func DumpWorkflowInternal(
logger.Info("DumpWorkflowInternal", "input", req)

apiAddress := config.GetApiServiceAddressWithDefault(env.GetSharedConfig())

svcCfg := env.GetSharedConfig()
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeader,
Servers: []iwfidl.ServerConfiguration{
{
URL: apiAddress,
Expand Down
Loading