Skip to content

Commit

Permalink
Update Mutable State to reduce unnecessary update to DB (#4304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Jul 16, 2021
1 parent 6f989a3 commit deb0caf
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 59 deletions.
104 changes: 55 additions & 49 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,28 @@ var (

type (
mutableStateBuilder struct {
pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityIDToEventID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos map[*persistence.ActivityInfo]struct{} // Modified activities from last update.
deleteActivityInfos map[int64]struct{} // Deleted activities from last update.
syncActivityTasks map[int64]struct{} // Activity to be sync to remote
pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityIDToEventID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos map[int64]*persistence.ActivityInfo // Modified activities from last update.
deleteActivityInfos map[int64]struct{} // Deleted activities from last update.
syncActivityTasks map[int64]struct{} // Activity to be sync to remote

pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info.
pendingTimerEventIDToID map[int64]string // User Timer Start Event ID -> User Timer ID.
updateTimerInfos map[*persistence.TimerInfo]struct{} // Modified timers from last update.
deleteTimerInfos map[string]struct{} // Deleted timers from last update.
pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info.
pendingTimerEventIDToID map[int64]string // User Timer Start Event ID -> User Timer ID.
updateTimerInfos map[string]*persistence.TimerInfo // Modified timers from last update.
deleteTimerInfos map[string]struct{} // Deleted timers from last update.

pendingChildExecutionInfoIDs map[int64]*persistence.ChildExecutionInfo // Initiated Event ID -> Child Execution Info
updateChildExecutionInfos map[*persistence.ChildExecutionInfo]struct{} // Modified ChildExecution Infos since last update
deleteChildExecutionInfos map[int64]struct{} // Deleted ChildExecution Infos since last update
pendingChildExecutionInfoIDs map[int64]*persistence.ChildExecutionInfo // Initiated Event ID -> Child Execution Info
updateChildExecutionInfos map[int64]*persistence.ChildExecutionInfo // Modified ChildExecution Infos since last update
deleteChildExecutionInfos map[int64]struct{} // Deleted ChildExecution Infos since last update

pendingRequestCancelInfoIDs map[int64]*persistence.RequestCancelInfo // Initiated Event ID -> RequestCancelInfo
updateRequestCancelInfos map[*persistence.RequestCancelInfo]struct{} // Modified RequestCancel Infos since last update, for persistence update
deleteRequestCancelInfos map[int64]struct{} // Deleted RequestCancel Infos since last update, for persistence update
pendingRequestCancelInfoIDs map[int64]*persistence.RequestCancelInfo // Initiated Event ID -> RequestCancelInfo
updateRequestCancelInfos map[int64]*persistence.RequestCancelInfo // Modified RequestCancel Infos since last update, for persistence update
deleteRequestCancelInfos map[int64]struct{} // Deleted RequestCancel Infos since last update, for persistence update

pendingSignalInfoIDs map[int64]*persistence.SignalInfo // Initiated Event ID -> SignalInfo
updateSignalInfos map[*persistence.SignalInfo]struct{} // Modified SignalInfo since last update
deleteSignalInfos map[int64]struct{} // Deleted SignalInfos since last update
pendingSignalInfoIDs map[int64]*persistence.SignalInfo // Initiated Event ID -> SignalInfo
updateSignalInfos map[int64]*persistence.SignalInfo // Modified SignalInfo since last update
deleteSignalInfos map[int64]struct{} // Deleted SignalInfos since last update

pendingSignalRequestedIDs map[string]struct{} // Set of signaled requestIds
updateSignalRequestedIDs map[string]struct{} // Set of signaled requestIds since last update
Expand Down Expand Up @@ -179,26 +179,26 @@ func newMutableStateBuilder(
domainEntry *cache.DomainCacheEntry,
) *mutableStateBuilder {
s := &mutableStateBuilder{
updateActivityInfos: make(map[*persistence.ActivityInfo]struct{}),
updateActivityInfos: make(map[int64]*persistence.ActivityInfo),
pendingActivityInfoIDs: make(map[int64]*persistence.ActivityInfo),
pendingActivityIDToEventID: make(map[string]int64),
deleteActivityInfos: make(map[int64]struct{}),
syncActivityTasks: make(map[int64]struct{}),

pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo),
pendingTimerEventIDToID: make(map[int64]string),
updateTimerInfos: make(map[*persistence.TimerInfo]struct{}),
updateTimerInfos: make(map[string]*persistence.TimerInfo),
deleteTimerInfos: make(map[string]struct{}),

updateChildExecutionInfos: make(map[*persistence.ChildExecutionInfo]struct{}),
updateChildExecutionInfos: make(map[int64]*persistence.ChildExecutionInfo),
pendingChildExecutionInfoIDs: make(map[int64]*persistence.ChildExecutionInfo),
deleteChildExecutionInfos: make(map[int64]struct{}),

updateRequestCancelInfos: make(map[*persistence.RequestCancelInfo]struct{}),
updateRequestCancelInfos: make(map[int64]*persistence.RequestCancelInfo),
pendingRequestCancelInfoIDs: make(map[int64]*persistence.RequestCancelInfo),
deleteRequestCancelInfos: make(map[int64]struct{}),

updateSignalInfos: make(map[*persistence.SignalInfo]struct{}),
updateSignalInfos: make(map[int64]*persistence.SignalInfo),
pendingSignalInfoIDs: make(map[int64]*persistence.SignalInfo),
deleteSignalInfos: make(map[int64]struct{}),

Expand Down Expand Up @@ -671,15 +671,15 @@ func (e *mutableStateBuilder) assignEventIDToBufferedEvents() {
scheduledIDToStartedID[scheduledID] = eventID
if ai, ok := e.GetActivityInfo(scheduledID); ok {
ai.StartedID = eventID
e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
}
case types.EventTypeChildWorkflowExecutionStarted:
attributes := event.ChildWorkflowExecutionStartedEventAttributes
initiatedID := attributes.GetInitiatedEventID()
scheduledIDToStartedID[initiatedID] = eventID
if ci, ok := e.GetChildExecutionInfo(initiatedID); ok {
ci.StartedID = eventID
e.updateChildExecutionInfos[ci] = struct{}{}
e.updateChildExecutionInfos[ci.InitiatedID] = ci
}
case types.EventTypeActivityTaskCompleted:
attributes := event.ActivityTaskCompletedEventAttributes
Expand Down Expand Up @@ -1202,6 +1202,7 @@ func (e *mutableStateBuilder) DeletePendingChildExecution(
e.logDataInconsistency()
}

delete(e.updateChildExecutionInfos, initiatedEventID)
e.deleteChildExecutionInfos[initiatedEventID] = struct{}{}
return nil
}
Expand All @@ -1222,6 +1223,7 @@ func (e *mutableStateBuilder) DeletePendingRequestCancel(
e.logDataInconsistency()
}

delete(e.updateRequestCancelInfos, initiatedEventID)
e.deleteRequestCancelInfos[initiatedEventID] = struct{}{}
return nil
}
Expand All @@ -1242,6 +1244,7 @@ func (e *mutableStateBuilder) DeletePendingSignal(
e.logDataInconsistency()
}

delete(e.updateSignalInfos, initiatedEventID)
e.deleteSignalInfos[initiatedEventID] = struct{}{}
return nil
}
Expand Down Expand Up @@ -1274,7 +1277,7 @@ func (e *mutableStateBuilder) UpdateActivityProgress(
ai.Version = e.GetCurrentVersion()
ai.Details = request.Details
ai.LastHeartBeatUpdatedTime = e.timeSource.Now()
e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
e.syncActivityTasks[ai.ScheduleID] = struct{}{}
}

Expand Down Expand Up @@ -1311,7 +1314,7 @@ func (e *mutableStateBuilder) ReplicateActivityInfo(
ai.TimerTaskStatus = TimerTaskStatusNone
}

e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
return nil
}

Expand All @@ -1329,7 +1332,7 @@ func (e *mutableStateBuilder) UpdateActivity(
}

e.pendingActivityInfoIDs[ai.ScheduleID] = ai
e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
return nil
}

Expand Down Expand Up @@ -1360,6 +1363,7 @@ func (e *mutableStateBuilder) DeleteActivity(
e.logDataInconsistency()
}

delete(e.updateActivityInfos, scheduleEventID)
e.deleteActivityInfos[scheduleEventID] = struct{}{}
return nil
}
Expand Down Expand Up @@ -1407,8 +1411,8 @@ func (e *mutableStateBuilder) UpdateUserTimer(
return ErrMissingTimerInfo
}

