Skip to content

Commit

Permalink
More search attribute work
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Jan 29, 2024
1 parent 599e655 commit 8fb7264
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 217 deletions.
78 changes: 2 additions & 76 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,80 +391,6 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
return wc.workflowInfo
}

func getIndexValue(payload *commonpb.Payload) enumspb.IndexedValueType {
return enumspb.IndexedValueType(enumspb.IndexedValueType_value[string(payload.GetMetadata()["type"][:])])
}

func convertToTypeSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes {
updates := make([]SearchAttributeUpdate, 0, len(attributes))
for key, payload := range attributes {
if payload.Data == nil {
continue
}
switch index := getIndexValue(payload); index {
case enumspb.INDEXED_VALUE_TYPE_BOOL:
attr := NewSearchAttributeKeyBool(key)
var value bool
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
case enumspb.INDEXED_VALUE_TYPE_KEYWORD:
attr := NewSearchAttributeKeyword(key)
var value string
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
case enumspb.INDEXED_VALUE_TYPE_TEXT:
attr := NewSearchAttributeKeyString(key)
var value string
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
case enumspb.INDEXED_VALUE_TYPE_INT:
attr := NewSearchAttributeKeyInt64(key)
var value int64
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
case enumspb.INDEXED_VALUE_TYPE_DOUBLE:
attr := NewSearchAttributeKeyFloat64(key)
var value float64
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
case enumspb.INDEXED_VALUE_TYPE_DATETIME:
attr := NewSearchAttributeKeyTime(key)
var value time.Time
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST:
attr := NewSearchAttributeKeywordList(key)
var value []string
err := converter.GetDefaultDataConverter().FromPayload(payload, &value)
if err != nil {
panic(err)
}
updates = append(updates, attr.ValueSet(value))
default:
logger.Warn("Unrecognized indexed value type on search attribute key", "key", key, "index", index)
}
}
return NewSearchAttributes(updates...)
}

func (wc *workflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes {
return convertToTypeSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields())
}
Expand Down Expand Up @@ -547,7 +473,7 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c
if len(attributes) == 0 {
return nil, errSearchAttributesNotSet
}
attr, err := serializeSearchAttributes(attributes)
attr, err := serializeUntypedSearchAttributes(attributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -620,7 +546,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
callback(nil, err)
return
}
searchAttr, err := serializeSearchAttributes(params.SearchAttributes)
searchAttr, err := serializeUntypedSearchAttributes(params.SearchAttributes)
if err != nil {
if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) {
startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{})
Expand Down
54 changes: 41 additions & 13 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"
)

type (
Expand Down Expand Up @@ -96,7 +97,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche
return nil, err
}

searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes)
searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,7 +276,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
if err != nil {
return err
}
scheduleDescription, err := scheduleDescriptionFromPB(describeResponse)
scheduleDescription, err := scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse)
if err != nil {
return err
}
Expand Down Expand Up @@ -314,7 +315,7 @@ func (scheduleHandle *scheduleHandleImpl) Describe(ctx context.Context) (*Schedu
if err != nil {
return nil, err
}
return scheduleDescriptionFromPB(describeResponse)
return scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse)
}

func (scheduleHandle *scheduleHandleImpl) Trigger(ctx context.Context, options ScheduleTriggerOptions) error {
Expand Down Expand Up @@ -454,7 +455,10 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS
}
}

func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeScheduleResponse) (*ScheduleDescription, error) {
func scheduleDescriptionFromPB(
logger log.Logger,
describeResponse *workflowservice.DescribeScheduleResponse,
) (*ScheduleDescription, error) {
if describeResponse == nil {
return nil, nil
}
Expand All @@ -474,7 +478,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul
nextActionTimes[i] = t.AsTime()
}

actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action)
actionDescription, err := convertFromPBScheduleAction(logger, describeResponse.Schedule.Action)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -560,7 +564,11 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch
}
}

func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) {
func convertToPBScheduleAction(
ctx context.Context,
client *WorkflowClient,
scheduleAction ScheduleAction,
) (*schedulepb.ScheduleAction, error) {
switch action := scheduleAction.(type) {
case *ScheduleWorkflowAction:
// Set header before interceptor run
Expand Down Expand Up @@ -590,10 +598,19 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
return nil, err
}

searchAttr, err := serializeSearchAttributes(action.SearchAttributes)
searchAttrs, err := serializeSearchAttributes(nil, action.TypedSearchAttributes)
if err != nil {
return nil, err
}
// Add any untyped search attributes that aren't already there
for k, v := range action.UntypedSearchAttributes {
if searchAttrs.GetIndexedFields()[k] == nil {
if searchAttrs == nil || searchAttrs.IndexedFields == nil {
searchAttrs = &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}}
}
searchAttrs.IndexedFields[k] = v
}
}

// get workflow headers from the context
header, err := headerPropagated(ctx, client.contextPropagators)
Expand All @@ -613,7 +630,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
WorkflowTaskTimeout: durationpb.New(action.WorkflowTaskTimeout),
RetryPolicy: convertToPBRetryPolicy(action.RetryPolicy),
Memo: memo,
SearchAttributes: searchAttr,
SearchAttributes: searchAttrs,
Header: header,
},
},
Expand All @@ -624,7 +641,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
}
}

func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAction, error) {
func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleAction) (ScheduleAction, error) {
switch action := action.Action.(type) {
case *schedulepb.ScheduleAction_StartWorkflow:
workflow := action.StartWorkflow
Expand All @@ -639,9 +656,19 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
memos[key] = element
}

searchAttributes := make(map[string]interface{})
for key, element := range workflow.GetSearchAttributes().GetIndexedFields() {
searchAttributes[key] = element
searchAttrs := convertToTypeSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields())
// Create untyped list for any attribute not in the existing list
untypedSearchAttrs := map[string]*commonpb.Payload{}
for k, v := range workflow.GetSearchAttributes().GetIndexedFields() {
var inTyped bool
for typedKey := range searchAttrs.untypedValue {
if inTyped = typedKey.GetName() == k; inTyped {
break
}
}
if !inTyped {
untypedSearchAttrs[k] = v
}
}

return &ScheduleWorkflowAction{
Expand All @@ -654,7 +681,8 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(),
RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy),
Memo: memos,
SearchAttributes: searchAttributes,
TypedSearchAttributes: searchAttrs,
UntypedSearchAttributes: untypedSearchAttrs,
}, nil
default:
// TODO maybe just panic instead?
Expand Down
Loading

0 comments on commit 8fb7264

Please sign in to comment.