Skip to content

Commit

Permalink
Fix fake cluster for empty version
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc committed Jan 21, 2022
1 parent 0ff1b16 commit 6c4376b
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 48 deletions.
16 changes: 14 additions & 2 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ const (
FakeClusterForEmptyVersion = "fake-cluster-for-empty-version"
)

var fakeClusterInfo = ClusterInformation{
Enabled: true,
InitialFailoverVersion: 0,
RPCAddress: "",
}

type (
Metadata interface {
common.Daemon
Expand Down Expand Up @@ -166,6 +172,9 @@ func NewMetadata(
for k, v := range clusterInfo {
copyClusterInfo[k] = v
}

copyClusterInfo[FakeClusterForEmptyVersion] = fakeClusterInfo

return &metadataImpl{
status: common.DaemonStatusInitialized,
enableGlobalNamespace: enableGlobalNamespace,
Expand Down Expand Up @@ -450,7 +459,8 @@ func (m *metadataImpl) updateFailoverVersionToClusterName() {
func updateVersionToClusterName(clusterInfo map[string]ClusterInformation, failoverVersionIncrement int64) map[int64]string {
versionToClusterName := make(map[int64]string)
for clusterName, info := range clusterInfo {
if failoverVersionIncrement <= info.InitialFailoverVersion || info.InitialFailoverVersion <= 0 {
if clusterName != FakeClusterForEmptyVersion &&
(failoverVersionIncrement <= info.InitialFailoverVersion || info.InitialFailoverVersion <= 0) {
panic(fmt.Sprintf(
"Version increment %v is smaller than initial version: %v.",
failoverVersionIncrement,
Expand All @@ -462,7 +472,7 @@ func updateVersionToClusterName(clusterInfo map[string]ClusterInformation, failo
}
versionToClusterName[info.InitialFailoverVersion] = clusterName

if info.Enabled && info.RPCAddress == "" {
if clusterName != FakeClusterForEmptyVersion && info.Enabled && info.RPCAddress == "" {
panic(fmt.Sprintf("Cluster %v: RPCAddress is empty", clusterName))
}
}
Expand Down Expand Up @@ -506,5 +516,7 @@ func (m *metadataImpl) listAllClusterMetadataFromDB() (map[string]*ClusterInform
version: getClusterResp.Version,
}
}
result[FakeClusterForEmptyVersion] = &fakeClusterInfo

return result, nil
}
4 changes: 2 additions & 2 deletions common/cluster/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (s *metadataSuite) Test_RegisterMetadataChangeCallback() {
s.metadata.RegisterMetadataChangeCallback(
id,
func(oldClusterMetadata map[string]*ClusterInformation, newClusterMetadata map[string]*ClusterInformation) {
s.Equal(2, len(newClusterMetadata))
s.Equal(3, len(newClusterMetadata))
})

s.metadata.UnRegisterMetadataChangeCallback(id)
Expand Down Expand Up @@ -224,5 +224,5 @@ func (s *metadataSuite) Test_ListAllClusterMetadataFromDB_Success() {

resp, err := s.metadata.listAllClusterMetadataFromDB()
s.NoError(err)
s.Equal(2, len(resp))
s.Equal(3, len(resp))
}
177 changes: 172 additions & 5 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
sdkclient "go.temporal.io/sdk/client"
sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
sw "go.temporal.io/server/service/worker"
"gopkg.in/yaml.v3"

"go.temporal.io/server/api/historyservice/v1"
Expand All @@ -69,6 +70,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/environment"
"go.temporal.io/server/host"
"go.temporal.io/server/service/worker/migration"
)

type (
Expand Down Expand Up @@ -1844,7 +1846,7 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() {

func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
namespace := "test-activity-heartbeat-workflow-failover-" + common.GenerateRandomString(5)
s.registerNamespace(namespace)
s.registerNamespace(namespace, true)

taskqueue := "integration-activity-heartbeat-workflow-failover-test-taskqueue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")
Expand Down Expand Up @@ -1949,6 +1951,167 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
s.Equal(2, lastAttemptCount)
}

func (s *integrationClustersTestSuite) printHistory(frontendClient workflowservice.WorkflowServiceClient, namespace, workflowID, runID string) {
events := s.getHistory(frontendClient, namespace, &commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
})
history := &historypb.History{Events: events}
common.PrettyPrintHistory(history, s.logger)
}

func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
namespace := "local-ns-to-be-promote-" + common.GenerateRandomString(5)
s.registerNamespace(namespace, false)

taskqueue := "integration-local-ns-to-be-promote-taskqueue"
client1, worker1 := s.newClientAndWorker(s.cluster1.GetHost().FrontendGRPCAddress(), namespace, taskqueue, "worker1")

testWorkflowFn := func(ctx workflow.Context, sleepInterval time.Duration) error {
err := workflow.Sleep(ctx, sleepInterval)
return err
}
worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()

// Start wf1 (in local ns)
workflowID := "local-ns-wf-1"
run1, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Millisecond*10)

s.NoError(err)
s.NotEmpty(run1.GetRunID())
s.logger.Info("start wf1", tag.WorkflowRunID(run1.GetRunID()))
// wait until wf1 complete
err = run1.Get(context.Background(), nil)
s.NoError(err)

// Start wf2 (start in local ns, and then promote to global ns, wf2 close in global ns)
workflowID2 := "local-ns-wf-2"
run2, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID2,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Second*15 /* longer than ns refresh */)
s.NoError(err)
s.NotEmpty(run2.GetRunID())
s.logger.Info("start wf2", tag.WorkflowRunID(run2.GetRunID()))

// promote ns
frontendClient1 := s.cluster1.GetFrontendClient()
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
PromoteNamespace: true,
})
s.NoError(err)
nsResp, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(1, len(nsResp.ReplicationConfig.Clusters))

// wait until wf2 complete
err = run2.Get(context.Background(), nil)
s.NoError(err)

// update ns to have 2 clusters
_, err = frontendClient1.UpdateNamespace(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
ReplicationConfig: &replicationpb.NamespaceReplicationConfig{
Clusters: clusterReplicationConfig,
},
})
s.NoError(err)
// wait for ns cache to pick up the change
time.Sleep(cacheRefreshInterval)

nsResp, err = frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp.IsGlobalNamespace)
s.Equal(2, len(nsResp.ReplicationConfig.Clusters))

// start wf3 (start in global ns)
workflowID3 := "local-ns-wf-3"
run3, err := client1.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID3,
TaskQueue: taskqueue,
WorkflowRunTimeout: time.Second * 30,
}, testWorkflowFn, time.Millisecond*10)
s.NoError(err)
s.NotEmpty(run3.GetRunID())
s.logger.Info("start wf3", tag.WorkflowRunID(run3.GetRunID()))
// wait until wf3 complete
err = run3.Get(context.Background(), nil)
s.NoError(err)

// start force-replicate wf
sysClient, err := sdkclient.NewClient(sdkclient.Options{
HostPort: s.cluster1.GetHost().FrontendGRPCAddress(),
Namespace: "temporal-system",
})
workflowID4 := "force-replication-wf-4"
run4, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID4,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "force-replication", migration.ForceReplicationParams{
Namespace: namespace,
RpsPerActivity: 10,
})

s.NoError(err)
err = run4.Get(context.Background(), nil)
s.NoError(err)

// start namespace-handover wf
workflowID5 := "namespace-handover-wf-5"
run5, err := sysClient.ExecuteWorkflow(host.NewContext(), sdkclient.StartWorkflowOptions{
ID: workflowID5,
TaskQueue: sw.DefaultWorkerTaskQueue,
WorkflowRunTimeout: time.Second * 30,
}, "namespace-handover", migration.NamespaceHandoverParams{
Namespace: namespace,
RemoteCluster: clusterName[1],
AllowedLaggingSeconds: 10,
HandoverTimeoutSeconds: 30,
})
s.NoError(err)
err = run5.Get(context.Background(), nil)
s.NoError(err)

// at this point ns migration is done.
// verify namespace is now active in cluster2
nsResp2, err := frontendClient1.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
})
s.NoError(err)
s.True(nsResp2.IsGlobalNamespace)
s.Equal(2, len(nsResp2.ReplicationConfig.Clusters))
s.Equal(clusterName[1], nsResp2.ReplicationConfig.ActiveClusterName)

// verify all wf in ns is now available in cluster2
client2, err := sdkclient.NewClient(sdkclient.Options{
HostPort: s.cluster2.GetHost().FrontendGRPCAddress(),
Namespace: namespace,
})
s.NoError(err)
verify := func(wfID string, expectedRunID string) {
desc1, err := client2.DescribeWorkflowExecution(host.NewContext(), wfID, "")
s.NoError(err)
s.Equal(expectedRunID, desc1.WorkflowExecutionInfo.Execution.RunId)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, desc1.WorkflowExecutionInfo.Status)
}
verify(workflowID, run1.GetRunID())
verify(workflowID2, run2.GetRunID())
verify(workflowID3, run3.GetRunID())
}