e.pendingTimerInfoIDs[timerID] = ti
e.updateTimerInfos[ti] = struct{}{}
e.pendingTimerInfoIDs[ti.TimerID] = ti
e.updateTimerInfos[ti.TimerID] = ti
return nil
}

Expand Down Expand Up @@ -1439,6 +1443,7 @@ func (e *mutableStateBuilder) DeleteUserTimer(
e.logDataInconsistency()
}

delete(e.updateTimerInfos, timerID)
e.deleteTimerInfos[timerID] = struct{}{}
return nil
}
Expand Down Expand Up @@ -1621,6 +1626,7 @@ func (e *mutableStateBuilder) DeleteSignalRequested(
) {

delete(e.pendingSignalRequestedIDs, requestID)
delete(e.updateSignalRequestedIDs, requestID)
e.deleteSignalRequestedIDs[requestID] = struct{}{}
}

Expand Down Expand Up @@ -2226,7 +2232,7 @@ func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent(

e.pendingActivityInfoIDs[scheduleEventID] = ai
e.pendingActivityIDToEventID[ai.ActivityID] = scheduleEventID
e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai

return ai, nil
}
Expand Down Expand Up @@ -2306,7 +2312,7 @@ func (e *mutableStateBuilder) ReplicateActivityTaskStartedEvent(
ai.RequestID = attributes.GetRequestID()
ai.StartedTime = time.Unix(0, event.GetTimestamp())
ai.LastHeartBeatUpdatedTime = ai.StartedTime
e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
return nil
}

