Skip to content

Commit

Permalink
Fix reference count in DeleteHistoryBranch (#2323)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Dec 27, 2021
1 parent f61e90f commit d61a454
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
11 changes: 7 additions & 4 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch(
// skip the target branch
continue
}
usedBranches[br.BranchId] = common.LastEventID
for _, ancestor := range br.Ancestors {
if curr, ok := usedBranches[ancestor.GetBranchId()]; !ok || curr < ancestor.GetEndNodeId() {
usedBranches[ancestor.GetBranchId()] = ancestor.GetEndNodeId()
Expand All @@ -185,10 +186,12 @@ findDeleteRanges:
br := brsToDelete[i]
if maxEndNode, ok := usedBranches[br.GetBranchId()]; ok {
// branch is used by others, we can only delete from the maxEndNode
deleteRanges = append(deleteRanges, InternalDeleteHistoryBranchRange{
BranchId: br.BranchId,
BeginNodeId: maxEndNode,
})
if maxEndNode != common.LastEventID {
deleteRanges = append(deleteRanges, InternalDeleteHistoryBranchRange{
BranchId: br.BranchId,
BeginNodeId: maxEndNode,
})
}
// all ancestors are also used, no need to go up further,
break findDeleteRanges
} else {
Expand Down
82 changes: 71 additions & 11 deletions common/persistence/tests/history_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() {
s.Equal(events, s.listAllHistoryEvents(shardID, newBranchToken))
}

func (s *HistoryEventsSuite) TestForkDeleteBranch() {
func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() {
shardID := rand.Int31()
treeID := uuid.New()
branchID := uuid.New()
Expand Down Expand Up @@ -502,23 +502,15 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch() {
s.appendHistoryEvents(shardID, br2Token, eventsPacket1)

// delete branch1, should only delete branch1:[4,5], keep branch1:[1,2,3] as it is used as ancestor by branch2
err = s.store.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{
ShardID: shardID,
BranchToken: br1Token,
})
s.NoError(err)
s.deleteHistoryBranch(shardID, br1Token)
// verify branch1:[1,2,3] still remains
s.Equal(eventsPacket0.events, s.listAllHistoryEvents(shardID, br1Token))
// verify branch2 is not affected
s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br2Token))

// delete branch2, should delete branch2:[4,5], and also should delete ancestor branch1:[1,2,3] as it is no longer
// used by anyone
err = s.store.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{
ShardID: shardID,
BranchToken: br2Token,
})
s.NoError(err)
s.deleteHistoryBranch(shardID, br2Token)

// at this point, both branch1 and branch2 are deleted.
_, err = s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{
Expand All @@ -540,6 +532,63 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch() {
s.Error(err, "Workflow execution history not found.")
}

func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() {
shardID := rand.Int31()
treeID := uuid.New()
branchID := uuid.New()
br1Token, err := p.NewHistoryBranchTokenByBranchID(treeID, branchID)
s.NoError(err)

transactionID := rand.Int63()
eventsPacket0 := s.newHistoryEvents(
[]int64{1, 2, 3},
transactionID,
0,
)
s.appendHistoryEvents(shardID, br1Token, eventsPacket0)
eventsPacket1 := s.newHistoryEvents(
[]int64{4, 5},
transactionID+1,
transactionID,
)
s.appendHistoryEvents(shardID, br1Token, eventsPacket1)

br2Token := s.forkHistoryBranch(shardID, br1Token, 4)
s.appendHistoryEvents(shardID, br2Token, s.newHistoryEvents(
[]int64{4, 5},
transactionID+2,
transactionID,
))

// delete branch2, should only delete branch2:[4,5], keep branch1:[1,2,3] [4,5] as it is by branch1
s.deleteHistoryBranch(shardID, br2Token)
// verify branch1 is not affected
s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br1Token))

// branch2:[4,5] should be deleted
_, err = s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{
ShardID: shardID,
BranchToken: br2Token,
MinEventID: 4,
MaxEventID: common.LastEventID,
PageSize: 1,
})
s.Error(err, "Workflow execution history not found.")

// delete branch1, should delete branch1:[1,2,3] [4,5]
s.deleteHistoryBranch(shardID, br1Token)

// branch1 should be deleted
_, err = s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{
ShardID: shardID,
BranchToken: br1Token,
MinEventID: common.FirstEventID,
MaxEventID: common.LastEventID,
PageSize: 1,
})
s.Error(err, "Workflow execution history not found.")
}

func (s *HistoryEventsSuite) appendHistoryEvents(
shardID int32,
branchToken []byte,
Expand Down Expand Up @@ -572,6 +621,17 @@ func (s *HistoryEventsSuite) forkHistoryBranch(
return resp.NewBranchToken
}

func (s *HistoryEventsSuite) deleteHistoryBranch(
shardID int32,
branchToken []byte,
) {
err := s.store.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{
ShardID: shardID,
BranchToken: branchToken,
})
s.NoError(err)
}

func (s *HistoryEventsSuite) trimHistoryBranch(
shardID int32,
branchToken []byte,
Expand Down

0 comments on commit d61a454

Please sign in to comment.