Skip to content

Commit

Permalink
Assign LastFirstTransactionID correctly (#2438)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan authored Feb 3, 2022
1 parent 4670e98 commit 0447a17
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 42 deletions.
2 changes: 2 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,8 @@ type (
ReadHistoryBranchByBatchResponse struct {
// History events by batch
History []*historypb.History
// TransactionID for relevant History batch
TransactionIDs []int64
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
Expand Down
40 changes: 21 additions & 19 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (m *executionManagerImpl) ReadHistoryBranchByBatch(

resp := &ReadHistoryBranchByBatchResponse{}
var err error
_, resp.History, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(true, request)
_, resp.History, resp.TransactionIDs, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(true, request)
return resp, err
}

Expand All @@ -458,7 +458,7 @@ func (m *executionManagerImpl) ReadHistoryBranch(

resp := &ReadHistoryBranchResponse{}
var err error
resp.HistoryEvents, _, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(false, request)
resp.HistoryEvents, _, _, resp.NextPageToken, resp.Size, err = m.readHistoryBranch(false, request)
return resp, err
}

Expand All @@ -469,7 +469,7 @@ func (m *executionManagerImpl) ReadRawHistoryBranch(
request *ReadHistoryBranchRequest,
) (*ReadRawHistoryBranchResponse, error) {

dataBlobs, token, dataSize, err := m.readRawHistoryBranchAndFilter(request)
dataBlobs, _, token, dataSize, err := m.readRawHistoryBranchAndFilter(request)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -575,7 +575,7 @@ func (m *executionManagerImpl) readRawHistoryBranch(

func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
request *ReadHistoryBranchRequest,
) ([]*commonpb.DataBlob, *historyPagingToken, int, error) {
) ([]*commonpb.DataBlob, []int64, *historyPagingToken, int, error) {

shardID := request.ShardID
branchToken := request.BranchToken
Expand All @@ -584,7 +584,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(

branch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, nil, 0, err
return nil, nil, nil, 0, err
}
treeID := branch.TreeId
branchID := branch.BranchId
Expand All @@ -606,7 +606,7 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
request.MinEventID-1,
)
if err != nil {
return nil, nil, 0, err
return nil, nil, nil, 0, err
}

nodes, token, err := m.readRawHistoryBranch(
Expand All @@ -620,10 +620,10 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
false,
)
if err != nil {
return nil, nil, 0, err
return nil, nil, nil, 0, err
}
if len(nodes) == 0 && len(request.NextPageToken) == 0 {
return nil, nil, 0, serviceerror.NewNotFound("Workflow execution history not found.")
return nil, nil, nil, 0, serviceerror.NewNotFound("Workflow execution history not found.")
}

nodes, err = m.filterHistoryNodes(
Expand All @@ -632,33 +632,35 @@ func (m *executionManagerImpl) readRawHistoryBranchAndFilter(
nodes,
)
if err != nil {
return nil, nil, 0, err
return nil, nil, nil, 0, err
}

var dataBlobs []*commonpb.DataBlob
transactionIDs := make([]int64, 0, len(nodes))
dataSize := 0
if len(nodes) > 0 {
dataBlobs = make([]*commonpb.DataBlob, len(nodes))
for index, node := range nodes {
dataBlobs[index] = node.Events
dataSize += len(node.Events.Data)
transactionIDs = append(transactionIDs, node.TransactionID)
}
lastNode := nodes[len(nodes)-1]
token.LastNodeID = lastNode.NodeID
token.LastTransactionID = lastNode.TransactionID
}

return dataBlobs, token, dataSize, nil
return dataBlobs, transactionIDs, token, dataSize, nil
}

func (m *executionManagerImpl) readHistoryBranch(
byBatch bool,
request *ReadHistoryBranchRequest,
) ([]*historypb.HistoryEvent, []*historypb.History, []byte, int, error) {
) ([]*historypb.HistoryEvent, []*historypb.History, []int64, []byte, int, error) {

dataBlobs, token, dataSize, err := m.readRawHistoryBranchAndFilter(request)
dataBlobs, transactionIDs, token, dataSize, err := m.readRawHistoryBranchAndFilter(request)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, nil, 0, err
}

historyEvents := make([]*historypb.HistoryEvent, 0, request.PageSize)
Expand All @@ -667,11 +669,11 @@ func (m *executionManagerImpl) readHistoryBranch(
for _, batch := range dataBlobs {
events, err := m.serializer.DeserializeEvents(batch)
if err != nil {
return historyEvents, historyEventBatches, nil, dataSize, err
return nil, nil, nil, nil, dataSize, err
}
if len(events) == 0 {
m.logger.Error("Empty events in a batch")
return historyEvents, historyEventBatches, nil, dataSize, serviceerror.NewDataLoss(fmt.Sprintf("corrupted history event batch, empty events"))
return nil, nil, nil, nil, dataSize, serviceerror.NewDataLoss(fmt.Sprintf("corrupted history event batch, empty events"))
}

firstEvent := events[0] // first
Expand All @@ -684,15 +686,15 @@ func (m *executionManagerImpl) readHistoryBranch(
tag.FirstEventVersion(firstEvent.GetVersion()), tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.LastEventVersion(lastEvent.GetVersion()), tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.Counter(eventCount))
return historyEvents, historyEventBatches, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, wrong version and IDs")
return historyEvents, historyEventBatches, transactionIDs, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, wrong version and IDs")
}
if firstEvent.GetEventId() != token.LastEventID+1 {
m.logger.Error("Corrupted non-contiguous event batch",
tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.TokenLastEventID(token.LastEventID),
tag.Counter(eventCount))
return historyEvents, historyEventBatches, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, eventID is not contiguous")
return historyEvents, historyEventBatches, transactionIDs, nil, dataSize, serviceerror.NewDataLoss("corrupted history event batch, eventID is not contiguous")
}

if byBatch {
Expand All @@ -705,9 +707,9 @@ func (m *executionManagerImpl) readHistoryBranch(

nextPageToken, err := m.serializeToken(token)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, nil, 0, err
}
return historyEvents, historyEventBatches, nextPageToken, dataSize, nil
return historyEvents, historyEventBatches, transactionIDs, nextPageToken, dataSize, nil
}

func (m *executionManagerImpl) filterHistoryNodes(
Expand Down
25 changes: 18 additions & 7 deletions service/history/nDCStateRebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type (
rebuiltHistorySize int64
logger log.Logger
}

HistoryBlobsPaginationItem struct {
History *historypb.History
TransactionID int64
}
)

var _ nDCStateRebuilder = (*nDCStateRebuilderImpl)(nil)
Expand Down Expand Up @@ -112,7 +117,6 @@ func (r *nDCStateRebuilderImpl) rebuild(
targetBranchToken []byte,
requestID string,
) (workflow.MutableState, int64, error) {

iter := collection.NewPagingIterator(r.getPaginationFn(
common.FirstEventID,
baseLastEventID+1,
Expand All @@ -129,27 +133,30 @@ func (r *nDCStateRebuilderImpl) rebuild(
now,
)

var lastTxnId int64
for iter.HasNext() {
batch, err := iter.Next()
switch err.(type) {
case nil:
// noop
case *serviceerror.DataLoss:
// log event
r.logger.Error("encountered data loss event", tag.WorkflowNamespaceID(baseWorkflowIdentifier.NamespaceID), tag.WorkflowID(baseWorkflowIdentifier.WorkflowID), tag.WorkflowRunID(baseWorkflowIdentifier.RunID))
return nil, 0, err
default:
return nil, 0, err
}

history := batch.(*HistoryBlobsPaginationItem)
if err := r.applyEvents(
targetWorkflowIdentifier,
stateBuilder,
batch.(*historypb.History).Events,
history.History.Events,
requestID,
); err != nil {
return nil, 0, err
}

lastTxnId = history.TransactionID
}

if err := rebuiltMutableState.SetCurrentBranchToken(targetBranchToken); err != nil {
Expand Down Expand Up @@ -180,6 +187,8 @@ func (r *nDCStateRebuilderImpl) rebuild(
return nil, 0, err
}

rebuiltMutableState.GetExecutionInfo().LastFirstEventTxnId = lastTxnId

// refresh tasks to be generated
if err := r.taskRefresher.RefreshTasks(now, rebuiltMutableState); err != nil {
return nil, 0, err
Expand Down Expand Up @@ -239,9 +248,7 @@ func (r *nDCStateRebuilderImpl) getPaginationFn(
nextEventID int64,
branchToken []byte,
) collection.PaginationFn {

return func(paginationToken []byte) ([]interface{}, []byte, error) {

resp, err := r.executionMgr.ReadHistoryBranchByBatch(&persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
Expand All @@ -256,8 +263,12 @@ func (r *nDCStateRebuilderImpl) getPaginationFn(

r.rebuiltHistorySize += int64(resp.Size)
var paginateItems []interface{}
for _, history := range resp.History {
paginateItems = append(paginateItems, history)
for i, history := range resp.History {
nextBatch := &HistoryBlobsPaginationItem{
History: history,
TransactionID: resp.TransactionIDs[i],
}
paginateItems = append(paginateItems, nextBatch)
}
return paginateItems, resp.NextPageToken, nil
}
Expand Down
48 changes: 32 additions & 16 deletions service/history/nDCStateRebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ func (s *nDCStateRebuilderSuite) TestPagination() {
Attributes: &historypb.HistoryEvent_ActivityTaskScheduledEventAttributes{ActivityTaskScheduledEventAttributes: &historypb.ActivityTaskScheduledEventAttributes{}},
}
history1 := []*historypb.History{{Events: []*historypb.HistoryEvent{event1, event2, event3}}}
transactionID1 := int64(10)
history2 := []*historypb.History{{Events: []*historypb.HistoryEvent{event4, event5}}}
history := append(history1, history2...)
transactionID2 := int64(20)
expectedHistory := append(history1, history2...)
expectedTransactionIDs := []int64{transactionID1, transactionID2}
pageToken := []byte("some random token")

shardID := s.mockShard.GetShardID()
Expand All @@ -210,9 +213,10 @@ func (s *nDCStateRebuilderSuite) TestPagination() {
NextPageToken: nil,
ShardID: shardID,
}).Return(&persistence.ReadHistoryBranchByBatchResponse{
History: history1,
NextPageToken: pageToken,
Size: 12345,
History: history1,
TransactionIDs: []int64{transactionID1},
NextPageToken: pageToken,
Size: 12345,
}, nil)
s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(&persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
Expand All @@ -222,22 +226,30 @@ func (s *nDCStateRebuilderSuite) TestPagination() {
NextPageToken: pageToken,
ShardID: shardID,
}).Return(&persistence.ReadHistoryBranchByBatchResponse{
History: history2,
NextPageToken: nil,
Size: 67890,
History: history2,
TransactionIDs: []int64{transactionID2},
NextPageToken: nil,
Size: 67890,
}, nil)

paginationFn := s.nDCStateRebuilder.getPaginationFn(firstEventID, nextEventID, branchToken)
iter := collection.NewPagingIterator(paginationFn)

var result []*historypb.History
var result []*HistoryBlobsPaginationItem
for iter.HasNext() {
item, err := iter.Next()
s.NoError(err)
result = append(result, item.(*historypb.History))
result = append(result, item.(*HistoryBlobsPaginationItem))
}
var historyResult []*historypb.History
var transactionIDsResult []int64
for _, item := range result {
historyResult = append(historyResult, item.History)
transactionIDsResult = append(transactionIDsResult, item.TransactionID)
}

s.Equal(history, result)
s.Equal(expectedHistory, historyResult)
s.Equal(expectedTransactionIDs, transactionIDsResult)
}

func (s *nDCStateRebuilderSuite) TestRebuild() {
Expand Down Expand Up @@ -293,10 +305,12 @@ func (s *nDCStateRebuilderSuite) TestRebuild() {
NextPageToken: nil,
ShardID: shardID,
}).Return(&persistence.ReadHistoryBranchByBatchResponse{
History: history1,
NextPageToken: pageToken,
Size: historySize1,
History: history1,
TransactionIDs: []int64{10},
NextPageToken: pageToken,
Size: historySize1,
}, nil)
expectedLastFirstTransactionID := int64(20)
s.mockExecutionManager.EXPECT().ReadHistoryBranchByBatch(&persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
Expand All @@ -305,9 +319,10 @@ func (s *nDCStateRebuilderSuite) TestRebuild() {
NextPageToken: pageToken,
ShardID: shardID,
}).Return(&persistence.ReadHistoryBranchByBatchResponse{
History: history2,
NextPageToken: nil,
Size: historySize2,
History: history2,
TransactionIDs: []int64{expectedLastFirstTransactionID},
NextPageToken: nil,
Size: historySize2,
}, nil)

s.mockNamespaceCache.EXPECT().GetNamespaceByID(targetNamespaceID).Return(namespace.NewGlobalNamespaceForTest(
Expand Down Expand Up @@ -349,4 +364,5 @@ func (s *nDCStateRebuilderSuite) TestRebuild() {
),
), rebuildMutableState.GetExecutionInfo().GetVersionHistories())
s.Equal(timestamp.TimeValue(rebuildMutableState.GetExecutionInfo().StartTime), s.now)
s.Equal(expectedLastFirstTransactionID, rebuildExecutionInfo.LastFirstEventTxnId)
}

0 comments on commit 0447a17

Please sign in to comment.