Skip to content

Commit

Permalink
Notify new tasks when workflow is potentially updated (#2334)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Dec 30, 2021
1 parent 633e456 commit 05f584c
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 30 deletions.
87 changes: 57 additions & 30 deletions service/history/workflow/transaction_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,29 @@ func (t *TransactionImpl) CreateWorkflowExecution(
newWorkflowEventsSeq []*persistence.WorkflowEvents,
) (int64, error) {

engine, err := t.shard.GetEngine()
if err != nil {
return 0, err
}
nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(newWorkflowSnapshot.ExecutionInfo.NamespaceId))
if err != nil {
return 0, err
}

resp, err := createWorkflowExecutionWithRetry(t.shard, &persistence.CreateWorkflowExecutionRequest{
ShardID: t.shard.GetShardID(),
// RangeID , this is set by shard context
Mode: createMode,
NewWorkflowSnapshot: *newWorkflowSnapshot,
NewWorkflowEvents: newWorkflowEventsSeq,
})
if err != nil {
return 0, err
if operationPossiblySucceeded(err) {
NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace())
}

engine, err := t.shard.GetEngine()
if err != nil {
return 0, err
}
nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(newWorkflowSnapshot.ExecutionInfo.NamespaceId))
if err != nil {
return 0, err
}
NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace())

if err := NotifyNewHistorySnapshotEvent(engine, newWorkflowSnapshot); err != nil {
t.logger.Error("unable to notify workflow creation", tag.Error(err))
}
Expand All @@ -109,6 +112,15 @@ func (t *TransactionImpl) ConflictResolveWorkflowExecution(
currentWorkflowEventsSeq []*persistence.WorkflowEvents,
) (int64, int64, int64, error) {

engine, err := t.shard.GetEngine()
if err != nil {
return 0, 0, 0, err
}
nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(resetWorkflowSnapshot.ExecutionInfo.NamespaceId))
if err != nil {
return 0, 0, 0, err
}

resp, err := conflictResolveWorkflowExecutionWithRetry(t.shard, &persistence.ConflictResolveWorkflowExecutionRequest{
ShardID: t.shard.GetShardID(),
// RangeID , this is set by shard context
Expand All @@ -120,21 +132,15 @@ func (t *TransactionImpl) ConflictResolveWorkflowExecution(
CurrentWorkflowMutation: currentWorkflowMutation,
CurrentWorkflowEvents: currentWorkflowEventsSeq,
})
if err != nil {
return 0, 0, 0, err
}

engine, err := t.shard.GetEngine()
if err != nil {
return 0, 0, 0, err
if operationPossiblySucceeded(err) {
NotifyWorkflowSnapshotTasks(engine, resetWorkflowSnapshot, nsEntry.IsGlobalNamespace())
NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace())
NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, nsEntry.IsGlobalNamespace())
}
nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(resetWorkflowSnapshot.ExecutionInfo.NamespaceId))
if err != nil {
return 0, 0, 0, err
}
NotifyWorkflowSnapshotTasks(engine, resetWorkflowSnapshot, nsEntry.IsGlobalNamespace())
NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace())
NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, nsEntry.IsGlobalNamespace())

if err := NotifyNewHistorySnapshotEvent(engine, resetWorkflowSnapshot); err != nil {
t.logger.Error("unable to notify workflow reset", tag.Error(err))
}
Expand Down Expand Up @@ -164,6 +170,15 @@ func (t *TransactionImpl) UpdateWorkflowExecution(
newWorkflowEventsSeq []*persistence.WorkflowEvents,
) (int64, int64, error) {

engine, err := t.shard.GetEngine()
if err != nil {
return 0, 0, err
}
nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(currentWorkflowMutation.ExecutionInfo.NamespaceId))
if err != nil {
return 0, 0, err
}

