Skip to content

Commit

Permalink
Make sure queries see the right id
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 4, 2024
1 parent 2885d7a commit a33c6d3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
1 change: 0 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,6 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
// No Operation
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId()
println("Set to ", weh.workflowInfo.lastCompletedBuildID)
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
weh.commandsHelper.handleActivityTaskScheduled(
event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId())
Expand Down
59 changes: 48 additions & 11 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ type (
next []*historypb.HistoryEvent
nextFlags []sdkFlag
binaryChecksum string
lastBuildID string
sdkVersion string
sdkName string
}
Expand All @@ -200,6 +199,9 @@ type (
binaryChecksum string
sdkVersion string
sdkName string
buildID string
// true if the workflow execution finishes in this task
hasTerminalEvent bool
}

finishedTask struct {
Expand Down Expand Up @@ -329,6 +331,20 @@ func isCommandEvent(eventType enumspb.EventType) bool {
}
}

func isTerminalWorkflowEvent(eventType enumspb.EventType) bool {
switch eventType {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
return true
default:
return false
}
}

// nextTask returns the next task to be processed.
func (eh *history) nextTask() (*preparedTask, error) {
if eh.next == nil {
Expand All @@ -350,6 +366,8 @@ func (eh *history) nextTask() (*preparedTask, error) {

var markers []*historypb.HistoryEvent
var msgs []*protocolpb.Message
var buildID string
var hasTerminalEvent bool
if len(result) > 0 {
nextTaskEvents, err := eh.prepareTask()
if err != nil {
Expand All @@ -361,15 +379,19 @@ func (eh *history) nextTask() (*preparedTask, error) {
eh.sdkVersion = nextTaskEvents.sdkVersion
markers = nextTaskEvents.markers
msgs = nextTaskEvents.msgs
buildID = nextTaskEvents.buildID
hasTerminalEvent = nextTaskEvents.hasTerminalEvent
}
return &preparedTask{
events: result,
markers: markers,
flags: sdkFlags,
msgs: msgs,
binaryChecksum: checksum,
sdkName: sdkName,
sdkVersion: sdkVersion,
events: result,
markers: markers,
flags: sdkFlags,
msgs: msgs,
binaryChecksum: checksum,
sdkName: sdkName,
sdkVersion: sdkVersion,
buildID: buildID,
hasTerminalEvent: hasTerminalEvent,
}, nil
}

Expand Down Expand Up @@ -459,7 +481,12 @@ OrderEvents:
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
// Skip
default:
if isPreloadMarkerEvent(event) {
if isTerminalWorkflowEvent(event.GetEventType()) {
taskEvents.hasTerminalEvent = true
} else if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
taskEvents.buildID = event.GetWorkflowTaskCompletedEventAttributes().
GetWorkerVersion().GetBuildId()
} else if isPreloadMarkerEvent(event) {
taskEvents.markers = append(taskEvents.markers, event)
} else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil {
taskEvents.msgs = append(taskEvents.msgs, inferMessage(attrs))
Expand Down Expand Up @@ -958,6 +985,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
var replayOutbox []outboxEntry
var replayCommands []*commandpb.Command
var respondEvents []*historypb.HistoryEvent
finalBuildID := ""

taskMessages := workflowTask.task.GetMessages()
skipReplayCheck := w.skipReplayCheck()
Expand Down Expand Up @@ -1018,6 +1046,9 @@ ProcessEvents:
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
if nextTask.hasTerminalEvent && nextTask.buildID != "" {
finalBuildID = nextTask.buildID
}
// Reset the mutable side effect markers recorded
eventHandler.mutableSideEffectsRecorded = nil
// Markers are from the events that are produced from the current workflow task.
Expand Down Expand Up @@ -1135,6 +1166,11 @@ ProcessEvents:
}
}

// If we saw that this is the last task of the execution, we need to use the build id from that
// before query handling.
if finalBuildID != "" {
w.workflowInfo.lastCompletedBuildID = finalBuildID
}
return w.applyWorkflowPanicPolicy(workflowTask, workflowError)
}

Expand Down Expand Up @@ -1301,9 +1337,10 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl

completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities)
w.clearCurrentTask()
// We need to set the last completed BuildID to our own ID when completing a task, because we
// might service a query before we ever process any events again, and we won't have had a chance
// to update the last completed BuildID - even though it has changed to be our build id.
if w.wth.workerBuildID != "" {
println("EXISTING", w.workflowInfo.lastCompletedBuildID)
println("SETTING ON COMPLETE TO ", w.wth.workerBuildID)
w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID
}

Expand Down

0 comments on commit a33c6d3

Please sign in to comment.