Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Typed Search Attributes #1368

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,21 @@ type (
// Use GetSearchAttributes API to get valid key and corresponding value type.
// For supported operations on different server versions see [Visibility].
//
// Deprecated: use TypedSearchAttributes instead.
//
// [Visibility]: https://docs.temporal.io/visibility
SearchAttributes map[string]interface{}

// TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are
// additional indexed information attributed to workflow and used for search and visibility. The search attributes
// can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal
// server side. For supported operations on different server versions see [Visibility].
//
// Optional: default to none.
//
// [Visibility]: https://docs.temporal.io/visibility
TypedSearchAttributes SearchAttributes

// EnableEagerStart - request eager execution for this workflow, if a local worker is available.
//
// WARNING: Eager start does not respect worker versioning. An eagerly started workflow may run on
Expand Down
6 changes: 6 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ type WorkflowOutboundInterceptor interface {
// GetInfo intercepts workflow.GetInfo.
GetInfo(ctx Context) *WorkflowInfo

// GetTypedSearchAttributes intercepts workflow.GetTypedSearchAttributes.
GetTypedSearchAttributes(ctx Context) SearchAttributes

// GetUpdateInfo intercepts workflow.GetUpdateInfo.
//
// NOTE: Experimental
Expand Down Expand Up @@ -233,6 +236,9 @@ type WorkflowOutboundInterceptor interface {
// UpsertSearchAttributes intercepts workflow.UpsertSearchAttributes.
UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error

// UpsertTypedSearchAttributes intercepts workflow.UpsertTypedSearchAttributes.
UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error

// UpsertMemo intercepts workflow.UpsertMemo.
UpsertMemo(ctx Context, memo map[string]interface{}) error

Expand Down
11 changes: 11 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func (w *WorkflowOutboundInterceptorBase) GetInfo(ctx Context) *WorkflowInfo {
return w.Next.GetInfo(ctx)
}

// GetTypedSearchAttributes implements WorkflowOutboundInterceptor.GetTypedSearchAttributes.
func (w *WorkflowOutboundInterceptorBase) GetTypedSearchAttributes(ctx Context) SearchAttributes {
return w.Next.GetTypedSearchAttributes(ctx)
}

// GetUpdateInfo implements WorkflowOutboundInterceptor.GetUpdateInfo.
func (w *WorkflowOutboundInterceptorBase) GetUpdateInfo(ctx Context) *UpdateInfo {
return w.Next.GetUpdateInfo(ctx)
Expand Down Expand Up @@ -297,6 +302,12 @@ func (w *WorkflowOutboundInterceptorBase) UpsertSearchAttributes(ctx Context, at
return w.Next.UpsertSearchAttributes(ctx, attributes)
}

// UpsertTypedSearchAttributes implements
// WorkflowOutboundInterceptor.UpsertTypedSearchAttributes.
func (w *WorkflowOutboundInterceptorBase) UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error {
return w.Next.UpsertTypedSearchAttributes(ctx, attributes...)
}

// UpsertMemo implements
// WorkflowOutboundInterceptor.UpsertMemo.
func (w *WorkflowOutboundInterceptorBase) UpsertMemo(ctx Context, memo map[string]interface{}) error {
Expand Down
8 changes: 6 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
return wc.workflowInfo
}

func (wc *workflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes {
return convertToTypedSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields())
}

func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) {
wc.completeHandler(result, err)
}
Expand Down Expand Up @@ -469,7 +473,7 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c
if len(attributes) == 0 {
return nil, errSearchAttributesNotSet
}
attr, err := serializeSearchAttributes(attributes)
attr, err := serializeUntypedSearchAttributes(attributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -542,7 +546,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
callback(nil, err)
return
}
searchAttr, err := serializeSearchAttributes(params.SearchAttributes)
searchAttr, err := serializeUntypedSearchAttributes(params.SearchAttributes)
if err != nil {
if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) {
startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{})
Expand Down
54 changes: 41 additions & 13 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"
)

type (
Expand Down Expand Up @@ -96,7 +97,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche
return nil, err
}

searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes)
searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,7 +276,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
if err != nil {
return err
}
scheduleDescription, err := scheduleDescriptionFromPB(describeResponse)
scheduleDescription, err := scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse)
if err != nil {
return err
}
Expand Down Expand Up @@ -314,7 +315,7 @@ func (scheduleHandle *scheduleHandleImpl) Describe(ctx context.Context) (*Schedu
if err != nil {
return nil, err
}
return scheduleDescriptionFromPB(describeResponse)
return scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse)
}