resp, err := updateWorkflowExecutionWithRetry(t.shard, &persistence.UpdateWorkflowExecutionRequest{
ShardID: t.shard.GetShardID(),
// RangeID , this is set by shard context
Expand All @@ -173,20 +188,14 @@ func (t *TransactionImpl) UpdateWorkflowExecution(
NewWorkflowSnapshot: newWorkflowSnapshot,
NewWorkflowEvents: newWorkflowEventsSeq,
})
if err != nil {
return 0, 0, err
if operationPossiblySucceeded(err) {
NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, nsEntry.IsGlobalNamespace())
NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace())
}

engine, err := t.shard.GetEngine()
if err != nil {
return 0, 0, err
}
nsEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(currentWorkflowMutation.ExecutionInfo.NamespaceId))
if err != nil {
return 0, 0, err
}
NotifyWorkflowMutationTasks(engine, currentWorkflowMutation, nsEntry.IsGlobalNamespace())
NotifyWorkflowSnapshotTasks(engine, newWorkflowSnapshot, nsEntry.IsGlobalNamespace())

if err := NotifyNewHistoryMutationEvent(engine, currentWorkflowMutation); err != nil {
t.logger.Error("unable to notify workflow mutation", tag.Error(err))
}
Expand Down Expand Up @@ -726,3 +735,21 @@ func emitCompletionMetrics(
)
}
}

func operationPossiblySucceeded(err error) bool {
switch err.(type) {
case *persistence.CurrentWorkflowConditionFailedError,
*persistence.WorkflowConditionFailedError,
*persistence.ConditionFailedError,
*persistence.ShardOwnershipLostError,
*persistence.InvalidPersistenceRequestError,
*persistence.TransactionSizeLimitError,
*serviceerror.ResourceExhausted,
*serviceerror.NotFound:
// Persistence failure that means the write was definitely not committed:
return false
default:
return true
}

}
194 changes: 194 additions & 0 deletions service/history/workflow/transaction_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package workflow

import (
"errors"

"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
)

type (
transactionSuite struct {
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.MockContext
mockEngine *shard.MockEngine
mockNamespaceCache *namespace.MockRegistry

logger log.Logger

transaction *TransactionImpl
}
)

func TestTransactionSuite(t *testing.T) {
s := new(transactionSuite)
suite.Run(t, s)
}

func (s *transactionSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())
s.mockShard = shard.NewMockContext(s.controller)
s.mockEngine = shard.NewMockEngine(s.controller)
s.mockNamespaceCache = namespace.NewMockRegistry(s.controller)
s.logger = log.NewTestLogger()

s.mockShard.EXPECT().GetShardID().Return(int32(1)).AnyTimes()
s.mockShard.EXPECT().GetEngine().Return(s.mockEngine, nil).AnyTimes()
s.mockShard.EXPECT().GetNamespaceRegistry().Return(s.mockNamespaceCache).AnyTimes()
s.mockShard.EXPECT().GetLogger().Return(s.logger).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()

s.transaction = NewTransaction(s.mockShard)
}

func (s *transactionSuite) TearDownTest() {
s.controller.Finish()
}

func (s *transactionSuite) TestOperationMayApplied() {
testCases := []struct {
err error
mayApplied bool
}{
{err: &persistence.CurrentWorkflowConditionFailedError{}, mayApplied: false},
{err: &persistence.WorkflowConditionFailedError{}, mayApplied: false},
{err: &persistence.ConditionFailedError{}, mayApplied: false},
{err: &persistence.ShardOwnershipLostError{}, mayApplied: false},
{err: &persistence.InvalidPersistenceRequestError{}, mayApplied: false},
{err: &persistence.TransactionSizeLimitError{}, mayApplied: false},
{err: &serviceerror.ResourceExhausted{}, mayApplied: false},
{err: &serviceerror.NotFound{}, mayApplied: false},
{err: nil, mayApplied: true},
{err: &persistence.TimeoutError{}, mayApplied: true},
{err: &serviceerror.Unavailable{}, mayApplied: true},
{err: errors.New("some unknown error"), mayApplied: true},
}

for _, tc := range testCases {
s.Equal(tc.mayApplied, operationPossiblySucceeded(tc.err))
}
}

