diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 7f88b025950..99fd66659a2 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -128,5 +128,7 @@ type ( GetSearchAttributesProvider() searchattribute.Provider GetSearchAttributesMapper() searchattribute.Mapper GetArchivalMetadata() archiver.ArchivalMetadata + + Unload() } ) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 7b98496d68c..4fe6ceadbba 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -1088,7 +1088,8 @@ func (s *ContextImpl) allocateTaskIDsLocked( return s.allocateTimerIDsLocked( namespaceEntry, workflowID, - timerTasks) + timerTasks, + transferMaxReadLevel) } func (s *ContextImpl) allocateTransferIDsLocked( @@ -1115,6 +1116,7 @@ func (s *ContextImpl) allocateTimerIDsLocked( namespaceEntry *namespace.Namespace, workflowID string, timerTasks []tasks.Task, + transferMaxReadLevel *int64, ) error { // assign IDs for the timer tasks. They need to be assigned under shard lock. @@ -1145,6 +1147,7 @@ func (s *ContextImpl) allocateTimerIDsLocked( return err } task.SetTaskID(seqNum) + *transferMaxReadLevel = seqNum visibilityTs := task.GetVisibilityTime() s.contextTaggedLogger.Debug("Assigning new timer", tag.Timestamp(visibilityTs), tag.TaskID(task.GetTaskID()), tag.AckLevel(s.shardInfo.TimerAckLevelTime)) @@ -1269,6 +1272,10 @@ func (s *ContextImpl) start() { s.transitionLocked(contextRequestAcquire{}) } +func (s *ContextImpl) Unload() { + s.stop() +} + // stop should only be called by the controller. func (s *ContextImpl) stop() { s.wLock() diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 468ca7b8102..24f55dfc7d5 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -695,6 +695,18 @@ func (mr *MockContextMockRecorder) SetCurrentTime(cluster, currentTime interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCurrentTime", reflect.TypeOf((*MockContext)(nil).SetCurrentTime), cluster, currentTime) } +// Unload mocks base method. +func (m *MockContext) Unload() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Unload") +} + +// Unload indicates an expected call of Unload. +func (mr *MockContextMockRecorder) Unload() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unload", reflect.TypeOf((*MockContext)(nil).Unload)) +} + // UpdateClusterReplicationLevel mocks base method. func (m *MockContext) UpdateClusterReplicationLevel(cluster string, ackTaskID int64, ackTimestamp time.Time) error { m.ctrl.T.Helper() diff --git a/service/history/taskProcessor.go b/service/history/taskProcessor.go index 9a1d3830371..5deeacb3f36 100644 --- a/service/history/taskProcessor.go +++ b/service/history/taskProcessor.go @@ -168,6 +168,12 @@ func (t *taskProcessor) taskWorker( if !ok { return } + if task.GetTaskID() > t.shard.GetTransferMaxReadLevel() { + // this could happen if we lost ownership and was not aware of it. + // unload shard + t.shard.Unload() + return + } t.processTaskAndAck(notificationChan, task) } }