From d61a4547443fadfe6a115165f8c569819c4272e1 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 27 Dec 2021 15:42:00 -0800 Subject: [PATCH] Fix reference count in DeleteHistoryBranch (#2323) --- common/persistence/history_manager.go | 11 +-- common/persistence/tests/history_store.go | 82 ++++++++++++++++++++--- 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/common/persistence/history_manager.go b/common/persistence/history_manager.go index 4fd6a7fbd09..5af3ff42593 100644 --- a/common/persistence/history_manager.go +++ b/common/persistence/history_manager.go @@ -171,6 +171,7 @@ func (m *executionManagerImpl) DeleteHistoryBranch( // skip the target branch continue } + usedBranches[br.BranchId] = common.LastEventID for _, ancestor := range br.Ancestors { if curr, ok := usedBranches[ancestor.GetBranchId()]; !ok || curr < ancestor.GetEndNodeId() { usedBranches[ancestor.GetBranchId()] = ancestor.GetEndNodeId() @@ -185,10 +186,12 @@ findDeleteRanges: br := brsToDelete[i] if maxEndNode, ok := usedBranches[br.GetBranchId()]; ok { // branch is used by others, we can only delete from the maxEndNode - deleteRanges = append(deleteRanges, InternalDeleteHistoryBranchRange{ - BranchId: br.BranchId, - BeginNodeId: maxEndNode, - }) + if maxEndNode != common.LastEventID { + deleteRanges = append(deleteRanges, InternalDeleteHistoryBranchRange{ + BranchId: br.BranchId, + BeginNodeId: maxEndNode, + }) + } // all ancestors are also used, no need to go up further, break findDeleteRanges } else { diff --git a/common/persistence/tests/history_store.go b/common/persistence/tests/history_store.go index d897f71e6fd..9cf4767570d 100644 --- a/common/persistence/tests/history_store.go +++ b/common/persistence/tests/history_store.go @@ -473,7 +473,7 @@ func (s *HistoryEventsSuite) TestAppendForkSelectTrim_LastBranch() { s.Equal(events, s.listAllHistoryEvents(shardID, newBranchToken)) } -func (s *HistoryEventsSuite) TestForkDeleteBranch() { +func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteBaseBranchFirst() { shardID := rand.Int31() treeID := uuid.New() branchID := uuid.New() @@ -502,11 +502,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch() { s.appendHistoryEvents(shardID, br2Token, eventsPacket1) // delete branch1, should only delete branch1:[4,5], keep branch1:[1,2,3] as it is used as ancestor by branch2 - err = s.store.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{ - ShardID: shardID, - BranchToken: br1Token, - }) - s.NoError(err) + s.deleteHistoryBranch(shardID, br1Token) // verify branch1:[1,2,3] still remains s.Equal(eventsPacket0.events, s.listAllHistoryEvents(shardID, br1Token)) // verify branch2 is not affected @@ -514,11 +510,7 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch() { // delete branch2, should delete branch2:[4,5], and also should delete ancestor branch1:[1,2,3] as it is no longer // used by anyone - err = s.store.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{ - ShardID: shardID, - BranchToken: br2Token, - }) - s.NoError(err) + s.deleteHistoryBranch(shardID, br2Token) // at this point, both branch1 and branch2 are deleted. _, err = s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{ @@ -540,6 +532,63 @@ func (s *HistoryEventsSuite) TestForkDeleteBranch() { s.Error(err, "Workflow execution history not found.") } +func (s *HistoryEventsSuite) TestForkDeleteBranch_DeleteForkedBranchFirst() { + shardID := rand.Int31() + treeID := uuid.New() + branchID := uuid.New() + br1Token, err := p.NewHistoryBranchTokenByBranchID(treeID, branchID) + s.NoError(err) + + transactionID := rand.Int63() + eventsPacket0 := s.newHistoryEvents( + []int64{1, 2, 3}, + transactionID, + 0, + ) + s.appendHistoryEvents(shardID, br1Token, eventsPacket0) + eventsPacket1 := s.newHistoryEvents( + []int64{4, 5}, + transactionID+1, + transactionID, + ) + s.appendHistoryEvents(shardID, br1Token, eventsPacket1) + + br2Token := s.forkHistoryBranch(shardID, br1Token, 4) + s.appendHistoryEvents(shardID, br2Token, s.newHistoryEvents( + []int64{4, 5}, + transactionID+2, + transactionID, + )) + + // delete branch2, should only delete branch2:[4,5], keep branch1:[1,2,3] [4,5] as it is by branch1 + s.deleteHistoryBranch(shardID, br2Token) + // verify branch1 is not affected + s.Equal(append(eventsPacket0.events, eventsPacket1.events...), s.listAllHistoryEvents(shardID, br1Token)) + + // branch2:[4,5] should be deleted + _, err = s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{ + ShardID: shardID, + BranchToken: br2Token, + MinEventID: 4, + MaxEventID: common.LastEventID, + PageSize: 1, + }) + s.Error(err, "Workflow execution history not found.") + + // delete branch1, should delete branch1:[1,2,3] [4,5] + s.deleteHistoryBranch(shardID, br1Token) + + // branch1 should be deleted + _, err = s.store.ReadHistoryBranch(&p.ReadHistoryBranchRequest{ + ShardID: shardID, + BranchToken: br1Token, + MinEventID: common.FirstEventID, + MaxEventID: common.LastEventID, + PageSize: 1, + }) + s.Error(err, "Workflow execution history not found.") +} + func (s *HistoryEventsSuite) appendHistoryEvents( shardID int32, branchToken []byte, @@ -572,6 +621,17 @@ func (s *HistoryEventsSuite) forkHistoryBranch( return resp.NewBranchToken } +func (s *HistoryEventsSuite) deleteHistoryBranch( + shardID int32, + branchToken []byte, +) { + err := s.store.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{ + ShardID: shardID, + BranchToken: branchToken, + }) + s.NoError(err) +} + func (s *HistoryEventsSuite) trimHistoryBranch( shardID int32, branchToken []byte,