Skip to content

Commit

Permalink
Apply parentClosePolicy to child workflow only (#2390)
Browse files Browse the repository at this point in the history
- Add externalExecution and childWorkflowExecutionOnly field to history terminateWorkflowExecution request
- Use those fields when applying parent close policy
  • Loading branch information
yycptt authored Jan 20, 2022
1 parent 28f78d5 commit 07cda1b
Show file tree
Hide file tree
Showing 7 changed files with 579 additions and 311 deletions.
715 changes: 414 additions & 301 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ message RemoveSignalMutableStateResponse {
message TerminateWorkflowExecutionRequest {
string namespace_id = 1;
temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest terminate_request = 2;
temporal.api.common.v1.WorkflowExecution external_workflow_execution = 3;
bool child_workflow_only = 4;
}

message TerminateWorkflowExecutionResponse {
Expand Down
9 changes: 9 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
namespaceID := namespaceEntry.ID()

request := terminateRequest.TerminateRequest
parentExecution := terminateRequest.ExternalWorkflowExecution
childWorkflowOnly := terminateRequest.ChildWorkflowOnly
execution := commonpb.WorkflowExecution{
WorkflowId: request.WorkflowExecution.WorkflowId,
}
Expand Down Expand Up @@ -2228,6 +2230,13 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
return nil, consts.ErrWorkflowExecutionNotFound
}

if childWorkflowOnly {
if parentExecution.GetWorkflowId() != executionInfo.ParentWorkflowId ||
parentExecution.GetRunId() != executionInfo.ParentRunId {
return nil, consts.ErrWorkflowParent
}
}

eventBatchFirstEventID := mutableState.GetNextEventID()

return updateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
Expand Down
102 changes: 101 additions & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
Expand Down Expand Up @@ -784,14 +785,113 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecution_NotFound() {
s.IsType(&serviceerror.NotFound{}, err)
}

func (s *engine2Suite) TestRequestCancelWorkflowExecution_ParentMismatch() {
namespaceID := tests.NamespaceID
workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "wId",
RunId: tests.RunID,
}
parentInfo := &workflowspb.ParentExecutionInfo{
NamespaceId: tests.ParentNamespaceID.String(),
Namespace: tests.ParentNamespace.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "parent wId",
RunId: "parent rId",
},
InitiatedId: 123,
}

identity := "testIdentity"
tl := "testTaskQueue"

msBuilder := s.createExecutionStartedStateWithParent(workflowExecution, tl, parentInfo, identity, false)
ms1 := workflow.TestCloneToProto(msBuilder)
gwmsResponse1 := &persistence.GetWorkflowExecutionResponse{State: ms1}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse1, nil)

err := s.historyEngine.RequestCancelWorkflowExecution(metrics.AddMetricsContext(context.Background()), &historyservice.RequestCancelWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowExecution.WorkflowId,
RunId: workflowExecution.RunId,
},
Identity: "identity",
},
ExternalWorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "unknown wId",
RunId: "unknown rId",
},
ChildWorkflowOnly: true,
})
s.Equal(consts.ErrWorkflowParent, err)
}

func (s *engine2Suite) TestTerminateWorkflowExecution_ParentMismatch() {
namespaceID := tests.NamespaceID
workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "wId",
RunId: tests.RunID,
}
parentInfo := &workflowspb.ParentExecutionInfo{
NamespaceId: tests.ParentNamespaceID.String(),
Namespace: tests.ParentNamespace.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "parent wId",
RunId: "parent rId",
},
InitiatedId: 123,
}

identity := "testIdentity"
tl := "testTaskQueue"

msBuilder := s.createExecutionStartedStateWithParent(workflowExecution, tl, parentInfo, identity, false)
ms1 := workflow.TestCloneToProto(msBuilder)
currentExecutionResp := &persistence.GetCurrentExecutionResponse{
RunID: tests.RunID,
}
gwmsResponse1 := &persistence.GetWorkflowExecutionResponse{State: ms1}

s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any()).Return(currentExecutionResp, nil)
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse1, nil)

err := s.historyEngine.TerminateWorkflowExecution(metrics.AddMetricsContext(context.Background()), &historyservice.TerminateWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowExecution.WorkflowId,
},
Identity: "identity",
FirstExecutionRunId: workflowExecution.RunId,
},
ExternalWorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "unknown wId",
RunId: "unknown rId",
},
ChildWorkflowOnly: true,
})
s.Equal(consts.ErrWorkflowParent, err)
}

func (s *engine2Suite) createExecutionStartedState(
we commonpb.WorkflowExecution, tl string,
identity string,
startWorkflowTask bool,
) workflow.MutableState {
return s.createExecutionStartedStateWithParent(we, tl, nil, identity, startWorkflowTask)
}

func (s *engine2Suite) createExecutionStartedStateWithParent(
we commonpb.WorkflowExecution, tl string,
parentInfo *workflowspb.ParentExecutionInfo,
identity string,
startWorkflowTask bool,
) workflow.MutableState {
msBuilder := workflow.TestLocalMutableState(s.historyEngine.shard, s.mockEventsCache, tests.LocalNamespaceEntry,
s.logger, we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity)
addWorkflowExecutionStartedEventWithParent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, parentInfo, identity)
di := addWorkflowTaskScheduledEvent(msBuilder)
if startWorkflowTask {
addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)
Expand Down
21 changes: 17 additions & 4 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,12 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
}
}

return t.processParentClosePolicy(namespaceID.String(), namespaceName.String(), children)
return t.processParentClosePolicy(
namespaceID.String(),
namespaceName.String(),
&execution,
children,
)
}

