Skip to content

Commit

Permalink
Adding UpdateActivityOptions, first iteration (#6657)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
First iteration for UpdateActivityOptions.
Contains most of the logic for UpdateActivityOptions, working with
proto, updating options, usage of updateMask, unit tests, etc.
Functional tests will come next.
To reviewers - 
* check the main logic in updateactivityoptions/api.go.
* check if anything fundamental is missing.

Known missing parts - reroute activity to the new TaskQueue is TaskQueue
name was changes, activity is scheduled but not started yet.
Knows issues with proto - I was not able to make protorefrect work with
"messages as a pointers", IsValid (should return false if field is a
pointer and nil) is just not working. As the result code is a bit less
general. Though more readable.
  • Loading branch information
ychebotarev authored Oct 22, 2024
1 parent f3dbfe5 commit c6d8378
Show file tree
Hide file tree
Showing 34 changed files with 3,551 additions and 2,359 deletions.
1,478 changes: 744 additions & 734 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2,131 changes: 1,076 additions & 1,055 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ func (s *TaskSerializer) timerActivityTaskToProto(
EventId: activityTimer.EventID,
TaskId: activityTimer.TaskID,
VisibilityTime: timestamppb.New(activityTimer.VisibilityTimestamp),
Stamp: activityTimer.Stamp,
}
}

Expand All @@ -749,6 +750,7 @@ func (s *TaskSerializer) timerActivityTaskFromProto(
EventID: activityTimer.EventId,
Attempt: activityTimer.ScheduleAttempt,
TimeoutType: activityTimer.TimeoutType,
Stamp: activityTimer.Stamp,
}
}

Expand All @@ -767,6 +769,7 @@ func (s *TaskSerializer) timerActivityRetryTaskToProto(
EventId: activityRetryTimer.EventID,
TaskId: activityRetryTimer.TaskID,
VisibilityTime: timestamppb.New(activityRetryTimer.VisibilityTimestamp),
Stamp: activityRetryTimer.Stamp,
}
}

Expand All @@ -784,6 +787,7 @@ func (s *TaskSerializer) timerActivityRetryTaskFromProto(
EventID: activityRetryTimer.EventId,
Version: activityRetryTimer.Version,
Attempt: activityRetryTimer.ScheduleAttempt,
Stamp: activityRetryTimer.Stamp,
}
}

Expand Down
64 changes: 64 additions & 0 deletions common/util/proto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package util

import (
"strings"

"golang.org/x/text/cases"
"golang.org/x/text/language"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func ConvertPathToCamel(input string) []string {
pathParts := strings.Split(input, ".")
for i, path := range pathParts {
// Split by "_" and convert each word to Title Case (CamelCase)
words := strings.Split(path, "_")
snakeCase := len(words) > 1
for j, word := range words {
if snakeCase && j > 0 {
words[j] = cases.Title(language.Und).String(strings.ToLower(word))
} else {
// lowercase the first letter
words[j] = strings.ToLower(word[:1]) + word[1:]
}
}
// Join the words into a CamelCase substring
pathParts[i] = strings.Join(words, "")
}
// Join all CamelCase substrings back with "."
return pathParts
}

func ParseFieldMask(mask *fieldmaskpb.FieldMask) map[string]struct{} {
updateFields := make(map[string]struct{})

for _, path := range mask.Paths {
pathParts := ConvertPathToCamel(path)
jsonPath := strings.Join(pathParts, ".")
updateFields[jsonPath] = struct{}{}
}

return updateFields
}
76 changes: 76 additions & 0 deletions common/util/proto_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestConvertPathToCamel(t *testing.T) {
testCases := []struct {
input string
expectedOutput []string
}{
{
input: "underscore",
expectedOutput: []string{"underscore"},
},
{
input: "CamelCase",
expectedOutput: []string{"camelCase"},
},
{
input: "UPPERCASE",
expectedOutput: []string{"uPPERCASE"},
},
{
input: "Dot.Separated",
expectedOutput: []string{"dot", "separated"},
},
{
input: "dash_separated",
expectedOutput: []string{"dashSeparated"},
},
{
input: "dash_separated.and_another",
expectedOutput: []string{"dashSeparated", "andAnother"},
},
{
input: "Already.CamelCase",
expectedOutput: []string{"already", "camelCase"},
},
{
input: "Mix.of.Snake_case.and.CamelCase",
expectedOutput: []string{"mix", "of", "snakeCase", "and", "camelCase"},
},
}

for _, tc := range testCases {
t.Run(tc.input, func(t *testing.T) {
actualOutput := ConvertPathToCamel(tc.input)
assert.Equal(t, tc.expectedOutput, actualOutput)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ message AddActivityTaskRequest {
// for TaskVersionDirective, which is unversioned.)
temporal.server.api.taskqueue.v1.TaskVersionDirective version_directive = 10;
temporal.server.api.taskqueue.v1.TaskForwardInfo forward_info = 11;

int32 stamp = 12;
}

message AddActivityTaskResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ message TimerTaskInfo {
// If specified, the task is a for a workflow chain instead of a specific workflow run.
// A workflow chain is identified by the run_id of the first workflow in the chain.
string first_run_id = 15;

// The stamp represents the version of the activity for which the timer task was created.
int32 stamp = 16;
}

message ArchivalTaskInfo {
Expand Down Expand Up @@ -463,6 +466,8 @@ message ActivityInfo {

// Last time an activity failure was recorded by the server.
google.protobuf.Timestamp last_attempt_complete_time = 40;
// stamp represent "version" of activity during its lifetime. It can be changed with Activity API
int32 stamp = 41;
}

// timer_map column
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5170,13 +5170,13 @@ func (wh *WorkflowHandler) UpdateActivityOptionsById(
return nil, errActivityIDNotSet
}

namespace_id, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
namespaceId, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
}

response, err := wh.historyClient.UpdateActivityOptions(ctx, &historyservice.UpdateActivityOptionsRequest{
NamespaceId: namespace_id.String(),
NamespaceId: namespaceId.String(),
UpdateRequest: request,
})

Expand Down
Loading

0 comments on commit c6d8378

Please sign in to comment.