Skip to content

Commit

Permalink
Add support for managing schedule search attributes when when updatin…
Browse files Browse the repository at this point in the history
…g a schedule (#1562)

Add support for defining search attributes when updating schedules
  • Loading branch information
justinp-tt authored Jul 25, 2024
1 parent 9c40461 commit bcc623d
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 13 deletions.
34 changes: 26 additions & 8 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,24 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
if err != nil {
return err
}

var newSA *commonpb.SearchAttributes
attributes := newSchedule.TypedSearchAttributes
if attributes != nil {
newSA, err = serializeTypedSearchAttributes(attributes.GetUntypedValues())
if err != nil {
return err
}
}

_, err = scheduleHandle.client.workflowService.UpdateSchedule(grpcCtx, &workflowservice.UpdateScheduleRequest{
Namespace: scheduleHandle.client.namespace,
ScheduleId: scheduleHandle.ID,
Schedule: newSchedulePB,
ConflictToken: nil,
Identity: scheduleHandle.client.identity,
RequestId: uuid.New(),
Namespace: scheduleHandle.client.namespace,
ScheduleId: scheduleHandle.ID,
Schedule: newSchedulePB,
ConflictToken: nil,
Identity: scheduleHandle.client.identity,
RequestId: uuid.New(),
SearchAttributes: newSA,
})
return err
}
Expand Down Expand Up @@ -484,6 +495,12 @@ func scheduleDescriptionFromPB(
return nil, err
}

var typedSearchAttributes SearchAttributes
searchAttributes := describeResponse.SearchAttributes
if searchAttributes != nil {
typedSearchAttributes = convertToTypedSearchAttributes(logger, searchAttributes.IndexedFields)
}

return &ScheduleDescription{
Schedule: Schedule{
Action: actionDescription,
Expand All @@ -510,8 +527,9 @@ func scheduleDescriptionFromPB(
CreatedAt: describeResponse.Info.GetCreateTime().AsTime(),
LastUpdateAt: describeResponse.Info.GetUpdateTime().AsTime(),
},
Memo: describeResponse.Memo,
SearchAttributes: describeResponse.SearchAttributes,
Memo: describeResponse.Memo,
SearchAttributes: searchAttributes,
TypedSearchAttributes: typedSearchAttributes,
}, nil
}

Expand Down
20 changes: 18 additions & 2 deletions internal/schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,19 @@ type (
// Memo - Non-indexed user supplied information.
Memo *commonpb.Memo

// SearchAttributes - Indexed info that can be used in query of List schedules APIs. The key and value type must be registered on Temporal server side.
// Use GetSearchAttributes API to get valid key and corresponding value type.
// SearchAttributes - Additional indexed information used for search and visibility. The key and its value type
// are registered on Temporal server side.
// For supported operations on different server versions see [Visibility].
//
// [Visibility]: https://docs.temporal.io/visibility
SearchAttributes *commonpb.SearchAttributes

// TypedSearchAttributes - Additional indexed information used for search and visibility. The key and its value
// type are registered on Temporal server side.
// For supported operations on different server versions see [Visibility].
//
// [Visibility]: https://docs.temporal.io/visibility
TypedSearchAttributes SearchAttributes
}

// SchedulePolicies describes the current polcies of a schedule.
Expand Down Expand Up @@ -476,6 +483,15 @@ type (
ScheduleUpdate struct {
// Schedule - New schedule to replace the existing schedule with
Schedule *Schedule

// TypedSearchAttributes - Optional indexed info that can be used for querying via the List schedules APIs.
// The key and value type must be registered on Temporal server side.
//
// nil: leave any pre-existing assigned search attributes intact
// empty: remove any and all pre-existing assigned search attributes
// attributes present: replace any and all pre-existing assigned search attributes with the defined search
// attributes, i.e. upsert
TypedSearchAttributes *SearchAttributes
}

// ScheduleUpdateInput describes the current state of the schedule to be updated.
Expand Down
125 changes: 122 additions & 3 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4980,9 +4980,18 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() {
err = handle.Delete(ctx)
ts.NoError(err)
}()

stringKey := temporal.NewSearchAttributeKeyString("CustomStringField")
keywordKey := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField")
sa := temporal.NewSearchAttributes(
stringKey.ValueSet("CustomStringFieldValue"),
keywordKey.ValueSet("foo"),
)

updateFunc := func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
Schedule: &input.Description.Schedule,
TypedSearchAttributes: &sa,
}, nil
}
description, err := handle.Describe(ctx)
Expand All @@ -4993,9 +5002,119 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() {
})
ts.NoError(err)

