Skip to content

Commit

Permalink
Add more test for namespace migration (#2450)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Feb 3, 2022
1 parent 9454db2 commit d2d1e3d
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 6 deletions.
62 changes: 61 additions & 1 deletion host/ndc/ndc_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *nDCIntegrationTestSuite) TestSingleBranch() {
// active has initial version 0
historyClient := s.active.GetHistoryClient()

versions := []int64{102, 2, 202, 302, 402, 602, 502, 802, 1002, 902, 702, 1102}
versions := []int64{0, 102, 2, 202, 302, 402, 602, 502, 802, 1002, 902, 702, 1102}
for _, version := range versions {
runID := uuid.New()
var historyBatch []*historypb.History
Expand Down Expand Up @@ -373,6 +373,66 @@ func (s *nDCIntegrationTestSuite) TestMultipleBranches() {
}
}

func (s *nDCIntegrationTestSuite) TestEmptyVersionAndNonEmptyVersion() {
workflowID := "ndc-migration-test" + uuid.New()

workflowType := "event-generator-workflow-type"
taskqueue := "event-generator-taskQueue"

// active has initial version 0
historyClient := s.active.GetHistoryClient()

runID := uuid.New()

version := common.EmptyVersion
var baseBranch []*historypb.History
baseGenerator := test.InitializeHistoryEventGenerator(s.namespace, version)
baseGenerator.SetVersion(version)

for i := 0; i < 10 && baseGenerator.HasNextVertex(); i++ {
events := baseGenerator.GetNextVertices()
historyEvents := &historypb.History{}
for _, event := range events {
historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent))
}
baseBranch = append(baseBranch, historyEvents)
}
baseVersionHistory := s.eventBatchesToVersionHistory(nil, baseBranch)

var branch1 []*historypb.History
branchVersionHistory1 := versionhistory.CopyVersionHistory(baseVersionHistory)
branchGenerator1 := baseGenerator.DeepCopy()
branchGenerator1.SetVersion(2)
for i := 0; i < 10 && branchGenerator1.HasNextVertex(); i++ {
events := branchGenerator1.GetNextVertices()
historyEvents := &historypb.History{}
for _, event := range events {
historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent))
}
branch1 = append(branch1, historyEvents)
}
branchVersionHistory1 = s.eventBatchesToVersionHistory(branchVersionHistory1, branch1)

s.applyEvents(
workflowID,
runID,
workflowType,
taskqueue,
baseVersionHistory,
baseBranch,
historyClient,
)
s.applyEvents(
workflowID,
runID,
workflowType,
taskqueue,
branchVersionHistory1,
branch1,
historyClient,
)
}

func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() {

s.setupRemoteFrontendClients()
Expand Down
93 changes: 88 additions & 5 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"testing"
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/server/api/adminservice/v1"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -1993,6 +1994,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
err := workflow.Sleep(ctx, sleepInterval)
return err
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()

Expand Down Expand Up @@ -2022,6 +2024,78 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.NotEmpty(run2.GetRunID())
s.logger.Info("start wf2", tag.WorkflowRunID(run2.GetRunID()))

// Start wf6 (start in local ns, with buffered event when ns is promoted, close in global ns)
workflowID6 := "local-ns-wf-buffered-events"
sigReadyToSendChan := make(chan struct{}, 1)
sigSendDoneChan := make(chan struct{})
localActivityFn := func(ctx context.Context) error {
// to unblock signal sending, so signal is send after first workflow task started.
select {
case sigReadyToSendChan <- struct{}{}:
default:
}

// this will block workflow task and cause the signal to become buffered event
select {
case <-sigSendDoneChan:
case <-ctx.Done():
}

return nil
}

var receivedSig string
wfWithBufferedEvents := func(ctx workflow.Context) error {
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn)
err1 := f1.Get(ctx1, nil)
if err1 != nil {
return err1
}

sigCh := workflow.GetSignalChannel(ctx, "signal-name")

for {
var sigVal string
ok := sigCh.ReceiveAsync(&sigVal)
if !ok {
break
}
receivedSig = sigVal
}

return nil
}
worker1.RegisterWorkflow(wfWithBufferedEvents)

workflowOptions := sdkclient.StartWorkflowOptions{
ID: workflowID6,
TaskQueue: taskqueue,
// Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not
// correctly dispatched, it would time out which would fail the workflow and cause test to fail.
WorkflowTaskTimeout: 30 * time.Second,
WorkflowRunTimeout: 30 * time.Second,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
run6, err := client1.ExecuteWorkflow(ctx, workflowOptions, wfWithBufferedEvents)
s.NoError(err)
s.NotNil(run6)
s.True(run6.GetRunID() != "")

// block until first workflow task started
select {
case <-sigReadyToSendChan:
case <-ctx.Done():
}

// this signal will become buffered event
err = client1.SignalWorkflow(ctx, workflowID6, run6.GetRunID(), "signal-name", "signal-value")
s.NoError(err)

// promote ns
frontendClient1 := s.cluster1.GetFrontendClient()
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Expand All @@ -2035,11 +2109,6 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(1, len(nsResp.ReplicationConfig.Clusters))

// wait until wf2 complete
err = run2.Get(context.Background(), nil)
s.NoError(err)

// update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
Expand All @@ -2048,6 +2117,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
},
})
s.NoError(err)

// wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

Expand All @@ -2058,6 +2128,18 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
s.True(nsResp.IsGlobalNamespace)
s.Equal(2, len(nsResp.ReplicationConfig.Clusters))

// namespace update completed, now resume wf6 (bufferedEvent workflow)
close(sigSendDoneChan)

// wait until wf2 complete
err = run2.Get(context.Background(), nil)
s.NoError(err)

// wait until wf6 complete
err = run6.Get(context.Background(), nil)
s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here
s.Equal("signal-value", receivedSig)

// start wf3 (start in global ns)
workflowID3 := "local-ns-wf-3"
run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
Expand Down Expand Up @@ -2132,6 +2214,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
verify(workflowID, run1.GetRunID())
verify(workflowID2, run2.GetRunID())
verify(workflowID3, run3.GetRunID())
verify(workflowID6, run6.GetRunID())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
Expand Down

0 comments on commit d2d1e3d

Please sign in to comment.