func (s *transactionSuite) TestCreateWorkflowExecution_NotifyTaskWhenFailed() {
timeoutErr := &persistence.TimeoutError{}
s.True(operationPossiblySucceeded(timeoutErr))

s.mockShard.EXPECT().CreateWorkflowExecution(gomock.Any()).Return(nil, timeoutErr)
s.setupMockForTaskNotification()

_, err := s.transaction.CreateWorkflowExecution(
persistence.CreateWorkflowModeBrandNew,
&persistence.WorkflowSnapshot{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
NamespaceId: tests.NamespaceID.String(),
WorkflowId: tests.WorkflowID,
},
ExecutionState: &persistencespb.WorkflowExecutionState{
RunId: tests.RunID,
},
},
[]*persistence.WorkflowEvents{},
)
s.Equal(timeoutErr, err)
}

func (s *transactionSuite) TestUpdateWorkflowExecution_NotifyTaskWhenFailed() {
timeoutErr := &persistence.TimeoutError{}
s.True(operationPossiblySucceeded(timeoutErr))

s.mockShard.EXPECT().UpdateWorkflowExecution(gomock.Any()).Return(nil, timeoutErr)
s.setupMockForTaskNotification() // for current workflow mutation
s.setupMockForTaskNotification() // for new workflow snapshot

_, _, err := s.transaction.UpdateWorkflowExecution(
persistence.UpdateWorkflowModeUpdateCurrent,
&persistence.WorkflowMutation{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
NamespaceId: tests.NamespaceID.String(),
WorkflowId: tests.WorkflowID,
},
ExecutionState: &persistencespb.WorkflowExecutionState{
RunId: tests.RunID,
},
},
[]*persistence.WorkflowEvents{},
&persistence.WorkflowSnapshot{},
[]*persistence.WorkflowEvents{},
)
s.Equal(timeoutErr, err)
}

func (s *transactionSuite) TestConflictResolveWorkflowExecution_NotifyTaskWhenFailed() {
timeoutErr := &persistence.TimeoutError{}
s.True(operationPossiblySucceeded(timeoutErr))

s.mockShard.EXPECT().ConflictResolveWorkflowExecution(gomock.Any()).Return(nil, timeoutErr)
s.setupMockForTaskNotification() // for reset workflow snapshot
s.setupMockForTaskNotification() // for new workflow snapshot
s.setupMockForTaskNotification() // for current workflow mutation

_, _, _, err := s.transaction.ConflictResolveWorkflowExecution(
persistence.ConflictResolveWorkflowModeUpdateCurrent,
&persistence.WorkflowSnapshot{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
NamespaceId: tests.NamespaceID.String(),
WorkflowId: tests.WorkflowID,
},
ExecutionState: &persistencespb.WorkflowExecutionState{
RunId: tests.RunID,
},
},
[]*persistence.WorkflowEvents{},
&persistence.WorkflowSnapshot{},
[]*persistence.WorkflowEvents{},
&persistence.WorkflowMutation{},
[]*persistence.WorkflowEvents{},
)
s.Equal(timeoutErr, err)
}

func (s *transactionSuite) setupMockForTaskNotification() {
s.mockEngine.EXPECT().NotifyNewTransferTasks(gomock.Any(), gomock.Any()).Times(1)
s.mockEngine.EXPECT().NotifyNewTimerTasks(gomock.Any(), gomock.Any()).Times(1)
s.mockEngine.EXPECT().NotifyNewVisibilityTasks(gomock.Any()).Times(1)
s.mockEngine.EXPECT().NotifyNewReplicationTasks(gomock.Any()).Times(1)
}

0 comments on commit 05f584c

Please sign in to comment.