Skip to content

Commit

Permalink
Trigger history node validation & trimming logic within write path (#…
Browse files Browse the repository at this point in the history
…2379)

* Trigger history node validation & trimming logic for the following APIs (write path)
  * CreateWorkflowExecution: no auto fix since this API is creating a brand new workflow
  * ConflictResolveWorkflowExecution: auto fix for reset workflow & current workflow
  * UpdateWorkflowExecution: auto fix for update workflow
  • Loading branch information
wxing1292 authored Jan 14, 2022
1 parent 773e18a commit 71093d7
Showing 1 changed file with 159 additions and 60 deletions.
219 changes: 159 additions & 60 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"

historyspb "go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/versionhistory"
)
Expand Down Expand Up @@ -124,42 +126,43 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
request *UpdateWorkflowExecutionRequest,
) (*UpdateWorkflowExecutionResponse, error) {

updateMutation := request.UpdateWorkflowMutation
newSnapshot := request.NewWorkflowSnapshot

updateWorkflowNewEvents, updateWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.UpdateWorkflowEvents)
if err != nil {
return nil, err
}
request.UpdateWorkflowMutation.ExecutionInfo.ExecutionStats.HistorySize += int64(updateWorkflowHistoryDiff.SizeDiff)
updateMutation.ExecutionInfo.ExecutionStats.HistorySize += int64(updateWorkflowHistoryDiff.SizeDiff)

newWorkflowNewEvents, newWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.NewWorkflowEvents)
if err != nil {
return nil, err
}
if request.NewWorkflowSnapshot != nil {
request.NewWorkflowSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newWorkflowHistoryDiff.SizeDiff)
if newSnapshot != nil {
newSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newWorkflowHistoryDiff.SizeDiff)
}

mutation := request.UpdateWorkflowMutation
newSnapshot := request.NewWorkflowSnapshot
if err := ValidateUpdateWorkflowModeState(
request.Mode,
mutation,
updateMutation,
newSnapshot,
); err != nil {
return nil, err
}
if err := ValidateUpdateWorkflowStateStatus(
mutation.ExecutionState.State,
mutation.ExecutionState.Status,
updateMutation.ExecutionState.State,
updateMutation.ExecutionState.Status,
); err != nil {
return nil, err
}

serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&mutation)
serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&updateMutation)
if err != nil {
return nil, err
}
var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot
if request.NewWorkflowSnapshot != nil {
if newSnapshot != nil {
serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(newSnapshot)
if err != nil {
return nil, err
Expand All @@ -178,70 +181,87 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
NewWorkflowNewEvents: newWorkflowNewEvents,
}

if err := m.persistence.UpdateWorkflowExecution(newRequest); err != nil {
err = m.persistence.UpdateWorkflowExecution(newRequest)
switch err.(type) {
case nil:
return &UpdateWorkflowExecutionResponse{
UpdateMutableStateStats: *statusOfInternalWorkflowMutation(
&newRequest.UpdateWorkflowMutation,
updateWorkflowHistoryDiff,
),
NewMutableStateStats: statusOfInternalWorkflowSnapshot(
newRequest.NewWorkflowSnapshot,
newWorkflowHistoryDiff,
),
}, nil
case *CurrentWorkflowConditionFailedError,
*WorkflowConditionFailedError,
*ConditionFailedError:
m.trimHistoryNode(
request.ShardID,
updateMutation.ExecutionInfo.NamespaceId,
updateMutation.ExecutionInfo.WorkflowId,
updateMutation.ExecutionState.RunId,
)
return nil, err
default:
return nil, err
}
return &UpdateWorkflowExecutionResponse{
UpdateMutableStateStats: *statusOfInternalWorkflowMutation(
&newRequest.UpdateWorkflowMutation,
updateWorkflowHistoryDiff,
),
NewMutableStateStats: statusOfInternalWorkflowSnapshot(
newRequest.NewWorkflowSnapshot,
newWorkflowHistoryDiff,
),
}, nil
}

func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
request *ConflictResolveWorkflowExecutionRequest,
) (*ConflictResolveWorkflowExecutionResponse, error) {

resetSnapshot := request.ResetWorkflowSnapshot
newSnapshot := request.NewWorkflowSnapshot
currentMutation := request.CurrentWorkflowMutation

resetWorkflowEventsNewEvents, resetWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.ResetWorkflowEvents)
if err != nil {
return nil, err
}
request.ResetWorkflowSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(resetWorkflowHistoryDiff.SizeDiff)
resetSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(resetWorkflowHistoryDiff.SizeDiff)

newWorkflowEventsNewEvents, newWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.NewWorkflowEvents)
if err != nil {
return nil, err
}
if request.NewWorkflowSnapshot != nil {
request.NewWorkflowSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newWorkflowHistoryDiff.SizeDiff)
if newSnapshot != nil {
newSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newWorkflowHistoryDiff.SizeDiff)
}

