From bcc623d58a9cc1e743ddcdafbebe7d0033779f39 Mon Sep 17 00:00:00 2001 From: justinp-tt <174377431+justinp-tt@users.noreply.github.com> Date: Thu, 25 Jul 2024 13:34:04 -0500 Subject: [PATCH] Add support for managing schedule search attributes when when updating a schedule (#1562) Add support for defining search attributes when updating schedules --- internal/internal_schedule_client.go | 34 ++++++-- internal/schedule_client.go | 20 ++++- test/integration_test.go | 125 ++++++++++++++++++++++++++- 3 files changed, 166 insertions(+), 13 deletions(-) diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index 352ab0fb4..7017e6503 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -294,13 +294,24 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc if err != nil { return err } + + var newSA *commonpb.SearchAttributes + attributes := newSchedule.TypedSearchAttributes + if attributes != nil { + newSA, err = serializeTypedSearchAttributes(attributes.GetUntypedValues()) + if err != nil { + return err + } + } + _, err = scheduleHandle.client.workflowService.UpdateSchedule(grpcCtx, &workflowservice.UpdateScheduleRequest{ - Namespace: scheduleHandle.client.namespace, - ScheduleId: scheduleHandle.ID, - Schedule: newSchedulePB, - ConflictToken: nil, - Identity: scheduleHandle.client.identity, - RequestId: uuid.New(), + Namespace: scheduleHandle.client.namespace, + ScheduleId: scheduleHandle.ID, + Schedule: newSchedulePB, + ConflictToken: nil, + Identity: scheduleHandle.client.identity, + RequestId: uuid.New(), + SearchAttributes: newSA, }) return err } @@ -484,6 +495,12 @@ func scheduleDescriptionFromPB( return nil, err } + var typedSearchAttributes SearchAttributes + searchAttributes := describeResponse.SearchAttributes + if searchAttributes != nil { + typedSearchAttributes = convertToTypedSearchAttributes(logger, searchAttributes.IndexedFields) + } + return &ScheduleDescription{ Schedule: Schedule{ Action: actionDescription, @@ -510,8 +527,9 @@ func scheduleDescriptionFromPB( CreatedAt: describeResponse.Info.GetCreateTime().AsTime(), LastUpdateAt: describeResponse.Info.GetUpdateTime().AsTime(), }, - Memo: describeResponse.Memo, - SearchAttributes: describeResponse.SearchAttributes, + Memo: describeResponse.Memo, + SearchAttributes: searchAttributes, + TypedSearchAttributes: typedSearchAttributes, }, nil } diff --git a/internal/schedule_client.go b/internal/schedule_client.go index e7c7c5a76..d95eebe21 100644 --- a/internal/schedule_client.go +++ b/internal/schedule_client.go @@ -416,12 +416,19 @@ type ( // Memo - Non-indexed user supplied information. Memo *commonpb.Memo - // SearchAttributes - Indexed info that can be used in query of List schedules APIs. The key and value type must be registered on Temporal server side. - // Use GetSearchAttributes API to get valid key and corresponding value type. + // SearchAttributes - Additional indexed information used for search and visibility. The key and its value type + // are registered on Temporal server side. // For supported operations on different server versions see [Visibility]. // // [Visibility]: https://docs.temporal.io/visibility SearchAttributes *commonpb.SearchAttributes + + // TypedSearchAttributes - Additional indexed information used for search and visibility. The key and its value + // type are registered on Temporal server side. + // For supported operations on different server versions see [Visibility]. + // + // [Visibility]: https://docs.temporal.io/visibility + TypedSearchAttributes SearchAttributes } // SchedulePolicies describes the current polcies of a schedule. @@ -476,6 +483,15 @@ type ( ScheduleUpdate struct { // Schedule - New schedule to replace the existing schedule with Schedule *Schedule + + // TypedSearchAttributes - Optional indexed info that can be used for querying via the List schedules APIs. + // The key and value type must be registered on Temporal server side. + // + // nil: leave any pre-existing assigned search attributes intact + // empty: remove any and all pre-existing assigned search attributes + // attributes present: replace any and all pre-existing assigned search attributes with the defined search + // attributes, i.e. upsert + TypedSearchAttributes *SearchAttributes } // ScheduleUpdateInput describes the current state of the schedule to be updated. diff --git a/test/integration_test.go b/test/integration_test.go index db41eba78..73da72101 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -4980,9 +4980,18 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() { err = handle.Delete(ctx) ts.NoError(err) }() + + stringKey := temporal.NewSearchAttributeKeyString("CustomStringField") + keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField") + sa := temporal.NewSearchAttributes( + stringKey.ValueSet("CustomStringFieldValue"), + keywordKey.ValueSet("foo"), + ) + updateFunc := func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { return &client.ScheduleUpdate{ - Schedule: &input.Description.Schedule, + Schedule: &input.Description.Schedule, + TypedSearchAttributes: &sa, }, nil } description, err := handle.Describe(ctx) @@ -4993,9 +5002,119 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() { }) ts.NoError(err) - description2, err := handle.Describe(ctx) + ts.EventuallyWithT(func(c *assert.CollectT) { + d, err := handle.Describe(ctx) + assert.NoError(c, err) + assert.Equal(c, description.Schedule, d.Schedule) + assert.Equal(c, 2, d.TypedSearchAttributes.Size()) + returnedString, _ := d.TypedSearchAttributes.GetString(stringKey) + expectedString, _ := sa.GetString(stringKey) + assert.Equal(c, expectedString, returnedString) + returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey) + expectedKeyword, _ := sa.GetKeyword(keywordKey) + assert.Equal(c, expectedKeyword, returnedKeyword) + assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields)) + }, time.Second, 100*time.Millisecond) + + // nil search attributes should leave current search attributes untouched + updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + }, nil + } + + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: updateFunc, + }) + ts.NoError(err) + + ts.EventuallyWithT(func(c *assert.CollectT) { + d, err := handle.Describe(ctx) + assert.NoError(c, err) + assert.Equal(c, 2, d.TypedSearchAttributes.Size()) + returnedString, _ := d.TypedSearchAttributes.GetString(stringKey) + expectedString, _ := sa.GetString(stringKey) + assert.Equal(c, expectedString, returnedString) + returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey) + expectedKeyword, _ := sa.GetKeyword(keywordKey) + assert.Equal(c, expectedKeyword, returnedKeyword) + assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields)) + }, time.Second, 100*time.Millisecond) + + // Updating an attribute without affecting the others + updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + newSa := temporal.NewSearchAttributes( + input.Description.TypedSearchAttributes.Copy(), + stringKey.ValueSet("Changed"), + ) + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + TypedSearchAttributes: &newSa, + }, nil + } + + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: updateFunc, + }) + ts.NoError(err) + + ts.EventuallyWithT(func(c *assert.CollectT) { + d, err := handle.Describe(ctx) + assert.NoError(c, err) + assert.Equal(c, 2, d.TypedSearchAttributes.Size()) + returnedString, _ := d.TypedSearchAttributes.GetString(stringKey) + expectedString, _ := temporal.NewSearchAttributes(stringKey.ValueSet("Changed")).GetString(stringKey) + assert.Equal(c, expectedString, returnedString) + returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey) + expectedKeyword, _ := sa.GetKeyword(keywordKey) + assert.Equal(c, expectedKeyword, returnedKeyword) + assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields)) + }, time.Second, 100*time.Millisecond) + + // updating a single search attribute on an existing collection acts as an upsert on the entire collection + newSa := temporal.NewSearchAttributes(stringKey.ValueSet("Changed")) + updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + TypedSearchAttributes: &newSa, + }, nil + } + + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: updateFunc, + }) + ts.NoError(err) + + ts.EventuallyWithT(func(c *assert.CollectT) { + d, err := handle.Describe(ctx) + assert.NoError(c, err) + assert.Equal(c, 1, d.TypedSearchAttributes.Size()) + returnedString, _ := d.TypedSearchAttributes.GetString(stringKey) + expectedString, _ := newSa.GetString(stringKey) + assert.Equal(c, expectedString, returnedString) + assert.Equal(c, 1, len(d.SearchAttributes.IndexedFields)) + }, time.Second, 100*time.Millisecond) + + // empty search attributes should remove pre-existing search attributes + sa = temporal.NewSearchAttributes() + updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + TypedSearchAttributes: &sa, + }, nil + } + + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: updateFunc, + }) ts.NoError(err) - ts.Equal(description.Schedule, description2.Schedule) + + ts.EventuallyWithT(func(c *assert.CollectT) { + d, err := handle.Describe(ctx) + assert.NoError(c, err) + assert.Nil(c, d.SearchAttributes) + assert.Empty(c, d.TypedSearchAttributes) + }, time.Second, 100*time.Millisecond) } func (ts *IntegrationTestSuite) TestScheduleUpdateCancelUpdate() {