Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting header for state API request #356

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion integ/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ func doTestBasicWorkflow(t *testing.T, backendType service.BackendType, config *
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(backendType)
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
SetTestHeader: true,
})
defer closeFunc2()

// start a workflow
Expand Down
17 changes: 14 additions & 3 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,26 @@ import (
const testWorkflowServerPort = "9714"
const testIwfServerPort = "9715"

func createTestConfig(failAtMemoCompatibility bool) config.Config {
return config.Config{
const TestHeaderKey = "x-envoy-upstream-rq-timeout-ms"
const TestHeaderValue = "86400"

func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
failAtMemoIncompatibility := !testCfg.DisableFailAtMemoIncompatibility
cfg := config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
},
Interpreter: config.Interpreter{
VerboseDebug: false,
FailAtMemoIncompatibility: failAtMemoCompatibility,
FailAtMemoIncompatibility: failAtMemoIncompatibility,
InterpreterActivityConfig: config.InterpreterActivityConfig{},
},
}
if testCfg.SetTestHeader {
cfg.Interpreter.InterpreterActivityConfig.DefaultHeader = map[string]string{
TestHeaderKey: TestHeaderValue,
}
}
return cfg
}
10 changes: 5 additions & 5 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type IwfServiceTestConfig struct {
BackendType service.BackendType
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
SetTestHeader bool
}

func startIwfService(backendType service.BackendType) (closeFunc func()) {
Expand Down Expand Up @@ -97,7 +98,6 @@ func startIwfServiceWithClient(backendType service.BackendType) (uclient uclient
//var integTemporalUclientCached api.UnifiedClient

func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.UnifiedClient, closeFunc func()) {
failAtMemoIncompatibility := !config.DisableFailAtMemoIncompatibility
if config.BackendType == service.BackendTypeTemporal {
dataConverter := converter.GetDefaultDataConverter()
if config.MemoEncryption {
Expand All @@ -110,7 +110,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -122,7 +122,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(createTestConfig(config), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand All @@ -141,7 +141,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger)
iwfService := api.NewService(createTestConfig(config), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -153,7 +153,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(createTestConfig(config), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand Down
10 changes: 10 additions & 0 deletions integ/workflow/basic/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package basic
import (
"github.com/gin-gonic/gin"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ"
"github.com/indeedeng/iwf/service"
"log"
"net/http"
Expand All @@ -26,6 +27,10 @@ func NewHandler() *handler {

// ApiV1WorkflowStartPost - for a workflow
func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
val := c.GetHeader(integ.TestHeaderKey)
if val != integ.TestHeaderValue {
panic("test header not found")
}
var req iwfidl.WorkflowStateStartRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
Expand Down Expand Up @@ -55,6 +60,11 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
}

func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
val := c.GetHeader(integ.TestHeaderKey)
if val != integ.TestHeaderValue {
panic("test header not found")
}

var req iwfidl.WorkflowStateDecideRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
Expand Down
1 change: 1 addition & 0 deletions service/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
// default is http://localhost:ApiConfig.Port
ApiServiceAddress string `json:"serviceAddress"`
DumpWorkflowInternalActivityConfig *DumpWorkflowInternalActivityConfig `json:"dumpWorkflowInternalActivityConfig"`
DefaultHeader map[string]string `json:"defaultHeader"`
}

DumpWorkflowInternalActivityConfig struct {
Expand Down
15 changes: 12 additions & 3 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
)

// StateStart is Deprecated, will be removed in next release
func StateStart(ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput) (*iwfidl.WorkflowStateStartResponse, error) {
func StateStart(
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput,
) (*iwfidl.WorkflowStateStartResponse, error) {
return StateApiWaitUntil(ctx, backendType, input)
}

Expand All @@ -30,7 +32,9 @@ func StateApiWaitUntil(
logger.Info("StateStartActivity", "input", input)
iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)

svcCfg := env.GetSharedConfig()
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeader,
Servers: []iwfidl.ServerConfiguration{
{
URL: iwfWorkerBaseUrl,
Expand Down Expand Up @@ -80,7 +84,9 @@ func StateApiExecute(
logger.Info("StateDecideActivity", "input", input)

iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)
svcCfg := env.GetSharedConfig()
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
DefaultHeader: svcCfg.Interpreter.InterpreterActivityConfig.DefaultHeader,
Servers: []iwfidl.ServerConfiguration{
{
URL: iwfWorkerBaseUrl,
Expand Down Expand Up @@ -193,7 +199,9 @@ func checkCommandRequestFromWaitUntilResponse(resp *iwfidl.WorkflowStateStartRes
return nil
}

func DumpWorkflowInternal(ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error) {
func DumpWorkflowInternal(
ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest,
) (*iwfidl.WorkflowDumpResponse, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("DumpWorkflowInternal", "input", req)
Expand All @@ -217,7 +225,8 @@ func DumpWorkflowInternal(ctx context.Context, backendType service.BackendType,
}

func InvokeWorkerRpc(
ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse, req iwfidl.WorkflowRpcRequest,
ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse,
req iwfidl.WorkflowRpcRequest,
) (*InvokeRpcActivityOutput, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
Expand Down
Loading