diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 726671a019f..f399f3b0c54 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -112,6 +112,7 @@ var ( errExecutionNotSet = &types.BadRequestError{Message: "Execution is not set on request."} errWorkflowIDNotSet = &types.BadRequestError{Message: "WorkflowId is not set on request."} errActivityIDNotSet = &types.BadRequestError{Message: "ActivityID is not set on request."} + errSignalNameNotSet = &types.BadRequestError{Message: "SignalName is not set on request."} errInvalidRunID = &types.BadRequestError{Message: "Invalid RunId."} errInvalidNextPageToken = &types.BadRequestError{Message: "Invalid NextPageToken."} errNextPageTokenRunIDMismatch = &types.BadRequestError{Message: "RunID in the request does not match the NextPageToken."} @@ -459,18 +460,20 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, wh.error(errRequestNotSet, scope) } + domainName := pollRequest.GetDomain() + tags := getDomainWfIDRunIDTags(domainName, nil) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) + } + wh.GetLogger().Debug("Received PollForActivityTask") if err := common.ValidateLongPollContextTimeout( ctx, "PollForActivityTask", wh.GetThrottledLogger(), ); err != nil { - return nil, wh.error(err, scope) - } - - domainName := pollRequest.GetDomain() - if domainName == "" { - return nil, wh.error(errDomainNotSet, scope) + return nil, wh.error(err, scope, tags...) } idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit() @@ -480,11 +483,11 @@ func (wh *WorkflowHandler) PollForActivityTask( idLengthWarnLimit, wh.config.DomainNameMaxLength(domainName), metrics.CadenceErrDomainNameExceededWarnLimit) { - return nil, wh.error(errDomainTooLong, scope) + return nil, wh.error(errDomainTooLong, scope, tags...) } if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil { - return nil, err + return nil, wh.error(err, scope, tags...) } if !common.ValidIDLength( @@ -493,12 +496,12 @@ func (wh *WorkflowHandler) PollForActivityTask( idLengthWarnLimit, wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { - return nil, wh.error(errIdentityTooLong, scope) + return nil, wh.error(errIdentityTooLong, scope, tags...) } domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } pollerID := uuid.New() @@ -525,7 +528,7 @@ func (wh *WorkflowHandler) PollForActivityTask( tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()), tag.Value(ctxTimeout), tag.Error(err)) - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } } return resp, nil @@ -538,7 +541,6 @@ func (wh *WorkflowHandler) PollForDecisionTask( ) (resp *types.PollForDecisionTaskResponse, retError error) { defer log.CapturePanic(wh.GetLogger(), &retError) - tagsForErrorLog := []tag.Tag{tag.WorkflowDomainName(pollRequest.GetDomain())} callTime := time.Now() scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendPollForDecisionTaskScope, pollRequest) @@ -549,11 +551,18 @@ func (wh *WorkflowHandler) PollForDecisionTask( } if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, wh.error(err, scope, tagsForErrorLog...) + return nil, wh.error(err, scope) } if pollRequest == nil { - return nil, wh.error(errRequestNotSet, scope, tagsForErrorLog...) + return nil, wh.error(errRequestNotSet, scope) + } + + domainName := pollRequest.GetDomain() + tags := getDomainWfIDRunIDTags(domainName, nil) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) } wh.GetLogger().Debug("Received PollForDecisionTask") @@ -562,12 +571,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( "PollForDecisionTask", wh.GetThrottledLogger(), ); err != nil { - return nil, wh.error(err, scope, tagsForErrorLog...) - } - - domainName := pollRequest.GetDomain() - if domainName == "" { - return nil, wh.error(errDomainNotSet, scope, tagsForErrorLog...) + return nil, wh.error(err, scope, tags...) } idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit() @@ -577,7 +581,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( idLengthWarnLimit, wh.config.DomainNameMaxLength(domainName), metrics.CadenceErrDomainNameExceededWarnLimit) { - return nil, wh.error(errDomainTooLong, scope, tagsForErrorLog...) + return nil, wh.error(errDomainTooLong, scope, tags...) } if !common.ValidIDLength( @@ -586,22 +590,22 @@ func (wh *WorkflowHandler) PollForDecisionTask( idLengthWarnLimit, wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { - return nil, wh.error(errIdentityTooLong, scope, tagsForErrorLog...) + return nil, wh.error(errIdentityTooLong, scope, tags...) } if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil { - return nil, err + return nil, wh.error(err, scope, tags...) } domainEntry, err := wh.GetDomainCache().GetDomain(domainName) if err != nil { - return nil, wh.error(err, scope, tagsForErrorLog...) + return nil, wh.error(err, scope, tags...) } domainID := domainEntry.GetInfo().ID wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID)) if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil { - return nil, wh.error(err, scope, tagsForErrorLog...) + return nil, wh.error(err, scope, tags...) } pollerID := uuid.New() @@ -636,12 +640,12 @@ func (wh *WorkflowHandler) PollForDecisionTask( return nil, nil } - tagsForErrorLog = append(tagsForErrorLog, []tag.Tag{tag.WorkflowID( + tags = append(tags, []tag.Tag{tag.WorkflowID( matchingResp.GetWorkflowExecution().GetWorkflowID()), tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...) resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken()) if err != nil { - return nil, wh.error(err, scope, tagsForErrorLog...) + return nil, wh.error(err, scope, tags...) } return resp, nil } @@ -694,6 +698,10 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( scope := wh.getDefaultScope(ctx, metrics.FrontendRecordActivityTaskHeartbeatScope) + if wh.isShuttingDown() { + return nil, errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, wh.error(err, scope) } @@ -702,9 +710,6 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( return nil, wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat") if heartbeatRequest.TaskToken == nil { return nil, wh.error(errTaskTokenNotSet, scope) @@ -722,18 +727,24 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( return nil, wh.error(err, scope) } + domainWrapper := domainWrapper{ + domain: domainName, + } scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRecordActivityTaskHeartbeatScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return nil, errShuttingDown - } + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) + + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: taskToken.WorkflowID, + RunID: taskToken.RunID, + }) sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -761,7 +772,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( FailedRequest: failRequest, }) if err != nil { - return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true} } else { @@ -770,7 +781,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat( HeartbeatRequest: heartbeatRequest, }) if err != nil { - return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } @@ -799,26 +810,37 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( return nil, wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) + domainName := heartbeatRequest.GetDomain() + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: heartbeatRequest.GetWorkflowID(), + RunID: heartbeatRequest.GetRunID(), + }) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) + } + + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(heartbeatRequest) wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID") - domainID, err := wh.GetDomainCache().GetDomainID(heartbeatRequest.GetDomain()) + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } workflowID := heartbeatRequest.GetWorkflowID() runID := heartbeatRequest.GetRunID() // runID is optional so can be empty activityID := heartbeatRequest.GetActivityID() if domainID == "" { - return nil, wh.error(errDomainNotSet, scope) + return nil, wh.error(errDomainNotSet, scope, tags...) } if workflowID == "" { - return nil, wh.error(errWorkflowIDNotSet, scope) + return nil, wh.error(errWorkflowIDNotSet, scope, tags...) } if activityID == "" { - return nil, wh.error(errActivityIDNotSet, scope) + return nil, wh.error(errActivityIDNotSet, scope, tags...) } taskToken := &common.TaskToken{ @@ -830,17 +852,9 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( } token, err := wh.tokenSerializer.Serialize(taskToken) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } - domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID) - if err != nil { - return nil, wh.error(err, scope) - } - - // add domain tag to scope, so further metrics will have the domain tag - scope = scope.Tagged(metrics.DomainTag(domainName)) - sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -867,7 +881,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( FailedRequest: failRequest, }) if err != nil { - return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true} } else { @@ -882,7 +896,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID( HeartbeatRequest: req, }) if err != nil { - return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } @@ -897,6 +911,11 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskCompletedScope) + + if wh.isShuttingDown() { + return errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return wh.error(err, scope) } @@ -905,9 +924,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - if completeRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) } @@ -924,26 +940,32 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( return wh.error(err, scope) } - if !common.ValidIDLength( - completeRequest.GetIdentity(), - scope, - wh.config.MaxIDLengthWarnLimit(), - wh.config.IdentityMaxLength(domainName), - metrics.CadenceErrIdentityExceededWarnLimit) { - return wh.error(errIdentityTooLong, scope) + domainWrapper := domainWrapper{ + domain: domainName, } - scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRespondActivityTaskCompletedScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return errShuttingDown + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) + + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: taskToken.WorkflowID, + RunID: taskToken.RunID, + }) + + if !common.ValidIDLength( + completeRequest.GetIdentity(), + scope, + wh.config.MaxIDLengthWarnLimit(), + wh.config.IdentityMaxLength(domainName), + metrics.CadenceErrIdentityExceededWarnLimit) { + return wh.error(errIdentityTooLong, scope, tags...) } sizeLimitError := wh.config.BlobSizeLimitError(domainName) @@ -972,7 +994,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( FailedRequest: failRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } else { err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{ @@ -980,7 +1002,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted( CompleteRequest: completeRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } @@ -1009,32 +1031,43 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) + domainName := completeRequest.GetDomain() + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: completeRequest.GetWorkflowID(), + RunID: completeRequest.GetRunID(), + }) + + if domainName == "" { + return wh.error(errDomainNotSet, scope, tags...) + } + + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(completeRequest) - domainID, err := wh.GetDomainCache().GetDomainID(completeRequest.GetDomain()) + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return wh.error(err, scope) + return wh.error(err, scope, tags...) } workflowID := completeRequest.GetWorkflowID() runID := completeRequest.GetRunID() // runID is optional so can be empty activityID := completeRequest.GetActivityID() if domainID == "" { - return wh.error(errDomainNotSet, scope) + return wh.error(errDomainNotSet, scope, tags...) } if workflowID == "" { - return wh.error(errWorkflowIDNotSet, scope) + return wh.error(errWorkflowIDNotSet, scope, tags...) } if activityID == "" { - return wh.error(errActivityIDNotSet, scope) + return wh.error(errActivityIDNotSet, scope, tags...) } if !common.ValidIDLength( completeRequest.GetIdentity(), scope, wh.config.MaxIDLengthWarnLimit(), - wh.config.IdentityMaxLength(completeRequest.GetDomain()), + wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { return wh.error(errIdentityTooLong, scope) } @@ -1051,14 +1084,6 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( return wh.error(err, scope) } - domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID) - if err != nil { - return wh.error(err, scope) - } - - // add domain tag to scope, so further metrics will have the domain tag - scope = scope.Tagged(metrics.DomainTag(domainName)) - sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -1085,7 +1110,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( FailedRequest: failRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } else { req := &types.RespondActivityTaskCompletedRequest{ @@ -1099,7 +1124,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID( CompleteRequest: req, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } @@ -1114,6 +1139,11 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskFailedScope) + + if wh.isShuttingDown() { + return errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return wh.error(err, scope) } @@ -1122,9 +1152,6 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - if failedRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) } @@ -1141,18 +1168,24 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( return wh.error(err, scope) } + domainWrapper := domainWrapper{ + domain: domainName, + } scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRespondActivityTaskFailedScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return errShuttingDown - } + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) + + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: taskToken.WorkflowID, + RunID: taskToken.RunID, + }) if !common.ValidIDLength( failedRequest.GetIdentity(), @@ -1160,7 +1193,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( wh.config.MaxIDLengthWarnLimit(), wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { - return wh.error(errIdentityTooLong, scope) + return wh.error(errIdentityTooLong, scope, tags...) } sizeLimitError := wh.config.BlobSizeLimitError(domainName) @@ -1187,7 +1220,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed( FailedRequest: failedRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return nil } @@ -1214,25 +1247,36 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) + domainName := failedRequest.GetDomain() + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: failedRequest.GetWorkflowID(), + RunID: failedRequest.GetRunID(), + }) + + if domainName == "" { + return wh.error(errDomainNotSet, scope, tags...) + } + + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(failedRequest) - domainID, err := wh.GetDomainCache().GetDomainID(failedRequest.GetDomain()) + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return wh.error(err, scope) + return wh.error(err, scope, tags...) } workflowID := failedRequest.GetWorkflowID() runID := failedRequest.GetRunID() // runID is optional so can be empty activityID := failedRequest.GetActivityID() if domainID == "" { - return wh.error(errDomainNotSet, scope) + return wh.error(errDomainNotSet, scope, tags...) } if workflowID == "" { - return wh.error(errWorkflowIDNotSet, scope) + return wh.error(errWorkflowIDNotSet, scope, tags...) } if activityID == "" { - return wh.error(errActivityIDNotSet, scope) + return wh.error(errActivityIDNotSet, scope, tags...) } if !common.ValidIDLength( @@ -1241,7 +1285,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( wh.config.MaxIDLengthWarnLimit(), wh.config.IdentityMaxLength(failedRequest.GetDomain()), metrics.CadenceErrIdentityExceededWarnLimit) { - return wh.error(errIdentityTooLong, scope) + return wh.error(errIdentityTooLong, scope, tags...) } taskToken := &common.TaskToken{ @@ -1253,17 +1297,9 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( } token, err := wh.tokenSerializer.Serialize(taskToken) if err != nil { - return wh.error(err, scope) - } - - domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID) - if err != nil { - return wh.error(err, scope) + return wh.error(err, scope, tags...) } - // add domain tag to scope, so further metrics will have the domain tag - scope = scope.Tagged(metrics.DomainTag(domainName)) - sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -1295,7 +1331,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID( FailedRequest: req, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return nil } @@ -1308,6 +1344,11 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskCanceledScope) + + if wh.isShuttingDown() { + return errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return wh.error(err, scope) } @@ -1316,16 +1357,15 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - if cancelRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) } + taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken) if err != nil { return wh.error(err, scope) } + if taskToken.DomainID == "" { return wh.error(errDomainNotSet, scope) } @@ -1335,18 +1375,24 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( return wh.error(err, scope) } + domainWrapper := domainWrapper{ + domain: domainName, + } scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRespondActivityTaskCanceledScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return errShuttingDown - } + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) + + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: taskToken.WorkflowID, + RunID: taskToken.RunID, + }) if !common.ValidIDLength( cancelRequest.GetIdentity(), @@ -1354,7 +1400,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( wh.config.MaxIDLengthWarnLimit(), wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { - return wh.error(errIdentityTooLong, scope) + return wh.error(errIdentityTooLong, scope, tags...) } sizeLimitError := wh.config.BlobSizeLimitError(domainName) @@ -1383,7 +1429,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( FailedRequest: failRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } else { err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{ @@ -1391,7 +1437,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled( CancelRequest: cancelRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } @@ -1420,25 +1466,36 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) + domainName := cancelRequest.GetDomain() + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: cancelRequest.GetWorkflowID(), + RunID: cancelRequest.GetRunID(), + }) - domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain()) + if domainName == "" { + return wh.error(errDomainNotSet, scope, tags...) + } + + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(cancelRequest) + + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return wh.error(err, scope) + return wh.error(err, scope, tags...) } workflowID := cancelRequest.GetWorkflowID() runID := cancelRequest.GetRunID() // runID is optional so can be empty activityID := cancelRequest.GetActivityID() if domainID == "" { - return wh.error(errDomainNotSet, scope) + return wh.error(errDomainNotSet, scope, tags...) } if workflowID == "" { - return wh.error(errWorkflowIDNotSet, scope) + return wh.error(errWorkflowIDNotSet, scope, tags...) } if activityID == "" { - return wh.error(errActivityIDNotSet, scope) + return wh.error(errActivityIDNotSet, scope, tags...) } if !common.ValidIDLength( @@ -1447,7 +1504,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( wh.config.MaxIDLengthWarnLimit(), wh.config.IdentityMaxLength(cancelRequest.GetDomain()), metrics.CadenceErrIdentityExceededWarnLimit) { - return wh.error(errIdentityTooLong, scope) + return wh.error(errIdentityTooLong, scope, tags...) } taskToken := &common.TaskToken{ @@ -1459,17 +1516,9 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( } token, err := wh.tokenSerializer.Serialize(taskToken) if err != nil { - return wh.error(err, scope) + return wh.error(err, scope, tags...) } - domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID) - if err != nil { - return wh.error(err, scope) - } - - // add domain tag to scope, so further metrics will have the domain tag - scope = scope.Tagged(metrics.DomainTag(domainName)) - sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -1496,7 +1545,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( FailedRequest: failRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } else { req := &types.RespondActivityTaskCanceledRequest{ @@ -1510,7 +1559,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID( CancelRequest: req, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } } @@ -1525,6 +1574,11 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(ctx, metrics.FrontendRespondDecisionTaskCompletedScope) + + if wh.isShuttingDown() { + return nil, errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return nil, wh.error(err, scope) } @@ -1533,9 +1587,6 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( return nil, wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - if completeRequest.TaskToken == nil { return nil, wh.error(errTaskTokenNotSet, scope) } @@ -1552,18 +1603,24 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( return nil, wh.error(err, scope) } + domainWrapper := domainWrapper{ + domain: domainName, + } scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRespondDecisionTaskCompletedScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return nil, errShuttingDown - } + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) + + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: taskToken.WorkflowID, + RunID: taskToken.RunID, + }) if !common.ValidIDLength( completeRequest.GetIdentity(), @@ -1571,7 +1628,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( wh.config.MaxIDLengthWarnLimit(), wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { - return nil, wh.error(errIdentityTooLong, scope) + return nil, wh.error(errIdentityTooLong, scope, tags...) } if err := common.CheckDecisionResultLimit( @@ -1586,7 +1643,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( CompleteRequest: completeRequest}, ) if err != nil { - return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } completedResp := &types.RespondDecisionTaskCompletedResponse{} @@ -1608,7 +1665,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken()) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } completedResp.DecisionTask = newDecisionTask } @@ -1624,6 +1681,11 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(ctx, metrics.FrontendRespondDecisionTaskFailedScope) + + if wh.isShuttingDown() { + return errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return wh.error(err, scope) } @@ -1632,9 +1694,6 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - if failedRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) } @@ -1651,18 +1710,24 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( return wh.error(err, scope) } + domainWrapper := domainWrapper{ + domain: domainName, + } scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRespondDecisionTaskFailedScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return errShuttingDown - } + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) + + tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{ + WorkflowID: taskToken.WorkflowID, + RunID: taskToken.RunID, + }) if !common.ValidIDLength( failedRequest.GetIdentity(), @@ -1670,7 +1735,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( wh.config.MaxIDLengthWarnLimit(), wh.config.IdentityMaxLength(domainName), metrics.CadenceErrIdentityExceededWarnLimit) { - return wh.error(errIdentityTooLong, scope) + return wh.error(errIdentityTooLong, scope, tags...) } sizeLimitError := wh.config.BlobSizeLimitError(domainName) @@ -1696,7 +1761,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed( FailedRequest: failedRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return nil } @@ -1709,6 +1774,11 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( defer log.CapturePanic(wh.GetLogger(), &retError) scope := wh.getDefaultScope(ctx, metrics.FrontendRespondQueryTaskCompletedScope) + + if wh.isShuttingDown() { + return errShuttingDown + } + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { return wh.error(err, scope) } @@ -1717,9 +1787,6 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( return wh.error(errRequestNotSet, scope) } - // Count the request in the RPS, but we still accept it even if RPS is exceeded - wh.allow(nil) - if completeRequest.TaskToken == nil { return wh.error(errTaskTokenNotSet, scope) } @@ -1736,18 +1803,19 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( return wh.error(err, scope) } + domainWrapper := domainWrapper{ + domain: domainName, + } scope, sw := wh.startRequestProfileWithDomain( ctx, metrics.FrontendRespondQueryTaskCompletedScope, - domainWrapper{ - domain: domainName, - }, + domainWrapper, ) defer sw.Stop() - if wh.isShuttingDown() { - return errShuttingDown - } + // Count the request in the RPS towards the requested domain, + // but we still accept it even if RPS is exceeded + wh.allow(domainWrapper) sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) @@ -1813,13 +1881,18 @@ func (wh *WorkflowHandler) StartWorkflowExecution( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(startRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) + domainName := startRequest.GetDomain() + wfExecution := &types.WorkflowExecution{ + WorkflowID: startRequest.GetWorkflowID(), } + tags := getDomainWfIDRunIDTags(domainName, wfExecution) - domainName := startRequest.GetDomain() if domainName == "" { - return nil, wh.error(errDomainNotSet, scope) + return nil, wh.error(errDomainNotSet, scope, tags...) + } + + if ok := wh.allow(startRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope, tags...) } idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit() @@ -1829,11 +1902,11 @@ func (wh *WorkflowHandler) StartWorkflowExecution( idLengthWarnLimit, wh.config.DomainNameMaxLength(domainName), metrics.CadenceErrDomainNameExceededWarnLimit) { - return nil, wh.error(errDomainTooLong, scope) + return nil, wh.error(errDomainTooLong, scope, tags...) } if startRequest.GetWorkflowID() == "" { - return nil, wh.error(errWorkflowIDNotSet, scope) + return nil, wh.error(errWorkflowIDNotSet, scope, tags...) } if !common.ValidIDLength( @@ -1842,15 +1915,15 @@ func (wh *WorkflowHandler) StartWorkflowExecution( idLengthWarnLimit, wh.config.WorkflowIDMaxLength(domainName), metrics.CadenceErrWorkflowIDExceededWarnLimit) { - return nil, wh.error(errWorkflowIDTooLong, scope) + return nil, wh.error(errWorkflowIDTooLong, scope, tags...) } if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } if err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } wh.GetLogger().Debug( @@ -1858,7 +1931,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution( tag.WorkflowID(startRequest.GetWorkflowID())) if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" { - return nil, wh.error(errWorkflowTypeNotSet, scope) + return nil, wh.error(errWorkflowTypeNotSet, scope, tags...) } if !common.ValidIDLength( @@ -1867,27 +1940,27 @@ func (wh *WorkflowHandler) StartWorkflowExecution( idLengthWarnLimit, wh.config.WorkflowTypeMaxLength(domainName), metrics.CadenceErrWorkflowTypeExceededWarnLimit) { - return nil, wh.error(errWorkflowTypeTooLong, scope) + return nil, wh.error(errWorkflowTypeTooLong, scope, tags...) } if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil { - return nil, err + return nil, wh.error(err, scope, tags...) } if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 { - return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope) + return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope, tags...) } if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 { - return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope) + return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope, tags...) } if startRequest.GetDelayStartSeconds() < 0 { - return nil, wh.error(errInvalidDelayStartSeconds, scope) + return nil, wh.error(errInvalidDelayStartSeconds, scope, tags...) } if startRequest.GetRequestID() == "" { - return nil, wh.error(errRequestIDNotSet, scope) + return nil, wh.error(errRequestIDNotSet, scope, tags...) } if !common.ValidIDLength( @@ -1896,22 +1969,19 @@ func (wh *WorkflowHandler) StartWorkflowExecution( idLengthWarnLimit, wh.config.RequestIDMaxLength(domainName), metrics.CadenceErrRequestIDExceededWarnLimit) { - return nil, wh.error(errRequestIDTooLong, scope) + return nil, wh.error(errRequestIDTooLong, scope, tags...) } if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName)) domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } - // add domain tag to scope, so further metrics will have the domain tag - scope = scope.Tagged(metrics.DomainTag(domainName)) - sizeLimitError := wh.config.BlobSizeLimitError(domainName) sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) actualSize := len(startRequest.Input) @@ -1929,7 +1999,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution( wh.GetThrottledLogger(), tag.BlobSizeViolationOperation("StartWorkflowExecution"), ); err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID)) @@ -1938,7 +2008,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution( resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } return resp, nil } @@ -1956,31 +2026,34 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( if wh.isShuttingDown() { return nil, errShuttingDown } - wfExecution := getRequest.GetExecution() if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope) } if getRequest == nil { - return nil, wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errRequestNotSet, scope) + } + + domainName := getRequest.GetDomain() + wfExecution := getRequest.GetExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) } if ok := wh.allow(getRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(createServiceBusyError(), scope, tags...) } - domainName := getRequest.GetDomain() - if domainName == "" { - return nil, wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if err := validateExecution(wfExecution); err != nil { + return nil, wh.error(err, scope, tags...) } + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) - } - - if err := wh.validateExecutionAndEmitMetrics(getRequest.Execution, scope); err != nil { - return nil, err + return nil, wh.error(err, scope, tags...) } if getRequest.GetMaximumPageSize() <= 0 { @@ -2000,7 +2073,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled() historyArchived := wh.historyArchived(ctx, getRequest, domainID) if enableArchivalRead && historyArchived { - return wh.getArchivedHistory(ctx, getRequest, domainID, scope) + return wh.getArchivedHistory(ctx, getRequest, domainID, scope, tags...) } } @@ -2052,10 +2125,10 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( if getRequest.NextPageToken != nil { token, err = deserializeHistoryToken(getRequest.NextPageToken) if err != nil { - return nil, wh.error(errInvalidNextPageToken, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errInvalidNextPageToken, scope, tags...) } if execution.RunID != "" && execution.GetRunID() != token.RunID { - return nil, wh.error(errNextPageTokenRunIDMismatch, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errNextPageTokenRunIDMismatch, scope, tags...) } execution.RunID = token.RunID @@ -2078,7 +2151,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err = queryHistory(domainID, execution, queryNextEventID, token.BranchToken) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } token.FirstEventID = token.NextEventID token.NextEventID = nextEventID @@ -2091,7 +2164,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err = queryHistory(domainID, execution, queryNextEventID, nil) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } execution.RunID = runID @@ -2151,7 +2224,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( if isCloseEventOnly { if !isWorkflowRunning { if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } if isRawHistoryEnabled { // since getHistory func will not return empty history, so the below is safe @@ -2177,7 +2250,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( } } else { if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } // here, for long pull on history events, we need to intercept the paging token from cassandra // and do something clever @@ -2190,7 +2263,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory( nextToken, err := serializeHistoryToken(token) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } return &types.GetWorkflowExecutionHistoryResponse{ History: history, @@ -2216,23 +2289,32 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendSignalWorkflowExecutionScope, signalRequest) defer sw.Stop() - wfExecution := signalRequest.GetWorkflowExecution() + if wh.isShuttingDown() { + return errShuttingDown + } if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope) } if signalRequest == nil { - return wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errRequestNotSet, scope) + } + + domainName := signalRequest.GetDomain() + wfExecution := signalRequest.GetWorkflowExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return wh.error(errDomainNotSet, scope, tags...) } if ok := wh.allow(signalRequest); !ok { - return wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(createServiceBusyError(), scope, tags...) } - domainName := signalRequest.GetDomain() - if domainName == "" { - return wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if err := validateExecution(wfExecution); err != nil { + return wh.error(err, scope, tags...) } idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit() @@ -2242,16 +2324,11 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( idLengthWarnLimit, wh.config.DomainNameMaxLength(domainName), metrics.CadenceErrDomainNameExceededWarnLimit) { - return wh.error(errDomainTooLong, scope, getWfIDRunIDTags(wfExecution)...) - } - - if err := wh.validateExecutionAndEmitMetrics(signalRequest.WorkflowExecution, scope); err != nil { - return err + return wh.error(errDomainTooLong, scope, tags...) } if signalRequest.GetSignalName() == "" { - return wh.error(&types.BadRequestError{Message: "SignalName is not set on request."}, - scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errSignalNameNotSet, scope, tags...) } if !common.ValidIDLength( @@ -2260,7 +2337,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( idLengthWarnLimit, wh.config.SignalNameMaxLength(domainName), metrics.CadenceErrSignalNameExceededWarnLimit) { - return wh.error(errSignalNameTooLong, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errSignalNameTooLong, scope, tags...) } if !common.ValidIDLength( @@ -2269,12 +2346,12 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( idLengthWarnLimit, wh.config.RequestIDMaxLength(domainName), metrics.CadenceErrRequestIDExceededWarnLimit) { - return wh.error(errRequestIDTooLong, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errRequestIDTooLong, scope, tags...) } domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope, tags...) } sizeLimitError := wh.config.BlobSizeLimitError(domainName) @@ -2290,7 +2367,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( wh.GetThrottledLogger(), tag.BlobSizeViolationOperation("SignalWorkflowExecution"), ); err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope, tags...) } err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{ @@ -2298,7 +2375,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution( SignalRequest: signalRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return nil @@ -2322,25 +2399,30 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( return nil, errShuttingDown } - wfExecution := &types.WorkflowExecution{ - WorkflowID: signalWithStartRequest.WorkflowID, - } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope) } if signalWithStartRequest == nil { - return nil, wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(signalWithStartRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) + domainName := signalWithStartRequest.GetDomain() + wfExecution := &types.WorkflowExecution{ + WorkflowID: signalWithStartRequest.GetWorkflowID(), } + tags := getDomainWfIDRunIDTags(domainName, wfExecution) - domainName := signalWithStartRequest.GetDomain() if domainName == "" { - return nil, wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errDomainNotSet, scope, tags...) + } + + if ok := wh.allow(signalWithStartRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope, tags...) + } + + if signalWithStartRequest.GetWorkflowID() == "" { + return nil, wh.error(errWorkflowIDNotSet, scope, tags...) } idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit() @@ -2350,12 +2432,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( idLengthWarnLimit, wh.config.DomainNameMaxLength(domainName), metrics.CadenceErrDomainNameExceededWarnLimit) { - return nil, wh.error(errDomainTooLong, scope, getWfIDRunIDTags(wfExecution)...) - } - - if signalWithStartRequest.GetWorkflowID() == "" { - return nil, wh.error(&types.BadRequestError{Message: "WorkflowId is not set on request."}, - scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errDomainTooLong, scope, tags...) } if !common.ValidIDLength( @@ -2364,12 +2441,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( idLengthWarnLimit, wh.config.WorkflowIDMaxLength(domainName), metrics.CadenceErrWorkflowIDExceededWarnLimit) { - return nil, wh.error(errWorkflowIDTooLong, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errWorkflowIDTooLong, scope, tags...) } if signalWithStartRequest.GetSignalName() == "" { - return nil, wh.error(&types.BadRequestError{Message: "SignalName is not set on request."}, - scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errSignalNameNotSet, scope, tags...) } if !common.ValidIDLength( @@ -2378,12 +2454,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( idLengthWarnLimit, wh.config.SignalNameMaxLength(domainName), metrics.CadenceErrSignalNameExceededWarnLimit) { - return nil, wh.error(errSignalNameTooLong, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errSignalNameTooLong, scope, tags...) } if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" { - return nil, wh.error(&types.BadRequestError{Message: "WorkflowType is not set on request."}, - scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errWorkflowTypeNotSet, scope, tags...) } if !common.ValidIDLength( @@ -2392,11 +2467,11 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( idLengthWarnLimit, wh.config.WorkflowTypeMaxLength(domainName), metrics.CadenceErrWorkflowTypeExceededWarnLimit) { - return nil, wh.error(errWorkflowTypeTooLong, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errWorkflowTypeTooLong, scope, tags...) } if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil { - return nil, err + return nil, wh.error(err, scope, tags...) } if !common.ValidIDLength( @@ -2405,36 +2480,32 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( idLengthWarnLimit, wh.config.RequestIDMaxLength(domainName), metrics.CadenceErrRequestIDExceededWarnLimit) { - return nil, wh.error(errRequestIDTooLong, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errRequestIDTooLong, scope, tags...) } if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 { - return nil, wh.error(&types.BadRequestError{ - Message: "A valid ExecutionStartToCloseTimeoutSeconds is not set on request."}, - scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope, tags...) } if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 { - return nil, wh.error(&types.BadRequestError{ - Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}, - scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope, tags...) } if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } if err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } sizeLimitError := wh.config.BlobSizeLimitError(domainName) @@ -2450,7 +2521,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( wh.GetThrottledLogger(), tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"), ); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields()) if err := common.CheckEventBlobSizeLimit( @@ -2464,7 +2535,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( wh.GetThrottledLogger(), tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"), ); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{ @@ -2472,7 +2543,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution( SignalWithStartRequest: signalWithStartRequest, }) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } return resp, nil @@ -2493,31 +2564,33 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution( return errShuttingDown } - wfExecution := terminateRequest.GetWorkflowExecution() - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope) } if terminateRequest == nil { - return wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errRequestNotSet, scope) } - if ok := wh.allow(terminateRequest); !ok { - return wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) - } + domainName := terminateRequest.GetDomain() + wfExecution := terminateRequest.GetWorkflowExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) if terminateRequest.GetDomain() == "" { - return wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errDomainNotSet, scope, tags...) } - if err := wh.validateExecutionAndEmitMetrics(terminateRequest.WorkflowExecution, scope); err != nil { - return err + if ok := wh.allow(terminateRequest); !ok { + return wh.error(createServiceBusyError(), scope, tags...) + } + + if err := validateExecution(wfExecution); err != nil { + return wh.error(err, scope, tags...) } - domainID, err := wh.GetDomainCache().GetDomainID(terminateRequest.GetDomain()) + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope, tags...) } err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{ @@ -2525,7 +2598,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution( TerminateRequest: terminateRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return nil @@ -2542,35 +2615,37 @@ func (wh *WorkflowHandler) ResetWorkflowExecution( scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendResetWorkflowExecutionScope, resetRequest) defer sw.Stop() - wfExecution := resetRequest.GetWorkflowExecution() - if wh.isShuttingDown() { return nil, errShuttingDown } if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope) } if resetRequest == nil { - return nil, wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(resetRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) + domainName := resetRequest.GetDomain() + wfExecution := resetRequest.GetWorkflowExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) } - if resetRequest.GetDomain() == "" { - return nil, wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if ok := wh.allow(resetRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope, tags...) } - if err := wh.validateExecutionAndEmitMetrics(resetRequest.WorkflowExecution, scope); err != nil { - return nil, err + if err := validateExecution(wfExecution); err != nil { + return nil, wh.error(err, scope, tags...) } domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain()) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{ @@ -2578,7 +2653,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution( ResetRequest: resetRequest, }) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } return resp, nil @@ -2594,35 +2669,37 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution( scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRequestCancelWorkflowExecutionScope, cancelRequest) defer sw.Stop() - wfExecution := cancelRequest.GetWorkflowExecution() - if wh.isShuttingDown() { return errShuttingDown } if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope) } if cancelRequest == nil { - return wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(errRequestNotSet, scope) } - if ok := wh.allow(cancelRequest); !ok { - return wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) + domainName := cancelRequest.GetDomain() + wfExecution := cancelRequest.GetWorkflowExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return wh.error(errDomainNotSet, scope, tags...) } - if cancelRequest.GetDomain() == "" { - return wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if ok := wh.allow(cancelRequest); !ok { + return wh.error(createServiceBusyError(), scope, tags...) } - if err := wh.validateExecutionAndEmitMetrics(cancelRequest.WorkflowExecution, scope); err != nil { - return err + if err := validateExecution(wfExecution); err != nil { + return wh.error(err, scope, tags...) } domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain()) if err != nil { - return wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return wh.error(err, scope, tags...) } err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{ @@ -2630,7 +2707,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution( CancelRequest: cancelRequest, }) if err != nil { - return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...)) + return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return nil @@ -2658,14 +2735,14 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(listRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if listRequest.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(listRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + if listRequest.StartTimeFilter == nil { return nil, wh.error(&types.BadRequestError{Message: "StartTimeFilter is required"}, scope) } @@ -2775,14 +2852,14 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(listRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if listRequest.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(listRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + if listRequest.GetPageSize() <= 0 { listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) } @@ -2867,14 +2944,14 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(listRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if listRequest.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(listRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + if listRequest.StartTimeFilter == nil { return nil, wh.error(&types.BadRequestError{Message: "StartTimeFilter is required"}, scope) } @@ -3007,14 +3084,14 @@ func (wh *WorkflowHandler) ListWorkflowExecutions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(listRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if listRequest.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(listRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + if listRequest.GetPageSize() <= 0 { listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) } @@ -3075,14 +3152,14 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(listRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if listRequest.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(listRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + if listRequest.GetPageSize() <= 0 { listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())) } @@ -3143,14 +3220,14 @@ func (wh *WorkflowHandler) CountWorkflowExecutions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(countRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if countRequest.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(countRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery()) if err != nil { return nil, wh.error(err, scope) @@ -3222,17 +3299,21 @@ func (wh *WorkflowHandler) ResetStickyTaskList( return nil, wh.error(errRequestNotSet, scope) } - if resetRequest.GetDomain() == "" { - return nil, wh.error(errDomainNotSet, scope) + domainName := resetRequest.GetDomain() + wfExecution := resetRequest.GetExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) } - if err := wh.validateExecutionAndEmitMetrics(resetRequest.Execution, scope); err != nil { - return nil, err + if err := validateExecution(wfExecution); err != nil { + return nil, wh.error(err, scope, tags...) } domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain()) if err != nil { - return nil, wh.error(err, scope) + return nil, wh.error(err, scope, tags...) } _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{ @@ -3240,7 +3321,7 @@ func (wh *WorkflowHandler) ResetStickyTaskList( Execution: resetRequest.Execution, }) if err != nil { - return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope)) + return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...)) } return &types.ResetStickyTaskListResponse{}, nil } @@ -3255,50 +3336,53 @@ func (wh *WorkflowHandler) QueryWorkflow( scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendQueryWorkflowScope, queryRequest) defer sw.Stop() - wfExecution := queryRequest.GetExecution() - if wh.isShuttingDown() { return nil, errShuttingDown } - if wh.config.DisallowQuery(queryRequest.GetDomain()) { - return nil, wh.error(errQueryDisallowedForDomain, scope, getWfIDRunIDTags(wfExecution)...) + if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { + return nil, wh.error(err, scope) } - if ok := wh.allow(queryRequest); !ok { - return nil, wh.error(createServiceBusyError(), scope) + if queryRequest == nil { + return nil, wh.error(errRequestNotSet, scope) } - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + domainName := queryRequest.GetDomain() + wfExecution := queryRequest.GetExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) } - if queryRequest == nil { - return nil, wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if ok := wh.allow(queryRequest); !ok { + return nil, wh.error(createServiceBusyError(), scope, tags...) } - if queryRequest.GetDomain() == "" { - return nil, wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if err := validateExecution(wfExecution); err != nil { + return nil, wh.error(err, scope, tags...) } - if err := wh.validateExecutionAndEmitMetrics(queryRequest.Execution, scope); err != nil { - return nil, err + + if wh.config.DisallowQuery(domainName) { + return nil, wh.error(errQueryDisallowedForDomain, scope, tags...) } if queryRequest.Query == nil { - return nil, wh.error(errQueryNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errQueryNotSet, scope, tags...) } if queryRequest.Query.GetQueryType() == "" { - return nil, wh.error(errQueryTypeNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errQueryTypeNotSet, scope, tags...) } - domainID, err := wh.GetDomainCache().GetDomainID(queryRequest.GetDomain()) + domainID, err := wh.GetDomainCache().GetDomainID(domainName) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } - sizeLimitError := wh.config.BlobSizeLimitError(queryRequest.GetDomain()) - sizeLimitWarn := wh.config.BlobSizeLimitWarn(queryRequest.GetDomain()) + sizeLimitError := wh.config.BlobSizeLimitError(domainName) + sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName) if err := common.CheckEventBlobSizeLimit( len(queryRequest.GetQuery().GetQueryArgs()), @@ -3310,7 +3394,7 @@ func (wh *WorkflowHandler) QueryWorkflow( scope, wh.GetThrottledLogger(), tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } req := &types.HistoryQueryWorkflowRequest{ @@ -3319,7 +3403,7 @@ func (wh *WorkflowHandler) QueryWorkflow( } hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } return hResponse.GetResponse(), nil } @@ -3338,30 +3422,33 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution( return nil, errShuttingDown } - wfExecution := request.GetExecution() - if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope) } if request == nil { - return nil, wh.error(errRequestNotSet, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errRequestNotSet, scope) + } + + domainName := request.GetDomain() + wfExecution := request.GetExecution() + tags := getDomainWfIDRunIDTags(domainName, wfExecution) + + if domainName == "" { + return nil, wh.error(errDomainNotSet, scope, tags...) } if ok := wh.allow(request); !ok { - return nil, wh.error(createServiceBusyError(), scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(createServiceBusyError(), scope, tags...) } - if request.GetDomain() == "" { - return nil, wh.error(errDomainNotSet, scope, getWfIDRunIDTags(wfExecution)...) + if err := validateExecution(wfExecution); err != nil { + return nil, wh.error(err, scope, tags...) } + domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain()) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) - } - - if err := wh.validateExecutionAndEmitMetrics(request.Execution, scope); err != nil { - return nil, err + return nil, wh.error(err, scope, tags...) } response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{ @@ -3370,7 +3457,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution( }) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } return response, nil @@ -3400,24 +3487,25 @@ func (wh *WorkflowHandler) DescribeTaskList( return nil, wh.error(errRequestNotSet, scope) } + if request.GetDomain() == "" { + return nil, wh.error(errDomainNotSet, scope) + } + if ok := wh.allow(request); !ok { return nil, wh.error(createServiceBusyError(), scope) } - if request.GetDomain() == "" { - return nil, wh.error(errDomainNotSet, scope) - } domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain()) if err != nil { return nil, wh.error(err, scope) } if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil { - return nil, err + return nil, wh.error(err, scope) } - if err := wh.validateTaskListType(request.TaskListType, scope); err != nil { - return nil, err + if request.TaskListType == nil { + return nil, wh.error(errTaskListTypeNotSet, scope) } response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{ @@ -3449,16 +3537,16 @@ func (wh *WorkflowHandler) ListTaskListPartitions( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(request); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if request.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(request); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil { - return nil, err + return nil, wh.error(err, scope) } resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{ @@ -3486,14 +3574,14 @@ func (wh *WorkflowHandler) GetTaskListsByDomain( return nil, wh.error(errRequestNotSet, scope) } - if ok := wh.allow(request); !ok { - return nil, wh.error(createServiceBusyError(), scope) - } - if request.GetDomain() == "" { return nil, wh.error(errDomainNotSet, scope) } + if ok := wh.allow(request); !ok { + return nil, wh.error(createServiceBusyError(), scope) + } + resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{ Domain: request.Domain, }) @@ -3721,30 +3809,24 @@ func (wh *WorkflowHandler) error(err error, scope metrics.Scope, tagsForErrorLog return err case *yarpcerrors.Status: if err.Code() == yarpcerrors.CodeDeadlineExceeded { + wh.GetLogger().WithTags(tagsForErrorLog...).Error("Frontend request timedout", tag.Error(err)) scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) return err } } if errors.Is(err, context.DeadlineExceeded) { + wh.GetLogger().WithTags(tagsForErrorLog...).Error("Frontend request timedout", tag.Error(err)) scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) return err } - wh.GetLogger().WithTags(tagsForErrorLog...).Error("Uncategorized error", - tag.Error(err)) + wh.GetLogger().WithTags(tagsForErrorLog...).Error("Uncategorized error", tag.Error(err)) scope.IncCounter(metrics.CadenceFailures) return frontendInternalServiceError("cadence internal uncategorized error, msg: %v", err.Error()) } -func (wh *WorkflowHandler) validateTaskListType(t *types.TaskListType, scope metrics.Scope) error { - if t == nil { - return wh.error(errTaskListTypeNotSet, scope) - } - return nil -} - func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error { if t == nil || t.GetName() == "" { - return wh.error(errTaskListNotSet, scope) + return errTaskListNotSet } if !common.ValidIDLength( @@ -3753,15 +3835,7 @@ func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Sco wh.config.MaxIDLengthWarnLimit(), wh.config.TaskListNameMaxLength(domain), metrics.CadenceErrTaskListNameExceededWarnLimit) { - return wh.error(errTaskListTooLong, scope) - } - return nil -} - -func (wh *WorkflowHandler) validateExecutionAndEmitMetrics(w *types.WorkflowExecution, scope metrics.Scope) error { - err := validateExecution(w) - if err != nil { - return wh.error(err, scope) + return errTaskListTooLong } return nil } @@ -4050,8 +4124,8 @@ func (wh *WorkflowHandler) getArchivedHistory( request *types.GetWorkflowExecutionHistoryRequest, domainID string, scope metrics.Scope, + tags ...tag.Tag, ) (*types.GetWorkflowExecutionHistoryResponse, error) { - wfExecution := request.GetExecution() entry, err := wh.GetDomainCache().GetDomainByID(domainID) if err != nil { return nil, wh.error(err, scope) @@ -4062,17 +4136,17 @@ func (wh *WorkflowHandler) getArchivedHistory( // if URI is empty, it means the domain has never enabled for archival. // the error is not "workflow has passed retention period", because // we have no way to tell if the requested workflow exists or not. - return nil, wh.error(errHistoryNotFound, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(errHistoryNotFound, scope, tags...) } URI, err := archiver.NewURI(URIString) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), common.FrontendServiceName) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{ @@ -4083,7 +4157,7 @@ func (wh *WorkflowHandler) getArchivedHistory( PageSize: int(request.GetMaximumPageSize()), }) if err != nil { - return nil, wh.error(err, scope, getWfIDRunIDTags(wfExecution)...) + return nil, wh.error(err, scope, tags...) } history := &types.History{} @@ -4174,14 +4248,19 @@ func (hs HealthStatus) String() string { } } -func getWfIDRunIDTags(wf *types.WorkflowExecution) []tag.Tag { +func getDomainWfIDRunIDTags( + domainName string, + wf *types.WorkflowExecution, +) []tag.Tag { + tags := []tag.Tag{tag.WorkflowDomainName(domainName)} if wf == nil { - return nil + return tags } - return []tag.Tag{ + return append( + tags, tag.WorkflowID(wf.GetWorkflowID()), tag.WorkflowRunID(wf.GetRunID()), - } + ) } func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error { diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 9e6506701cd..8d79232054a 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -140,18 +140,16 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl } func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() { - domain := "test-domain" - domainID := uuid.New() config := s.newConfig(dc.NewInMemoryClient()) config.DisableListVisibilityByFilter = dc.GetBoolPropertyFnFilteredByDomain(true) wh := s.getWorkflowHandler(config) - s.mockDomainCache.EXPECT().GetDomainID(gomock.Any()).Return(domainID, nil).AnyTimes() + s.mockDomainCache.EXPECT().GetDomainID(gomock.Any()).Return(s.testDomainID, nil).AnyTimes() // test list open by wid listRequest := &types.ListOpenWorkflowExecutionsRequest{ - Domain: domain, + Domain: s.testDomain, StartTimeFilter: &types.StartTimeFilter{ EarliestTime: common.Int64Ptr(0), LatestTime: common.Int64Ptr(time.Now().UnixNano()), @@ -175,7 +173,7 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() { // test list close by wid listRequest2 := &types.ListClosedWorkflowExecutionsRequest{ - Domain: domain, + Domain: s.testDomain, StartTimeFilter: &types.StartTimeFilter{ EarliestTime: common.Int64Ptr(0), LatestTime: common.Int64Ptr(time.Now().UnixNano()), @@ -211,22 +209,30 @@ func (s *workflowHandlerSuite) TestPollForTask_Failed_ContextTimeoutTooShort() { wh := s.getWorkflowHandler(config) bgCtx := context.Background() - _, err := wh.PollForDecisionTask(bgCtx, &types.PollForDecisionTaskRequest{}) + _, err := wh.PollForDecisionTask(bgCtx, &types.PollForDecisionTaskRequest{ + Domain: s.testDomain, + }) s.Error(err) s.Equal(common.ErrContextTimeoutNotSet, err) - _, err = wh.PollForActivityTask(bgCtx, &types.PollForActivityTaskRequest{}) + _, err = wh.PollForActivityTask(bgCtx, &types.PollForActivityTaskRequest{ + Domain: s.testDomain, + }) s.Error(err) s.Equal(common.ErrContextTimeoutNotSet, err) shortCtx, cancel := context.WithTimeout(bgCtx, common.MinLongPollTimeout-time.Millisecond) defer cancel() - _, err = wh.PollForDecisionTask(shortCtx, &types.PollForDecisionTaskRequest{}) + _, err = wh.PollForDecisionTask(shortCtx, &types.PollForDecisionTaskRequest{ + Domain: s.testDomain, + }) s.Error(err) s.Equal(common.ErrContextTimeoutTooShort, err) - _, err = wh.PollForActivityTask(shortCtx, &types.PollForActivityTaskRequest{}) + _, err = wh.PollForActivityTask(shortCtx, &types.PollForActivityTaskRequest{ + Domain: s.testDomain, + }) s.Error(err) s.Equal(common.ErrContextTimeoutTooShort, err) } @@ -237,7 +243,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowID: "workflow-id", WorkflowType: &types.WorkflowType{ Name: "workflow-type", @@ -266,7 +272,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_BadDelayStartSe wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowID: "workflow-id", WorkflowType: &types.WorkflowType{ Name: "workflow-type", @@ -336,7 +342,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowIdNotSe wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowType: &types.WorkflowType{ Name: "workflow-type", }, @@ -365,7 +371,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowTypeNot wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowID: "workflow-id", WorkflowType: &types.WorkflowType{ Name: "", @@ -395,7 +401,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_TaskListNotSet( wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowID: "workflow-id", WorkflowType: &types.WorkflowType{ Name: "workflow-type", @@ -425,7 +431,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidExecutio wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowID: "workflow-id", WorkflowType: &types.WorkflowType{ Name: "workflow-type", @@ -455,7 +461,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidTaskStar wh := s.getWorkflowHandler(config) startWorkflowExecutionRequest := &types.StartWorkflowExecutionRequest{ - Domain: "test-domain", + Domain: s.testDomain, WorkflowID: "workflow-id", WorkflowType: &types.WorkflowType{ Name: "workflow-type", @@ -619,7 +625,7 @@ func (s *workflowHandlerSuite) TestDescribeDomain_Success_ArchivalDisabled() { wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient())) req := &types.DescribeDomainRequest{ - Name: common.StringPtr("test-domain"), + Name: common.StringPtr(s.testDomain), } result, err := wh.DescribeDomain(context.Background(), req) @@ -642,7 +648,7 @@ func (s *workflowHandlerSuite) TestDescribeDomain_Success_ArchivalEnabled() { wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient())) req := &types.DescribeDomainRequest{ - Name: common.StringPtr("test-domain"), + Name: common.StringPtr(s.testDomain), } result, err := wh.DescribeDomain(context.Background(), req) @@ -884,12 +890,12 @@ func (s *workflowHandlerSuite) TestHistoryArchived() { wh := s.getWorkflowHandler(s.newConfig(dc.NewInMemoryClient())) getHistoryRequest := &types.GetWorkflowExecutionHistoryRequest{} - s.False(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain")) + s.False(wh.historyArchived(context.Background(), getHistoryRequest, s.testDomain)) getHistoryRequest = &types.GetWorkflowExecutionHistoryRequest{ Execution: &types.WorkflowExecution{}, } - s.False(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain")) + s.False(wh.historyArchived(context.Background(), getHistoryRequest, s.testDomain)) s.mockHistoryClient.EXPECT().GetMutableState(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1) getHistoryRequest = &types.GetWorkflowExecutionHistoryRequest{ @@ -898,7 +904,7 @@ func (s *workflowHandlerSuite) TestHistoryArchived() { RunID: testRunID, }, } - s.False(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain")) + s.False(wh.historyArchived(context.Background(), getHistoryRequest, s.testDomain)) s.mockHistoryClient.EXPECT().GetMutableState(gomock.Any(), gomock.Any()).Return(nil, &types.EntityNotExistsError{Message: "got archival indication error"}).Times(1) getHistoryRequest = &types.GetWorkflowExecutionHistoryRequest{ @@ -907,7 +913,7 @@ func (s *workflowHandlerSuite) TestHistoryArchived() { RunID: testRunID, }, } - s.True(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain")) + s.True(wh.historyArchived(context.Background(), getHistoryRequest, s.testDomain)) s.mockHistoryClient.EXPECT().GetMutableState(gomock.Any(), gomock.Any()).Return(nil, errors.New("got non-archival indication error")).Times(1) getHistoryRequest = &types.GetWorkflowExecutionHistoryRequest{ @@ -916,7 +922,7 @@ func (s *workflowHandlerSuite) TestHistoryArchived() { RunID: testRunID, }, } - s.False(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain")) + s.False(wh.historyArchived(context.Background(), getHistoryRequest, s.testDomain)) } func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_DomainCacheEntryError() { @@ -931,7 +937,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_DomainCacheEntryEr func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_ArchivalURIEmpty() { domainEntry := cache.NewLocalDomainCacheEntryForTest( - &persistence.DomainInfo{Name: "test-domain"}, + &persistence.DomainInfo{Name: s.testDomain}, &persistence.DomainConfig{ HistoryArchivalStatus: types.ArchivalStatusDisabled, HistoryArchivalURI: "", @@ -951,7 +957,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_ArchivalURIEmpty() func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidURI() { domainEntry := cache.NewLocalDomainCacheEntryForTest( - &persistence.DomainInfo{Name: "test-domain"}, + &persistence.DomainInfo{Name: s.testDomain}, &persistence.DomainConfig{ HistoryArchivalStatus: types.ArchivalStatusEnabled, HistoryArchivalURI: "uri without scheme", @@ -971,7 +977,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidURI() { func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() { domainEntry := cache.NewLocalDomainCacheEntryForTest( - &persistence.DomainInfo{Name: "test-domain"}, + &persistence.DomainInfo{Name: s.testDomain}, &persistence.DomainConfig{ HistoryArchivalStatus: types.ArchivalStatusEnabled, HistoryArchivalURI: testHistoryArchivalURI, @@ -1103,7 +1109,7 @@ func (s *workflowHandlerSuite) TestListArchivedVisibility_Failure_DomainNotConfi func (s *workflowHandlerSuite) TestListArchivedVisibility_Failure_InvalidURI() { s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewLocalDomainCacheEntryForTest( - &persistence.DomainInfo{Name: "test-domain"}, + &persistence.DomainInfo{Name: s.testDomain}, &persistence.DomainConfig{ VisibilityArchivalStatus: types.ArchivalStatusDisabled, VisibilityArchivalURI: "uri without scheme", @@ -1122,7 +1128,7 @@ func (s *workflowHandlerSuite) TestListArchivedVisibility_Failure_InvalidURI() { func (s *workflowHandlerSuite) TestListArchivedVisibility_Success() { s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.NewLocalDomainCacheEntryForTest( - &persistence.DomainInfo{Name: "test-domain"}, + &persistence.DomainInfo{Name: s.testDomain}, &persistence.DomainConfig{ VisibilityArchivalStatus: types.ArchivalStatusEnabled, VisibilityArchivalURI: testVisibilityArchivalURI,