Expand Down Expand Up @@ -2497,7 +2503,7 @@ func (e *mutableStateBuilder) ReplicateActivityTaskCancelRequestedEvent(
ai.CancelRequested = true

ai.CancelRequestID = event.GetEventID()
e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
return nil
}

Expand Down Expand Up @@ -2811,8 +2817,8 @@ func (e *mutableStateBuilder) ReplicateRequestCancelExternalWorkflowExecutionIni
CancelRequestID: cancelRequestID,
}

e.pendingRequestCancelInfoIDs[initiatedEventID] = rci
e.updateRequestCancelInfos[rci] = struct{}{}
e.pendingRequestCancelInfoIDs[rci.InitiatedID] = rci
e.updateRequestCancelInfos[rci.InitiatedID] = rci

return rci, nil
}
Expand Down Expand Up @@ -2938,8 +2944,8 @@ func (e *mutableStateBuilder) ReplicateSignalExternalWorkflowExecutionInitiatedE
Control: attributes.Control,
}

e.pendingSignalInfoIDs[initiatedEventID] = si
e.updateSignalInfos[si] = struct{}{}
e.pendingSignalInfoIDs[si.InitiatedID] = si
e.updateSignalInfos[si.InitiatedID] = si
return si, nil
}

Expand Down Expand Up @@ -3112,9 +3118,9 @@ func (e *mutableStateBuilder) ReplicateTimerStartedEvent(
TaskStatus: TimerTaskStatusNone,
}

e.pendingTimerInfoIDs[timerID] = ti
e.pendingTimerEventIDToID[event.GetEventID()] = timerID
e.updateTimerInfos[ti] = struct{}{}
e.pendingTimerInfoIDs[ti.TimerID] = ti
e.pendingTimerEventIDToID[ti.StartedID] = ti.TimerID
e.updateTimerInfos[ti.TimerID] = ti

return ti, nil
}
Expand Down Expand Up @@ -3471,8 +3477,8 @@ func (e *mutableStateBuilder) ReplicateStartChildWorkflowExecutionInitiatedEvent
ParentClosePolicy: attributes.GetParentClosePolicy(),
}

e.pendingChildExecutionInfoIDs[initiatedEventID] = ci
e.updateChildExecutionInfos[ci] = struct{}{}
e.pendingChildExecutionInfoIDs[ci.InitiatedID] = ci
e.updateChildExecutionInfos[ci.InitiatedID] = ci

return ci, nil
}
Expand Down Expand Up @@ -3517,7 +3523,7 @@ func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionStartedEvent(
ci, _ := e.GetChildExecutionInfo(initiatedID)
ci.StartedID = event.GetEventID()
ci.StartedRunID = attributes.GetWorkflowExecution().GetRunID()
e.updateChildExecutionInfos[ci] = struct{}{}
e.updateChildExecutionInfos[ci.InitiatedID] = ci

return nil
}
Expand Down Expand Up @@ -3825,7 +3831,7 @@ func (e *mutableStateBuilder) RetryActivity(
return false, err
}

