diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 133747fa6f8..6e5dbd177cc 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -891,6 +891,8 @@ type ( ReadHistoryBranchByBatchResponse struct { // History events by batch History []*historypb.History + // TransactionID for relevant History batch + TransactionIDs []int64 // Token to read next page if there are more events beyond page size. // Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page. // Empty means we have reached the last page, not need to continue diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 5af3ff42593..febebc44a77 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -446,7 +446,7 @@ func (m *executionManagerImpl) ReadHistoryBranchByBatch( resp := &ReadHistoryBranchByBatchResponse{} var err error - _, resp.History, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(true, request) + _, resp.History, resp.TransactionIDs, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(true, request) return resp, err } @@ -458,7 +458,7 @@ func (m *executionManagerImpl) ReadHistoryBranch( resp := &ReadHistoryBranchResponse{} var err error - resp.HistoryEvents, _, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(false, request) + resp.HistoryEvents, _, _, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(false, request) return resp, err } @@ -469,7 +469,7 @@ func (m *executionManagerImpl) ReadRawHistoryBranch( request *ReadHistoryBranchRequest, ) (*ReadRawHistoryBranchResponse, error) { - dataBlobs, token, dataSize, err := m.readRawHistoryBranchAndFilter(request) + dataBlobs, _, token, dataSize, err := m.readRawHistoryBranchAndFilter(request) if err != nil { return nil, err } @@ -575,7 +575,7 @@ func (m *executionManagerImpl) readRawHistoryBranch( func (m *executionManagerImpl) readRawHistoryBranchAndFilter( request *ReadHistoryBranchRequest, -) ([]*commonpb.DataBlob, *historyPagingToken, int, error) { +) ([]*commonpb.DataBlob, []int64, *historyPagingToken, int, error) { shardID := request.ShardID branchToken := request.BranchToken @@ -584,7 +584,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( branch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) if err != nil { - return nil, nil, 0, err + return nil, nil, nil, 0, err } treeID := branch.TreeId branchID := branch.BranchId @@ -606,7 +606,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( request.MinEventID-1, ) if err != nil { - return nil, nil, 0, err + return nil, nil, nil, 0, err } nodes, token, err := m.readRawHistoryBranch( @@ -620,10 +620,10 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( false, ) if err != nil { - return nil, nil, 0, err + return nil, nil, nil, 0, err } if len(nodes) == 0 && len(request.NextPageToken) == 0 { - return nil, nil, 0, serviceerror.NewNotFound("Workflow execution history not found.") + return nil, nil, nil, 0, serviceerror.NewNotFound("Workflow execution history not found.") } nodes, err = m.filterHistoryNodes( @@ -632,33 +632,35 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter( nodes, ) if err != nil { - return nil, nil, 0, err + return nil, nil, nil, 0, err } var dataBlobs []*commonpb.DataBlob + transactionIDs := make([]int64, 0, len(nodes)) dataSize := 0 if len(nodes) > 0 { dataBlobs = make([]*commonpb.DataBlob, len(nodes)) for index, node := range nodes { dataBlobs[index] = node.Events dataSize += len(node.Events.Data) + transactionIDs = append(transactionIDs, node.TransactionID) } lastNode := nodes[len(nodes)-1] token.LastNodeID = lastNode.NodeID token.LastTransactionID = lastNode.TransactionID } - return dataBlobs, token, dataSize, nil + return dataBlobs, transactionIDs, token, dataSize, nil } func (m *executionManagerImpl) readHistoryBranch( byBatch bool, request *ReadHistoryBranchRequest, -) ([]*historypb.HistoryEvent, []*historypb.History, []byte, int, error) { +) ([]*historypb.HistoryEvent, []*historypb.History, []int64, []byte, int, error) { - dataBlobs, token, dataSize, err := m.readRawHistoryBranchAndFilter(request) + dataBlobs, transactionIDs, token, dataSize, err := m.readRawHistoryBranchAndFilter(request) if err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, nil, 0, err } historyEvents := make([]*historypb.HistoryEvent, 0, request.PageSize) @@ -667,11 +669,11 @@ func (m *executionManagerImpl) readHistoryBranch( for _, batch := range dataBlobs { events, err := m.serializer.DeserializeEvents(batch) if err != nil { - return historyEvents, historyEventBatches, nil, dataSize, err + return nil, nil, nil, nil, dataSize, err } if len(events) == 0 { m.logger.Error("Empty events in a batch") - return historyEvents, historyEventBatches, nil, dataSize, serviceerror.NewDataLoss(fmt.Sprintf("corrupted history event batch, empty events")) + return nil, nil, nil, nil, dataSize, serviceerror.NewDataLoss(fmt.Sprintf("corrupted history event batch, empty events")) } firstEvent := events[0] // first @@ -684,7 +686,7 @@ func (m *executionManagerImpl) readHistoryBranch( tag.FirstEventVersion(firstEvent.GetVersion()), tag.WorkflowFirstEventID(firstEvent.GetEventId()), tag.LastEventVersion(lastEvent.GetVersion()), tag.WorkflowNextEventID(lastEvent.GetEventId()), tag.Counter(eventCount)) - return historyEvents, historyEventBatches, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, wrong version and IDs") + return historyEvents, historyEventBatches, transactionIDs, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, wrong version and IDs") } if firstEvent.GetEventId() != token.LastEventID+1 { m.logger.Error("Corrupted non-contiguous event batch", @@ -692,7 +694,7 @@ func (m *executionManagerImpl) readHistoryBranch( tag.WorkflowNextEventID(lastEvent.GetEventId()), tag.TokenLastEventID(token.LastEventID), tag.Counter(eventCount)) - return historyEvents, historyEventBatches, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, eventID is not contiguous") + return historyEvents, historyEventBatches, transactionIDs, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, eventID is not contiguous") } if byBatch { @@ -705,9 +707,9 @@ func (m *executionManagerImpl) readHistoryBranch( nextPageToken, err := m.serializeToken(token) if err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, nil, 0, err } - return historyEvents, historyEventBatches, nextPageToken, dataSize, nil + return historyEvents, historyEventBatches, transactionIDs, nextPageToken, dataSize, nil } func (m *executionManagerImpl) filterHistoryNodes( diff --git a/service/history/nDCStateRebuilder.go b/service/history/nDCStateRebuilder.go index 1ffd701ea02..f5e3638b79b 100644 --- a/service/history/nDCStateRebuilder.go +++ b/service/history/nDCStateRebuilder.go @@ -75,6 +75,11 @@ type ( rebuiltHistorySize int64 logger log.Logger } + + HistoryBlobsPaginationItem struct { + History *historypb.History + TransactionID int64 + } ) var _ nDCStateRebuilder = (*nDCStateRebuilderImpl)(nil) @@ -112,7 +117,6 @@ func (r *nDCStateRebuilderImpl) rebuild( targetBranchToken []byte, requestID string, ) (workflow.MutableState, int64, error) { - iter := collection.NewPagingIterator(r.getPaginationFn( common.FirstEventID, baseLastEventID+1, @@ -129,27 +133,30 @@ func (r *nDCStateRebuilderImpl) rebuild( now, ) + var lastTxnId int64 for iter.HasNext() { batch, err := iter.Next() switch err.(type) { case nil: // noop case *serviceerror.DataLoss: - // log event r.logger.Error("encountered data loss event", tag.WorkflowNamespaceID(baseWorkflowIdentifier.NamespaceID), tag.WorkflowID(baseWorkflowIdentifier.WorkflowID), tag.WorkflowRunID(baseWorkflowIdentifier.RunID)) return nil, 0, err default: return nil, 0, err } + history := batch.(*HistoryBlobsPaginationItem) if err := r.applyEvents( targetWorkflowIdentifier, stateBuilder, - batch.(*historypb.History).Events, + history.History.Events, requestID, ); err != nil { return nil, 0, err } + + lastTxnId = history.TransactionID } if err := rebuiltMutableState.SetCurrentBranchToken(targetBranchToken); err != nil { @@ -180,6 +187,8 @@ func (r *nDCStateRebuilderImpl) rebuild( return nil, 0, err } + rebuiltMutableState.GetExecutionInfo().LastFirstEventTxnId = lastTxnId + // refresh tasks to be generated if err := r.taskRefresher.RefreshTasks(now, rebuiltMutableState); err != nil { return nil, 0, err @@ -239,9 +248,7 @@ func (r *nDCStateRebuilderImpl) getPaginationFn( nextEventID int64, branchToken []byte, ) collection.PaginationFn { - return func(paginationToken []byte) ([]interface{}, []byte, error) { - resp, err := r.executionMgr.ReadHistoryBranchByBatch(&persistence.ReadHistoryBranchRequest{ BranchToken: branchToken, MinEventID: firstEventID, @@ -256,8 +263,12 @@ func (r *nDCStateRebuilderImpl) getPaginationFn( r.rebuiltHistorySize += int64(resp.Size) var paginateItems []interface{} - for _, history := range resp.History { - paginateItems = append(paginateItems, history) + for i, history := range resp.History { + nextBatch := &HistoryBlobsPaginationItem{ + History: history, + TransactionID: resp.TransactionIDs[i], + } + paginateItems = append(paginateItems, nextBatch) } return paginateItems, resp.NextPageToken, nil } diff --git a/service/history/nDCStateRebuilder_test.go b/service/history/nDCStateRebuilder_test.go index 6a39451f174..a1787d688d0 100644 --- a/service/history/nDCStateRebuilder_test.go +++ b/service/history/nDCStateRebuilder_test.go @@ -197,8 +197,11 @@ func (s *nDCStateRebuilderSuite) TestPagination() { Attributes: &historypb.HistoryEvent_ActivityTaskScheduledEventAttributes{ActivityTaskScheduledEventAttributes: &historypb.ActivityTaskScheduledEventAttributes{}}, } history1 := []*historypb.History{{Events: []*historypb.HistoryEvent{event1, event2, event3}}} + transactionID1 := int64(10) history2 := []*historypb.History{{Events: []*historypb.HistoryEvent{event4, event5}}} - history := append(history1, history2...) + transactionID2 := int64(20) + expectedHistory := append(history1, history2...) + expectedTransactionIDs := []int64{transactionID1, transactionID2} pageToken := []byte("some random token") shardID := s.mockShard.GetShardID() @@ -210,9 +213,10 @@ func (s *nDCStateRebuilderSuite) TestPagination() { NextPageToken: nil, ShardID: shardID, }).Return(&persistence.ReadHistoryBranchByBatchResponse{ - History: history1, - NextPageToken: pageToken, - Size: 12345, + History: history1, + TransactionIDs: []int64{transactionID1}, + NextPageToken: pageToken, + Size: 12345, }, nil) s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(&persistence.ReadHistoryBranchRequest{ BranchToken: branchToken, @@ -222,22 +226,30 @@ func (s *nDCStateRebuilderSuite) TestPagination() { NextPageToken: pageToken, ShardID: shardID, }).Return(&persistence.ReadHistoryBranchByBatchResponse{ - History: history2, - NextPageToken: nil, - Size: 67890, + History: history2, + TransactionIDs: []int64{transactionID2}, + NextPageToken: nil, + Size: 67890, }, nil) paginationFn := s.nDCStateRebuilder.getPaginationFn(firstEventID, nextEventID, branchToken) iter := collection.NewPagingIterator(paginationFn) - var result []*historypb.History + var result []*HistoryBlobsPaginationItem for iter.HasNext() { item, err := iter.Next() s.NoError(err) - result = append(result, item.(*historypb.History)) + result = append(result, item.(*HistoryBlobsPaginationItem)) + } + var historyResult []*historypb.History + var transactionIDsResult []int64 + for _, item := range result { + historyResult = append(historyResult, item.History) + transactionIDsResult = append(transactionIDsResult, item.TransactionID) } - s.Equal(history, result) + s.Equal(expectedHistory, historyResult) + s.Equal(expectedTransactionIDs, transactionIDsResult) } func (s *nDCStateRebuilderSuite) TestRebuild() { @@ -293,10 +305,12 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { NextPageToken: nil, ShardID: shardID, }).Return(&persistence.ReadHistoryBranchByBatchResponse{ - History: history1, - NextPageToken: pageToken, - Size: historySize1, + History: history1, + TransactionIDs: []int64{10}, + NextPageToken: pageToken, + Size: historySize1, }, nil) + expectedLastFirstTransactionID := int64(20) s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(&persistence.ReadHistoryBranchRequest{ BranchToken: branchToken, MinEventID: firstEventID, @@ -305,9 +319,10 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { NextPageToken: pageToken, ShardID: shardID, }).Return(&persistence.ReadHistoryBranchByBatchResponse{ - History: history2, - NextPageToken: nil, - Size: historySize2, + History: history2, + TransactionIDs: []int64{expectedLastFirstTransactionID}, + NextPageToken: nil, + Size: historySize2, }, nil) s.mockNamespaceCache.EXPECT().GetNamespaceByID(targetNamespaceID).Return(namespace.NewGlobalNamespaceForTest( @@ -349,4 +364,5 @@ func (s *nDCStateRebuilderSuite) TestRebuild() { ), ), rebuildMutableState.GetExecutionInfo().GetVersionHistories()) s.Equal(timestamp.TimeValue(rebuildMutableState.GetExecutionInfo().StartTime), s.now) + s.Equal(expectedLastFirstTransactionID, rebuildExecutionInfo.LastFirstEventTxnId) }