Skip to content

Commit

Permalink
Fix continueAsNew with large snapshot(>4MB) (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Dec 10, 2024
1 parent b9ce8b9 commit 1bed2dc
Show file tree
Hide file tree
Showing 22 changed files with 297 additions and 148 deletions.
26 changes: 0 additions & 26 deletions .github/workflows/ci-cadence-integ-test-disable-sticky.yml

This file was deleted.

9 changes: 5 additions & 4 deletions docker-compose/init-ci-cadence.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ yes | cadence adm cl asa --search_attr_key IwfExecutingStateIds --search_attr_ty
yes | cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 1


# see https://github.com/indeedeng/iwf/blob/main/CONTRIBUTING.md#option-3-run-with-your-own-cadence-service
echo "now sleep for 60s so that all the search attributes can take effect"

sleep 70
echo "After registering, it may take up 60s because of this issue. for Cadence to load the new search attributes."
echo "If run the test too early, you may see error: \"IwfWorkflowType is not a valid search attribute key\""
echo "and the test would fail with: unknown decision DecisionType: Activity, ID: 0, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition"
sleep 65

echo "now register the domain to tell the tests that Cadence is ready"
cadence --do default domain register

tail -f /dev/null
18 changes: 17 additions & 1 deletion integ/any_command_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ func TestAnyCommandCloseWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
}
}

func TestAnyCommandCloseWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
Expand All @@ -31,7 +39,15 @@ func TestAnyCommandCloseWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
}
}

func TestAnyCommandCloseWorkflowCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand Down
16 changes: 8 additions & 8 deletions integ/any_command_combination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ func TestAnyCommandCombinationWorkflowTemporal(t *testing.T) {
}
}

func TestAnyCommandCombinationWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}

func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
func TestAnyCommandCombinationWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
}
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: anycommandconbination.WorkflowType,
WorkflowTimeoutSeconds: 20,
WorkflowTimeoutSeconds: 40,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(anycommandconbination.State1),
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
Expand Down
2 changes: 1 addition & 1 deletion integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
},
QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 1,
MaximumAttempts: 5,
MaximumAttempts: 10,
},
},
Interpreter: config.Interpreter{
Expand Down
6 changes: 3 additions & 3 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT
panicAtHttpError(err, httpResp)

// workflow shouldn't executed any state
var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
var dump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
}
assertions.Equal(service.StateExecutionCounterInfo{
StateIdStartedCount: make(map[string]int),
StateIdCurrentlyExecutingCount: make(map[string]int),
TotalCurrentlyExecutingCount: 0,
}, dump.StateExecutionCounterInfo)
}, dump.Snapshot.StateExecutionCounterInfo)

// invoke an RPC to trigger the state execution
reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
Expand Down
119 changes: 119 additions & 0 deletions integ/large_data_attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package integ

import (
"context"
"fmt"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/signal"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/stretchr/testify/assert"
"net/http"
"strconv"
"strings"
"testing"
"time"
)

func TestLargeDataAttributesTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestLargeQueryAttributes(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0())
smallWaitForFastTest()
}
}

func doTestLargeQueryAttributes(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
if !*temporalIntegTest {
t.Skip()
}
assertions := assert.New(t)

// start test workflow server
wfHandler := signal.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

_, closeFunc2 := startIwfServiceWithClient(backendType)
defer closeFunc2()

wfId := signal.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: signal.WorkflowType,
WorkflowTimeoutSeconds: 86400,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(signal.State1),
// this is necessary for large DAs
// otherwise the workflow task will fail when trying to execute a stateAPI with data attributes >4MB
StateOptions: &signal.StateOptionsForLargeDataAttributes,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}).Execute()
panicAtHttpError(err, httpResp)

assertions.Equal(httpResp.StatusCode, http.StatusOK)

// Define the size of the string in bytes (1 MB = 1024 * 1024 bytes)
const size = 1024 * 1024

OneMbDataObject := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(strings.Repeat("a", size)),
}

// setting a large data object to test, especially continueAsNew
// because there is a 4MB limit for GRPC in temporal
setReq := apiClient.DefaultApi.ApiV1WorkflowDataobjectsSetPost(context.Background())
for i := 0; i < 5; i++ {

httpResp2, err := setReq.WorkflowSetDataObjectsRequest(iwfidl.WorkflowSetDataObjectsRequest{
WorkflowId: wfId,
Objects: []iwfidl.KeyValue{
{
Key: iwfidl.PtrString("large-data-object-" + strconv.Itoa(i)),
Value: &OneMbDataObject,
},
},
}).Execute()

panicAtHttpError(err, httpResp2)
}

