Skip to content

Commit

Permalink
Unload shard if loaded task ID is greater than max transfer ID (#2361)
Browse files Browse the repository at this point in the history
* Unload shard if loaded task ID is greater than max transfer ID

* Update max read level for timer task ID as well
  • Loading branch information
yiminc authored Jan 11, 2022
1 parent b33d241 commit aa62866
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 1 deletion.
2 changes: 2 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,7 @@ type (
GetSearchAttributesProvider() searchattribute.Provider
GetSearchAttributesMapper() searchattribute.Mapper
GetArchivalMetadata() archiver.ArchivalMetadata

Unload()
}
)
9 changes: 8 additions & 1 deletion service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,8 @@ func (s *ContextImpl) allocateTaskIDsLocked(
return s.allocateTimerIDsLocked(
namespaceEntry,
workflowID,
timerTasks)
timerTasks,
transferMaxReadLevel)
}

func (s *ContextImpl) allocateTransferIDsLocked(
Expand All @@ -1115,6 +1116,7 @@ func (s *ContextImpl) allocateTimerIDsLocked(
namespaceEntry *namespace.Namespace,
workflowID string,
timerTasks []tasks.Task,
transferMaxReadLevel *int64,
) error {

// assign IDs for the timer tasks. They need to be assigned under shard lock.
Expand Down Expand Up @@ -1145,6 +1147,7 @@ func (s *ContextImpl) allocateTimerIDsLocked(
return err
}
task.SetTaskID(seqNum)
*transferMaxReadLevel = seqNum
visibilityTs := task.GetVisibilityTime()
s.contextTaggedLogger.Debug("Assigning new timer",
tag.Timestamp(visibilityTs), tag.TaskID(task.GetTaskID()), tag.AckLevel(s.shardInfo.TimerAckLevelTime))
Expand Down Expand Up @@ -1269,6 +1272,10 @@ func (s *ContextImpl) start() {
s.transitionLocked(contextRequestAcquire{})
}

func (s *ContextImpl) Unload() {
s.stop()
}

// stop should only be called by the controller.
func (s *ContextImpl) stop() {
s.wLock()
Expand Down
12 changes: 12 additions & 0 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions service/history/taskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (t *taskProcessor) taskWorker(
if !ok {
return
}
if task.GetTaskID() > t.shard.GetTransferMaxReadLevel() {
// this could happen if we lost ownership and was not aware of it.
// unload shard
t.shard.Unload()
return
}
t.processTaskAndAck(notificationChan, task)
}
}
Expand Down

0 comments on commit aa62866

Please sign in to comment.