Skip to content

Commit

Permalink
Change registry Apis signature to return info interface (#1355)
Browse files Browse the repository at this point in the history
* Change registry Apis signature to return info interface

* fix typo in registry_test.go

* style: explicitly return nil
  • Loading branch information
ketsiambaku authored Jul 8, 2024
1 parent 6db0afa commit c032df0
Showing 17 changed files with 154 additions and 146 deletions.
3 changes: 3 additions & 0 deletions activity/activity.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,9 @@ type (

// RegisterOptions consists of options for registering an activity
RegisterOptions = internal.RegisterActivityOptions

// RegistryInfo
RegistryInfo = internal.RegistryActivityInfo
)

// ErrResultPending is returned from activity's implementation to indicate the activity is not completed when
7 changes: 7 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
@@ -35,9 +35,16 @@ import (
)

type (
// RegistryActivityInfo
RegistryActivityInfo interface {
ActivityType() ActivityType
GetFunction() interface{}
}

// ActivityType identifies a activity type.
ActivityType struct {
Name string
Path string
}

// ActivityInfo contains information about currently executing activity.
2 changes: 1 addition & 1 deletion internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
@@ -249,7 +249,7 @@ func createWorkflowTaskWithQueries(
copy(eventsCopy, events)
return &s.PollForDecisionTaskResponse{
PreviousStartedEventId: common.Int64Ptr(previousStartEventID),
WorkflowType: workflowTypePtr(WorkflowType{workflowName}),
WorkflowType: workflowTypePtr(WorkflowType{Name: workflowName}),
History: &s.History{Events: eventsCopy},
WorkflowExecution: &s.WorkflowExecution{
WorkflowId: common.StringPtr("fake-workflow-id"),
41 changes: 28 additions & 13 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
@@ -653,6 +653,7 @@ func decodeAndAssignValue(dc DataConverter, from interface{}, toValuePtr interfa
type workflowExecutor struct {
workflowType string
fn interface{}
path string
}

func (we *workflowExecutor) Execute(ctx Context, input []byte) ([]byte, error) {
@@ -677,15 +678,27 @@ func (we *workflowExecutor) Execute(ctx Context, input []byte) ([]byte, error) {
return serializeResults(we.fn, results, dataConverter)
}

func (we *workflowExecutor) WorkflowType() WorkflowType {
return WorkflowType{
Name: we.workflowType,
Path: we.path,
}
}

func (we *workflowExecutor) GetFunction() interface{} {
return we.fn
}

// Wrapper to execute activity functions.
type activityExecutor struct {
name string
fn interface{}
options RegisterActivityOptions
path string
}

func (ae *activityExecutor) ActivityType() ActivityType {
return ActivityType{Name: ae.name}
return ActivityType{Name: ae.name, Path: ae.path}
}

func (ae *activityExecutor) GetFunction() interface{} {
@@ -776,20 +789,22 @@ type aggregatedWorker struct {
registry *registry
}

func (aw *aggregatedWorker) GetRegisteredWorkflows() []string {
return aw.registry.GetRegisteredWorkflows()
}

func (aw *aggregatedWorker) GetWorkflowFunc(registerName string) (interface{}, bool) {
return aw.registry.GetWorkflowFunc(registerName)
}

func (aw *aggregatedWorker) GetRegisteredActivities() []string {
return aw.registry.GetRegisteredActivities()
func (aw *aggregatedWorker) GetRegisteredWorkflows() []RegistryWorkflowInfo {
workflows := aw.registry.GetRegisteredWorkflows()
var result []RegistryWorkflowInfo
for _, wf := range workflows {
result = append(result, wf)
}
return result
}

func (aw *aggregatedWorker) GetActivityFunc(registerName string) (interface{}, bool) {
return aw.registry.GetActivityFunc(registerName)
func (aw *aggregatedWorker) GetRegisteredActivities() []RegistryActivityInfo {
activities := aw.registry.getRegisteredActivities()
var result []RegistryActivityInfo
for _, a := range activities {
result = append(result, a)
}
return result
}

func (aw *aggregatedWorker) RegisterWorkflow(w interface{}) {
41 changes: 24 additions & 17 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
@@ -843,35 +843,42 @@ func TestRegisterVariousWorkflowTypes(t *testing.T) {
w.RegisterWorkflowWithOptions(testWorkflowReturnStructPtrPtr, RegisterWorkflowOptions{EnableShortName: true})

wfs := w.GetRegisteredWorkflows()
var wfNames []string
for _, wf := range wfs {
wfNames = append(wfNames, wf.WorkflowType().Name)
}
assert.Equal(t, 8, len(wfs))
assert.Contains(t, wfs, "testWorkflowSample")
assert.Contains(t, wfs, "testWorkflowMultipleArgs")
assert.Contains(t, wfs, "testWorkflowNoArgs")
assert.Contains(t, wfs, "testWorkflowReturnInt")
assert.Contains(t, wfs, "testWorkflowReturnString")
assert.Contains(t, wfs, "testWorkflowReturnString")
assert.Contains(t, wfs, "testWorkflowReturnStructPtr")
assert.Contains(t, wfs, "testWorkflowReturnStructPtrPtr")
assert.Contains(t, wfNames, "testWorkflowSample")
assert.Contains(t, wfNames, "testWorkflowMultipleArgs")
assert.Contains(t, wfNames, "testWorkflowNoArgs")
assert.Contains(t, wfNames, "testWorkflowReturnInt")
assert.Contains(t, wfNames, "testWorkflowReturnString")
assert.Contains(t, wfNames, "testWorkflowReturnString")
assert.Contains(t, wfNames, "testWorkflowReturnStructPtr")
assert.Contains(t, wfNames, "testWorkflowReturnStructPtrPtr")

// sample assertion on workflow func
fn, ok := w.GetWorkflowFunc("testWorkflowSample")
assert.True(t, ok)
assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind())
assert.Equal(t, getFunctionName(testWorkflowSample), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
var sampleFunc interface{}
for _, wf := range wfs {
if wf.WorkflowType().Name == "testWorkflowSample" {
sampleFunc = wf.GetFunction()
break
}
}
assert.Equal(t, getFunctionName(testWorkflowSample), runtime.FuncForPC(reflect.ValueOf(sampleFunc).Pointer()).Name())
}

func TestRegisterActivityWithOptions(t *testing.T) {
r := newRegistry()
w := &aggregatedWorker{registry: r}
w.RegisterActivityWithOptions(testActivityMultipleArgs, RegisterActivityOptions{EnableShortName: true})

wfs := w.GetRegisteredActivities()
assert.Equal(t, 1, len(wfs))
assert.Contains(t, wfs, "testActivityMultipleArgs")
a := w.GetRegisteredActivities()
assert.Equal(t, 1, len(a))
assert.Contains(t, a[0].ActivityType().Name, "testActivityMultipleArgs")

// assert activity function
fn, ok := w.GetActivityFunc("testActivityMultipleArgs")
assert.True(t, ok)
fn := a[0].GetFunction()
assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind())
assert.Equal(t, getFunctionName(testActivityMultipleArgs), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}
2 changes: 2 additions & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
@@ -98,6 +98,8 @@ type (
// Note that workflow.Context is used instead of context.Context to avoid use of raw channels.
workflow interface {
Execute(ctx Context, input []byte) (result []byte, err error)
WorkflowType() WorkflowType
GetFunction() interface{}
}

sendCallback struct {
26 changes: 14 additions & 12 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
@@ -1884,20 +1884,22 @@ func (env *testWorkflowEnvironmentImpl) ExecuteChildWorkflow(params executeWorkf
return env.executeChildWorkflowWithDelay(0, params, callback, startedHandler)
}

func (env *testWorkflowEnvironmentImpl) GetRegisteredWorkflows() []string {
return env.registry.GetRegisteredWorkflows()
}

func (env *testWorkflowEnvironmentImpl) GetWorkflowFunc(registerName string) (interface{}, bool) {
return env.registry.GetWorkflowFunc(registerName)
}

func (env *testWorkflowEnvironmentImpl) GetRegisteredActivities() []string {
return env.registry.GetRegisteredActivities()
func (env *testWorkflowEnvironmentImpl) GetRegisteredWorkflows() []RegistryWorkflowInfo {
workflows := env.registry.GetRegisteredWorkflows()
var result []RegistryWorkflowInfo
for _, wf := range workflows {
result = append(result, wf)
}
return result
}

func (env *testWorkflowEnvironmentImpl) GetActivityFunc(registerName string) (interface{}, bool) {
return env.registry.GetActivityFunc(registerName)
func (env *testWorkflowEnvironmentImpl) GetRegisteredActivities() []RegistryActivityInfo {
activities := env.registry.getRegisteredActivities()
var result []RegistryActivityInfo
for _, a := range activities {
result = append(result, a)
}
return result
}

func (env *testWorkflowEnvironmentImpl) executeChildWorkflowWithDelay(delayStart time.Duration, params executeWorkflowParams, callback resultHandler, startedHandler func(r WorkflowExecution, e error)) error {
57 changes: 26 additions & 31 deletions internal/registry.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ var globalRegistry *registry

func newRegistry() *registry {
return &registry{
workflowFuncMap: make(map[string]interface{}),
workflowFuncMap: make(map[string]workflow),
workflowAliasMap: make(map[string]string),
activityFuncMap: make(map[string]activity),
activityAliasMap: make(map[string]string),
@@ -50,7 +50,7 @@ func newRegistry() *registry {
func getGlobalRegistry() *registry {
once.Do(func() {
globalRegistry = &registry{
workflowFuncMap: make(map[string]interface{}),
workflowFuncMap: make(map[string]workflow),
workflowAliasMap: make(map[string]string),
activityFuncMap: make(map[string]activity),
activityAliasMap: make(map[string]string),
@@ -61,7 +61,7 @@ func getGlobalRegistry() *registry {

type registry struct {
sync.Mutex
workflowFuncMap map[string]interface{}
workflowFuncMap map[string]workflow
workflowAliasMap map[string]string
activityFuncMap map[string]activity
activityAliasMap map[string]string
@@ -100,7 +100,7 @@ func (r *registry) RegisterWorkflowWithOptions(
panic(fmt.Sprintf("workflow name \"%v\" is already registered", registerName))
}
}
r.workflowFuncMap[registerName] = wf
r.workflowFuncMap[registerName] = &workflowExecutor{registerName, wf, fnName}
if len(alias) > 0 || options.EnableShortName {
r.workflowAliasMap[fnName] = registerName
}
@@ -124,29 +124,18 @@ func (r *registry) RegisterActivityWithOptions(af interface{}, options RegisterA
}
}

func (r *registry) GetRegisteredWorkflows() []string {
return r.GetRegisteredWorkflowTypes()
}

func (r *registry) GetWorkflowFunc(registerName string) (interface{}, bool) {
return r.getWorkflowFn(registerName)
}

func (r *registry) GetRegisteredActivities() []string {
activities := r.getRegisteredActivities()
activityNames := make([]string, 0, len(activities))
for _, a := range activities {
activityNames = append(activityNames, a.ActivityType().Name)
func (r *registry) GetRegisteredWorkflows() []workflow {
r.Lock()
var result []workflow
for _, wf := range r.workflowFuncMap {
result = append(result, wf)
}
return activityNames
}

func (r *registry) GetActivityFunc(registerName string) (interface{}, bool) {
a, ok := r.GetActivity(registerName)
if !ok {
return nil, false
r.Unlock()
if r.next != nil {
nextWFs := r.next.GetRegisteredWorkflows()
result = append(result, nextWFs...)
}
return a.GetFunction(), ok
return result
}

func (r *registry) registerActivityFunction(af interface{}, options RegisterActivityOptions) error {
@@ -174,7 +163,7 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af, options}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af, options, fnName}
if len(alias) > 0 || options.EnableShortName {
r.activityAliasMap[fnName] = registerName
}
@@ -216,7 +205,7 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface(), options}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface(), options, methodName}
if len(structPrefix) > 0 || options.EnableShortName {
r.activityAliasMap[methodName] = registerName
}
@@ -248,24 +237,30 @@ func (r *registry) getWorkflowAlias(fnName string) (string, bool) {

func (r *registry) getWorkflowFn(fnName string) (interface{}, bool) {
r.Lock() // do not defer for Unlock to call next.getWorkflowFn without lock
fn, ok := r.workflowFuncMap[fnName]
wf, ok := r.workflowFuncMap[fnName]
if !ok { // if exact match is not found, check for backwards compatible name without -fm suffix
fn, ok = r.workflowFuncMap[strings.TrimSuffix(fnName, "-fm")]
wf, ok = r.workflowFuncMap[strings.TrimSuffix(fnName, "-fm")]
}
if !ok && r.next != nil {
r.Unlock()
return r.next.getWorkflowFn(fnName)
}
r.Unlock()
return fn, ok
if ok {
return wf.GetFunction(), ok
}
return nil, ok
}

func (r *registry) getWorkflowNoLock(registerName string) (interface{}, bool) {
a, ok := r.workflowFuncMap[registerName]
if !ok && r.next != nil {
return r.next.getWorkflowNoLock(registerName)
}
return a, ok
if ok {
return a.GetFunction(), ok
}
return nil, ok
}

func (r *registry) GetRegisteredWorkflowTypes() []string {
9 changes: 3 additions & 6 deletions internal/registry_test.go
Original file line number Diff line number Diff line change
@@ -105,16 +105,16 @@ func TestWorkflowRegistration(t *testing.T) {
tt.register(r)

// Verify registered workflow type
workflowType := r.GetRegisteredWorkflows()[0]
workflowType := r.GetRegisteredWorkflows()[0].WorkflowType().Name
require.Equal(t, tt.workflowType, workflowType)

// Verify workflow is resolved from workflow type
_, ok := r.GetWorkflowFunc(tt.workflowType)
_, ok := r.getWorkflowFn(tt.workflowType)
require.True(t, ok)

// Verify workflow is resolved from alternative (backwards compatible) workflow type
if len(tt.altWorkflowType) > 0 {
_, ok = r.GetWorkflowFunc(tt.altWorkflowType)
_, ok := r.getWorkflowFn(tt.altWorkflowType)
require.True(t, ok)
}

@@ -228,13 +228,10 @@ func TestActivityRegistration(t *testing.T) {
// Verify registered activity type
activityType := r.getRegisteredActivities()[0].ActivityType().Name
require.Equal(t, tt.activityType, activityType, "activity type")
require.Equal(t, tt.activityType, r.GetRegisteredActivities()[0])

// Verify activity is resolved from activity type
_, ok := r.GetActivity(tt.activityType)
require.True(t, ok)
_, ok = r.GetActivityFunc(tt.activityType)
require.True(t, ok)

// Verify activity is resolved from alternative (backwards compatible) activity type
if len(tt.altActivityType) > 0 {
7 changes: 7 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
@@ -389,9 +389,16 @@ type (
SignalChildWorkflow(ctx Context, signalName string, data interface{}) Future
}

// RegistryWorkflowInfo
RegistryWorkflowInfo interface {
WorkflowType() WorkflowType
GetFunction() interface{}
}

// WorkflowType identifies a workflow type.
WorkflowType struct {
Name string
Path string
}

// WorkflowExecution Details.
Loading

0 comments on commit c032df0

Please sign in to comment.