func (t *transferQueueActiveTaskExecutor) processCancelExecution(
Expand Down Expand Up @@ -1282,6 +1287,7 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(
func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
namespaceID string,
namespace string,
parentExecution *commonpb.WorkflowExecution,
childInfos map[int64]*persistencespb.ChildExecutionInfo,
) error {

Expand Down Expand Up @@ -1319,15 +1325,17 @@ func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
}

request := parentclosepolicy.Request{
Namespace: namespace,
NamespaceID: namespaceID,
Executions: executions,
Namespace: namespace,
NamespaceID: namespaceID,
ParentExecution: *parentExecution,
Executions: executions,
}
return t.parentClosePolicyClient.SendParentClosePolicyRequest(request)
}

for _, childInfo := range childInfos {
if err := t.applyParentClosePolicy(
parentExecution,
childInfo,
); err != nil {
if _, ok := err.(*serviceerror.NotFound); !ok {
Expand All @@ -1341,6 +1349,7 @@ func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
}

func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
parentExecution *commonpb.WorkflowExecution,
childInfo *persistencespb.ChildExecutionInfo,
) error {

Expand Down Expand Up @@ -1370,6 +1379,8 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
Reason: "by parent close policy",
Identity: consts.IdentityHistoryService,
},
ExternalWorkflowExecution: parentExecution,
ChildWorkflowOnly: true,
})
return err

Expand All @@ -1391,6 +1402,8 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
FirstExecutionRunId: childInfo.GetStartedRunId(),
Identity: consts.IdentityHistoryService,
},
ExternalWorkflowExecution: parentExecution,
ChildWorkflowOnly: true,
})
return err

Expand Down
26 changes: 23 additions & 3 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"

"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -910,8 +911,22 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig())
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockHistoryClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest, _ ...grpc.CallOption) (*historyservice.RequestCancelWorkflowExecutionResponse, error) {
s.True(request.GetChildWorkflowOnly())
s.Equal(execution.GetWorkflowId(), request.GetExternalWorkflowExecution().GetWorkflowId())
s.Equal(execution.GetRunId(), request.GetExternalWorkflowExecution().GetRunId())
return nil, nil
},
)
s.mockHistoryClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.TerminateWorkflowExecutionRequest, _ ...grpc.CallOption) (*historyservice.TerminateWorkflowExecutionResponse, error) {
s.True(request.GetChildWorkflowOnly())
s.Equal(execution.GetWorkflowId(), request.GetExternalWorkflowExecution().GetWorkflowId())
s.Equal(execution.GetRunId(), request.GetExternalWorkflowExecution().GetRunId())
return nil, nil
},
)

s.mockNamespaceCache.EXPECT().GetNamespaceID(namespace.Name("child namespace2")).Return(namespace.ID("child namespace2 id"), nil)
s.mockNamespaceCache.EXPECT().GetNamespaceID(namespace.Name("child namespace3")).Return(namespace.ID("child namespace3 id"), nil)
Expand Down Expand Up @@ -1005,7 +1020,12 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig())
s.mockParentClosePolicyClient.EXPECT().SendParentClosePolicyRequest(gomock.Any()).Return(nil)
s.mockParentClosePolicyClient.EXPECT().SendParentClosePolicyRequest(gomock.Any()).DoAndReturn(
func(request parentclosepolicy.Request) error {
s.Equal(execution, request.ParentExecution)
return nil
},
)

s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(tests.NamespaceID, nil).Times(10)

Expand Down
15 changes: 13 additions & 2 deletions service/worker/parentclosepolicy/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type (
// Deprecated: use Namespace in RequestDetail instead. Should be removed in 1.17
Namespace string
// Deprecated: use NamespaceID in RequestDetail instead. Should be removed in 1.17
NamespaceID string
Executions []RequestDetail
NamespaceID string
ParentExecution commonpb.WorkflowExecution
Executions []RequestDetail
}
)

Expand Down Expand Up @@ -106,6 +107,12 @@ func ProcessorWorkflow(ctx workflow.Context) error {
func ProcessorActivity(ctx context.Context, request Request) error {
processor := ctx.Value(processorContextKey).(*Processor)
client := processor.clientBean.GetHistoryClient()
// this is for backward compatibility
// ideally we should always have childWorkflowOnly = true
// however if ParentExecution is not specified, setting it to false
// will cause terminate or cancel request to return mismatch error
childWorkflowOnly := request.ParentExecution.GetWorkflowId() != "" &&
request.ParentExecution.GetRunId() != ""
for _, execution := range request.Executions {
namespaceId := execution.NamespaceID
if len(execution.NamespaceID) == 0 {
Expand Down Expand Up @@ -134,6 +141,8 @@ func ProcessorActivity(ctx context.Context, request Request) error {
Identity: processorWFTypeName,
FirstExecutionRunId: execution.RunID,
},
ExternalWorkflowExecution: &request.ParentExecution,
ChildWorkflowOnly: childWorkflowOnly,
})
case enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL:
_, err = client.RequestCancelWorkflowExecution(ctx, &historyservice.RequestCancelWorkflowExecutionRequest{
Expand All @@ -146,6 +155,8 @@ func ProcessorActivity(ctx context.Context, request Request) error {
Identity: processorWFTypeName,
FirstExecutionRunId: execution.RunID,
},
ExternalWorkflowExecution: &request.ParentExecution,
ChildWorkflowOnly: childWorkflowOnly,
})
}

Expand Down

0 comments on commit 07cda1b

Please sign in to comment.