Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deliver messages even when events are skipped #1015

Merged
merged 2 commits into from
Jan 30, 2023
Merged

Deliver messages even when events are skipped #1015

merged 2 commits into from
Jan 30, 2023

Conversation

mmcshane
Copy link
Contributor

What was changed

Here we switch to delivering all messages dependent on events IDs less than or equal to the current event ID.

Why?

We don't run WFTScheduledEvent instances through standard event handling so we were missing messages dependent on events of that type

Checklist

  1. Closes

  2. How was this tested:

Unit & features

  1. Any docs updates needed?

@mmcshane mmcshane requested a review from a team as a code owner January 28, 2023 03:16
We don't run WFTScheduledEvent instances through standard event handling
so we were missing messages dependent on events of that type. Here we
switch to delivering all messages dependent on events IDs less than or
equal to the current event ID.
@@ -841,7 +828,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
var replayCommands []*commandpb.Command
var respondEvents []*historypb.HistoryEvent

msgs := indexMessages(workflowTask)
msgs := indexMessagesByEventID(workflowTask.task.GetMessages())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it have value to ask the server to always deliver these in event ID order?

Also, is the number of these so great that you really need to sort because of fear of the cost of takeLTE? I think you can just make takeLTE iterate over all messages every time and https://github.com/golang/go/wiki/SliceTricks#filter-in-place to keep the ones greater than while extracting the ones less than or equal.

So something like this (untested, just typed here in comments):

func (emi *eventMsgIndex) takeLTE(eventID int64) (out []*protocolpb.Message) {
	n := 0
	for _, msg := range *emi {
		if msg.GetEventID() > eventID {
			(*emi)[n] = msg
			n++
		} else {
			out = append(out, msg)
		}
	}
	*emi = *emi[:n]
	return;
}

Copy link
Contributor Author

@mmcshane mmcshane Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with doing a little more work in takeLTE. I would expect the number of messages per WFT to be low (which to be fair, also makes sorting cheap) but there may be a substantial number when we rebundle into a bigger WFT for replay. I'd like to preserve the transport's ability to reorder messages until we learn more about what is desirable at that layer.

Do a little more work in the takeLTE function to avoid the up front
work of sorting.
@mmcshane mmcshane merged commit 1b39ac9 into temporalio:master Jan 30, 2023
@mmcshane mmcshane deleted the mpm/msg-indexing branch January 30, 2023 16:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants