Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply parentClosePolicy to child workflow only #2390

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
715 changes: 414 additions & 301 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ message RemoveSignalMutableStateResponse {
message TerminateWorkflowExecutionRequest {
string namespace_id = 1;
temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest terminate_request = 2;
temporal.api.common.v1.WorkflowExecution external_workflow_execution = 3;
bool child_workflow_only = 4;
}

message TerminateWorkflowExecutionResponse {
Expand Down
9 changes: 9 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
namespaceID := namespaceEntry.ID()

request := terminateRequest.TerminateRequest
parentExecution := terminateRequest.ExternalWorkflowExecution
childWorkflowOnly := terminateRequest.ChildWorkflowOnly
execution := commonpb.WorkflowExecution{
WorkflowId: request.WorkflowExecution.WorkflowId,
}
Expand Down Expand Up @@ -2228,6 +2230,13 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
return nil, consts.ErrWorkflowExecutionNotFound
}

if childWorkflowOnly {
if parentExecution.GetWorkflowId() != executionInfo.ParentWorkflowId ||
parentExecution.GetRunId() != executionInfo.ParentRunId {
return nil, consts.ErrWorkflowParent
}
}

eventBatchFirstEventID := mutableState.GetNextEventID()

return updateWorkflowWithoutWorkflowTask, workflow.TerminateWorkflow(
Expand Down
102 changes: 101 additions & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
Expand Down Expand Up @@ -784,14 +785,113 @@ func (s *engine2Suite) TestRequestCancelWorkflowExecution_NotFound() {
s.IsType(&serviceerror.NotFound{}, err)
}

func (s *engine2Suite) TestRequestCancelWorkflowExecution_ParentMismatch() {
namespaceID := tests.NamespaceID
workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "wId",
RunId: tests.RunID,
}
parentInfo := &workflowspb.ParentExecutionInfo{
NamespaceId: tests.ParentNamespaceID.String(),
Namespace: tests.ParentNamespace.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "parent wId",
RunId: "parent rId",
},
InitiatedId: 123,
}

identity := "testIdentity"
tl := "testTaskQueue"

msBuilder := s.createExecutionStartedStateWithParent(workflowExecution, tl, parentInfo, identity, false)
ms1 := workflow.TestCloneToProto(msBuilder)
gwmsResponse1 := &persistence.GetWorkflowExecutionResponse{State: ms1}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse1, nil)

err := s.historyEngine.RequestCancelWorkflowExecution(metrics.AddMetricsContext(context.Background()), &historyservice.RequestCancelWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowExecution.WorkflowId,
RunId: workflowExecution.RunId,
},
Identity: "identity",
},
ExternalWorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "unknown wId",
RunId: "unknown rId",
},
ChildWorkflowOnly: true,
})
s.Equal(consts.ErrWorkflowParent, err)
}

func (s *engine2Suite) TestTerminateWorkflowExecution_ParentMismatch() {
namespaceID := tests.NamespaceID
workflowExecution := commonpb.WorkflowExecution{
WorkflowId: "wId",
RunId: tests.RunID,
}
parentInfo := &workflowspb.ParentExecutionInfo{
NamespaceId: tests.ParentNamespaceID.String(),
Namespace: tests.ParentNamespace.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "parent wId",
RunId: "parent rId",
},
InitiatedId: 123,
}

identity := "testIdentity"
tl := "testTaskQueue"

msBuilder := s.createExecutionStartedStateWithParent(workflowExecution, tl, parentInfo, identity, false)
ms1 := workflow.TestCloneToProto(msBuilder)
currentExecutionResp := &persistence.GetCurrentExecutionResponse{
RunID: tests.RunID,
}
gwmsResponse1 := &persistence.GetWorkflowExecutionResponse{State: ms1}

s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any()).Return(currentExecutionResp, nil)
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse1, nil)

err := s.historyEngine.TerminateWorkflowExecution(metrics.AddMetricsContext(context.Background()), &historyservice.TerminateWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: workflowExecution.WorkflowId,
},
Identity: "identity",
FirstExecutionRunId: workflowExecution.RunId,
},
ExternalWorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "unknown wId",
RunId: "unknown rId",
},
ChildWorkflowOnly: true,
})
s.Equal(consts.ErrWorkflowParent, err)
}

func (s *engine2Suite) createExecutionStartedState(
we commonpb.WorkflowExecution, tl string,
identity string,
startWorkflowTask bool,
) workflow.MutableState {
return s.createExecutionStartedStateWithParent(we, tl, nil, identity, startWorkflowTask)
}

func (s *engine2Suite) createExecutionStartedStateWithParent(
we commonpb.WorkflowExecution, tl string,
parentInfo *workflowspb.ParentExecutionInfo,
identity string,
startWorkflowTask bool,
) workflow.MutableState {
msBuilder := workflow.TestLocalMutableState(s.historyEngine.shard, s.mockEventsCache, tests.LocalNamespaceEntry,
s.logger, we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity)
addWorkflowExecutionStartedEventWithParent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, parentInfo, identity)
di := addWorkflowTaskScheduledEvent(msBuilder)
if startWorkflowTask {
addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)
Expand Down
21 changes: 17 additions & 4 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,12 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution(
}
}

return t.processParentClosePolicy(namespaceID.String(), namespaceName.String(), children)
return t.processParentClosePolicy(
namespaceID.String(),
namespaceName.String(),
&execution,
children,
)
}

