Skip to content

Commit

Permalink
Add new Engine API to accept multiple SyncActivity request
Browse files Browse the repository at this point in the history
  • Loading branch information
xwduan committed Nov 16, 2023
1 parent ebc3116 commit d7fbf91
Show file tree
Hide file tree
Showing 11 changed files with 2,464 additions and 609 deletions.
2,358 changes: 1,808 additions & 550 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,27 @@ message SyncActivityRequest {
string last_worker_identity = 13;
temporal.server.api.history.v1.VersionHistory version_history = 14;
temporal.server.api.workflow.v1.BaseExecutionInfo base_execution_info = 15;
}

message SyncActivitiesRequest {
string namespace_id = 1;
string workflow_id = 2;
string run_id = 3;
repeated ActivitySyncInfo activities_info = 4;
}
message ActivitySyncInfo {
int64 version = 1;
int64 scheduled_event_id = 2;
google.protobuf.Timestamp scheduled_time = 3 [(gogoproto.stdtime) = true];
int64 started_event_id = 4;
google.protobuf.Timestamp started_time = 5 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp last_heartbeat_time = 6 [(gogoproto.stdtime) = true];
temporal.api.common.v1.Payloads details = 7;
int32 attempt = 8;
temporal.api.failure.v1.Failure last_failure = 9;
string last_worker_identity = 10;
temporal.server.api.history.v1.VersionHistory version_history = 11;
temporal.server.api.workflow.v1.BaseExecutionInfo base_execution_info = 12;
}

message SyncActivityResponse {
Expand Down
7 changes: 7 additions & 0 deletions service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,13 @@ func (e *historyEngineImpl) SyncActivity(
return e.nDCActivityStateReplicator.SyncActivityState(ctx, request)
}

func (e *historyEngineImpl) SyncActivities(
ctx context.Context,
request *historyservice.SyncActivitiesRequest,
) (retError error) {
return e.nDCActivityStateReplicator.SyncActivitiesState(ctx, request)
}

// ReplicateWorkflowState is an experimental method to replicate workflow state. This should not expose outside of history service role.
func (e *historyEngineImpl) ReplicateWorkflowState(
ctx context.Context,
Expand Down
200 changes: 157 additions & 43 deletions service/history/ndc/activity_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type (
ctx context.Context,
request *historyservice.SyncActivityRequest,
) error
SyncActivitiesState(
ctx context.Context,
request *historyservice.SyncActivitiesRequest,
) error
}

ActivityStateReplicatorImpl struct {
Expand Down Expand Up @@ -126,64 +130,188 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
}
return err
}
applied, err := r.syncSingleActivityState(
&definition.WorkflowKey{
NamespaceID: request.NamespaceId,
WorkflowID: request.WorkflowId,
RunID: request.RunId,
},
mutableState,
&historyservice.ActivitySyncInfo{
Version: request.Version,
ScheduledEventId: request.ScheduledEventId,
ScheduledTime: request.ScheduledTime,
StartedEventId: request.StartedEventId,
StartedTime: request.StartedTime,
LastHeartbeatTime: request.LastHeartbeatTime,
Details: request.Details,
Attempt: request.Attempt,
LastFailure: request.LastFailure,
LastWorkerIdentity: request.LastWorkerIdentity,
VersionHistory: request.VersionHistory,
BaseExecutionInfo: request.BaseExecutionInfo,
},
)
if err != nil {
return err
}
if !applied {
return nil
}

scheduledEventID := request.GetScheduledEventId()
shouldApply, err := r.testVersionHistory(
updateMode := persistence.UpdateWorkflowModeUpdateCurrent
if state, _ := mutableState.GetWorkflowStateStatus(); state == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
updateMode = persistence.UpdateWorkflowModeBypassCurrent
}

return executionContext.UpdateWorkflowExecutionWithNew(
ctx,
r.shardContext,
updateMode,
nil, // no new workflow
nil, // no new workflow
workflow.TransactionPolicyPassive,
nil,
)
}

func (r *ActivityStateReplicatorImpl) SyncActivitiesState(
ctx context.Context,
request *historyservice.SyncActivitiesRequest,
) (retError error) {
// sync activity info will only be sent from active side, when
// 1. activity retry
// 2. activity start
// 3. activity heart beat
// no sync activity task will be sent when active side fail / timeout activity,
namespaceID := namespace.ID(request.GetNamespaceId())
execution := commonpb.WorkflowExecution{
WorkflowId: request.WorkflowId,
RunId: request.RunId,
}

executionContext, release, err := r.workflowCache.GetOrCreateWorkflowExecution(
ctx,
r.shardContext,
namespaceID,
execution.GetWorkflowId(),
execution.GetRunId(),
execution,
workflow.LockPriorityHigh,
)
if err != nil {
// for get workflow execution context, with valid run id
// err will not be of type EntityNotExistsError
return err
}
defer func() { release(retError) }()

mutableState, err := executionContext.LoadMutableState(ctx, r.shardContext)
if err != nil {
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
// this can happen if the workflow start event and this sync activity task are out of order
// or the target workflow is long gone
// the safe solution to this is to throw away the sync activity task
// or otherwise, worker attempt will exceed limit and put this message to DLQ
return nil
}
return err
}
anyEventApplied := false
for _, syncActivityInfo := range request.ActivitiesInfo {
applied, err := r.syncSingleActivityState(
&definition.WorkflowKey{
NamespaceID: request.NamespaceId,
WorkflowID: request.WorkflowId,
RunID: request.RunId,
},
mutableState,
syncActivityInfo,
)
if err != nil {
return err
}
if applied == true {
anyEventApplied = true
}
}
if !anyEventApplied {
return nil
}

