diff --git a/client/clientfactory.go b/client/clientfactory.go index 77a8323418c..d2e949a2973 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -30,6 +30,7 @@ import ( "time" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -147,7 +148,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( clientCache := common.NewClientCache(keyResolver, clientProvider) client := history.NewClient(cf.numberOfHistoryShards, timeout, clientCache, cf.logger) if cf.metricsClient != nil { - client = history.NewMetricClient(client, cf.metricsClient) + client = history.NewMetricClient(client, cf.metricsClient, cf.logger) } return client, nil } @@ -175,7 +176,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout( ) if cf.metricsClient != nil { - client = matching.NewMetricClient(client, cf.metricsClient) + client = matching.NewMetricClient(client, cf.metricsClient, cf.logger) } return client, nil diff --git a/client/history/metricClient.go b/client/history/metricClient.go index 9cc8ae429f1..30fc768073a 100644 --- a/client/history/metricClient.go +++ b/client/history/metricClient.go @@ -30,6 +30,8 @@ import ( "google.golang.org/grpc" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" ) @@ -38,37 +40,39 @@ var _ historyservice.HistoryServiceClient = (*metricClient)(nil) type metricClient struct { client historyservice.HistoryServiceClient metricsClient metrics.Client + logger log.Logger } // NewMetricClient creates a new instance of historyservice.HistoryServiceClient that emits metrics -func NewMetricClient(client historyservice.HistoryServiceClient, metricsClient metrics.Client) historyservice.HistoryServiceClient { +func NewMetricClient( + client historyservice.HistoryServiceClient, + metricsClient metrics.Client, + logger log.Logger, +) historyservice.HistoryServiceClient { return &metricClient{ client: client, metricsClient: metricsClient, + logger: logger, } } func (c *metricClient) StartWorkflowExecution( context context.Context, request *historyservice.StartWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.StartWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientStartWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.StartWorkflowExecution(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.StartWorkflowExecutionResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientStartWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.StartWorkflowExecution(context, request, opts...) } func (c *metricClient) DescribeHistoryHost( context context.Context, request *historyservice.DescribeHistoryHostRequest, - opts ...grpc.CallOption) (*historyservice.DescribeHistoryHostResponse, error) { + opts ...grpc.CallOption, +) (_ *historyservice.DescribeHistoryHostResponse, retError error) { resp, err := c.client.DescribeHistoryHost(context, request, opts...) return resp, err @@ -77,7 +81,8 @@ func (c *metricClient) DescribeHistoryHost( func (c *metricClient) RemoveTask( context context.Context, request *historyservice.RemoveTaskRequest, - opts ...grpc.CallOption) (*historyservice.RemoveTaskResponse, error) { + opts ...grpc.CallOption, +) (_ *historyservice.RemoveTaskResponse, retError error) { resp, err := c.client.RemoveTask(context, request, opts...) return resp, err @@ -86,7 +91,8 @@ func (c *metricClient) RemoveTask( func (c *metricClient) CloseShard( context context.Context, request *historyservice.CloseShardRequest, - opts ...grpc.CallOption) (*historyservice.CloseShardResponse, error) { + opts ...grpc.CallOption, +) (_ *historyservice.CloseShardResponse, retError error) { resp, err := c.client.CloseShard(context, request, opts...) return resp, err @@ -95,7 +101,8 @@ func (c *metricClient) CloseShard( func (c *metricClient) GetShard( context context.Context, request *historyservice.GetShardRequest, - opts ...grpc.CallOption) (*historyservice.GetShardResponse, error) { + opts ...grpc.CallOption, +) (_ *historyservice.GetShardResponse, retError error) { resp, err := c.client.GetShard(context, request, opts...) return resp, err @@ -104,7 +111,8 @@ func (c *metricClient) GetShard( func (c *metricClient) DescribeMutableState( context context.Context, request *historyservice.DescribeMutableStateRequest, - opts ...grpc.CallOption) (*historyservice.DescribeMutableStateResponse, error) { + opts ...grpc.CallOption, +) (_ *historyservice.DescribeMutableStateResponse, retError error) { resp, err := c.client.DescribeMutableState(context, request, opts...) return resp, err @@ -113,557 +121,416 @@ func (c *metricClient) DescribeMutableState( func (c *metricClient) GetMutableState( context context.Context, request *historyservice.GetMutableStateRequest, - opts ...grpc.CallOption) (*historyservice.GetMutableStateResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientGetMutableStateScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientGetMutableStateScope, metrics.ClientLatency) - resp, err := c.client.GetMutableState(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.GetMutableStateResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientGetMutableStateScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientGetMutableStateScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.GetMutableState(context, request, opts...) } func (c *metricClient) PollMutableState( context context.Context, request *historyservice.PollMutableStateRequest, - opts ...grpc.CallOption) (*historyservice.PollMutableStateResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientPollMutableStateScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientPollMutableStateScope, metrics.ClientLatency) - resp, err := c.client.PollMutableState(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.PollMutableStateResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientPollMutableStateScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientPollMutableStateScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.PollMutableState(context, request, opts...) } func (c *metricClient) ResetStickyTaskQueue( context context.Context, request *historyservice.ResetStickyTaskQueueRequest, - opts ...grpc.CallOption) (*historyservice.ResetStickyTaskQueueResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientResetStickyTaskQueueScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientResetStickyTaskQueueScope, metrics.ClientLatency) - resp, err := c.client.ResetStickyTaskQueue(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.ResetStickyTaskQueueResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientResetStickyTaskQueueScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientResetStickyTaskQueueScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.ResetStickyTaskQueue(context, request, opts...) } func (c *metricClient) DescribeWorkflowExecution( context context.Context, request *historyservice.DescribeWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.DescribeWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.DescribeWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.DescribeWorkflowExecution(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientDescribeWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.ClientFailures) - } - - return resp, err + return c.client.DescribeWorkflowExecution(context, request, opts...) } func (c *metricClient) RecordWorkflowTaskStarted( context context.Context, request *historyservice.RecordWorkflowTaskStartedRequest, - opts ...grpc.CallOption) (*historyservice.RecordWorkflowTaskStartedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRecordWorkflowTaskStartedScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.RecordWorkflowTaskStartedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordWorkflowTaskStartedScope, metrics.ClientLatency) - resp, err := c.client.RecordWorkflowTaskStarted(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRecordWorkflowTaskStartedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRecordWorkflowTaskStartedScope, metrics.ClientFailures) - } - - return resp, err + return c.client.RecordWorkflowTaskStarted(context, request, opts...) } func (c *metricClient) RecordActivityTaskStarted( context context.Context, request *historyservice.RecordActivityTaskStartedRequest, - opts ...grpc.CallOption) (*historyservice.RecordActivityTaskStartedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.ClientLatency) - resp, err := c.client.RecordActivityTaskStarted(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RecordActivityTaskStartedResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRecordActivityTaskStartedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RecordActivityTaskStarted(context, request, opts...) } func (c *metricClient) RespondWorkflowTaskCompleted( context context.Context, request *historyservice.RespondWorkflowTaskCompletedRequest, - opts ...grpc.CallOption) (*historyservice.RespondWorkflowTaskCompletedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRespondWorkflowTaskCompletedScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.RespondWorkflowTaskCompletedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondWorkflowTaskCompletedScope, metrics.ClientLatency) - response, err := c.client.RespondWorkflowTaskCompleted(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRespondWorkflowTaskCompletedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRespondWorkflowTaskCompletedScope, metrics.ClientFailures) - } - - return response, err + return c.client.RespondWorkflowTaskCompleted(context, request, opts...) } func (c *metricClient) RespondWorkflowTaskFailed( context context.Context, request *historyservice.RespondWorkflowTaskFailedRequest, - opts ...grpc.CallOption) (*historyservice.RespondWorkflowTaskFailedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRespondWorkflowTaskFailedScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondWorkflowTaskFailedScope, metrics.ClientLatency) - resp, err := c.client.RespondWorkflowTaskFailed(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RespondWorkflowTaskFailedResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRespondWorkflowTaskFailedScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRespondWorkflowTaskFailedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RespondWorkflowTaskFailed(context, request, opts...) } func (c *metricClient) RespondActivityTaskCompleted( context context.Context, request *historyservice.RespondActivityTaskCompletedRequest, - opts ...grpc.CallOption) (*historyservice.RespondActivityTaskCompletedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskCompleted(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RespondActivityTaskCompletedResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRespondActivityTaskCompletedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RespondActivityTaskCompleted(context, request, opts...) } func (c *metricClient) RespondActivityTaskFailed( context context.Context, request *historyservice.RespondActivityTaskFailedRequest, - opts ...grpc.CallOption) (*historyservice.RespondActivityTaskFailedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskFailed(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RespondActivityTaskFailedResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRespondActivityTaskFailedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RespondActivityTaskFailed(context, request, opts...) } func (c *metricClient) RespondActivityTaskCanceled( context context.Context, request *historyservice.RespondActivityTaskCanceledRequest, - opts ...grpc.CallOption) (*historyservice.RespondActivityTaskCanceledResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskCanceled(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RespondActivityTaskCanceledResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRespondActivityTaskCanceledScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RespondActivityTaskCanceled(context, request, opts...) } func (c *metricClient) RecordActivityTaskHeartbeat( context context.Context, request *historyservice.RecordActivityTaskHeartbeatRequest, - opts ...grpc.CallOption) (*historyservice.RecordActivityTaskHeartbeatResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.ClientLatency) - resp, err := c.client.RecordActivityTaskHeartbeat(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RecordActivityTaskHeartbeatResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRecordActivityTaskHeartbeatScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RecordActivityTaskHeartbeat(context, request, opts...) } func (c *metricClient) RequestCancelWorkflowExecution( context context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.RequestCancelWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.RequestCancelWorkflowExecution(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RequestCancelWorkflowExecutionResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRequestCancelWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RequestCancelWorkflowExecution(context, request, opts...) } func (c *metricClient) SignalWorkflowExecution( context context.Context, request *historyservice.SignalWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.SignalWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.SignalWorkflowExecution(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.SignalWorkflowExecutionResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientSignalWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.SignalWorkflowExecution(context, request, opts...) } func (c *metricClient) SignalWithStartWorkflowExecution( context context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.SignalWithStartWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientSignalWithStartWorkflowExecutionScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientSignalWithStartWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.SignalWithStartWorkflowExecution(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.SignalWithStartWorkflowExecutionResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientSignalWithStartWorkflowExecutionScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientSignalWithStartWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.SignalWithStartWorkflowExecution(context, request, opts...) } func (c *metricClient) RemoveSignalMutableState( context context.Context, request *historyservice.RemoveSignalMutableStateRequest, - opts ...grpc.CallOption) (*historyservice.RemoveSignalMutableStateResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRemoveSignalMutableStateScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientRemoveSignalMutableStateScope, metrics.ClientLatency) - resp, err := c.client.RemoveSignalMutableState(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.RemoveSignalMutableStateResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRemoveSignalMutableStateScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRemoveSignalMutableStateScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.RemoveSignalMutableState(context, request, opts...) } func (c *metricClient) TerminateWorkflowExecution( context context.Context, request *historyservice.TerminateWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.TerminateWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.TerminateWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.TerminateWorkflowExecution(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientTerminateWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.ClientFailures) - } - - return resp, err + return c.client.TerminateWorkflowExecution(context, request, opts...) } func (c *metricClient) ResetWorkflowExecution( context context.Context, request *historyservice.ResetWorkflowExecutionRequest, - opts ...grpc.CallOption) (*historyservice.ResetWorkflowExecutionResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientResetWorkflowExecutionScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.ResetWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientResetWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.ResetWorkflowExecution(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientResetWorkflowExecutionScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientResetWorkflowExecutionScope, metrics.ClientFailures) - } - - return resp, err + return c.client.ResetWorkflowExecution(context, request, opts...) } func (c *metricClient) ScheduleWorkflowTask( context context.Context, request *historyservice.ScheduleWorkflowTaskRequest, - opts ...grpc.CallOption) (*historyservice.ScheduleWorkflowTaskResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientScheduleWorkflowTaskScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.ScheduleWorkflowTaskResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientScheduleWorkflowTaskScope, metrics.ClientLatency) - resp, err := c.client.ScheduleWorkflowTask(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientScheduleWorkflowTaskScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientScheduleWorkflowTaskScope, metrics.ClientFailures) - } - - return resp, err + return c.client.ScheduleWorkflowTask(context, request, opts...) } func (c *metricClient) RecordChildExecutionCompleted( context context.Context, request *historyservice.RecordChildExecutionCompletedRequest, - opts ...grpc.CallOption) (*historyservice.RecordChildExecutionCompletedResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.RecordChildExecutionCompletedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.ClientLatency) - resp, err := c.client.RecordChildExecutionCompleted(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRecordChildExecutionCompletedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.ClientFailures) - } - - return resp, err + return c.client.RecordChildExecutionCompleted(context, request, opts...) } func (c *metricClient) ReplicateEventsV2( context context.Context, request *historyservice.ReplicateEventsV2Request, - opts ...grpc.CallOption) (*historyservice.ReplicateEventsV2Response, error) { - c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsV2Scope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.ReplicateEventsV2Response, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientReplicateEventsV2Scope, metrics.ClientLatency) - resp, err := c.client.ReplicateEventsV2(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientReplicateEventsV2Scope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsV2Scope, metrics.ClientFailures) - } - - return resp, err + return c.client.ReplicateEventsV2(context, request, opts...) } func (c *metricClient) SyncShardStatus( context context.Context, request *historyservice.SyncShardStatusRequest, - opts ...grpc.CallOption) (*historyservice.SyncShardStatusResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientSyncShardStatusScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.SyncShardStatusResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientSyncShardStatusScope, metrics.ClientLatency) - resp, err := c.client.SyncShardStatus(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientSyncShardStatusScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientSyncShardStatusScope, metrics.ClientFailures) - } - - return resp, err + return c.client.SyncShardStatus(context, request, opts...) } func (c *metricClient) SyncActivity( context context.Context, request *historyservice.SyncActivityRequest, - opts ...grpc.CallOption) (*historyservice.SyncActivityResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientSyncActivityScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *historyservice.SyncActivityResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.HistoryClientSyncActivityScope, metrics.ClientLatency) - resp, err := c.client.SyncActivity(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientSyncActivityScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientSyncActivityScope, metrics.ClientFailures) - } - - return resp, err + return c.client.SyncActivity(context, request, opts...) } func (c *metricClient) GetReplicationMessages( context context.Context, request *historyservice.GetReplicationMessagesRequest, - opts ...grpc.CallOption) (*historyservice.GetReplicationMessagesResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientGetReplicationTasksScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientGetReplicationTasksScope, metrics.ClientLatency) - resp, err := c.client.GetReplicationMessages(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.GetReplicationMessagesResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientGetReplicationTasksScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientGetReplicationTasksScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.GetReplicationMessages(context, request, opts...) } func (c *metricClient) GetDLQReplicationMessages( context context.Context, request *historyservice.GetDLQReplicationMessagesRequest, - opts ...grpc.CallOption) (*historyservice.GetDLQReplicationMessagesResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientGetDLQReplicationTasksScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientGetDLQReplicationTasksScope, metrics.ClientLatency) - resp, err := c.client.GetDLQReplicationMessages(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.GetDLQReplicationMessagesResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientGetDLQReplicationTasksScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientGetDLQReplicationTasksScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.GetDLQReplicationMessages(context, request, opts...) } func (c *metricClient) QueryWorkflow( context context.Context, request *historyservice.QueryWorkflowRequest, - opts ...grpc.CallOption) (*historyservice.QueryWorkflowResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientQueryWorkflowScope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.HistoryClientQueryWorkflowScope, metrics.ClientLatency) - resp, err := c.client.QueryWorkflow(context, request, opts...) - sw.Stop() + opts ...grpc.CallOption, +) (_ *historyservice.QueryWorkflowResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientQueryWorkflowScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientQueryWorkflowScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.QueryWorkflow(context, request, opts...) } func (c *metricClient) ReapplyEvents( context context.Context, request *historyservice.ReapplyEventsRequest, - opts ...grpc.CallOption) (*historyservice.ReapplyEventsResponse, error) { + opts ...grpc.CallOption, +) (_ *historyservice.ReapplyEventsResponse, retError error) { - c.metricsClient.IncCounter(metrics.HistoryClientReapplyEventsScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientReapplyEventsScope, metrics.ClientLatency) - resp, err := c.client.ReapplyEvents(context, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientReapplyEventsScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientReapplyEventsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ReapplyEvents(context, request, opts...) } func (c *metricClient) GetDLQMessages( ctx context.Context, request *historyservice.GetDLQMessagesRequest, opts ...grpc.CallOption, -) (*historyservice.GetDLQMessagesResponse, error) { +) (_ *historyservice.GetDLQMessagesResponse, retError error) { - c.metricsClient.IncCounter(metrics.HistoryClientGetDLQMessagesScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientGetDLQMessagesScope, metrics.ClientLatency) - resp, err := c.client.GetDLQMessages(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientGetDLQMessagesScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientGetDLQMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetDLQMessages(ctx, request, opts...) } func (c *metricClient) PurgeDLQMessages( ctx context.Context, request *historyservice.PurgeDLQMessagesRequest, opts ...grpc.CallOption, -) (*historyservice.PurgeDLQMessagesResponse, error) { +) (_ *historyservice.PurgeDLQMessagesResponse, retError error) { - c.metricsClient.IncCounter(metrics.HistoryClientPurgeDLQMessagesScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientPurgeDLQMessagesScope, metrics.ClientLatency) - resp, err := c.client.PurgeDLQMessages(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientPurgeDLQMessagesScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientPurgeDLQMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.PurgeDLQMessages(ctx, request, opts...) } func (c *metricClient) MergeDLQMessages( ctx context.Context, request *historyservice.MergeDLQMessagesRequest, opts ...grpc.CallOption, -) (*historyservice.MergeDLQMessagesResponse, error) { +) (_ *historyservice.MergeDLQMessagesResponse, retError error) { - c.metricsClient.IncCounter(metrics.HistoryClientMergeDLQMessagesScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientMergeDLQMessagesScope, metrics.ClientLatency) - resp, err := c.client.MergeDLQMessages(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientMergeDLQMessagesScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientMergeDLQMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.MergeDLQMessages(ctx, request, opts...) } func (c *metricClient) RefreshWorkflowTasks( ctx context.Context, request *historyservice.RefreshWorkflowTasksRequest, opts ...grpc.CallOption, -) (*historyservice.RefreshWorkflowTasksResponse, error) { +) (_ *historyservice.RefreshWorkflowTasksResponse, retError error) { - c.metricsClient.IncCounter(metrics.HistoryClientRefreshWorkflowTasksScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientRefreshWorkflowTasksScope, metrics.ClientLatency) - resp, err := c.client.RefreshWorkflowTasks(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientRefreshWorkflowTasksScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientRefreshWorkflowTasksScope, metrics.ClientFailures) - } - return resp, err + return c.client.RefreshWorkflowTasks(ctx, request, opts...) } func (c *metricClient) GenerateLastHistoryReplicationTasks( ctx context.Context, request *historyservice.GenerateLastHistoryReplicationTasksRequest, opts ...grpc.CallOption, -) (*historyservice.GenerateLastHistoryReplicationTasksResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientGenerateLastHistoryReplicationTasksScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientGenerateLastHistoryReplicationTasksScope, metrics.ClientLatency) - resp, err := c.client.GenerateLastHistoryReplicationTasks(ctx, request, opts...) - sw.Stop() +) (_ *historyservice.GenerateLastHistoryReplicationTasksResponse, retError error) { - if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientGenerateLastHistoryReplicationTasksScope, metrics.ClientFailures) - } - return resp, err + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientGenerateLastHistoryReplicationTasksScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) + + return c.client.GenerateLastHistoryReplicationTasks(ctx, request, opts...) } func (c *metricClient) GetReplicationStatus( ctx context.Context, request *historyservice.GetReplicationStatusRequest, opts ...grpc.CallOption, -) (*historyservice.GetReplicationStatusResponse, error) { - c.metricsClient.IncCounter(metrics.HistoryClientGetReplicationStatusScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.HistoryClientGetReplicationStatusScope, metrics.ClientLatency) - resp, err := c.client.GetReplicationStatus(ctx, request, opts...) - sw.Stop() +) (_ *historyservice.GetReplicationStatusResponse, retError error) { + + scope, stopwatch := c.startMetricsRecording(metrics.HistoryClientGetReplicationStatusScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) + return c.client.GetReplicationStatus(ctx, request, opts...) +} + +func (c *metricClient) startMetricsRecording( + metricScope int, +) (metrics.Scope, metrics.Stopwatch) { + scope := c.metricsClient.Scope(metricScope) + scope.IncCounter(metrics.ClientRequests) + stopwatch := scope.StartTimer(metrics.ClientLatency) + return scope, stopwatch +} + +func (c *metricClient) finishMetricsRecording( + scope metrics.Scope, + stopwatch metrics.Stopwatch, + err error, +) { if err != nil { - c.metricsClient.IncCounter(metrics.HistoryClientGetReplicationStatusScope, metrics.ClientFailures) + c.logger.Error("history client encountered error", tag.Error(err)) + scope.IncCounter(metrics.ClientFailures) } - return resp, err + stopwatch.Stop() } diff --git a/client/matching/metricClient.go b/client/matching/metricClient.go index 8fad682e79d..d9315db1475 100644 --- a/client/matching/metricClient.go +++ b/client/matching/metricClient.go @@ -32,6 +32,8 @@ import ( "google.golang.org/grpc" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" ) @@ -40,223 +42,201 @@ var _ matchingservice.MatchingServiceClient = (*metricClient)(nil) type metricClient struct { client matchingservice.MatchingServiceClient metricsClient metrics.Client + logger log.Logger } // NewMetricClient creates a new instance of matchingservice.MatchingServiceClient that emits metrics -func NewMetricClient(client matchingservice.MatchingServiceClient, metricsClient metrics.Client) matchingservice.MatchingServiceClient { +func NewMetricClient( + client matchingservice.MatchingServiceClient, + metricsClient metrics.Client, + logger log.Logger, +) matchingservice.MatchingServiceClient { return &metricClient{ client: client, metricsClient: metricsClient, + logger: logger, } } func (c *metricClient) AddActivityTask( ctx context.Context, request *matchingservice.AddActivityTaskRequest, - opts ...grpc.CallOption) (*matchingservice.AddActivityTaskResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.AddActivityTaskResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientAddActivityTaskScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.MatchingClientAddActivityTaskScope, metrics.ClientLatency) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientAddActivityTaskScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) c.emitForwardedSourceStats( - metrics.MatchingClientAddActivityTaskScope, + scope, request.GetForwardedSource(), request.TaskQueue, ) - resp, err := c.client.AddActivityTask(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientAddActivityTaskScope, metrics.ClientFailures) - } - - return resp, err + return c.client.AddActivityTask(ctx, request, opts...) } func (c *metricClient) AddWorkflowTask( ctx context.Context, request *matchingservice.AddWorkflowTaskRequest, - opts ...grpc.CallOption) (*matchingservice.AddWorkflowTaskResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.AddWorkflowTaskResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientAddWorkflowTaskScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.MatchingClientAddWorkflowTaskScope, metrics.ClientLatency) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientAddWorkflowTaskScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) c.emitForwardedSourceStats( - metrics.MatchingClientAddWorkflowTaskScope, + scope, request.GetForwardedSource(), request.TaskQueue, ) - resp, err := c.client.AddWorkflowTask(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientAddWorkflowTaskScope, metrics.ClientFailures) - } - - return resp, err + return c.client.AddWorkflowTask(ctx, request, opts...) } func (c *metricClient) PollActivityTaskQueue( ctx context.Context, request *matchingservice.PollActivityTaskQueueRequest, - opts ...grpc.CallOption) (*matchingservice.PollActivityTaskQueueResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.PollActivityTaskQueueResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientPollActivityTaskQueueScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.MatchingClientPollActivityTaskQueueScope, metrics.ClientLatency) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientPollActivityTaskQueueScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) if request.PollRequest != nil { c.emitForwardedSourceStats( - metrics.MatchingClientPollActivityTaskQueueScope, + scope, request.GetForwardedSource(), request.PollRequest.TaskQueue, ) } - resp, err := c.client.PollActivityTaskQueue(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientPollActivityTaskQueueScope, metrics.ClientFailures) - } - - return resp, err + return c.client.PollActivityTaskQueue(ctx, request, opts...) } func (c *metricClient) PollWorkflowTaskQueue( ctx context.Context, request *matchingservice.PollWorkflowTaskQueueRequest, - opts ...grpc.CallOption) (*matchingservice.PollWorkflowTaskQueueResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.PollWorkflowTaskQueueResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientPollWorkflowTaskQueueScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.MatchingClientPollWorkflowTaskQueueScope, metrics.ClientLatency) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientPollWorkflowTaskQueueScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) if request.PollRequest != nil { c.emitForwardedSourceStats( - metrics.MatchingClientPollWorkflowTaskQueueScope, + scope, request.GetForwardedSource(), request.PollRequest.TaskQueue, ) } - resp, err := c.client.PollWorkflowTaskQueue(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientPollWorkflowTaskQueueScope, metrics.ClientFailures) - } - - return resp, err + return c.client.PollWorkflowTaskQueue(ctx, request, opts...) } func (c *metricClient) QueryWorkflow( ctx context.Context, request *matchingservice.QueryWorkflowRequest, - opts ...grpc.CallOption) (*matchingservice.QueryWorkflowResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.QueryWorkflowResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientQueryWorkflowScope, metrics.ClientRequests) - sw := c.metricsClient.StartTimer(metrics.MatchingClientQueryWorkflowScope, metrics.ClientLatency) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientQueryWorkflowScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) c.emitForwardedSourceStats( - metrics.MatchingClientQueryWorkflowScope, + scope, request.GetForwardedSource(), request.TaskQueue, ) - resp, err := c.client.QueryWorkflow(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientQueryWorkflowScope, metrics.ClientFailures) - } - - return resp, err + return c.client.QueryWorkflow(ctx, request, opts...) } func (c *metricClient) RespondQueryTaskCompleted( ctx context.Context, request *matchingservice.RespondQueryTaskCompletedRequest, - opts ...grpc.CallOption) (*matchingservice.RespondQueryTaskCompletedResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.RespondQueryTaskCompletedResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientRespondQueryTaskCompletedScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientRespondQueryTaskCompletedScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - sw := c.metricsClient.StartTimer(metrics.MatchingClientRespondQueryTaskCompletedScope, metrics.ClientLatency) - resp, err := c.client.RespondQueryTaskCompleted(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientRespondQueryTaskCompletedScope, metrics.ClientFailures) - } - - return resp, err + return c.client.RespondQueryTaskCompleted(ctx, request, opts...) } func (c *metricClient) CancelOutstandingPoll( ctx context.Context, request *matchingservice.CancelOutstandingPollRequest, - opts ...grpc.CallOption) (*matchingservice.CancelOutstandingPollResponse, error) { - - c.metricsClient.IncCounter(metrics.MatchingClientCancelOutstandingPollScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *matchingservice.CancelOutstandingPollResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.MatchingClientCancelOutstandingPollScope, metrics.ClientLatency) - resp, err := c.client.CancelOutstandingPoll(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientCancelOutstandingPollScope, metrics.ClientFailures) - } + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientCancelOutstandingPollScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - return resp, err + return c.client.CancelOutstandingPoll(ctx, request, opts...) } func (c *metricClient) DescribeTaskQueue( ctx context.Context, request *matchingservice.DescribeTaskQueueRequest, - opts ...grpc.CallOption) (*matchingservice.DescribeTaskQueueResponse, error) { + opts ...grpc.CallOption, +) (_ *matchingservice.DescribeTaskQueueResponse, retError error) { - c.metricsClient.IncCounter(metrics.MatchingClientDescribeTaskQueueScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientDescribeTaskQueueScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - sw := c.metricsClient.StartTimer(metrics.MatchingClientDescribeTaskQueueScope, metrics.ClientLatency) - resp, err := c.client.DescribeTaskQueue(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientDescribeTaskQueueScope, metrics.ClientFailures) - } - - return resp, err + return c.client.DescribeTaskQueue(ctx, request, opts...) } func (c *metricClient) ListTaskQueuePartitions( ctx context.Context, request *matchingservice.ListTaskQueuePartitionsRequest, - opts ...grpc.CallOption) (*matchingservice.ListTaskQueuePartitionsResponse, error) { - - c.metricsClient.IncCounter(metrics.MatchingClientListTaskQueuePartitionsScope, metrics.ClientRequests) + opts ...grpc.CallOption, +) (_ *matchingservice.ListTaskQueuePartitionsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.MatchingClientListTaskQueuePartitionsScope, metrics.ClientLatency) - resp, err := c.client.ListTaskQueuePartitions(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.MatchingClientListTaskQueuePartitionsScope) + defer c.finishMetricsRecording(scope, stopwatch, retError) - if err != nil { - c.metricsClient.IncCounter(metrics.MatchingClientListTaskQueuePartitionsScope, metrics.ClientFailures) - } - - return resp, err + return c.client.ListTaskQueuePartitions(ctx, request, opts...) } -func (c *metricClient) emitForwardedSourceStats(scope int, forwardedFrom string, taskQueue *taskqueuepb.TaskQueue) { +func (c *metricClient) emitForwardedSourceStats( + scope metrics.Scope, + forwardedFrom string, + taskQueue *taskqueuepb.TaskQueue, +) { if taskQueue == nil { return } + isChildPartition := strings.HasPrefix(taskQueue.GetName(), taskQueuePartitionPrefix) switch { case forwardedFrom != "": - c.metricsClient.IncCounter(scope, metrics.MatchingClientForwardedCounter) + scope.IncCounter(metrics.MatchingClientForwardedCounter) default: if isChildPartition { - c.metricsClient.IncCounter(scope, metrics.MatchingClientInvalidTaskQueueName) + scope.IncCounter(metrics.MatchingClientInvalidTaskQueueName) } } } + +func (c *metricClient) startMetricsRecording( + metricScope int, +) (metrics.Scope, metrics.Stopwatch) { + scope := c.metricsClient.Scope(metricScope) + scope.IncCounter(metrics.ClientRequests) + stopwatch := scope.StartTimer(metrics.ClientLatency) + return scope, stopwatch +} + +func (c *metricClient) finishMetricsRecording( + scope metrics.Scope, + stopwatch metrics.Stopwatch, + err error, +) { + if err != nil { + c.logger.Error("matching client encountered error", tag.Error(err)) + scope.IncCounter(metrics.ClientFailures) + } + stopwatch.Stop() +} diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index 942d67ce695..b2a7ddc4b42 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -110,32 +110,32 @@ var ( ) type ( - NamesapceRateBurstImpl struct { + NamespaceRateBurstImpl struct { namespaceName string rateFn dynamicconfig.FloatPropertyFnWithNamespaceFilter burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter } ) -var _ quotas.RateBurst = (*NamesapceRateBurstImpl)(nil) +var _ quotas.RateBurst = (*NamespaceRateBurstImpl)(nil) func NewNamespaceRateBurst( namespaceName string, rateFn dynamicconfig.FloatPropertyFnWithNamespaceFilter, burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter, -) *NamesapceRateBurstImpl { - return &NamesapceRateBurstImpl{ +) *NamespaceRateBurstImpl { + return &NamespaceRateBurstImpl{ namespaceName: namespaceName, rateFn: rateFn, burstFn: burstFn, } } -func (c *NamesapceRateBurstImpl) Rate() float64 { +func (c *NamespaceRateBurstImpl) Rate() float64 { return c.rateFn(c.namespaceName) } -func (c *NamesapceRateBurstImpl) Burst() int { +func (c *NamespaceRateBurstImpl) Burst() int { return c.burstFn(c.namespaceName) }