From 8fb72640f407ae7fd4a3a6e6f96f8b757bfdba49 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 29 Jan 2024 12:55:47 -0600 Subject: [PATCH] More search attribute work --- internal/internal_event_handlers.go | 78 +------ internal/internal_schedule_client.go | 54 +++-- internal/internal_search_attributes.go | 212 ++++++++++++++++++-- internal/internal_search_attributes_test.go | 14 +- internal/internal_workflow_client.go | 65 +----- internal/internal_workflow_client_test.go | 10 +- internal/schedule_client.go | 27 ++- internal/workflow_testsuite.go | 2 +- temporal/search_attributes.go | 45 +++-- test/integration_test.go | 88 +++++++- test/test_utils_test.go | 9 +- test/workflow_test.go | 4 +- workflow/workflow.go | 20 ++ 13 files changed, 411 insertions(+), 217 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 3c5e9e30b..05d0176e5 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -391,80 +391,6 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo { return wc.workflowInfo } -func getIndexValue(payload *commonpb.Payload) enumspb.IndexedValueType { - return enumspb.IndexedValueType(enumspb.IndexedValueType_value[string(payload.GetMetadata()["type"][:])]) -} - -func convertToTypeSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes { - updates := make([]SearchAttributeUpdate, 0, len(attributes)) - for key, payload := range attributes { - if payload.Data == nil { - continue - } - switch index := getIndexValue(payload); index { - case enumspb.INDEXED_VALUE_TYPE_BOOL: - attr := NewSearchAttributeKeyBool(key) - var value bool - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - case enumspb.INDEXED_VALUE_TYPE_KEYWORD: - attr := NewSearchAttributeKeyword(key) - var value string - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - case enumspb.INDEXED_VALUE_TYPE_TEXT: - attr := NewSearchAttributeKeyString(key) - var value string - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - case enumspb.INDEXED_VALUE_TYPE_INT: - attr := NewSearchAttributeKeyInt64(key) - var value int64 - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - case enumspb.INDEXED_VALUE_TYPE_DOUBLE: - attr := NewSearchAttributeKeyFloat64(key) - var value float64 - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - case enumspb.INDEXED_VALUE_TYPE_DATETIME: - attr := NewSearchAttributeKeyTime(key) - var value time.Time - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST: - attr := NewSearchAttributeKeywordList(key) - var value []string - err := converter.GetDefaultDataConverter().FromPayload(payload, &value) - if err != nil { - panic(err) - } - updates = append(updates, attr.ValueSet(value)) - default: - logger.Warn("Unrecognized indexed value type on search attribute key", "key", key, "index", index) - } - } - return NewSearchAttributes(updates...) -} - func (wc *workflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes { return convertToTypeSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields()) } @@ -547,7 +473,7 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c if len(attributes) == 0 { return nil, errSearchAttributesNotSet } - attr, err := serializeSearchAttributes(attributes) + attr, err := serializeUntypedSearchAttributes(attributes) if err != nil { return nil, err } @@ -620,7 +546,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( callback(nil, err) return } - searchAttr, err := serializeSearchAttributes(params.SearchAttributes) + searchAttr, err := serializeUntypedSearchAttributes(params.SearchAttributes) if err != nil { if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) { startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{}) diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index 7a91d464d..776c98e15 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -40,6 +40,7 @@ import ( workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" ) type ( @@ -96,7 +97,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche return nil, err } - searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes) + searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes) if err != nil { return nil, err } @@ -275,7 +276,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc if err != nil { return err } - scheduleDescription, err := scheduleDescriptionFromPB(describeResponse) + scheduleDescription, err := scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse) if err != nil { return err } @@ -314,7 +315,7 @@ func (scheduleHandle *scheduleHandleImpl) Describe(ctx context.Context) (*Schedu if err != nil { return nil, err } - return scheduleDescriptionFromPB(describeResponse) + return scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse) } func (scheduleHandle *scheduleHandleImpl) Trigger(ctx context.Context, options ScheduleTriggerOptions) error { @@ -454,7 +455,10 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS } } -func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeScheduleResponse) (*ScheduleDescription, error) { +func scheduleDescriptionFromPB( + logger log.Logger, + describeResponse *workflowservice.DescribeScheduleResponse, +) (*ScheduleDescription, error) { if describeResponse == nil { return nil, nil } @@ -474,7 +478,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul nextActionTimes[i] = t.AsTime() } - actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action) + actionDescription, err := convertFromPBScheduleAction(logger, describeResponse.Schedule.Action) if err != nil { return nil, err } @@ -560,7 +564,11 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch } } -func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) { +func convertToPBScheduleAction( + ctx context.Context, + client *WorkflowClient, + scheduleAction ScheduleAction, +) (*schedulepb.ScheduleAction, error) { switch action := scheduleAction.(type) { case *ScheduleWorkflowAction: // Set header before interceptor run @@ -590,10 +598,19 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche return nil, err } - searchAttr, err := serializeSearchAttributes(action.SearchAttributes) + searchAttrs, err := serializeSearchAttributes(nil, action.TypedSearchAttributes) if err != nil { return nil, err } + // Add any untyped search attributes that aren't already there + for k, v := range action.UntypedSearchAttributes { + if searchAttrs.GetIndexedFields()[k] == nil { + if searchAttrs == nil || searchAttrs.IndexedFields == nil { + searchAttrs = &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}} + } + searchAttrs.IndexedFields[k] = v + } + } // get workflow headers from the context header, err := headerPropagated(ctx, client.contextPropagators) @@ -613,7 +630,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche WorkflowTaskTimeout: durationpb.New(action.WorkflowTaskTimeout), RetryPolicy: convertToPBRetryPolicy(action.RetryPolicy), Memo: memo, - SearchAttributes: searchAttr, + SearchAttributes: searchAttrs, Header: header, }, }, @@ -624,7 +641,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche } } -func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAction, error) { +func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleAction) (ScheduleAction, error) { switch action := action.Action.(type) { case *schedulepb.ScheduleAction_StartWorkflow: workflow := action.StartWorkflow @@ -639,9 +656,19 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct memos[key] = element } - searchAttributes := make(map[string]interface{}) - for key, element := range workflow.GetSearchAttributes().GetIndexedFields() { - searchAttributes[key] = element + searchAttrs := convertToTypeSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields()) + // Create untyped list for any attribute not in the existing list + untypedSearchAttrs := map[string]*commonpb.Payload{} + for k, v := range workflow.GetSearchAttributes().GetIndexedFields() { + var inTyped bool + for typedKey := range searchAttrs.untypedValue { + if inTyped = typedKey.GetName() == k; inTyped { + break + } + } + if !inTyped { + untypedSearchAttrs[k] = v + } } return &ScheduleWorkflowAction{ @@ -654,7 +681,8 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(), RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy), Memo: memos, - SearchAttributes: searchAttributes, + TypedSearchAttributes: searchAttrs, + UntypedSearchAttributes: untypedSearchAttrs, }, nil default: // TODO maybe just panic instead? diff --git a/internal/internal_search_attributes.go b/internal/internal_search_attributes.go index 155bbffd7..b78723eaf 100644 --- a/internal/internal_search_attributes.go +++ b/internal/internal_search_attributes.go @@ -25,10 +25,14 @@ package internal import ( + "fmt" "reflect" "time" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" ) type ( @@ -56,50 +60,53 @@ type ( reflectType reflect.Type } - // SearchAttributeKeyString represents a search attribute key for a text attribute type + // SearchAttributeKeyString represents a search attribute key for a text attribute type. SearchAttributeKeyString struct { baseSearchAttributeKey } - // SearchAttributeKeyString represents a search attribute key for a keyword attribute type - SearchAttributeKeyword struct { + // SearchAttributeKeyKeyword represents a search attribute key for a keyword attribute type. + SearchAttributeKeyKeyword struct { baseSearchAttributeKey } - // SearchAttributeKeyBool represents a search attribute key for a boolean attribute type + // SearchAttributeKeyBool represents a search attribute key for a boolean attribute type. SearchAttributeKeyBool struct { baseSearchAttributeKey } - // SearchAttributeKeyInt64 represents a search attribute key for a integer attribute type + // SearchAttributeKeyInt64 represents a search attribute key for a integer attribute type. SearchAttributeKeyInt64 struct { baseSearchAttributeKey } - // SearchAttributeKeyFloat64 represents a search attribute key for a float attribute type + // SearchAttributeKeyFloat64 represents a search attribute key for a float attribute type. SearchAttributeKeyFloat64 struct { baseSearchAttributeKey } - // SearchAttributeKeyTime represents a search attribute key for a date time attribute type + // SearchAttributeKeyTime represents a search attribute key for a date time attribute type. SearchAttributeKeyTime struct { baseSearchAttributeKey } - // SearchAttributeKeywordList represents a search attribute key for a list of keyword attribute type - SearchAttributeKeywordList struct { + // SearchAttributeKeyKeywordList represents a search attribute key for a list of keyword attribute type. + SearchAttributeKeyKeywordList struct { baseSearchAttributeKey } ) +// GetName of the search attribute. func (bk baseSearchAttributeKey) GetName() string { return bk.name } +// GetValueType of the search attribute. func (bk baseSearchAttributeKey) GetValueType() enumspb.IndexedValueType { return bk.valueType } +// GetReflectType of the search attribute. func (bk baseSearchAttributeKey) GetReflectType() reflect.Type { return bk.reflectType } @@ -114,20 +121,22 @@ func NewSearchAttributeKeyString(name string) SearchAttributeKeyString { } } +// ValueSet creates an update to set the value of the attribute. func (k SearchAttributeKeyString) ValueSet(value string) SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = value } } +// ValueUnset creates an update to remove the attribute. func (k SearchAttributeKeyString) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil } } -func NewSearchAttributeKeyword(name string) SearchAttributeKeyword { - return SearchAttributeKeyword{ +func NewSearchAttributeKeyKeyword(name string) SearchAttributeKeyKeyword { + return SearchAttributeKeyKeyword{ baseSearchAttributeKey: baseSearchAttributeKey{ name: name, valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD, @@ -136,13 +145,15 @@ func NewSearchAttributeKeyword(name string) SearchAttributeKeyword { } } -func (k SearchAttributeKeyword) ValueSet(value string) SearchAttributeUpdate { +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyKeyword) ValueSet(value string) SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = value } } -func (k SearchAttributeKeyword) ValueUnset() SearchAttributeUpdate { +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyKeyword) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil } @@ -158,12 +169,14 @@ func NewSearchAttributeKeyBool(name string) SearchAttributeKeyBool { } } +// ValueSet creates an update to set the value of the attribute. func (k SearchAttributeKeyBool) ValueSet(value bool) SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = value } } +// ValueUnset creates an update to remove the attribute. func (k SearchAttributeKeyBool) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil @@ -180,12 +193,14 @@ func NewSearchAttributeKeyInt64(name string) SearchAttributeKeyInt64 { } } +// ValueSet creates an update to set the value of the attribute. func (k SearchAttributeKeyInt64) ValueSet(value int64) SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = value } } +// ValueUnset creates an update to remove the attribute. func (k SearchAttributeKeyInt64) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil @@ -202,12 +217,14 @@ func NewSearchAttributeKeyFloat64(name string) SearchAttributeKeyFloat64 { } } +// ValueSet creates an update to set the value of the attribute. func (k SearchAttributeKeyFloat64) ValueSet(value float64) SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = value } } +// ValueUnset creates an update to remove the attribute. func (k SearchAttributeKeyFloat64) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil @@ -224,20 +241,22 @@ func NewSearchAttributeKeyTime(name string) SearchAttributeKeyTime { } } +// ValueSet creates an update to set the value of the attribute. func (k SearchAttributeKeyTime) ValueSet(value time.Time) SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = value } } +// ValueUnset creates an update to remove the attribute. func (k SearchAttributeKeyTime) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil } } -func NewSearchAttributeKeywordList(name string) SearchAttributeKeywordList { - return SearchAttributeKeywordList{ +func NewSearchAttributeKeyKeywordList(name string) SearchAttributeKeyKeywordList { + return SearchAttributeKeyKeywordList{ baseSearchAttributeKey: baseSearchAttributeKey{ name: name, valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, @@ -246,14 +265,16 @@ func NewSearchAttributeKeywordList(name string) SearchAttributeKeywordList { } } -func (k SearchAttributeKeywordList) ValueSet(values []string) SearchAttributeUpdate { +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyKeywordList) ValueSet(values []string) SearchAttributeUpdate { listCopy := append([]string(nil), values...) return func(sa *SearchAttributes) { sa.untypedValue[k] = listCopy } } -func (k SearchAttributeKeywordList) ValueUnset() SearchAttributeUpdate { +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyKeywordList) ValueUnset() SearchAttributeUpdate { return func(sa *SearchAttributes) { sa.untypedValue[k] = nil } @@ -275,6 +296,7 @@ func NewSearchAttributes(attributes ...SearchAttributeUpdate) SearchAttributes { return sa } +// GetString gets a value for the given key and whether it was present. func (sa *SearchAttributes) GetString(key SearchAttributeKeyString) (string, bool) { value, ok := sa.untypedValue[key] if !ok { @@ -283,7 +305,8 @@ func (sa *SearchAttributes) GetString(key SearchAttributeKeyString) (string, boo return value.(string), true } -func (sa *SearchAttributes) GetKeyword(key SearchAttributeKeyword) (string, bool) { +// GetKeyword gets a value for the given key and whether it was present. +func (sa *SearchAttributes) GetKeyword(key SearchAttributeKeyKeyword) (string, bool) { value, ok := sa.untypedValue[key] if !ok { return "", false @@ -291,6 +314,7 @@ func (sa *SearchAttributes) GetKeyword(key SearchAttributeKeyword) (string, bool return value.(string), true } +// GetBool gets a value for the given key and whether it was present. func (sa *SearchAttributes) GetBool(key SearchAttributeKeyBool) (bool, bool) { value, ok := sa.untypedValue[key] if !ok { @@ -299,7 +323,8 @@ func (sa *SearchAttributes) GetBool(key SearchAttributeKeyBool) (bool, bool) { return value.(bool), true } -func (sa *SearchAttributes) GetInt(key SearchAttributeKeyInt64) (int64, bool) { +// GetInt64 gets a value for the given key and whether it was present. +func (sa *SearchAttributes) GetInt64(key SearchAttributeKeyInt64) (int64, bool) { value, ok := sa.untypedValue[key] if !ok { return 0, false @@ -307,7 +332,8 @@ func (sa *SearchAttributes) GetInt(key SearchAttributeKeyInt64) (int64, bool) { return value.(int64), true } -func (sa *SearchAttributes) GetFloat(key SearchAttributeKeyFloat64) (float64, bool) { +// GetFloat64 gets a value for the given key and whether it was present. +func (sa *SearchAttributes) GetFloat64(key SearchAttributeKeyFloat64) (float64, bool) { value, ok := sa.untypedValue[key] if !ok { return 0.0, false @@ -315,6 +341,7 @@ func (sa *SearchAttributes) GetFloat(key SearchAttributeKeyFloat64) (float64, bo return value.(float64), true } +// GetTime gets a value for the given key and whether it was present. func (sa *SearchAttributes) GetTime(key SearchAttributeKeyTime) (time.Time, bool) { value, ok := sa.untypedValue[key] if !ok { @@ -323,7 +350,8 @@ func (sa *SearchAttributes) GetTime(key SearchAttributeKeyTime) (time.Time, bool return value.(time.Time), true } -func (sa *SearchAttributes) GetKeywordList(key SearchAttributeKeywordList) ([]string, bool) { +// GetKeywordList gets a value for the given key and whether it was present. +func (sa *SearchAttributes) GetKeywordList(key SearchAttributeKeyKeywordList) ([]string, bool) { value, ok := sa.untypedValue[key] if !ok { return nil, false @@ -333,15 +361,18 @@ func (sa *SearchAttributes) GetKeywordList(key SearchAttributeKeywordList) ([]st return append([]string(nil), result...), true } +// ContainsKey gets whether a key is present. func (sa *SearchAttributes) ContainsKey(key SearchAttributeKey) bool { _, ok := sa.untypedValue[key] return ok } +// Size gets the size of the attribute collection. func (sa *SearchAttributes) Size() int { return len(sa.untypedValue) } +// GetUntypedValues gets a copy of the collection with raw types. func (sa *SearchAttributes) GetUntypedValues() map[SearchAttributeKey]interface{} { untypedValueCopy := make(map[SearchAttributeKey]interface{}, len(sa.untypedValue)) for key, value := range sa.untypedValue { @@ -355,6 +386,7 @@ func (sa *SearchAttributes) GetUntypedValues() map[SearchAttributeKey]interface{ return untypedValueCopy } +// Copy creates an update that copies existing values. func (sa *SearchAttributes) Copy() SearchAttributeUpdate { return func(s *SearchAttributes) { untypedValues := sa.GetUntypedValues() @@ -363,3 +395,141 @@ func (sa *SearchAttributes) Copy() SearchAttributeUpdate { } } } + +func serializeUntypedSearchAttributes(input map[string]interface{}) (*commonpb.SearchAttributes, error) { + if input == nil { + return nil, nil + } + + attr := make(map[string]*commonpb.Payload) + for k, v := range input { + // If search attribute value is already of Payload type, then use it directly. + // This allows to copy search attributes from workflow info to child workflow options. + if vp, ok := v.(*commonpb.Payload); ok { + attr[k] = vp + continue + } + var err error + attr[k], err = converter.GetDefaultDataConverter().ToPayload(v) + if err != nil { + return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) + } + } + return &commonpb.SearchAttributes{IndexedFields: attr}, nil +} + +func serializeTypedSearchAttributes(searchAttributes map[SearchAttributeKey]interface{}) (*commonpb.SearchAttributes, error) { + if searchAttributes == nil { + return nil, nil + } + + serializedAttr := make(map[string]*commonpb.Payload) + for k, v := range searchAttributes { + payload, err := converter.GetDefaultDataConverter().ToPayload(v) + if err != nil { + return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) + } + // Server does not remove search attributes if they set a type + if payload.GetData() != nil { + payload.Metadata["type"] = []byte(enumspb.IndexedValueType_name[int32(k.GetValueType())]) + } + serializedAttr[k.GetName()] = payload + } + return &commonpb.SearchAttributes{IndexedFields: serializedAttr}, nil +} + +func serializeSearchAttributes( + untypedAttributes map[string]interface{}, + typedAttributes SearchAttributes, +) (*commonpb.SearchAttributes, error) { + var searchAttr *commonpb.SearchAttributes + var err error + if untypedAttributes != nil && typedAttributes.Size() != 0 { + return nil, fmt.Errorf("cannot specify both SearchAttributes and TypedSearchAttributes") + } else if untypedAttributes != nil { + searchAttr, err = serializeUntypedSearchAttributes(untypedAttributes) + if err != nil { + return nil, err + } + } else if typedAttributes.Size() != 0 { + searchAttr, err = serializeTypedSearchAttributes(typedAttributes.GetUntypedValues()) + if err != nil { + return nil, err + } + } + return searchAttr, nil +} + +func getIndexValue(payload *commonpb.Payload) enumspb.IndexedValueType { + return enumspb.IndexedValueType(enumspb.IndexedValueType_value[string(payload.GetMetadata()["type"][:])]) +} + +func convertToTypeSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes { + updates := make([]SearchAttributeUpdate, 0, len(attributes)) + for key, payload := range attributes { + if payload.Data == nil { + continue + } + switch index := getIndexValue(payload); index { + case enumspb.INDEXED_VALUE_TYPE_BOOL: + attr := NewSearchAttributeKeyBool(key) + var value bool + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_KEYWORD: + attr := NewSearchAttributeKeyKeyword(key) + var value string + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_TEXT: + attr := NewSearchAttributeKeyString(key) + var value string + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_INT: + attr := NewSearchAttributeKeyInt64(key) + var value int64 + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_DOUBLE: + attr := NewSearchAttributeKeyFloat64(key) + var value float64 + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_DATETIME: + attr := NewSearchAttributeKeyTime(key) + var value time.Time + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST: + attr := NewSearchAttributeKeyKeywordList(key) + var value []string + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + default: + logger.Warn("Unrecognized indexed value type on search attribute key", "key", key, "index", index) + } + } + return NewSearchAttributes(updates...) +} diff --git a/internal/internal_search_attributes_test.go b/internal/internal_search_attributes_test.go index cb6be58d1..eaf5ec37a 100644 --- a/internal/internal_search_attributes_test.go +++ b/internal/internal_search_attributes_test.go @@ -46,11 +46,11 @@ func TestSearchAttributes(t *testing.T) { require.Equal(t, 0, sa.Size()) stringKey := NewSearchAttributeKeyString("stringKey") - keywordKey := NewSearchAttributeKeyword("keywordKey") + keywordKey := NewSearchAttributeKeyKeyword("keywordKey") intKey := NewSearchAttributeKeyInt64("intKey") floatKey := NewSearchAttributeKeyFloat64("floatKey") timeKey := NewSearchAttributeKeyTime("timeKey") - keywordListKey := NewSearchAttributeKeywordList("keywordListKey") + keywordListKey := NewSearchAttributeKeyKeywordList("keywordListKey") now := time.Now() sa = NewSearchAttributes( @@ -78,12 +78,12 @@ func TestSearchAttributes(t *testing.T) { require.True(t, ok) require.True(t, sa.ContainsKey(intKey)) - intValue, ok := sa.GetInt(intKey) + intValue, ok := sa.GetInt64(intKey) require.Equal(t, int64(10), intValue) require.True(t, ok) require.True(t, sa.ContainsKey(floatKey)) - floatValue, ok := sa.GetFloat(floatKey) + floatValue, ok := sa.GetFloat64(floatKey) require.Equal(t, float64(5.4), floatValue) require.True(t, ok) @@ -134,9 +134,9 @@ func TestSearchAttributes(t *testing.T) { require.Equal(t, 7, sa.Size()) } -func TestSearchAttributesKeyWordList(t *testing.T) { +func TestSearchAttributesKeywordList(t *testing.T) { t.Parallel() - kw := NewSearchAttributeKeywordList("keywordList") + kw := NewSearchAttributeKeyKeywordList("keywordList") kv := []string{"keyword1", "keyword2", "keyword3"} sa := NewSearchAttributes(kw.ValueSet(kv)) // Modify the list and verify it doesn't change the search attribute @@ -156,7 +156,7 @@ func TestSearchAttributesKeyWordList(t *testing.T) { func TestSearchAttributesDeepCopy(t *testing.T) { t.Parallel() key1 := NewSearchAttributeKeyString("stringKey1") - key2 := NewSearchAttributeKeywordList("keywordList") + key2 := NewSearchAttributeKeyKeywordList("keywordList") keywordListValue := []string{"keyword1", "keyword2", "keyword3"} sa := NewSearchAttributes( key1.ValueSet("value"), diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 1d0cc29c5..fcbb1919f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -1450,67 +1450,6 @@ func getWorkflowMemo(input map[string]interface{}, dc converter.DataConverter) ( return &commonpb.Memo{Fields: memo}, nil } -func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAttributes, error) { - if input == nil { - return nil, nil - } - - attr := make(map[string]*commonpb.Payload) - for k, v := range input { - // If search attribute value is already of Payload type, then use it directly. - // This allows to copy search attributes from workflow info to child workflow options. - if vp, ok := v.(*commonpb.Payload); ok { - attr[k] = vp - continue - } - var err error - attr[k], err = converter.GetDefaultDataConverter().ToPayload(v) - if err != nil { - return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) - } - } - return &commonpb.SearchAttributes{IndexedFields: attr}, nil -} - -func serializeTypedSearchAttributes(searchAttributes map[SearchAttributeKey]interface{}) (*commonpb.SearchAttributes, error) { - if searchAttributes == nil { - return nil, nil - } - - serializedAttr := make(map[string]*commonpb.Payload) - for k, v := range searchAttributes { - payload, err := converter.GetDefaultDataConverter().ToPayload(v) - if err != nil { - return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) - } - // Server does not remove search attributes if they set a type - if payload.GetData() != nil { - payload.Metadata["type"] = []byte(enumspb.IndexedValueType_name[int32(k.GetValueType())]) - } - serializedAttr[k.GetName()] = payload - } - return &commonpb.SearchAttributes{IndexedFields: serializedAttr}, nil -} - -func GetSearchAttributes(untypedAttributes map[string]interface{}, typedAttributes SearchAttributes) (*commonpb.SearchAttributes, error) { - var searchAttr *commonpb.SearchAttributes - var err error - if untypedAttributes != nil && typedAttributes.Size() != 0 { - return nil, fmt.Errorf("cannot specify both SearchAttributes and TypedSearchAttributes") - } else if untypedAttributes != nil { - searchAttr, err = serializeSearchAttributes(untypedAttributes) - if err != nil { - return nil, err - } - } else if typedAttributes.Size() != 0 { - searchAttr, err = serializeTypedSearchAttributes(typedAttributes.GetUntypedValues()) - if err != nil { - return nil, err - } - } - return searchAttr, nil -} - type workflowClientInterceptor struct { client *WorkflowClient } @@ -1545,7 +1484,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( return nil, err } - searchAttr, err := GetSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes) + searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes) if err != nil { return nil, err } @@ -1686,7 +1625,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow( return nil, err } - searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes) + searchAttr, err := serializeUntypedSearchAttributes(in.Options.SearchAttributes) if err != nil { return nil, err } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 9b3c059c8..ef3e5f322 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1475,12 +1475,12 @@ func (s *workflowClientTestSuite) TestGetWorkflowMemo() { func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { var input1 map[string]interface{} - result1, err := serializeSearchAttributes(input1) + result1, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.Nil(result1) input1 = make(map[string]interface{}) - result2, err := serializeSearchAttributes(input1) + result2, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.NotNil(result2) s.Equal(0, len(result2.IndexedFields)) @@ -1488,7 +1488,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { input1 = map[string]interface{}{ "t1": "v1", } - result3, err := serializeSearchAttributes(input1) + result3, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.NotNil(result3) s.Equal(1, len(result3.IndexedFields)) @@ -1502,7 +1502,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { input1 = map[string]interface{}{ "payload": p, } - result4, err := serializeSearchAttributes(input1) + result4, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.NotNil(result3) s.Equal(1, len(result3.IndexedFields)) @@ -1512,7 +1512,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { input1 = map[string]interface{}{ "non-serializable": make(chan int), } - _, err = serializeSearchAttributes(input1) + _, err = serializeUntypedSearchAttributes(input1) s.Error(err) } diff --git a/internal/schedule_client.go b/internal/schedule_client.go index 7a262f2e5..aaaf5c500 100644 --- a/internal/schedule_client.go +++ b/internal/schedule_client.go @@ -269,13 +269,18 @@ type ( // On ScheduleHandle.Describe() or ScheduleHandle.Update() Memo will be returned as *commonpb.Payload. Memo map[string]interface{} - // SearchAttributes - Optional indexed info that can be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on Temporal server side. - // Use GetSearchAttributes API to get valid key and corresponding value type. - // On ScheduleHandle.Describe() or ScheduleHandle.Update() SearchAttributes will be returned as *commonpb.Payload. - // For supported operations on different server versions see [Visibility]. + // TypedSearchAttributes - Optional indexed info that can be used in query of List/Scan/Count workflow APIs. The key + // and value type must be registered on Temporal server side. For supported operations on different server versions + // see [Visibility]. // // [Visibility]: https://docs.temporal.io/visibility - SearchAttributes map[string]interface{} + TypedSearchAttributes SearchAttributes + + // UntypedSearchAttributes - These are set upon update for older schedules that did not have typed attributes. This + // should never be used for create. + // + // Deprecated - This is only for update of older search attributes. This may be removed in a future version. + UntypedSearchAttributes map[string]*commonpb.Payload } // ScheduleOptions configure the parameters for creating a schedule. @@ -344,8 +349,20 @@ type ( // Use GetSearchAttributes API to get valid key and corresponding value type. // For supported operations on different server versions see [Visibility]. // + // Deprecated: use TypedSearchAttributes instead. + // // [Visibility]: https://docs.temporal.io/visibility SearchAttributes map[string]interface{} + + // TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are + // additional indexed information attributed to workflow and used for search and visibility. The search attributes + // can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal + // server side. For supported operations on different server versions see [Visibility]. + // + // Optional: default to none. + // + // [Visibility]: https://docs.temporal.io/visibility + TypedSearchAttributes SearchAttributes } // ScheduleWorkflowExecution contains details on a workflows execution stared by a schedule. diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 7d0f6f125..c8be840ce 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -918,7 +918,7 @@ func (e *TestWorkflowEnvironment) SetMemoOnStart(memo map[string]interface{}) er // SetSearchAttributesOnStart sets the search attributes when start workflow. func (e *TestWorkflowEnvironment) SetSearchAttributesOnStart(searchAttributes map[string]interface{}) error { - attr, err := serializeSearchAttributes(searchAttributes) + attr, err := serializeUntypedSearchAttributes(searchAttributes) if err != nil { return err } diff --git a/temporal/search_attributes.go b/temporal/search_attributes.go index 3b250fb36..25c32d3c6 100644 --- a/temporal/search_attributes.go +++ b/temporal/search_attributes.go @@ -27,65 +27,80 @@ package temporal import "go.temporal.io/sdk/internal" type ( - // SearchAttributes represents a collection of typed search attributes + // SearchAttributes represents a collection of typed search attributes. Create with [NewSearchAttributes]. SearchAttributes = internal.SearchAttributes - // SearchAttributesUpdate represents a change to SearchAttributes + // SearchAttributesUpdate represents a change to SearchAttributes. SearchAttributeUpdate = internal.SearchAttributeUpdate // SearchAttributeKey represents a typed search attribute key. SearchAttributeKey = internal.SearchAttributeKey - // SearchAttributeKeyString represents a search attribute key for a text attribute type + // SearchAttributeKeyString represents a search attribute key for a text attribute type. Create with + // [NewSearchAttributeKeyString]. SearchAttributeKeyString = internal.SearchAttributeKeyString - // SearchAttributeKeyString represents a search attribute key for a keyword attribute type - SearchAttributeKeyword = internal.SearchAttributeKeyword + // SearchAttributeKeyKeyword represents a search attribute key for a keyword attribute type. Create with + // [NewSearchAttributeKeyKeyword]. + SearchAttributeKeyKeyword = internal.SearchAttributeKeyKeyword - // SearchAttributeKeyBool represents a search attribute key for a boolean attribute type + // SearchAttributeKeyBool represents a search attribute key for a boolean attribute type. Create with + // [NewSearchAttributeKeyBool]. SearchAttributeKeyBool = internal.SearchAttributeKeyBool - // SearchAttributeKeyInt64 represents a search attribute key for a integer attribute type + // SearchAttributeKeyInt64 represents a search attribute key for a integer attribute type. Create with + // [NewSearchAttributeKeyInt64]. SearchAttributeKeyInt64 = internal.SearchAttributeKeyInt64 - // SearchAttributeKeyFloat64 represents a search attribute key for a double attribute type + // SearchAttributeKeyFloat64 represents a search attribute key for a double attribute type. Create with + // [NewSearchAttributeKeyFloat64]. SearchAttributeKeyFloat64 = internal.SearchAttributeKeyFloat64 - // SearchAttributeKeyTime represents a search attribute key for a time attribute type + // SearchAttributeKeyTime represents a search attribute key for a time attribute type. Create with + // [NewSearchAttributeKeyTime]. SearchAttributeKeyTime = internal.SearchAttributeKeyTime - // SearchAttributeKeywordList represents a search attribute key for a keyword list attribute type - SearchAttributeKeywordList = internal.SearchAttributeKeywordList + // SearchAttributeKeyKeywordList represents a search attribute key for a keyword list attribute type. Create with + // [NewSearchAttributeKeyKeywordList]. + SearchAttributeKeyKeywordList = internal.SearchAttributeKeyKeywordList ) +// NewSearchAttributeKeyString creates a new string-based key. func NewSearchAttributeKeyString(name string) SearchAttributeKeyString { return internal.NewSearchAttributeKeyString(name) } -func NewSearchAttributeKeyword(name string) SearchAttributeKeyword { - return internal.NewSearchAttributeKeyword(name) +// NewSearchAttributeKeyKeyword creates a new keyword-based key. +func NewSearchAttributeKeyKeyword(name string) SearchAttributeKeyKeyword { + return internal.NewSearchAttributeKeyKeyword(name) } +// NewSearchAttributeKeyBool creates a new bool-based key. func NewSearchAttributeKeyBool(name string) SearchAttributeKeyBool { return internal.NewSearchAttributeKeyBool(name) } +// NewSearchAttributeKeyInt64 creates a new int64-based key. func NewSearchAttributeKeyInt64(name string) SearchAttributeKeyInt64 { return internal.NewSearchAttributeKeyInt64(name) } +// NewSearchAttributeKeyFloat64 creates a new float64-based key. func NewSearchAttributeKeyFloat64(name string) SearchAttributeKeyFloat64 { return internal.NewSearchAttributeKeyFloat64(name) } +// NewSearchAttributeKeyTime creates a new time-based key. func NewSearchAttributeKeyTime(name string) SearchAttributeKeyTime { return internal.NewSearchAttributeKeyTime(name) } -func NewSearchAttributeKeywordList(name string) SearchAttributeKeywordList { - return internal.NewSearchAttributeKeywordList(name) +// NewSearchAttributeKeyKeywordList creates a new keyword-list-based key. +func NewSearchAttributeKeyKeywordList(name string) SearchAttributeKeyKeywordList { + return internal.NewSearchAttributeKeyKeywordList(name) } +// NewSearchAttributes creates a new search attribute collection for the given updates. func NewSearchAttributes(attributes ...SearchAttributeUpdate) SearchAttributes { return internal.NewSearchAttributes(attributes...) } diff --git a/test/integration_test.go b/test/integration_test.go index 0b3457624..10eaec42a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -3381,7 +3381,7 @@ func (ts *IntegrationTestSuite) TestUpsertMemoWithExistingMemo() { ts.Equal(expectedMemo, memo) } -func (ts *IntegrationTestSuite) createBasicScheduleWorkflowAction(ID string, workflow interface{}) client.ScheduleAction { +func (ts *IntegrationTestSuite) createBasicScheduleWorkflowAction(ID string, workflow interface{}) *client.ScheduleWorkflowAction { return &client.ScheduleWorkflowAction{ Workflow: workflow, ID: ID, @@ -3422,17 +3422,21 @@ func (ts *IntegrationTestSuite) TestScheduleTypedSearchAttributes() { "* * * * * * *", }, }, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-typed-search-attributes", ts.workflows.ScheduleTypedSearchAttributesWorkflow), + Action: ts.createBasicScheduleWorkflowAction( + "test-schedule-typed-search-attributes", ts.workflows.ScheduleTypedSearchAttributesWorkflow), }) ts.NoError(err) defer func() { ts.NoError(handle.Delete(ctx)) }() + // Wait for the schedule to run - time.Sleep(2 * time.Second) - desc, err := handle.Describe(ctx) - ts.NoError(err) - ts.Len(desc.Info.RecentActions, 1) + var desc *client.ScheduleDescription + ts.Eventually(func() bool { + desc, err = handle.Describe(ctx) + ts.NoError(err) + return len(desc.Info.RecentActions) > 0 + }, 2*time.Second, 200*time.Millisecond) startWorkflowResult := desc.Info.RecentActions[0].StartWorkflowResult run := ts.client.GetWorkflow(ctx, startWorkflowResult.WorkflowID, startWorkflowResult.FirstExecutionRunID) var result string @@ -3441,6 +3445,78 @@ func (ts *IntegrationTestSuite) TestScheduleTypedSearchAttributes() { ts.Equal(scheduleID, result) } +func (ts *IntegrationTestSuite) TestScheduleWorkflowActionTypedSearchAttributes() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + scheduleID := "test-schedule-typed-search-attributes" + action := ts.createBasicScheduleWorkflowAction( + "test-schedule-typed-search-attributes", ts.workflows.ScheduleTypedSearchAttributesWorkflow) + stringKey := temporal.NewSearchAttributeKeyString("CustomStringField") + action.TypedSearchAttributes = temporal.NewSearchAttributes(stringKey.ValueSet("SomeValue1")) + handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: scheduleID, + RemainingActions: 1, + Spec: client.ScheduleSpec{ + CronExpressions: []string{ + "* * * * * * *", + }, + }, + Action: action, + }) + ts.NoError(err) + defer func() { + ts.NoError(handle.Delete(ctx)) + }() + + // Confirm typed search attrs on action + desc, err := handle.Describe(ctx) + ts.NoError(err) + actualAttrVal, _ := desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.Equal("SomeValue1", actualAttrVal) + + // Update but don't change + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }) + ts.NoError(err) + desc, err = handle.Describe(ctx) + ts.NoError(err) + actualAttrVal, _ = desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.Equal("SomeValue1", actualAttrVal) + + // Update with change + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + action := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction) + action.TypedSearchAttributes = temporal.NewSearchAttributes( + action.TypedSearchAttributes.Copy(), stringKey.ValueSet("SomeValue2")) + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }) + ts.NoError(err) + desc, err = handle.Describe(ctx) + ts.NoError(err) + actualAttrVal, _ = desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.Equal("SomeValue2", actualAttrVal) + + // Now remove it + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + action := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction) + action.TypedSearchAttributes = temporal.NewSearchAttributes( + action.TypedSearchAttributes.Copy(), stringKey.ValueUnset()) + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }) + ts.NoError(err) + desc, err = handle.Describe(ctx) + ts.NoError(err) + _, hasAttr := desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.False(hasAttr) +} + func (ts *IntegrationTestSuite) TestScheduleCalendarDefault() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/test/test_utils_test.go b/test/test_utils_test.go index ad5c7eef9..c12ed82e1 100644 --- a/test/test_utils_test.go +++ b/test/test_utils_test.go @@ -314,7 +314,7 @@ func (ts *ConfigAndClientSuiteBase) ensureSearchAttributes() error { } defer client.Close() - // Add CustomKeywordField attribute if not already present + // Add CustomKeywordField and CustomStringField attribute if not already present saResp, err := client.OperatorService().ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{ Namespace: ts.config.Namespace, }) @@ -324,8 +324,11 @@ func (ts *ConfigAndClientSuiteBase) ensureSearchAttributes() error { return nil } _, err = client.OperatorService().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ - Namespace: ts.config.Namespace, - SearchAttributes: map[string]enumspb.IndexedValueType{"CustomKeywordField": enumspb.INDEXED_VALUE_TYPE_KEYWORD}, + Namespace: ts.config.Namespace, + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomKeywordField": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + "CustomStringField": enumspb.INDEXED_VALUE_TYPE_TEXT, + }, }) if err != nil { return fmt.Errorf("failed adding search attribute: %w", err) diff --git a/test/workflow_test.go b/test/workflow_test.go index a26927f8b..7cca93ac9 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2290,7 +2290,7 @@ func (w *Workflows) ScheduleTypedSearchAttributesWorkflow(ctx workflow.Context) attributes := workflow.GetTypedSearchAttributes(ctx) scheduleStartTimeKey := temporal.NewSearchAttributeKeyTime("TemporalScheduledStartTime") - scheduleByIDKey := temporal.NewSearchAttributeKeyword("TemporalScheduledById") + scheduleByIDKey := temporal.NewSearchAttributeKeyKeyword("TemporalScheduledById") _, ok := attributes.GetTime(scheduleStartTimeKey) if !ok { @@ -2314,7 +2314,7 @@ func (w *Workflows) UpsertTypedSearchAttributesWorkflow(ctx workflow.Context, sl } // Add a new search attribute - key := temporal.NewSearchAttributeKeyword("CustomKeywordField") + key := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField") err := workflow.UpsertTypedSearchAttributes(ctx, key.ValueSet("CustomKeywordFieldValue")) if err != nil { return err diff --git a/workflow/workflow.go b/workflow/workflow.go index dc704a492..81e8e686e 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -584,6 +584,26 @@ func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) erro return internal.UpsertSearchAttributes(ctx, attributes) } +// UpsertTypedSearchAttributes is used to add, update, or remove workflow search attributes. The search attributes can +// be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on temporal server side; +// The value has to deterministic when replay; The value has to be Json serializable. +// UpsertTypedSearchAttributes will merge attributes to existing map in workflow, for example workflow code: +// +// var intKey = temporal.NewSearchAttributeKeyInt64("CustomIntField") +// var boolKey = temporal.NewSearchAttributeKeyBool("CustomBoolField") +// var keywordKey = temporal.NewSearchAttributeKeyBool("CustomKeywordField") +// +// func MyWorkflow(ctx workflow.Context, input string) error { +// err = workflow.UpsertTypedSearchAttributes(ctx, intAttrKey.ValueSet(1), boolAttrKey.ValueSet(true)) +// // ... +// +// err = workflow.UpsertSearchAttributes(ctx, intKey.ValueSet(2), keywordKey.ValueUnset()) +// // ... +// } +// +// For supported operations on different server versions see [Visibility]. +// +// [Visibility]: https://docs.temporal.io/visibility func UpsertTypedSearchAttributes(ctx Context, searchAttributeUpdate ...temporal.SearchAttributeUpdate) error { return internal.UpsertTypedSearchAttributes(ctx, searchAttributeUpdate...) }