currentWorkflowEventsNewEvents, currentWorkflowHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.CurrentWorkflowEvents)
if err != nil {
return nil, err
}
if request.CurrentWorkflowMutation != nil {
request.CurrentWorkflowMutation.ExecutionInfo.ExecutionStats.HistorySize += int64(currentWorkflowHistoryDiff.SizeDiff)
if currentMutation != nil {
currentMutation.ExecutionInfo.ExecutionStats.HistorySize += int64(currentWorkflowHistoryDiff.SizeDiff)
}

if err := ValidateConflictResolveWorkflowModeState(
request.Mode,
request.ResetWorkflowSnapshot,
request.NewWorkflowSnapshot,
request.CurrentWorkflowMutation,
resetSnapshot,
newSnapshot,
currentMutation,
); err != nil {
return nil, err
}

serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot)
serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&resetSnapshot)
if err != nil {
return nil, err
}
var serializedCurrentWorkflowMutation *InternalWorkflowMutation
if request.CurrentWorkflowMutation != nil {
serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation)
if currentMutation != nil {
serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(currentMutation)
if err != nil {
return nil, err
}
}
var serializedNewWorkflowMutation *InternalWorkflowSnapshot
if request.NewWorkflowSnapshot != nil {
serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot)
if newSnapshot != nil {
serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(newSnapshot)
if err != nil {
return nil, err
}
Expand All @@ -264,50 +284,70 @@ func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
}

err = m.persistence.ConflictResolveWorkflowExecution(newRequest)
if err != nil {
switch err.(type) {
case nil:
return &ConflictResolveWorkflowExecutionResponse{
ResetMutableStateStats: *statusOfInternalWorkflowSnapshot(
&newRequest.ResetWorkflowSnapshot,
resetWorkflowHistoryDiff,
),
NewMutableStateStats: statusOfInternalWorkflowSnapshot(
newRequest.NewWorkflowSnapshot,
newWorkflowHistoryDiff,
),
CurrentMutableStateStats: statusOfInternalWorkflowMutation(
newRequest.CurrentWorkflowMutation,
currentWorkflowHistoryDiff,
),
}, nil
case *CurrentWorkflowConditionFailedError,
*WorkflowConditionFailedError,
*ConditionFailedError:
m.trimHistoryNode(
request.ShardID,
resetSnapshot.ExecutionInfo.NamespaceId,
resetSnapshot.ExecutionInfo.WorkflowId,
resetSnapshot.ExecutionState.RunId,
)
if currentMutation != nil {
m.trimHistoryNode(
request.ShardID,
currentMutation.ExecutionInfo.NamespaceId,
currentMutation.ExecutionInfo.WorkflowId,
currentMutation.ExecutionState.RunId,
)
}
return nil, err
default:
return nil, err
}
return &ConflictResolveWorkflowExecutionResponse{
ResetMutableStateStats: *statusOfInternalWorkflowSnapshot(
&newRequest.ResetWorkflowSnapshot,
resetWorkflowHistoryDiff,
),
NewMutableStateStats: statusOfInternalWorkflowSnapshot(
newRequest.NewWorkflowSnapshot,
newWorkflowHistoryDiff,
),
CurrentMutableStateStats: statusOfInternalWorkflowMutation(
newRequest.CurrentWorkflowMutation,
currentWorkflowHistoryDiff,
),
}, nil
}