e.updateActivityInfos[ai] = struct{}{}
e.updateActivityInfos[ai.ScheduleID] = ai
e.syncActivityTasks[ai.ScheduleID] = struct{}{}
return true, nil
}
Expand Down Expand Up @@ -4152,20 +4158,20 @@ func (e *mutableStateBuilder) cleanupTransaction(
// Clear all updates to prepare for the next session
e.hBuilder = NewHistoryBuilder(e, e.logger)

e.updateActivityInfos = make(map[*persistence.ActivityInfo]struct{})
e.updateActivityInfos = make(map[int64]*persistence.ActivityInfo)
e.deleteActivityInfos = make(map[int64]struct{})
e.syncActivityTasks = make(map[int64]struct{})

e.updateTimerInfos = make(map[*persistence.TimerInfo]struct{})
e.updateTimerInfos = make(map[string]*persistence.TimerInfo)
e.deleteTimerInfos = make(map[string]struct{})

e.updateChildExecutionInfos = make(map[*persistence.ChildExecutionInfo]struct{})
e.updateChildExecutionInfos = make(map[int64]*persistence.ChildExecutionInfo)
e.deleteChildExecutionInfos = make(map[int64]struct{})

e.updateRequestCancelInfos = make(map[*persistence.RequestCancelInfo]struct{})
e.updateRequestCancelInfos = make(map[int64]*persistence.RequestCancelInfo)
e.deleteRequestCancelInfos = make(map[int64]struct{})

e.updateSignalInfos = make(map[*persistence.SignalInfo]struct{})
e.updateSignalInfos = make(map[int64]*persistence.SignalInfo)
e.deleteSignalInfos = make(map[int64]struct{})

e.updateSignalRequestedIDs = make(map[string]struct{})
Expand Down
20 changes: 10 additions & 10 deletions service/history/execution/mutable_state_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func convertPendingActivityInfos(
}

func convertUpdateActivityInfos(
inputs map[*persistence.ActivityInfo]struct{},
inputs map[int64]*persistence.ActivityInfo,
) []*persistence.ActivityInfo {

outputs := make([]*persistence.ActivityInfo, 0, len(inputs))
for item := range inputs {
for _, item := range inputs {
outputs = append(outputs, item)
}
return outputs
Expand Down Expand Up @@ -113,11 +113,11 @@ func convertPendingTimerInfos(
}

func convertUpdateTimerInfos(
inputs map[*persistence.TimerInfo]struct{},
inputs map[string]*persistence.TimerInfo,
) []*persistence.TimerInfo {

outputs := make([]*persistence.TimerInfo, 0, len(inputs))
for item := range inputs {
for _, item := range inputs {
outputs = append(outputs, item)
}
return outputs
Expand Down Expand Up @@ -146,11 +146,11 @@ func convertPendingChildExecutionInfos(
}

func convertUpdateChildExecutionInfos(
inputs map[*persistence.ChildExecutionInfo]struct{},
inputs map[int64]*persistence.ChildExecutionInfo,
) []*persistence.ChildExecutionInfo {

outputs := make([]*persistence.ChildExecutionInfo, 0, len(inputs))
for item := range inputs {
for _, item := range inputs {
outputs = append(outputs, item)
}
return outputs
Expand All @@ -168,11 +168,11 @@ func convertPendingRequestCancelInfos(
}

func convertUpdateRequestCancelInfos(
inputs map[*persistence.RequestCancelInfo]struct{},
inputs map[int64]*persistence.RequestCancelInfo,
) []*persistence.RequestCancelInfo {

outputs := make([]*persistence.RequestCancelInfo, 0, len(inputs))
for item := range inputs {
for _, item := range inputs {
outputs = append(outputs, item)
}
return outputs
Expand All @@ -190,11 +190,11 @@ func convertPendingSignalInfos(
}

func convertUpdateSignalInfos(
inputs map[*persistence.SignalInfo]struct{},
inputs map[int64]*persistence.SignalInfo,
) []*persistence.SignalInfo {

outputs := make([]*persistence.SignalInfo, 0, len(inputs))
for item := range inputs {
for _, item := range inputs {
outputs = append(outputs, item)
}
return outputs
Expand Down

0 comments on commit deb0caf

Please sign in to comment.