// signal the workflow to complete
for i := 0; i < 4; i++ {
signalVal := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(fmt.Sprintf("test-data-%v", i)),
}

req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
httpResp2, err := req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: signal.SignalName,
SignalValue: &signalVal,
}).Execute()

panicAtHttpError(err, httpResp2)
}

// wait for the workflow
reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp, err = reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"time"
)

func TestSetQueryAttributes(t *testing.T) {
func TestSetDataAttributesTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
Expand Down Expand Up @@ -50,33 +50,34 @@ func TestSetQueryAttributes(t *testing.T) {

assertions.Equal(httpResp.StatusCode, http.StatusOK)

var signalVals []iwfidl.KeyValue
signalVals = append(signalVals, iwfidl.KeyValue{
Key: iwfidl.PtrString(persistence.TestDataObjectKey),
Value: &persistence.TestDataObjectVal1,
},
iwfidl.KeyValue{
smallDataObjects := []iwfidl.KeyValue{
{
Key: iwfidl.PtrString(persistence.TestDataObjectKey),
Value: &persistence.TestDataObjectVal1,
},
{
Key: iwfidl.PtrString(persistence.TestDataObjectKey2),
Value: &persistence.TestDataObjectVal2,
})
},
}

setReq := apiClient.DefaultApi.ApiV1WorkflowDataobjectsSetPost(context.Background())
httpResp2, err := setReq.WorkflowSetDataObjectsRequest(iwfidl.WorkflowSetDataObjectsRequest{
WorkflowId: wfId,
Objects: signalVals,
Objects: smallDataObjects,
}).Execute()

panicAtHttpError(err, httpResp2)

time.Sleep(time.Second)

getReq := apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(context.Background())
searchResult, httpRespGet, err := getReq.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
getResult, httpRespGet, err := getReq.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
persistence.TestDataObjectKey, persistence.TestDataObjectKey2,
}}).Execute()
panicAtHttpError(err, httpRespGet)

assertions.ElementsMatch(signalVals, searchResult.Objects)
assertions.ElementsMatch(smallDataObjects, getResult.Objects)
}
25 changes: 11 additions & 14 deletions integ/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
if config != nil {
expectedConfig = *config
}
assertions.Equal(service.DebugDumpResponse{
Config: expectedConfig,
}, debugDump)
assertions.Equal(expectedConfig, debugDump.Config)

// update the disable system SA
reqUpdateConfig := apiClient.DefaultApi.ApiV1WorkflowConfigUpdatePost(context.Background())
Expand All @@ -113,21 +111,22 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

if config != nil {
time.Sleep(2 * time.Second)
}
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
}
expectedConfig.DisableSystemSearchAttribute = iwfidl.PtrBool(true)
assertions.Equal(service.DebugDumpResponse{
Config: expectedConfig,
}, debugDump)
assertions.Equal(expectedConfig, debugDump.Config)

// update the pagination size
reqUpdateConfig = apiClient.DefaultApi.ApiV1WorkflowConfigUpdatePost(context.Background())
httpResp, err = reqUpdateConfig.WorkflowConfigUpdateRequest(iwfidl.WorkflowConfigUpdateRequest{
WorkflowId: wfId,
WorkflowConfig: iwfidl.WorkflowConfig{
ContinueAsNewPageSizeInBytes: iwfidl.PtrInt32(300),
ContinueAsNewPageSizeInBytes: iwfidl.PtrInt32(3000000),
},
}).Execute()
panicAtHttpError(err, httpResp)
Expand All @@ -136,10 +135,8 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
if err != nil {
panic(err)
}
expectedConfig.ContinueAsNewPageSizeInBytes = iwfidl.PtrInt32(300)
assertions.Equal(service.DebugDumpResponse{
Config: expectedConfig,
}, debugDump)
expectedConfig.ContinueAsNewPageSizeInBytes = iwfidl.PtrInt32(3000000)
assertions.Equal(expectedConfig, debugDump.Config)

// signal for testing unhandled signals
var unhandledSignalVals []*iwfidl.EncodedObject
Expand Down Expand Up @@ -225,12 +222,12 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
assertions.Equal(signalVals[i], data[fmt.Sprintf("signalValue%v", i)])
}

var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
var dump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
}
assertions.Equal(unhandledSignalVals, dump.SignalsReceived[signal.UnhandledSignalName])
assertions.Equal(unhandledSignalVals, dump.Snapshot.SignalsReceived[signal.UnhandledSignalName])
assertions.True(len(unhandledSignalVals) > 0)

if config == nil {
Expand Down
Loading

0 comments on commit 1bed2dc

Please sign in to comment.