-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert SDK to protocol/messages approach #1006
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only have pedantic stuff. This LGTM. I take comfort knowing this is experimental and therefore if we missed something internally here we can fix it. I think this should remain experimental until implemented in other languages.
@@ -292,6 +286,21 @@ func (s *scheduledSignal) handle(result *commonpb.Payloads, err error) { | |||
s.callback(result, err) | |||
} | |||
|
|||
func (wc *workflowEnvironmentImpl) drainOutbox(sink *[]*protocolpb.Message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meh, might as well just append outbox where you need and set outbox to nil where you need instead of slice pointers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm going to keep this one. Chasing private member variables from outside a type - even within a package - gives me The Fear.
func (weh *workflowExecutionEventHandlerImpl) protocolConstructorForMessage( | ||
msg *protocolpb.Message, | ||
) (func() protocol.Instance, error) { | ||
protoName, err := protocol.NameFromMessage(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may be a tad over-abstracted. You can just type switch on https://pkg.go.dev/github.com/gogo/protobuf/types#UnmarshalAny or if you're concerned w/ non-gogo types, a string switch on https://pkg.go.dev/github.com/gogo/protobuf/types#AnyMessageName is fine. Not sure you need a registry.
EDIT: After reading that, you basically do do that and registry is a different thing, so this can be mostly disregarded. However, I do want to caution over-abstracting the protocol/registry/etc concept with a single implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point. The saving grace here is that it's all internal. The reason I put the registry in place (it's not strictly needed - the Update protocol instance could be moved into the Couroutine and execute to completion there) was to leave some aspect of a blueprint for the next protocol to come along if I'm not the implementer.
// one and registers it under the instance ID indicated. | ||
func (r *Registry) FindOrAdd(instID string, ctor func() Instance) Instance { | ||
r.mut.Lock() | ||
defer r.mut.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Harmless, but probably locking not needed if inside the same workflow.
(arguably this trivial "protocol" code doesn't deserve an entire package)
internal/update.go
Outdated
@@ -0,0 +1,350 @@ | |||
package internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably dumb, but to match others I wonder if we want to rename the file internal_update.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh. You're probably right. I don't care for it but yeah.
internal/update.go
Outdated
func newUpdateProtocol( | ||
protoInstanceID string, | ||
scheduleUpdate func(name string, args *commonpb.Payloads, header *commonpb.Header, callbacks UpdateCallbacks), | ||
env updateEnv) *updateProtocol { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
env updateEnv) *updateProtocol { | |
env updateEnv, | |
) *updateProtocol { |
A bit clearer IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Not committing yours only because this whole thing is going to get wiped when the new upstream API comes in.
internal/update.go
Outdated
|
||
spawn(ctx, name, func(ctx Context) { | ||
envInterceptor := getWorkflowEnvironmentInterceptor(ctx) | ||
if !IsReplaying(ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder: We'll want a test in SDK features or somewhere to confirm that making a validation return failure on replay is ok because validator never called on replay. Not only am I unsure when this IsReplaying
is set, but it tells other SDKs they have to make sure to support no-validate-on-replay too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I tested this by hand and it does work in the way we want. Doing it in features will require some finesse but I agree that it's worth it from a pseudo-documentation perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing it in features will require some finesse
I think the best way may be to have a (bad) validator that increments some global counter and returns error if > 1, then after workflow complete, force a replay (e.g. query or something).
internal/update.go
Outdated
} | ||
input := UpdateInput{Name: name, Args: args} | ||
|
||
spawn(ctx, name, func(ctx Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder: We'll want SDK features tests for panic in validation handler and panic in update. Obviously with Go code they are the same (wft failure), but it forces other SDKs to acknowledge their unexpected-error paths too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. And FYI they're in the unit tests too.
Creates a protocol abstraction & registry and a first implementation of the same which is for update.
For review, NOT merge. I want to get eyes on this so that when we merge the API change upstream we're immediately ready with these changes. Ignore the go.mod/go.sum redirects and conflicts.
Some code moved around as I tried to consolidate update code into update.go and protocol tracking code into the internal/protocol package.