From c23ca69e0f1c90b88acd0c23c99c603d6ada31a3 Mon Sep 17 00:00:00 2001 From: Yimin Chen Date: Tue, 1 Feb 2022 17:31:15 -0800 Subject: [PATCH] Add more test for namespace migration --- host/ndc/ndc_integration_test.go | 62 +++++++++++++++++- host/xdc/integration_failover_test.go | 93 +++++++++++++++++++++++++-- 2 files changed, 149 insertions(+), 6 deletions(-) diff --git a/host/ndc/ndc_integration_test.go b/host/ndc/ndc_integration_test.go index b4ff59f3132..02dc9f5a2bf 100644 --- a/host/ndc/ndc_integration_test.go +++ b/host/ndc/ndc_integration_test.go @@ -209,7 +209,7 @@ func (s *nDCIntegrationTestSuite) TestSingleBranch() { // active has initial version 0 historyClient := s.active.GetHistoryClient() - versions := []int64{102, 2, 202, 302, 402, 602, 502, 802, 1002, 902, 702, 1102} + versions := []int64{0, 102, 2, 202, 302, 402, 602, 502, 802, 1002, 902, 702, 1102} for _, version := range versions { runID := uuid.New() var historyBatch []*historypb.History @@ -373,6 +373,66 @@ func (s *nDCIntegrationTestSuite) TestMultipleBranches() { } } +func (s *nDCIntegrationTestSuite) TestEmptyVersionAndNonEmptyVersion() { + workflowID := "ndc-migration-test" + uuid.New() + + workflowType := "event-generator-workflow-type" + taskqueue := "event-generator-taskQueue" + + // active has initial version 0 + historyClient := s.active.GetHistoryClient() + + runID := uuid.New() + + version := common.EmptyVersion + var baseBranch []*historypb.History + baseGenerator := test.InitializeHistoryEventGenerator(s.namespace, version) + baseGenerator.SetVersion(version) + + for i := 0; i < 10 && baseGenerator.HasNextVertex(); i++ { + events := baseGenerator.GetNextVertices() + historyEvents := &historypb.History{} + for _, event := range events { + historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent)) + } + baseBranch = append(baseBranch, historyEvents) + } + baseVersionHistory := s.eventBatchesToVersionHistory(nil, baseBranch) + + var branch1 []*historypb.History + branchVersionHistory1 := versionhistory.CopyVersionHistory(baseVersionHistory) + branchGenerator1 := baseGenerator.DeepCopy() + branchGenerator1.SetVersion(2) + for i := 0; i < 10 && branchGenerator1.HasNextVertex(); i++ { + events := branchGenerator1.GetNextVertices() + historyEvents := &historypb.History{} + for _, event := range events { + historyEvents.Events = append(historyEvents.Events, event.GetData().(*historypb.HistoryEvent)) + } + branch1 = append(branch1, historyEvents) + } + branchVersionHistory1 = s.eventBatchesToVersionHistory(branchVersionHistory1, branch1) + + s.applyEvents( + workflowID, + runID, + workflowType, + taskqueue, + baseVersionHistory, + baseBranch, + historyClient, + ) + s.applyEvents( + workflowID, + runID, + workflowType, + taskqueue, + branchVersionHistory1, + branch1, + historyClient, + ) +} + func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() { s.setupRemoteFrontendClients() diff --git a/host/xdc/integration_failover_test.go b/host/xdc/integration_failover_test.go index 4401d6516d5..df8e492aad7 100644 --- a/host/xdc/integration_failover_test.go +++ b/host/xdc/integration_failover_test.go @@ -40,6 +40,7 @@ import ( "testing" "time" + "go.temporal.io/sdk/temporal" "go.temporal.io/server/api/adminservice/v1" "github.com/pborman/uuid" @@ -1993,6 +1994,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { err := workflow.Sleep(ctx, sleepInterval) return err } + worker1.RegisterWorkflow(testWorkflowFn) worker1.Start() @@ -2022,6 +2024,78 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { s.NotEmpty(run2.GetRunID()) s.logger.Info("start wf2", tag.WorkflowRunID(run2.GetRunID())) + // Start wf6 (start in local ns, with buffered event when ns is promoted, close in global ns) + workflowID6 := "local-ns-wf-buffered-events" + sigReadyToSendChan := make(chan struct{}, 1) + sigSendDoneChan := make(chan struct{}) + localActivityFn := func(ctx context.Context) error { + // to unblock signal sending, so signal is send after first workflow task started. + select { + case sigReadyToSendChan <- struct{}{}: + default: + } + + // this will block workflow task and cause the signal to become buffered event + select { + case <-sigSendDoneChan: + case <-ctx.Done(): + } + + return nil + } + + var receivedSig string + wfWithBufferedEvents := func(ctx workflow.Context) error { + ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + StartToCloseTimeout: 30 * time.Second, + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, + }) + f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn) + err1 := f1.Get(ctx1, nil) + if err1 != nil { + return err1 + } + + sigCh := workflow.GetSignalChannel(ctx, "signal-name") + + for { + var sigVal string + ok := sigCh.ReceiveAsync(&sigVal) + if !ok { + break + } + receivedSig = sigVal + } + + return nil + } + worker1.RegisterWorkflow(wfWithBufferedEvents) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: workflowID6, + TaskQueue: taskqueue, + // Intentionally use same timeout for WorkflowTaskTimeout and WorkflowRunTimeout so if workflow task is not + // correctly dispatched, it would time out which would fail the workflow and cause test to fail. + WorkflowTaskTimeout: 30 * time.Second, + WorkflowRunTimeout: 30 * time.Second, + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + run6, err := client1.ExecuteWorkflow(ctx, workflowOptions, wfWithBufferedEvents) + s.NoError(err) + s.NotNil(run6) + s.True(run6.GetRunID() != "") + + // block until first workflow task started + select { + case <-sigReadyToSendChan: + case <-ctx.Done(): + } + + // this signal will become buffered event + err = client1.SignalWorkflow(ctx, workflowID6, run6.GetRunID(), "signal-name", "signal-value") + s.NoError(err) + // promote ns frontendClient1 := s.cluster1.GetFrontendClient() _, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{ @@ -2035,11 +2109,6 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { s.NoError(err) s.True(nsResp.IsGlobalNamespace) s.Equal(1, len(nsResp.ReplicationConfig.Clusters)) - - // wait until wf2 complete - err = run2.Get(context.Background(), nil) - s.NoError(err) - // update ns to have 2 clusters _, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{ Namespace: namespace, @@ -2048,6 +2117,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { }, }) s.NoError(err) + // wait for ns cache to pick up the change time.Sleep(cacheRefreshInterval) @@ -2058,6 +2128,18 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { s.True(nsResp.IsGlobalNamespace) s.Equal(2, len(nsResp.ReplicationConfig.Clusters)) + // namespace update completed, now resume wf6 (bufferedEvent workflow) + close(sigSendDoneChan) + + // wait until wf2 complete + err = run2.Get(context.Background(), nil) + s.NoError(err) + + // wait until wf6 complete + err = run6.Get(context.Background(), nil) + s.NoError(err) // if new workflow task is not correctly dispatched, it would cause timeout error here + s.Equal("signal-value", receivedSig) + // start wf3 (start in global ns) workflowID3 := "local-ns-wf-3" run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{ @@ -2132,6 +2214,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() { verify(workflowID, run1.GetRunID()) verify(workflowID2, run2.GetRunID()) verify(workflowID3, run3.GetRunID()) + verify(workflowID6, run6.GetRunID()) } func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {