Skip to content

Commit

Permalink
Add Mutable State X-Cluster RecordChildCompletion
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender committed Jul 30, 2021
1 parent 37706b2 commit cc5f267
Show file tree
Hide file tree
Showing 18 changed files with 1,746 additions and 475 deletions.
488 changes: 472 additions & 16 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

163 changes: 84 additions & 79 deletions .gen/proto/admin/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

163 changes: 84 additions & 79 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

757 changes: 673 additions & 84 deletions .gen/proto/shared/v1/queue.pb.go

Large diffs are not rendered by default.

163 changes: 84 additions & 79 deletions .gen/proto/shared/v1/queue.pb.yarpc.go

Large diffs are not rendered by default.

21 changes: 10 additions & 11 deletions common/authorization/oauthAutorizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ import (
type (
oauthSuite struct {
suite.Suite
logger *log.MockLogger
cfg config.OAuthAuthorizer
att Attributes
token string
controller *gomock.Controller
domainCache *cache.MockDomainCache
ctx context.Context
domainEntry *cache.DomainCacheEntry
logger *log.MockLogger
cfg config.OAuthAuthorizer
att Attributes
token string
controller *gomock.Controller
domainCache *cache.MockDomainCache
ctx context.Context
domainEntry *cache.DomainCacheEntry
}
)

Expand Down Expand Up @@ -88,12 +88,11 @@ func (s *oauthSuite) SetupTest() {

s.domainEntry = cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{
ID: "test-domain-id",
ID: "test-domain-id",
Name: "test-domain",
Data: map[string]string{
common.DomainDataKeyForReadGroups: "c",
},

},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
Expand Down Expand Up @@ -227,4 +226,4 @@ func (s *oauthSuite) TestIncorrectPermission() {
}))
result, _ := authorizer.Authorize(s.ctx, &s.att)
s.Equal(result.Decision, DecisionDeny)
}
}
2 changes: 1 addition & 1 deletion common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,4 @@ const StickyTaskConditionFailedErrorMsg = "StickyTaskConditionFailedError"
const MemoKeyForOperator = "operator"

// ReservedTaskListPrefix is the required naming prefix for any task list partition other than partition 0
const ReservedTaskListPrefix = "/__cadence_sys/"
const ReservedTaskListPrefix = "/__cadence_sys/"
229 changes: 116 additions & 113 deletions common/metrics/defs.go

Large diffs are not rendered by default.

60 changes: 60 additions & 0 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ const (
TransferTaskTypeRecordWorkflowStarted
TransferTaskTypeResetWorkflow
TransferTaskTypeUpsertWorkflowSearchAttributes
TransferTaskTypeRecordChildWorkflowExecutionComplete
)

// Types of cross-cluster tasks
const (
CrossClusterTaskTypeStartChildExecution = iota + 1
CrossClusterTaskTypeCancelExecution
CrossClusterTaskTypeSignalExecution
CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
)

// Types of replication tasks
Expand Down Expand Up @@ -566,6 +568,17 @@ type (
Version int64
}

// RecordChildCompletionTask identifies a task completing a child execution
RecordWorkflowExecutionCompleteTask struct {
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetWorkflowID string
TargetRunID string
InitiatedID int64
Version int64
}

// CrossClusterStartChildExecutionTask is the cross-cluster version of StartChildExecutionTask
CrossClusterStartChildExecutionTask struct {
StartChildExecutionTask
Expand All @@ -587,6 +600,13 @@ type (
TargetCluster string
}

// CrossClusterRecordChildWorkflowExecutionCompleteTask is the cross-cluster version of RecordChildCompletionTask
CrossClusterRecordChildWorkflowExecutionCompleteTask struct {
RecordWorkflowExecutionCompleteTask

TargetCluster string
}

// ActivityTimeoutTask identifies a timeout task.
ActivityTimeoutTask struct {
VisibilityTimestamp time.Time
Expand Down Expand Up @@ -2246,6 +2266,41 @@ func (u *SignalExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the signal transfer task
func (u *RecordWorkflowExecutionCompleteTask) GetType() int {
return TransferTaskTypeRecordChildWorkflowExecutionComplete
}

// GetVersion returns the version of the signal transfer task
func (u *RecordWorkflowExecutionCompleteTask) GetVersion() int64 {
return u.Version
}

// SetVersion returns the version of the signal transfer task
func (u *RecordWorkflowExecutionCompleteTask) SetVersion(version int64) {
u.Version = version
}

// GetTaskID returns the sequence ID of the signal transfer task.
func (u *RecordWorkflowExecutionCompleteTask) GetTaskID() int64 {
return u.TaskID
}

// SetTaskID sets the sequence ID of the signal transfer task.
func (u *RecordWorkflowExecutionCompleteTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *RecordWorkflowExecutionCompleteTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *RecordWorkflowExecutionCompleteTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the upsert search attributes transfer task
func (u *UpsertWorkflowSearchAttributesTask) GetType() int {
return TransferTaskTypeUpsertWorkflowSearchAttributes
Expand Down Expand Up @@ -2331,6 +2386,11 @@ func (c *CrossClusterSignalExecutionTask) GetType() int {
return CrossClusterTaskTypeSignalExecution
}

// GetType returns of type of the cross-cluster record child workflow completion task
func (c *CrossClusterRecordChildWorkflowExecutionCompleteTask) GetType() int {
return CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
}

// GetType returns the type of the history replication task
func (a *HistoryReplicationTask) GetType() int {
return ReplicationTaskTypeHistory
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(
targetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
scheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID

case p.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
targetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster
targetDomainID = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetDomainID
targetWorkflowID = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetWorkflowID
targetRunID = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetRunID
scheduleID = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).InitiatedID

default:
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/sql/sqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,13 @@ func createCrossClusterTasks(
info.TargetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
info.ScheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID

case p.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster
info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetDomainID)
info.TargetWorkflowID = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetWorkflowID
info.TargetRunID = serialization.MustParseUUID(task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetRunID)
info.ScheduleID = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).InitiatedID

default:
return &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
Expand Down
41 changes: 41 additions & 0 deletions common/types/mapper/proto/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,47 @@ func ToCrossClusterSignalExecutionResponseAttributes(t *sharedv1.CrossClusterSig
return &types.CrossClusterSignalExecutionResponseAttributes{}
}

// FromCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes converts internal CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes type to proto
func FromCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes(t *types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
if t == nil {
return nil
}
return &sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes{
TargetDomainId: t.TargetDomainID,
TargetWorkflowExecution: FromWorkflowRunPair(t.TargetWorkflowID, t.TargetRunID),
InitiatedEventId: t.InitiatedEventID,
}
}

// ToCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes converts proto CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes type to internal
func ToCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes(t *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) *types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
if t == nil {
return nil
}
return &types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes{
TargetDomainID: t.TargetDomainId,
TargetWorkflowID: ToWorkflowID(t.TargetWorkflowExecution),
TargetRunID: ToRunID(t.TargetWorkflowExecution),
InitiatedEventID: t.InitiatedEventId,
}
}

// FromCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes converts internal CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes type to proto
func FromCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes(t *types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
if t == nil {
return nil
}
return &sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes{}
}

// ToCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes converts proto CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes type to internal
func ToCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes(t *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) *types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
if t == nil {
return nil
}
return &types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes{}
}

// FromCrossClusterTaskRequest converts internal CrossClusterTaskRequest type to proto
func FromCrossClusterTaskRequest(t *types.CrossClusterTaskRequest) *sharedv1.CrossClusterTaskRequest {
if t == nil {
Expand Down
81 changes: 71 additions & 10 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10652,12 +10652,56 @@ func (v *CrossClusterSignalExecutionRequestAttributes) GetControl() (o []byte) {
type CrossClusterSignalExecutionResponseAttributes struct {
}

type CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes struct {
TargetDomainID string `json:"targetDomainID,omitempty"`
TargetWorkflowID string `json:"targetWorkflowID,omitempty"`
TargetRunID string `json:"targetRunID,omitempty"`
InitiatedEventID int64 `json:"initiatedEventID,omitempty"`
}

// GetTargetDomainID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetTargetDomainID() (o string) {
if v != nil {
return v.TargetDomainID
}
return
}

// GetTargetWorkflowID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetTargetWorkflowID() (o string) {
if v != nil {
return v.TargetWorkflowID
}
return
}

// GetTargetRunID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetTargetRunID() (o string) {
if v != nil {
return v.TargetRunID
}
return
}

// GetInitiatedEventID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetInitiatedEventID() (o int64) {
if v != nil {
return v.InitiatedEventID
}
return
}

// CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes is an internal type (TBD...)
type CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes struct {
}

// CrossClusterTaskRequest is an internal type (TBD...)
type CrossClusterTaskRequest struct {
TaskInfo *CrossClusterTaskInfo `json:"taskInfo,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionRequestAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionRequestAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionRequestAttributes `json:"signalExecutionAttributes,omitempty"`
TaskInfo *CrossClusterTaskInfo `json:"taskInfo,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionRequestAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionRequestAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionRequestAttributes `json:"signalExecutionAttributes,omitempty"`
RecordChildWorkflowExecutionCompleteAttributes *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes `json:"RecordChildWorkflowExecutionCompleteAttributes,omitempty"`
}

// GetTaskInfo is an internal getter (TBD...)
Expand Down Expand Up @@ -10692,14 +10736,23 @@ func (v *CrossClusterTaskRequest) GetSignalExecutionAttributes() (o *CrossCluste
return
}

// GetSignalExecutionAttributes is an internal getter (TBD...)
func (v *CrossClusterTaskRequest) GetRecordChildWorkflowExecutionCompleteAttributes() (o *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) {
if v != nil && v.RecordChildWorkflowExecutionCompleteAttributes != nil {
return v.RecordChildWorkflowExecutionCompleteAttributes
}
return
}

// CrossClusterTaskResponse is an internal type (TBD...)
type CrossClusterTaskResponse struct {
TaskID int64 `json:"taskID,omitempty"`
TaskType *CrossClusterTaskType `json:"taskType,omitempty"`
FailedCause *CrossClusterTaskFailedCause `json:"failedCause,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionResponseAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionResponseAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionResponseAttributes `json:"signalExecutionAttributes,omitempty"`
TaskID int64 `json:"taskID,omitempty"`
TaskType *CrossClusterTaskType `json:"taskType,omitempty"`
FailedCause *CrossClusterTaskFailedCause `json:"failedCause,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionResponseAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionResponseAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionResponseAttributes `json:"signalExecutionAttributes,omitempty"`
RecordChildWorkflowExecutionCompleteAttributes *CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes `json:"RecordChildWorkflowExecutionCompleteAttributes,omitempty"`
}

// GetTaskID is an internal getter (TBD...)
Expand Down Expand Up @@ -10750,6 +10803,14 @@ func (v *CrossClusterTaskResponse) GetSignalExecutionAttributes() (o *CrossClust
return
}

// GetSignalExecutionAttributes is an internal getter (TBD...)
func (v *CrossClusterTaskResponse) GetRecordChildWorkflowExecutionCompleteAttributes() (o *CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) {
if v != nil && v.RecordChildWorkflowExecutionCompleteAttributes != nil {
return v.RecordChildWorkflowExecutionCompleteAttributes
}
return
}

// GetCrossClusterTasksRequest is an internal type (TBD...)
type GetCrossClusterTasksRequest struct {
ShardIDs []int32 `json:"shardIDs,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from d4f57c to bc0bae
13 changes: 12 additions & 1 deletion proto/internal/uber/cadence/shared/v1/queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ message CrossClusterStartChildExecutionRequestAttributes {
string request_id = 2;
int64 initiated_event_id = 3;
api.v1.StartChildWorkflowExecutionInitiatedEventAttributes initiated_event_attributes = 4;
// targetRunID is for scheduling first decision task
// targetRunID is for scheduling first decision task
// targetWorkflowID is available in initiatedEventAttributes
string target_run_id = 5;
}
Expand Down Expand Up @@ -109,12 +109,22 @@ message CrossClusterSignalExecutionRequestAttributes {
message CrossClusterSignalExecutionResponseAttributes {
}

message CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
string target_domain_id = 1;
api.v1.WorkflowExecution target_workflow_execution = 2;
int64 initiated_event_id = 3;
}

message CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
}

message CrossClusterTaskRequest {
CrossClusterTaskInfo task_info = 1;
oneof attributes {
CrossClusterStartChildExecutionRequestAttributes startChildExecutionAttributes = 2;
CrossClusterCancelExecutionRequestAttributes cancelExecutionAttributes = 3;
CrossClusterSignalExecutionRequestAttributes signalExecutionAttributes = 4;
CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes recordChildWorkflowExecutionCompleteRequestAttributes = 5;
}
}

Expand All @@ -126,6 +136,7 @@ message CrossClusterTaskResponse {
CrossClusterStartChildExecutionResponseAttributes startChildExecutionAttributes = 4;
CrossClusterCancelExecutionResponseAttributes cancelExecutionAttributes = 5;
CrossClusterSignalExecutionResponseAttributes signalExecutionAttributes = 6;
CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes recordChildWorkflowExecutionCompleteRequestAttributes = 7;
}
}

Expand Down
13 changes: 12 additions & 1 deletion service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,18 @@ func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterTaskFromTransferTask
Version: task.Version,
},
}
// TODO: add the case for TransferTaskTypeCloseExecution
case persistence.TransferTaskTypeRecordChildWorkflowExecutionComplete:
crossClusterTask = &persistence.CrossClusterRecordChildWorkflowExecutionCompleteTask{
TargetCluster: targetCluster,
RecordWorkflowExecutionCompleteTask: persistence.RecordWorkflowExecutionCompleteTask{
// TaskID is set by shard context
TargetDomainID: task.TargetDomainID,
TargetWorkflowID: task.TargetWorkflowID,
TargetRunID: task.TargetRunID,
InitiatedID: task.ScheduleID,
Version: task.Version,
},
}
default:
return fmt.Errorf("unable to convert transfer task of type %v to cross-cluster task", task.TaskType)
}
Expand Down
Loading

0 comments on commit cc5f267

Please sign in to comment.