Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change registry Apis signature to return info interface #1355

Merged
merged 3 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type (

// RegisterOptions consists of options for registering an activity
RegisterOptions = internal.RegisterActivityOptions

// RegistryInfo
RegistryInfo = internal.RegistryActivityInfo
)

// ErrResultPending is returned from activity's implementation to indicate the activity is not completed when
Expand Down
7 changes: 7 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ import (
)

type (
// RegistryActivityInfo
RegistryActivityInfo interface {
ActivityType() ActivityType
GetFunction() interface{}
}

// ActivityType identifies a activity type.
ActivityType struct {
Name string
Path string
}

// ActivityInfo contains information about currently executing activity.
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func createWorkflowTaskWithQueries(
copy(eventsCopy, events)
return &s.PollForDecisionTaskResponse{
PreviousStartedEventId: common.Int64Ptr(previousStartEventID),
WorkflowType: workflowTypePtr(WorkflowType{workflowName}),
WorkflowType: workflowTypePtr(WorkflowType{Name: workflowName}),
History: &s.History{Events: eventsCopy},
WorkflowExecution: &s.WorkflowExecution{
WorkflowId: common.StringPtr("fake-workflow-id"),
Expand Down
41 changes: 28 additions & 13 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ func decodeAndAssignValue(dc DataConverter, from interface{}, toValuePtr interfa
type workflowExecutor struct {
workflowType string
fn interface{}
path string
}

func (we *workflowExecutor) Execute(ctx Context, input []byte) ([]byte, error) {
Expand All @@ -677,15 +678,27 @@ func (we *workflowExecutor) Execute(ctx Context, input []byte) ([]byte, error) {
return serializeResults(we.fn, results, dataConverter)
}

func (we *workflowExecutor) WorkflowType() WorkflowType {
return WorkflowType{
Name: we.workflowType,
Path: we.path,
}
}

func (we *workflowExecutor) GetFunction() interface{} {
return we.fn
}

// Wrapper to execute activity functions.
type activityExecutor struct {
name string
fn interface{}
options RegisterActivityOptions
path string
}

func (ae *activityExecutor) ActivityType() ActivityType {
return ActivityType{Name: ae.name}
return ActivityType{Name: ae.name, Path: ae.path}
}

func (ae *activityExecutor) GetFunction() interface{} {
Expand Down Expand Up @@ -776,20 +789,22 @@ type aggregatedWorker struct {
registry *registry
}

func (aw *aggregatedWorker) GetRegisteredWorkflows() []string {
return aw.registry.GetRegisteredWorkflows()
}

func (aw *aggregatedWorker) GetWorkflowFunc(registerName string) (interface{}, bool) {
return aw.registry.GetWorkflowFunc(registerName)
}

func (aw *aggregatedWorker) GetRegisteredActivities() []string {
return aw.registry.GetRegisteredActivities()
func (aw *aggregatedWorker) GetRegisteredWorkflows() []RegistryWorkflowInfo {
workflows := aw.registry.GetRegisteredWorkflows()
var result []RegistryWorkflowInfo
for _, wf := range workflows {
result = append(result, wf)
}
return result
}

func (aw *aggregatedWorker) GetActivityFunc(registerName string) (interface{}, bool) {
return aw.registry.GetActivityFunc(registerName)
func (aw *aggregatedWorker) GetRegisteredActivities() []RegistryActivityInfo {
activities := aw.registry.getRegisteredActivities()
var result []RegistryActivityInfo
for _, a := range activities {
result = append(result, a)
}
return result
}

func (aw *aggregatedWorker) RegisterWorkflow(w interface{}) {
Expand Down
41 changes: 24 additions & 17 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,35 +843,42 @@ func TestRegisterVariousWorkflowTypes(t *testing.T) {
w.RegisterWorkflowWithOptions(testWorkflowReturnStructPtrPtr, RegisterWorkflowOptions{EnableShortName: true})

wfs := w.GetRegisteredWorkflows()
var wfNames []string
for _, wf := range wfs {
wfNames = append(wfNames, wf.WorkflowType().Name)
}
assert.Equal(t, 8, len(wfs))
assert.Contains(t, wfs, "testWorkflowSample")
assert.Contains(t, wfs, "testWorkflowMultipleArgs")
assert.Contains(t, wfs, "testWorkflowNoArgs")
assert.Contains(t, wfs, "testWorkflowReturnInt")
assert.Contains(t, wfs, "testWorkflowReturnString")
assert.Contains(t, wfs, "testWorkflowReturnString")
assert.Contains(t, wfs, "testWorkflowReturnStructPtr")
assert.Contains(t, wfs, "testWorkflowReturnStructPtrPtr")
assert.Contains(t, wfNames, "testWorkflowSample")
assert.Contains(t, wfNames, "testWorkflowMultipleArgs")
assert.Contains(t, wfNames, "testWorkflowNoArgs")
assert.Contains(t, wfNames, "testWorkflowReturnInt")
assert.Contains(t, wfNames, "testWorkflowReturnString")
assert.Contains(t, wfNames, "testWorkflowReturnString")
assert.Contains(t, wfNames, "testWorkflowReturnStructPtr")
assert.Contains(t, wfNames, "testWorkflowReturnStructPtrPtr")

// sample assertion on workflow func
fn, ok := w.GetWorkflowFunc("testWorkflowSample")
assert.True(t, ok)
assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind())
assert.Equal(t, getFunctionName(testWorkflowSample), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
var sampleFunc interface{}
for _, wf := range wfs {
if wf.WorkflowType().Name == "testWorkflowSample" {
sampleFunc = wf.GetFunction()
break
}
}
assert.Equal(t, getFunctionName(testWorkflowSample), runtime.FuncForPC(reflect.ValueOf(sampleFunc).Pointer()).Name())
}

func TestRegisterActivityWithOptions(t *testing.T) {
r := newRegistry()
w := &aggregatedWorker{registry: r}
w.RegisterActivityWithOptions(testActivityMultipleArgs, RegisterActivityOptions{EnableShortName: true})

wfs := w.GetRegisteredActivities()
assert.Equal(t, 1, len(wfs))
assert.Contains(t, wfs, "testActivityMultipleArgs")
a := w.GetRegisteredActivities()
assert.Equal(t, 1, len(a))
assert.Contains(t, a[0].ActivityType().Name, "testActivityMultipleArgs")

// assert activity function
fn, ok := w.GetActivityFunc("testActivityMultipleArgs")
assert.True(t, ok)
fn := a[0].GetFunction()
assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind())
assert.Equal(t, getFunctionName(testActivityMultipleArgs), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type (
// Note that workflow.Context is used instead of context.Context to avoid use of raw channels.
workflow interface {
Execute(ctx Context, input []byte) (result []byte, err error)
WorkflowType() WorkflowType
GetFunction() interface{}
}

sendCallback struct {
Expand Down
26 changes: 14 additions & 12 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1884,20 +1884,22 @@ func (env *testWorkflowEnvironmentImpl) ExecuteChildWorkflow(params executeWorkf
return env.executeChildWorkflowWithDelay(0, params, callback, startedHandler)
}

func (env *testWorkflowEnvironmentImpl) GetRegisteredWorkflows() []string {
return env.registry.GetRegisteredWorkflows()
}

func (env *testWorkflowEnvironmentImpl) GetWorkflowFunc(registerName string) (interface{}, bool) {
return env.registry.GetWorkflowFunc(registerName)
}

func (env *testWorkflowEnvironmentImpl) GetRegisteredActivities() []string {
return env.registry.GetRegisteredActivities()
func (env *testWorkflowEnvironmentImpl) GetRegisteredWorkflows() []RegistryWorkflowInfo {
workflows := env.registry.GetRegisteredWorkflows()
var result []RegistryWorkflowInfo
for _, wf := range workflows {
result = append(result, wf)
}
return result
}

func (env *testWorkflowEnvironmentImpl) GetActivityFunc(registerName string) (interface{}, bool) {
return env.registry.GetActivityFunc(registerName)
func (env *testWorkflowEnvironmentImpl) GetRegisteredActivities() []RegistryActivityInfo {
activities := env.registry.getRegisteredActivities()
var result []RegistryActivityInfo
for _, a := range activities {
result = append(result, a)
}
return result
}

func (env *testWorkflowEnvironmentImpl) executeChildWorkflowWithDelay(delayStart time.Duration, params executeWorkflowParams, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) error {
Expand Down
57 changes: 26 additions & 31 deletions internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var globalRegistry *registry

func newRegistry() *registry {
return &registry{
workflowFuncMap: make(map[string]interface{}),
workflowFuncMap: make(map[string]workflow),
workflowAliasMap: make(map[string]string),
activityFuncMap: make(map[string]activity),
activityAliasMap: make(map[string]string),
Expand All @@ -50,7 +50,7 @@ func newRegistry() *registry {
func getGlobalRegistry() *registry {
once.Do(func() {
globalRegistry = &registry{
workflowFuncMap: make(map[string]interface{}),
workflowFuncMap: make(map[string]workflow),
workflowAliasMap: make(map[string]string),
activityFuncMap: make(map[string]activity),
activityAliasMap: make(map[string]string),
Expand All @@ -61,7 +61,7 @@ func getGlobalRegistry() *registry {

type registry struct {
sync.Mutex
workflowFuncMap map[string]interface{}
workflowFuncMap map[string]workflow
workflowAliasMap map[string]string
activityFuncMap map[string]activity
activityAliasMap map[string]string
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *registry) RegisterWorkflowWithOptions(
panic(fmt.Sprintf("workflow name \"%v\" is already registered", registerName))
}
}
r.workflowFuncMap[registerName] = wf
r.workflowFuncMap[registerName] = &workflowExecutor{registerName, wf, fnName}
if len(alias) > 0 || options.EnableShortName {
r.workflowAliasMap[fnName] = registerName
}
Expand All @@ -124,29 +124,18 @@ func (r *registry) RegisterActivityWithOptions(af interface{}, options RegisterA
}
}

func (r *registry) GetRegisteredWorkflows() []string {
return r.GetRegisteredWorkflowTypes()
}

func (r *registry) GetWorkflowFunc(registerName string) (interface{}, bool) {
return r.getWorkflowFn(registerName)
}

func (r *registry) GetRegisteredActivities() []string {
activities := r.getRegisteredActivities()
activityNames := make([]string, 0, len(activities))
for _, a := range activities {
activityNames = append(activityNames, a.ActivityType().Name)
func (r *registry) GetRegisteredWorkflows() []workflow {
r.Lock()
var result []workflow
for _, wf := range r.workflowFuncMap {
result = append(result, wf)
}
return activityNames
}

func (r *registry) GetActivityFunc(registerName string) (interface{}, bool) {
a, ok := r.GetActivity(registerName)
if !ok {
return nil, false
r.Unlock()
if r.next != nil {
nextWFs := r.next.GetRegisteredWorkflows()
result = append(result, nextWFs...)
}
return a.GetFunction(), ok
return result
}

func (r *registry) registerActivityFunction(af interface{}, options RegisterActivityOptions) error {
Expand Down Expand Up @@ -174,7 +163,7 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af, options}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af, options, fnName}
if len(alias) > 0 || options.EnableShortName {
r.activityAliasMap[fnName] = registerName
}
Expand Down Expand Up @@ -216,7 +205,7 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface(), options}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface(), options, methodName}
if len(structPrefix) > 0 || options.EnableShortName {
r.activityAliasMap[methodName] = registerName
}
Expand Down Expand Up @@ -248,24 +237,30 @@ func (r *registry) getWorkflowAlias(fnName string) (string, bool) {

func (r *registry) getWorkflowFn(fnName string) (interface{}, bool) {
r.Lock() // do not defer for Unlock to call next.getWorkflowFn without lock
fn, ok := r.workflowFuncMap[fnName]
wf, ok := r.workflowFuncMap[fnName]
if !ok { // if exact match is not found, check for backwards compatible name without -fm suffix
fn, ok = r.workflowFuncMap[strings.TrimSuffix(fnName, "-fm")]
wf, ok = r.workflowFuncMap[strings.TrimSuffix(fnName, "-fm")]
}
if !ok && r.next != nil {
r.Unlock()
return r.next.getWorkflowFn(fnName)
}
r.Unlock()
return fn, ok
if ok {
return wf.GetFunction(), ok
}
return nil, ok
}

func (r *registry) getWorkflowNoLock(registerName string) (interface{}, bool) {
a, ok := r.workflowFuncMap[registerName]
if !ok && r.next != nil {
return r.next.getWorkflowNoLock(registerName)
}
return a, ok
if ok {
return a.GetFunction(), ok
}
return nil, ok
}

func (r *registry) GetRegisteredWorkflowTypes() []string {
Expand Down
9 changes: 3 additions & 6 deletions internal/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ func TestWorkflowRegistration(t *testing.T) {
tt.register(r)

// Verify registered workflow type
workflowType := r.GetRegisteredWorkflows()[0]
workflowType := r.GetRegisteredWorkflows()[0].WorkflowType().Name
require.Equal(t, tt.workflowType, workflowType)

// Verify workflow is resolved from workflow type
_, ok := r.GetWorkflowFunc(tt.workflowType)
_, ok := r.getWorkflowFn(tt.workflowType)
require.True(t, ok)

// Verify workflow is resolved from alternative (backwards compatible) workflow type
if len(tt.altWorkflowType) > 0 {
_, ok = r.GetWorkflowFunc(tt.altWorkflowType)
_, ok := r.getWorkflowFn(tt.altWorkflowType)
require.True(t, ok)
}

Expand Down Expand Up @@ -228,13 +228,10 @@ func TestActivityRegistration(t *testing.T) {
// Verify registered activity type
activityType := r.getRegisteredActivities()[0].ActivityType().Name
require.Equal(t, tt.activityType, activityType, "activity type")
require.Equal(t, tt.activityType, r.GetRegisteredActivities()[0])

// Verify activity is resolved from activity type
_, ok := r.GetActivity(tt.activityType)
require.True(t, ok)
_, ok = r.GetActivityFunc(tt.activityType)
require.True(t, ok)

// Verify activity is resolved from alternative (backwards compatible) activity type
if len(tt.altActivityType) > 0 {
Expand Down
7 changes: 7 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,16 @@ type (
SignalChildWorkflow(ctx Context, signalName string, data interface{}) Future
}

// RegistryWorkflowInfo
RegistryWorkflowInfo interface {
WorkflowType() WorkflowType
GetFunction() interface{}
}

// WorkflowType identifies a workflow type.
WorkflowType struct {
Name string
Path string
}

// WorkflowExecution Details.
Expand Down
Loading
Loading