func (scheduleHandle *scheduleHandleImpl) Trigger(ctx context.Context, options ScheduleTriggerOptions) error {
Expand Down Expand Up @@ -454,7 +455,10 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS
}
}

func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeScheduleResponse) (*ScheduleDescription, error) {
func scheduleDescriptionFromPB(
logger log.Logger,
describeResponse *workflowservice.DescribeScheduleResponse,
) (*ScheduleDescription, error) {
if describeResponse == nil {
return nil, nil
}
Expand All @@ -474,7 +478,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul
nextActionTimes[i] = t.AsTime()
}

actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action)
actionDescription, err := convertFromPBScheduleAction(logger, describeResponse.Schedule.Action)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -560,7 +564,11 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch
}
}

func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) {
func convertToPBScheduleAction(
ctx context.Context,
client *WorkflowClient,
scheduleAction ScheduleAction,
) (*schedulepb.ScheduleAction, error) {
switch action := scheduleAction.(type) {
case *ScheduleWorkflowAction:
// Set header before interceptor run
Expand Down Expand Up @@ -590,10 +598,19 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
return nil, err
}

searchAttr, err := serializeSearchAttributes(action.SearchAttributes)
searchAttrs, err := serializeSearchAttributes(nil, action.TypedSearchAttributes)
if err != nil {
return nil, err
}
// Add any untyped search attributes that aren't already there
for k, v := range action.UntypedSearchAttributes {
if searchAttrs.GetIndexedFields()[k] == nil {
if searchAttrs == nil || searchAttrs.IndexedFields == nil {
searchAttrs = &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}}
}
searchAttrs.IndexedFields[k] = v
}
}

// get workflow headers from the context
header, err := headerPropagated(ctx, client.contextPropagators)
Expand All @@ -613,7 +630,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
WorkflowTaskTimeout: durationpb.New(action.WorkflowTaskTimeout),
RetryPolicy: convertToPBRetryPolicy(action.RetryPolicy),
Memo: memo,
SearchAttributes: searchAttr,
SearchAttributes: searchAttrs,
Header: header,
},
},
Expand All @@ -624,7 +641,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
}
}

func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAction, error) {
func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleAction) (ScheduleAction, error) {
switch action := action.Action.(type) {
case *schedulepb.ScheduleAction_StartWorkflow:
workflow := action.StartWorkflow
Expand All @@ -639,9 +656,19 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
memos[key] = element
}

searchAttributes := make(map[string]interface{})
for key, element := range workflow.GetSearchAttributes().GetIndexedFields() {
searchAttributes[key] = element
searchAttrs := convertToTypedSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields())
// Create untyped list for any attribute not in the existing list
untypedSearchAttrs := map[string]*commonpb.Payload{}
for k, v := range workflow.GetSearchAttributes().GetIndexedFields() {
var inTyped bool
for typedKey := range searchAttrs.untypedValue {
if inTyped = typedKey.GetName() == k; inTyped {
break
}
}
if !inTyped {
untypedSearchAttrs[k] = v
}
}

return &ScheduleWorkflowAction{
Expand All @@ -654,7 +681,8 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(),
RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy),
Memo: memos,
SearchAttributes: searchAttributes,
TypedSearchAttributes: searchAttrs,
UntypedSearchAttributes: untypedSearchAttrs,
}, nil
default:
// TODO maybe just panic instead?
Expand Down
Loading
Loading