func (t *transferQueueActiveTaskExecutor) processCancelExecution(
Expand Down Expand Up @@ -1282,6 +1287,7 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(
func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
namespaceID string,
namespace string,
parentExecution *commonpb.WorkflowExecution,
childInfos map[int64]*persistencespb.ChildExecutionInfo,
) error {

Expand Down Expand Up @@ -1319,15 +1325,17 @@ func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
}

request := parentclosepolicy.Request{
Namespace: namespace,
NamespaceID: namespaceID,
Executions: executions,
Namespace: namespace,
NamespaceID: namespaceID,
ParentExecution: *parentExecution,
Executions: executions,
}
return t.parentClosePolicyClient.SendParentClosePolicyRequest(request)
}

for _, childInfo := range childInfos {
if err := t.applyParentClosePolicy(
parentExecution,
childInfo,
); err != nil {
if _, ok := err.(*serviceerror.NotFound); !ok {
Expand All @@ -1341,6 +1349,7 @@ func (t *transferQueueActiveTaskExecutor) processParentClosePolicy(
}

func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
parentExecution *commonpb.WorkflowExecution,
childInfo *persistencespb.ChildExecutionInfo,
) error {

Expand Down Expand Up @@ -1370,6 +1379,8 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
Reason: "by parent close policy",
Identity: consts.IdentityHistoryService,
},
ExternalWorkflowExecution: parentExecution,
ChildWorkflowOnly: true,
})
return err

Expand All @@ -1391,6 +1402,8 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy(
FirstExecutionRunId: childInfo.GetStartedRunId(),
Identity: consts.IdentityHistoryService,
},
ExternalWorkflowExecution: parentExecution,
ChildWorkflowOnly: true,
})
return err

Expand Down
26 changes: 23 additions & 3 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"

"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -910,8 +911,22 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig())
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockHistoryClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest, _ ...grpc.CallOption) (*historyservice.RequestCancelWorkflowExecutionResponse, error) {
s.True(request.GetChildWorkflowOnly())
s.Equal(execution.GetWorkflowId(), request.GetExternalWorkflowExecution().GetWorkflowId())
s.Equal(execution.GetRunId(), request.GetExternalWorkflowExecution().GetRunId())
return nil, nil
},
)
s.mockHistoryClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *historyservice.TerminateWorkflowExecutionRequest, _ ...grpc.CallOption) (*historyservice.TerminateWorkflowExecutionResponse, error) {
s.True(request.GetChildWorkflowOnly())
s.Equal(execution.GetWorkflowId(), request.GetExternalWorkflowExecution().GetWorkflowId())
s.Equal(execution.GetRunId(), request.GetExternalWorkflowExecution().GetRunId())
return nil, nil
},
)

s.mockNamespaceCache.EXPECT().GetNamespaceID(namespace.Name("child namespace2")).Return(namespace.ID("child namespace2 id"), nil)
s.mockNamespaceCache.EXPECT().GetNamespaceID(namespace.Name("child namespace3")).Return(namespace.ID("child namespace3 id"), nil)
Expand Down Expand Up @@ -1005,7 +1020,12 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen
persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion())
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewDisabledArchvialConfig())
s.mockParentClosePolicyClient.EXPECT().SendParentClosePolicyRequest(gomock.Any()).Return(nil)
s.mockParentClosePolicyClient.EXPECT().SendParentClosePolicyRequest(gomock.Any()).DoAndReturn(
func(request parentclosepolicy.Request) error {
s.Equal(execution, request.ParentExecution)
return nil
},
)

s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(tests.NamespaceID, nil).Times(10)

Expand Down
15 changes: 13 additions & 2 deletions service/worker/parentclosepolicy/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type (
// Deprecated: use Namespace in RequestDetail instead. Should be removed in 1.17
Namespace string
// Deprecated: use NamespaceID in RequestDetail instead. Should be removed in 1.17
NamespaceID string
Executions []RequestDetail
NamespaceID string
ParentExecution commonpb.WorkflowExecution
Executions []RequestDetail
}
)

Expand Down Expand Up @@ -106,6 +107,12 @@ func ProcessorWorkflow(ctx workflow.Context) error {
func ProcessorActivity(ctx context.Context, request Request) error {
processor := ctx.Value(processorContextKey).(*Processor)
client := processor.clientBean.GetHistoryClient()
// this is for backward compatibility
// ideally we should always have childWorkflowOnly = true
// however if ParentExecution is not specified, setting it to false
// will cause terminate or cancel request to return mismatch error
childWorkflowOnly := request.ParentExecution.GetWorkflowId() != "" &&
request.ParentExecution.GetRunId() != ""
for _, execution := range request.Executions {
namespaceId := execution.NamespaceID
if len(execution.NamespaceID) == 0 {
Expand Down Expand Up @@ -134,6 +141,8 @@ func ProcessorActivity(ctx context.Context, request Request) error {
Identity: processorWFTypeName,
FirstExecutionRunId: execution.RunID,
},
ExternalWorkflowExecution: &request.ParentExecution,
ChildWorkflowOnly: childWorkflowOnly,
})
case enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL:
_, err = client.RequestCancelWorkflowExecution(ctx, &historyservice.RequestCancelWorkflowExecutionRequest{
Expand All @@ -146,6 +155,8 @@ func ProcessorActivity(ctx context.Context, request Request) error {
Identity: processorWFTypeName,
FirstExecutionRunId: execution.RunID,
},
ExternalWorkflowExecution: &request.ParentExecution,
ChildWorkflowOnly: childWorkflowOnly,
})
}

Expand Down