diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index cb201ec1df8..c1cc629794d 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -110,10 +110,12 @@ func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { other := tv.WithBuildId("other") go s.idlePollWorkflow(other, true, ver3MinPollTime, "other deployment should not receive pinned task") - s.waitForDeploymentDataPropagation(other, tqTypeWf) s.startWorkflow(tv, makePinnedOverride(tv.Deployment())) s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") + + // Sleeping to let the pollers arrive to server before ending the test. + time.Sleep(200 * time.Millisecond) //nolint:forbidigo }) } @@ -122,9 +124,11 @@ func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() { func() { tv := testvars.New(s) go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task") - s.waitForDeploymentDataPropagation(tv, tqTypeWf) s.startWorkflow(tv, nil) + + // Sleeping to let the pollers arrive to server before ending the test. + time.Sleep(200 * time.Millisecond) //nolint:forbidigo }) } @@ -145,6 +149,8 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { ver3MinPollTime, "old deployment should not receive unpinned task", ) + // Sleeping to let the pollers arrive to server before ending the test. + time.Sleep(200 * time.Millisecond) //nolint:forbidigo }, ) } @@ -178,6 +184,8 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { s.NotNil(task) return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil }) + // TODO (shahab): remove the waits once the following error is handled properly. + // "MultiOperation could not be executed: Start failed: Workflow was not started: StartReused" s.waitForDeploymentDataPropagation(tv, tqTypeWf) actCompleted := make(chan interface{}) @@ -239,6 +247,8 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil }) + // TODO (shahab): remove the waits once the following error is handled properly. + // "MultiOperation could not be executed: Start failed: Workflow was not started: StartReused" s.waitForDeploymentDataPropagation(tv, tqTypeWf) actCompleted := make(chan interface{}) @@ -249,7 +259,8 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { }) s.waitForDeploymentDataPropagation(tv, tqTypeAct) - s.updateTaskQueueDeploymentData(tv, 0, tqTypeWf, tqTypeAct) + s.setCurrentDeployment(d) + s.waitForDeploymentDataPropagation(tv, tqTypeWf, tqTypeAct) we := s.startWorkflow(tv, nil) @@ -516,6 +527,19 @@ func transitionTo(d *deploymentpb.Deployment) *workflow.DeploymentTransition { } } +func (s *Versioning3Suite) setCurrentDeployment( + deployment *deploymentpb.Deployment, +) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + _, err := s.FrontendClient().SetCurrentDeployment(ctx, + &workflowservice.SetCurrentDeploymentRequest{ + Namespace: s.Namespace(), + Deployment: deployment, + }) + s.NoError(err) +} + func (s *Versioning3Suite) updateTaskQueueDeploymentData( tv *testvars.TestVars, timeSinceCurrent time.Duration,