diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index ff30fa37464..6af61ac7143 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -28,6 +28,7 @@ import ( "context" "encoding/binary" "fmt" + "strings" "sync/atomic" "time" "unicode/utf8" @@ -3012,8 +3013,9 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow return nil, errSchedulesNotAllowed } - // a schedule id is a workflow id so validate it the same way - if err := wh.validateWorkflowID(request.ScheduleId); err != nil { + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + + if err := wh.validateWorkflowID(workflowID); err != nil { return nil, err } @@ -3103,7 +3105,7 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow // Create StartWorkflowExecutionRequest startReq := &workflowservice.StartWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowId: request.ScheduleId, + WorkflowId: workflowID, WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType}, TaskQueue: &taskqueuepb.TaskQueue{Name: workercommon.PerNSWorkerTaskQueue}, Input: inputPayload, @@ -3150,7 +3152,8 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return nil, err } - execution := &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId} + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + execution := &commonpb.WorkflowExecution{WorkflowId: workflowID} // first describe to get memo and search attributes describeResponse, err := wh.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{ @@ -3314,6 +3317,8 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow return nil, errRequestIDTooLong } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3334,7 +3339,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow sizeLimitWarn, sizeLimitError, namespaceID.String(), - request.GetScheduleId(), + workflowID, "", // don't have runid yet wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, @@ -3347,7 +3352,7 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow NamespaceId: namespaceID.String(), SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, SignalName: scheduler.SignalNameUpdate, Input: inputPayloads, Identity: request.Identity, @@ -3385,6 +3390,8 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows return nil, errRequestIDTooLong } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3404,7 +3411,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows sizeLimitWarn, sizeLimitError, namespaceID.String(), - request.GetScheduleId(), + workflowID, "", // don't have runid yet wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, @@ -3417,7 +3424,7 @@ func (wh *WorkflowHandler) PatchSchedule(ctx context.Context, request *workflows NamespaceId: namespaceID.String(), SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, SignalName: scheduler.SignalNamePatch, Input: inputPayloads, Identity: request.Identity, @@ -3451,6 +3458,8 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques return nil, errSchedulesNotAllowed } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3468,7 +3477,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques sizeLimitWarn, sizeLimitError, namespaceID.String(), - request.ScheduleId, + workflowID, "", wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())), wh.throttledLogger, @@ -3480,7 +3489,7 @@ func (wh *WorkflowHandler) ListScheduleMatchingTimes(ctx context.Context, reques NamespaceId: namespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ Namespace: request.Namespace, - Execution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + Execution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, Query: &querypb.WorkflowQuery{ QueryType: scheduler.QueryNameListMatchingTimes, QueryArgs: queryPayload, @@ -3521,6 +3530,8 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow return nil, errSchedulesNotAllowed } + workflowID := scheduler.WorkflowIDPrefix + request.ScheduleId + namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) if err != nil { return nil, err @@ -3530,7 +3541,7 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow NamespaceId: namespaceID.String(), TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{ Namespace: request.Namespace, - WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: request.ScheduleId}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: workflowID}, Reason: "terminated by DeleteSchedule", Identity: request.Identity, }, @@ -3601,8 +3612,10 @@ func (wh *WorkflowHandler) ListSchedules(ctx context.Context, request *workflows info := wh.decodeScheduleListInfo(searchAttributes) searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes) memo := wh.cleanScheduleMemo(ex.GetMemo()) + workflowID := ex.GetExecution().GetWorkflowId() + scheduleID := strings.TrimPrefix(workflowID, scheduler.WorkflowIDPrefix) schedules[i] = &schedpb.ScheduleListEntry{ - ScheduleId: ex.GetExecution().GetWorkflowId(), + ScheduleId: scheduleID, Memo: memo, SearchAttributes: searchAttributes, Info: info, diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 15c255df330..b6d928e7899 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -51,6 +51,9 @@ import ( ) const ( + // Schedules are implemented by a workflow whose ID is this string plus the schedule ID. + WorkflowIDPrefix = "temporal-sys-scheduler:" + // This is an example of a timestamp that's appended to the workflow // id, used for validation in the frontend. AppendedTimestampForValidation = "-2009-11-10T23:00:00Z"