updateMode := persistence.UpdateWorkflowModeUpdateCurrent
if state, _ := mutableState.GetWorkflowStateStatus(); state == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
updateMode = persistence.UpdateWorkflowModeBypassCurrent
}

return executionContext.UpdateWorkflowExecutionWithNew(
ctx,
r.shardContext,
updateMode,
nil, // no new workflow
nil, // no new workflow
workflow.TransactionPolicyPassive,
nil,
)
}

func (r *ActivityStateReplicatorImpl) syncSingleActivityState(
workflowKey *definition.WorkflowKey,
mutableState workflow.MutableState,
activitySyncInfo *historyservice.ActivitySyncInfo,
) (applied bool, retError error) {
scheduledEventID := activitySyncInfo.GetScheduledEventId()
shouldApply, err := r.testVersionHistory(
namespace.ID(workflowKey.NamespaceID),
workflowKey.WorkflowID,
workflowKey.RunID,
scheduledEventID,
mutableState,
request.GetVersionHistory(),
activitySyncInfo.GetVersionHistory(),
)
if err != nil || !shouldApply {
return err
return false, err
}

activityInfo, ok := mutableState.GetActivityInfo(scheduledEventID)
if !ok {
// this should not retry, can be caused by out of order delivery
// since the activity is already finished
return nil
return false, nil
}
if shouldApply := r.testActivity(
request.GetVersion(),
request.GetAttempt(),
timestamp.TimeValue(request.GetLastHeartbeatTime()),
activitySyncInfo.GetVersion(),
activitySyncInfo.GetAttempt(),
timestamp.TimeValue(activitySyncInfo.GetLastHeartbeatTime()),
activityInfo,
); !shouldApply {
return nil
return false, nil
}

// sync activity with empty started ID means activity retry
eventTime := timestamp.TimeValue(request.GetScheduledTime())
if request.StartedEventId == common.EmptyEventID && request.Attempt > activityInfo.GetAttempt() {
eventTime := timestamp.TimeValue(activitySyncInfo.GetScheduledTime())
if activitySyncInfo.StartedEventId == common.EmptyEventID && activitySyncInfo.Attempt > activityInfo.GetAttempt() {
mutableState.AddTasks(&tasks.ActivityRetryTimerTask{
WorkflowKey: definition.WorkflowKey{
NamespaceID: request.GetNamespaceId(),
WorkflowID: request.GetWorkflowId(),
RunID: request.GetRunId(),
},
WorkflowKey: *workflowKey,
VisibilityTimestamp: eventTime,
EventID: request.GetScheduledEventId(),
Version: request.GetVersion(),
Attempt: request.GetAttempt(),
EventID: activitySyncInfo.GetScheduledEventId(),
Version: activitySyncInfo.GetVersion(),
Attempt: activitySyncInfo.GetAttempt(),
})
}

refreshTask := r.testRefreshActivityTimerTaskMask(
request.GetVersion(),
request.GetAttempt(),
activitySyncInfo.GetVersion(),
activitySyncInfo.GetAttempt(),
activityInfo,
)
err = mutableState.ReplicateActivityInfo(request, refreshTask)
err = mutableState.ReplicateActivityInfo(activitySyncInfo, refreshTask)
if err != nil {
return err
return false, err
}

// Todo: improve efficiency by calculating once all activities are replicated in mutableState
// see whether we need to refresh the activity timer
startedTime := timestamp.TimeValue(request.GetStartedTime())
lastHeartbeatTime := timestamp.TimeValue(request.GetLastHeartbeatTime())
startedTime := timestamp.TimeValue(activitySyncInfo.GetStartedTime())
lastHeartbeatTime := timestamp.TimeValue(activitySyncInfo.GetLastHeartbeatTime())
if eventTime.Before(startedTime) {
eventTime = startedTime
}
Expand All @@ -195,23 +323,9 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
if _, err := workflow.NewTimerSequence(
mutableState,
).CreateNextActivityTimer(); err != nil {
return err
}

updateMode := persistence.UpdateWorkflowModeUpdateCurrent
if state, _ := mutableState.GetWorkflowStateStatus(); state == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
updateMode = persistence.UpdateWorkflowModeBypassCurrent
return false, err
}

return executionContext.UpdateWorkflowExecutionWithNew(
ctx,
r.shardContext,
updateMode,
nil, // no new workflow
nil, // no new workflow
workflow.TransactionPolicyPassive,
nil,
)
return true, nil
}

func (r *ActivityStateReplicatorImpl) testRefreshActivityTimerTaskMask(
Expand Down
14 changes: 14 additions & 0 deletions service/history/ndc/activity_state_replicator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d7fbf91

Please sign in to comment.