func (s *integrationClustersTestSuite) getHistory(client host.FrontendClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
historyResponse, err := client.GetWorkflowExecutionHistory(host.NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Expand Down Expand Up @@ -1996,12 +2159,16 @@ func (s *integrationClustersTestSuite) failover(
time.Sleep(cacheRefreshInterval)
}

func (s *integrationClustersTestSuite) registerNamespace(namespace string) {
func (s *integrationClustersTestSuite) registerNamespace(namespace string, isGlobalNamespace bool) {
clusters := clusterReplicationConfig
if !isGlobalNamespace {
clusters = clusterReplicationConfig[0:1]
}
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
IsGlobalNamespace: isGlobalNamespace,
Clusters: clusters,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
Expand All @@ -2017,7 +2184,7 @@ func (s *integrationClustersTestSuite) registerNamespace(namespace string) {
s.NoError(err)
s.NotNil(resp)
s.Equal(namespace, resp.NamespaceInfo.Name)
s.Equal(true, resp.IsGlobalNamespace)
s.Equal(isGlobalNamespace, resp.IsGlobalNamespace)
}

func (s *integrationClustersTestSuite) newClientAndWorker(hostport, namespace, taskqueue, identity string) (sdkclient.Client, sdkworker.Worker) {
Expand Down
3 changes: 0 additions & 3 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers(
timerTasks []tasks.Task,
) {

if clusterName == cluster.FakeClusterForEmptyVersion {
return
}
if clusterName == t.currentClusterName {
t.activeTimerProcessor.notifyNewTimers(timerTasks)
return
Expand Down
3 changes: 0 additions & 3 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ func (t *transferQueueProcessorImpl) NotifyNewTask(
transferTasks []tasks.Task,
) {

if clusterName == cluster.FakeClusterForEmptyVersion {
return
}
if clusterName == t.currentClusterName {
// we will ignore the current time passed in, since the active processor process task immediately
if len(transferTasks) != 0 {
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ func (c *ContextImpl) ConflictResolveWorkflowExecution(
newWorkflowEventsSeq,
currentWorkflow,
currentWorkflowEventsSeq,
resetMutableState.GetNamespaceEntry().IsGlobalNamespace(),
); err != nil {
return err
} else {
Expand Down Expand Up @@ -685,6 +686,7 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew(
currentWorkflowEventsSeq,
newWorkflow,
newWorkflowEventsSeq,
c.MutableState.GetNamespaceEntry().IsGlobalNamespace(),
); err != nil {
return err
} else {
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4266,7 +4266,7 @@ func (e *MutableStateImpl) closeTransactionWithPolicyCheck(
activeCluster := e.clusterMetadata.ClusterNameForFailoverVersion(e.namespaceEntry.IsGlobalNamespace(), e.GetCurrentVersion())
currentCluster := e.clusterMetadata.GetCurrentClusterName()

if activeCluster != currentCluster {
if activeCluster != currentCluster && activeCluster != cluster.FakeClusterForEmptyVersion {
namespaceID := e.GetExecutionInfo().NamespaceId
return serviceerror.NewNamespaceNotActive(namespaceID, currentCluster, activeCluster)
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
createMode persistence.CreateWorkflowMode,
newWorkflowSnapshot *persistence.WorkflowSnapshot,
newWorkflowEventsSeq []*persistence.WorkflowEvents,
isGlobalNamespace bool,
) (int64, error)

ConflictResolveWorkflowExecution(
Expand All @@ -45,6 +46,7 @@ type (
newWorkflowEventsSeq []*persistence.WorkflowEvents,
currentWorkflowMutation *persistence.WorkflowMutation,
currentWorkflowEventsSeq []*persistence.WorkflowEvents,
isGlobalNamespace bool,
) (int64, int64, int64, error)

UpdateWorkflowExecution(
Expand All @@ -53,6 +55,7 @@ type (
currentWorkflowEventsSeq []*persistence.WorkflowEvents,
newWorkflowSnapshot *persistence.WorkflowSnapshot,
newWorkflowEventsSeq []*persistence.WorkflowEvents,
isGlobalNamespace bool,
) (int64, int64, error)
}
)
Loading

0 comments on commit 6c4376b

Please sign in to comment.