description2, err := handle.Describe(ctx)
ts.EventuallyWithT(func(c *assert.CollectT) {
d, err := handle.Describe(ctx)
assert.NoError(c, err)
assert.Equal(c, description.Schedule, d.Schedule)
assert.Equal(c, 2, d.TypedSearchAttributes.Size())
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
expectedString, _ := sa.GetString(stringKey)
assert.Equal(c, expectedString, returnedString)
returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey)
expectedKeyword, _ := sa.GetKeyword(keywordKey)
assert.Equal(c, expectedKeyword, returnedKeyword)
assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields))
}, time.Second, 100*time.Millisecond)

// nil search attributes should leave current search attributes untouched
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
}, nil
}

err = handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: updateFunc,
})
ts.NoError(err)

ts.EventuallyWithT(func(c *assert.CollectT) {
d, err := handle.Describe(ctx)
assert.NoError(c, err)
assert.Equal(c, 2, d.TypedSearchAttributes.Size())
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
expectedString, _ := sa.GetString(stringKey)
assert.Equal(c, expectedString, returnedString)
returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey)
expectedKeyword, _ := sa.GetKeyword(keywordKey)
assert.Equal(c, expectedKeyword, returnedKeyword)
assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields))
}, time.Second, 100*time.Millisecond)

// Updating an attribute without affecting the others
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
newSa := temporal.NewSearchAttributes(
input.Description.TypedSearchAttributes.Copy(),
stringKey.ValueSet("Changed"),
)
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
TypedSearchAttributes: &newSa,
}, nil
}

err = handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: updateFunc,
})
ts.NoError(err)

ts.EventuallyWithT(func(c *assert.CollectT) {
d, err := handle.Describe(ctx)
assert.NoError(c, err)
assert.Equal(c, 2, d.TypedSearchAttributes.Size())
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
expectedString, _ := temporal.NewSearchAttributes(stringKey.ValueSet("Changed")).GetString(stringKey)
assert.Equal(c, expectedString, returnedString)
returnedKeyword, _ := d.TypedSearchAttributes.GetKeyword(keywordKey)
expectedKeyword, _ := sa.GetKeyword(keywordKey)
assert.Equal(c, expectedKeyword, returnedKeyword)
assert.Equal(c, 2, len(d.SearchAttributes.IndexedFields))
}, time.Second, 100*time.Millisecond)

// updating a single search attribute on an existing collection acts as an upsert on the entire collection
newSa := temporal.NewSearchAttributes(stringKey.ValueSet("Changed"))
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
TypedSearchAttributes: &newSa,
}, nil
}

err = handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: updateFunc,
})
ts.NoError(err)

ts.EventuallyWithT(func(c *assert.CollectT) {
d, err := handle.Describe(ctx)
assert.NoError(c, err)
assert.Equal(c, 1, d.TypedSearchAttributes.Size())
returnedString, _ := d.TypedSearchAttributes.GetString(stringKey)
expectedString, _ := newSa.GetString(stringKey)
assert.Equal(c, expectedString, returnedString)
assert.Equal(c, 1, len(d.SearchAttributes.IndexedFields))
}, time.Second, 100*time.Millisecond)

// empty search attributes should remove pre-existing search attributes
sa = temporal.NewSearchAttributes()
updateFunc = func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
TypedSearchAttributes: &sa,
}, nil
}

err = handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: updateFunc,
})
ts.NoError(err)
ts.Equal(description.Schedule, description2.Schedule)

ts.EventuallyWithT(func(c *assert.CollectT) {
d, err := handle.Describe(ctx)
assert.NoError(c, err)
assert.Nil(c, d.SearchAttributes)
assert.Empty(c, d.TypedSearchAttributes)
}, time.Second, 100*time.Millisecond)
}

func (ts *IntegrationTestSuite) TestScheduleUpdateCancelUpdate() {
Expand Down

0 comments on commit bcc623d

Please sign in to comment.