Skip to content

Commit

Permalink
Support the new versioning API (#1494)
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal authored Jun 7, 2024
1 parent 9ecb2a4 commit c73a007
Show file tree
Hide file tree
Showing 16 changed files with 2,143 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ jobs:
if: ${{ matrix.testDockerCompose }}
run: go run . integration-test
working-directory: ./internal/cmd/build
env:
RUN_VERSIONING_V2_TESTS: "true"

cloud-test:
strategy:
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ frontend.enableUpdateWorkflowExecutionAsyncAccepted:
- value: true
system.enableEagerWorkflowStart:
- value: true
frontend.workerVersioningRuleAPIs:
- value: true
frontend.workerVersioningDataAPIs:
- value: true
frontend.workerVersioningWorkflowAPIs:
- value: true
worker.buildIdScavengerEnabled:
- value: true
worker.removableBuildIdDurationSinceDefault:
- value: 1
- value: 1
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ test.log
.DS_Store
.gobincache
go.work
go.work.sum
go.work.sum
*~
228 changes: 213 additions & 15 deletions client/client.go

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,27 @@ type (
// GetWorkerTaskReachability returns which versions are is still in use by open or closed workflows.
GetWorkerTaskReachability(ctx context.Context, options *GetWorkerTaskReachabilityOptions) (*WorkerTaskReachability, error)

// DescribeTaskQueueEnhanced returns information about the target task queue, broken down by Build Id:
// - List of pollers
// - Workflow Reachability status
// - Backlog info for Workflow and/or Activity tasks
// When not supported by the server, it returns an empty [TaskQueueDescription] if there is no information
// about the task queue, or an error when the response identifies an unsupported server.
// Note that using a sticky queue as target is not supported.
// WARNING: Worker versioning is currently experimental, and requires server 1.24+
DescribeTaskQueueEnhanced(ctx context.Context, options DescribeTaskQueueEnhancedOptions) (TaskQueueDescription, error)

// UpdateWorkerVersioningRules allows updating the worker-build-id based assignment and redirect rules for a given
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the feature.
// The errors it can return:
// - serviceerror.FailedPrecondition when the conflict token is invalid
// WARNING: Worker versioning is currently experimental, and requires server 1.24+
UpdateWorkerVersioningRules(ctx context.Context, options UpdateWorkerVersioningRulesOptions) (*WorkerVersioningRules, error)

// GetWorkerVersioningRules returns the worker-build-id assignment and redirect rules for a task queue.
// WARNING: Worker versioning is currently experimental, and requires server 1.24+
GetWorkerVersioningRules(ctx context.Context, options GetWorkerVersioningOptions) (*WorkerVersioningRules, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (b *builder) integrationTest() error {
ExtraArgs: []string{
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecutionAsyncAccepted=true",
"--dynamic-config-value", "frontend.workerVersioningRuleAPIs=true",
"--dynamic-config-value", "frontend.workerVersioningDataAPIs=true",
"--dynamic-config-value", "frontend.workerVersioningWorkflowAPIs=true",
"--dynamic-config-value", "system.enableActivityEagerExecution=true",
Expand Down
321 changes: 321 additions & 0 deletions internal/internal_versioning_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"errors"
"time"

"go.temporal.io/api/common/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

enumspb "go.temporal.io/api/enums/v1"
)

// TaskQueueType specifies which category of tasks are associated with a queue.
type TaskQueueType int

const (
// TaskQueueTypeUnspecified indicates the task queue type was not specified.
TaskQueueTypeUnspecified = iota
// TaskQueueTypeWorkflow indicates the task queue is used for dispatching workflow tasks.
TaskQueueTypeWorkflow
// TaskQueueTypeActivity indicates the task queue is used for delivering activity tasks.
TaskQueueTypeActivity
// TaskQueueTypeNexus indicates the task queue is used for dispatching Nexus requests.
TaskQueueTypeNexus
)

// BuildIDTaskReachability specifies which category of tasks may reach a versioned worker of a certain Build ID.
//
// Note: future activities who inherit their workflow's Build ID but not its task queue will not be
// accounted for reachability as server cannot know if they'll happen as they do not use
// assignment rules of their task queue. Same goes for Child Workflows or Continue-As-New Workflows
// who inherit the parent/previous workflow's Build ID but not its task queue. In those cases, make
// sure to query reachability for the parent/previous workflow's task queue as well.
type BuildIDTaskReachability int

const (
// BuildIDTaskReachabilityUnspecified indicates that task reachability was not reported.
BuildIDTaskReachabilityUnspecified = iota
// BuildIDTaskReachabilityReachable indicates that this Build ID may be used by new workflows or activities
// (based on versioning rules), or there are open workflows or backlogged activities assigned to it.
BuildIDTaskReachabilityReachable
// BuildIDTaskReachabilityClosedWorkflowsOnly specifies that this Build ID does not have open workflows
// and is not reachable by new workflows, but MAY have closed workflows within the namespace retention period.
// Not applicable to activity-only task queues.
BuildIDTaskReachabilityClosedWorkflowsOnly
// BuildIDTaskReachabilityUnreachable indicates that this Build ID is not used for new executions, nor
// it has been used by any existing execution within the retention period.
BuildIDTaskReachabilityUnreachable
)

type (
// TaskQueueVersionSelection is a task queue filter based on versioning.
// It is an optional component of [DescribeTaskQueueEnhancedOptions].
TaskQueueVersionSelection struct {
// Include specific Build IDs.
BuildIDs []string
// Include the unversioned queue.
Unversioned bool
// Include all active versions. A version is active if it has had new
// tasks or polls recently.
AllActive bool
}

// DescribeTaskQueueEnhancedOptions is the input to [Client.DescribeTaskQueueEnhanced].
DescribeTaskQueueEnhancedOptions struct {
// Name of the task queue. Sticky queues are not supported.
TaskQueue string
// An optional queue selector based on versioning. If not provided,
// the result for the default Build ID will be returned. The default
// Build ID is the one mentioned in the first unconditional Assignment Rule.
// If there is no default Build ID, the result for the
// unversioned queue will be returned.
Versions *TaskQueueVersionSelection
// Task queue types to report info about. If not specified, all types are considered.
TaskQueueTypes []TaskQueueType
// Include list of pollers for requested task queue types and versions.
ReportPollers bool
// Include task reachability for the requested versions and all task types
// (task reachability is not reported per task type).
ReportTaskReachability bool
}

// WorkerVersionCapabilities includes a worker's build identifier
// and whether it is choosing to use the versioning feature.
// It is an optional component of [TaskQueuePollerInfo].
WorkerVersionCapabilities struct {
// Build ID of the worker.
BuildID string
// Whether the worker is using the versioning feature.
UseVersioning bool
}

// TaskQueuePollerInfo provides information about a worker/client polling a task queue.
// It is used by [TaskQueueTypeInfo].
TaskQueuePollerInfo struct {
// Time of the last poll. A value of zero means it was not set.
LastAccessTime time.Time
// The identity of the worker/client who is polling this task queue.
Identity string
// Polling rate. A value of zero means it was not set.
RatePerSecond float64
// Optional poller versioning capabilities. Available when a worker has opted into the worker versioning feature.
WorkerVersionCapabilities *WorkerVersionCapabilities
}

// TaskQueueTypeInfo specifies task queue information per task type and Build ID.
// It is included in [TaskQueueVersionInfo].
TaskQueueTypeInfo struct {
// Poller details for this task queue category.
Pollers []TaskQueuePollerInfo
}

// TaskQueueVersionInfo includes task queue information per Build ID.
// It is part of [TaskQueueDescription].
TaskQueueVersionInfo struct {
// Task queue info per task type.
TypesInfo map[TaskQueueType]TaskQueueTypeInfo
// The category of tasks that may reach a versioned worker of a certain Build ID.
TaskReachability BuildIDTaskReachability
}

// TaskQueueDescription is the response to [Client.DescribeTaskQueueEnhanced].
TaskQueueDescription struct {
// Task queue information for each Build ID. Empty string as key value means unversioned.
VersionsInfo map[string]TaskQueueVersionInfo
}
)

func (o *DescribeTaskQueueEnhancedOptions) validateAndConvertToProto(namespace string) (*workflowservice.DescribeTaskQueueRequest, error) {
if namespace == "" {
return nil, errors.New("missing namespace argument")
}

if o.TaskQueue == "" {
return nil, errors.New("missing task queue field")
}

taskQueueTypes := make([]enumspb.TaskQueueType, len(o.TaskQueueTypes))
for i, t := range o.TaskQueueTypes {
taskQueueTypes[i] = taskQueueTypeToProto(t)
}

opt := &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueuepb.TaskQueue{
// Sticky queues not supported
Name: o.TaskQueue,
},
ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
Versions: taskQueueVersionSelectionToProto(o.Versions),
TaskQueueTypes: taskQueueTypes,
ReportPollers: o.ReportPollers,
ReportTaskReachability: o.ReportTaskReachability,
}

return opt, nil
}

func workerVersionCapabilitiesFromResponse(response *common.WorkerVersionCapabilities) *WorkerVersionCapabilities {
if response == nil {
return nil
}

return &WorkerVersionCapabilities{
BuildID: response.GetBuildId(),
UseVersioning: response.GetUseVersioning(),
}
}

func pollerInfoFromResponse(response *taskqueuepb.PollerInfo) TaskQueuePollerInfo {
if response == nil {
return TaskQueuePollerInfo{}
}

lastAccessTime := time.Time{}
if response.GetLastAccessTime() != nil {
lastAccessTime = response.GetLastAccessTime().AsTime()
}

return TaskQueuePollerInfo{
LastAccessTime: lastAccessTime,
Identity: response.GetIdentity(),
RatePerSecond: response.GetRatePerSecond(),
WorkerVersionCapabilities: workerVersionCapabilitiesFromResponse(response.GetWorkerVersionCapabilities()),
}
}

func taskQueueTypeInfoFromResponse(response *taskqueuepb.TaskQueueTypeInfo) TaskQueueTypeInfo {
if response == nil {
return TaskQueueTypeInfo{}
}

pollers := make([]TaskQueuePollerInfo, len(response.GetPollers()))
for i, pInfo := range response.GetPollers() {
pollers[i] = pollerInfoFromResponse(pInfo)
}

return TaskQueueTypeInfo{
Pollers: pollers,
}
}

func taskQueueVersionInfoFromResponse(response *taskqueuepb.TaskQueueVersionInfo) TaskQueueVersionInfo {
if response == nil {
return TaskQueueVersionInfo{}
}

typesInfo := make(map[TaskQueueType]TaskQueueTypeInfo, len(response.GetTypesInfo()))
for taskType, tInfo := range response.GetTypesInfo() {
typesInfo[taskQueueTypeFromProto(enumspb.TaskQueueType(taskType))] = taskQueueTypeInfoFromResponse(tInfo)
}

return TaskQueueVersionInfo{
TypesInfo: typesInfo,
TaskReachability: buildIDTaskReachabilityFromProto(response.GetTaskReachability()),
}
}

func detectTaskQueueEnhancedNotSupported(response *workflowservice.DescribeTaskQueueResponse) error {
// A server before 1.24 returns a non-enhanced proto, which only fills `pollers` and `taskQueueStatus` fields
if len(response.GetVersionsInfo()) == 0 &&
(len(response.GetPollers()) > 0 || response.GetTaskQueueStatus() != nil) {
return errors.New("server does not support `DescribeTaskQueueEnhanced`")
}
return nil
}

func taskQueueDescriptionFromResponse(response *workflowservice.DescribeTaskQueueResponse) TaskQueueDescription {
if response == nil {
return TaskQueueDescription{}
}

versionsInfo := make(map[string]TaskQueueVersionInfo, len(response.GetVersionsInfo()))
for buildID, vInfo := range response.GetVersionsInfo() {
versionsInfo[buildID] = taskQueueVersionInfoFromResponse(vInfo)
}

return TaskQueueDescription{
VersionsInfo: versionsInfo,
}
}

func taskQueueVersionSelectionToProto(s *TaskQueueVersionSelection) *taskqueuepb.TaskQueueVersionSelection {
if s == nil {
return nil
}

return &taskqueuepb.TaskQueueVersionSelection{
BuildIds: s.BuildIDs,
Unversioned: s.Unversioned,
AllActive: s.AllActive,
}
}

func taskQueueTypeToProto(t TaskQueueType) enumspb.TaskQueueType {
switch t {
case TaskQueueTypeUnspecified:
return enumspb.TASK_QUEUE_TYPE_UNSPECIFIED
case TaskQueueTypeWorkflow:
return enumspb.TASK_QUEUE_TYPE_WORKFLOW
case TaskQueueTypeActivity:
return enumspb.TASK_QUEUE_TYPE_ACTIVITY
case TaskQueueTypeNexus:
return enumspb.TASK_QUEUE_TYPE_NEXUS
default:
panic("unknown task queue type")
}
}

func taskQueueTypeFromProto(t enumspb.TaskQueueType) TaskQueueType {
switch t {
case enumspb.TASK_QUEUE_TYPE_UNSPECIFIED:
return TaskQueueTypeUnspecified
case enumspb.TASK_QUEUE_TYPE_WORKFLOW:
return TaskQueueTypeWorkflow
case enumspb.TASK_QUEUE_TYPE_ACTIVITY:
return TaskQueueTypeActivity
case enumspb.TASK_QUEUE_TYPE_NEXUS:
return TaskQueueTypeNexus
default:
panic("unknown task queue type from proto")
}
}

func buildIDTaskReachabilityFromProto(r enumspb.BuildIdTaskReachability) BuildIDTaskReachability {
switch r {
case enumspb.BUILD_ID_TASK_REACHABILITY_UNSPECIFIED:
return BuildIDTaskReachabilityUnspecified
case enumspb.BUILD_ID_TASK_REACHABILITY_REACHABLE:
return BuildIDTaskReachabilityReachable
case enumspb.BUILD_ID_TASK_REACHABILITY_CLOSED_WORKFLOWS_ONLY:
return BuildIDTaskReachabilityClosedWorkflowsOnly
case enumspb.BUILD_ID_TASK_REACHABILITY_UNREACHABLE:
return BuildIDTaskReachabilityUnreachable
default:
panic("unknown task queue reachability")
}
}
Loading

0 comments on commit c73a007

Please sign in to comment.