func (m *executionManagerImpl) CreateWorkflowExecution(
request *CreateWorkflowExecutionRequest,
) (*CreateWorkflowExecutionResponse, error) {

newSnapshot := request.NewWorkflowSnapshot
newWorkflowNewEvents, newHistoryDiff, err := m.serializeWorkflowEventBatches(request.ShardID, request.NewWorkflowEvents)
if err != nil {
return nil, err
}
request.NewWorkflowSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newHistoryDiff.SizeDiff)
newSnapshot.ExecutionInfo.ExecutionStats.HistorySize += int64(newHistoryDiff.SizeDiff)

snapshot := request.NewWorkflowSnapshot
if err := ValidateCreateWorkflowModeState(
request.Mode,
snapshot,
newSnapshot,
); err != nil {
return nil, err
}
if err := ValidateCreateWorkflowStateStatus(
snapshot.ExecutionState.State,
snapshot.ExecutionState.Status,
newSnapshot.ExecutionState.State,
newSnapshot.ExecutionState.Status,
); err != nil {
return nil, err
}

serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&snapshot)
serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&newSnapshot)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -487,7 +527,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
return nil, err
}
}
result.LastWriteVersion, err = getLastWriteVersion(input.ExecutionInfo.VersionHistories)
result.LastWriteVersion, err = getCurrentBranchLastWriteVersion(input.ExecutionInfo.VersionHistories)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -554,7 +594,7 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
if err != nil {
return nil, err
}
result.LastWriteVersion, err = getLastWriteVersion(input.ExecutionInfo.VersionHistories)
result.LastWriteVersion, err = getCurrentBranchLastWriteVersion(input.ExecutionInfo.VersionHistories)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -937,6 +977,52 @@ func (m *executionManagerImpl) Close() {
m.persistence.Close()
}

func (m *executionManagerImpl) trimHistoryNode(
shardID int32,
namespaceID string,
workflowID string,
runID string,
) {
response, err := m.GetWorkflowExecution(&GetWorkflowExecutionRequest{
ShardID: shardID,
NamespaceID: namespaceID,
WorkflowID: workflowID,
RunID: runID,
})
if err != nil {
m.logger.Error("ExecutionManager unable to get mutable state for trimming history branch",
tag.WorkflowNamespaceID(namespaceID),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.Error(err),
)
return // best effort trim
}

executionInfo := response.State.ExecutionInfo
branchToken, err := getCurrentBranchToken(executionInfo.VersionHistories)
if err != nil {
return
}
mutableStateLastNodeID := executionInfo.LastFirstEventId
mutableStateLastNodeTransactionID := executionInfo.LastFirstEventTxnId
if _, err := m.TrimHistoryBranch(&TrimHistoryBranchRequest{
ShardID: shardID,
BranchToken: branchToken,
NodeID: mutableStateLastNodeID,
TransactionID: mutableStateLastNodeTransactionID,
}); err != nil {
// best effort trim
m.logger.Error("ExecutionManager unable to trim history branch",
tag.WorkflowNamespaceID(namespaceID),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.Error(err),
)
return
}
}

func (m *executionManagerImpl) toWorkflowMutableState(internState *InternalWorkflowMutableState) (*persistencespb.WorkflowMutableState, error) {
state := &persistencespb.WorkflowMutableState{
ActivityInfos: make(map[int64]*persistencespb.ActivityInfo),
Expand Down Expand Up @@ -1010,14 +1096,27 @@ func (m *executionManagerImpl) toWorkflowMutableState(internState *InternalWorkf
return state, nil
}

func getLastWriteVersion(
func getCurrentBranchToken(
versionHistories *historyspb.VersionHistories,
) (int64, error) {
) ([]byte, error) {
// TODO remove this if check once legacy execution tests are removed
if versionHistories == nil {
return nil, serviceerror.NewInternal("version history is empty")
}
versionHistory, err := versionhistory.GetCurrentVersionHistory(versionHistories)
if err != nil {
return nil, err
}
return versionHistory.BranchToken, nil
}

func getCurrentBranchLastWriteVersion(
versionHistories *historyspb.VersionHistories,
) (int64, error) {
// TODO remove this if check once legacy execution tests are removed
if versionHistories == nil {
return common.EmptyVersion, nil
}

versionHistory, err := versionhistory.GetCurrentVersionHistory(versionHistories)
if err != nil {
return 0, err
Expand Down

0 comments on commit 71093d7

Please sign in to comment.