From 86b0ef6559ed811e50a2c220bae5a25c408a33ca Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 28 Nov 2022 11:33:11 -0800 Subject: [PATCH] Add num_pollers metric (#965) --- internal/common/metrics/constants.go | 9 ++- internal/common/metrics/tags.go | 7 +++ internal/internal_task_pollers.go | 84 ++++++++++++++++++++++++---- test/integration_test.go | 30 ++++++++++ 4 files changed, 117 insertions(+), 13 deletions(-) diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 04f3b3888..ff21902dd 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -62,6 +62,7 @@ const ( WorkerStartCounter = TemporalMetricsPrefix + "worker_start" WorkerTaskSlotsAvailable = TemporalMetricsPrefix + "worker_task_slots_available" PollerStartCounter = TemporalMetricsPrefix + "poller_start" + NumPoller = TemporalMetricsPrefix + "num_pollers" TemporalRequest = TemporalMetricsPrefix + "request" TemporalRequestFailure = TemporalRequest + "_failure" @@ -84,6 +85,7 @@ const ( const ( NamespaceTagName = "namespace" ClientTagName = "client_name" + PollerTypeTagName = "poller_type" WorkerTypeTagName = "worker_type" WorkflowTypeNameTagName = "workflow_type" ActivityTypeNameTagName = "activity_type" @@ -94,6 +96,9 @@ const ( // Metric tag values const ( - NoneTagValue = "none" - ClientTagValue = "temporal_go" + NoneTagValue = "none" + ClientTagValue = "temporal_go" + PollerTypeWorkflowTask = "workflow_task" + PollerTypeWorkflowStickyTask = "workflow_sticky_task" + PollerTypeActivityTask = "activity_task" ) diff --git a/internal/common/metrics/tags.go b/internal/common/metrics/tags.go index 3cc53358f..8e7e0a1de 100644 --- a/internal/common/metrics/tags.go +++ b/internal/common/metrics/tags.go @@ -80,3 +80,10 @@ func WorkerTags(workerType string) map[string]string { WorkerTypeTagName: workerType, } } + +// PollerTags returns a set of tags for pollers. +func PollerTags(pollerType string) map[string]string { + return map[string]string{ + PollerTypeTagName: pollerType, + } +} diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 848bc407e..544f6f31d 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -73,6 +73,13 @@ type ( stopC <-chan struct{} } + // numPollerMetric tracks the number of active pollers and publishes a metric on it. + numPollerMetric struct { + lock sync.Mutex + numPollers int32 + gauge metrics.Gauge + } + // workflowTaskPoller implements polling/processing a workflow task workflowTaskPoller struct { basePoller @@ -94,6 +101,9 @@ type ( requestLock sync.Mutex stickyCacheSize int eagerActivityExecutor *eagerActivityExecutor + + numNormalPollerMetric *numPollerMetric + numStickyPollerMetric *numPollerMetric } // activityTaskPoller implements polling/processing a workflow task @@ -106,6 +116,7 @@ type ( taskHandler ActivityTaskHandler logger log.Logger activitiesPerSecond float64 + numPollerMetric *numPollerMetric } historyIteratorImpl struct { @@ -149,6 +160,26 @@ type ( } ) +func newNumPollerMetric(metricsHandler metrics.Handler, pollerType string) *numPollerMetric { + return &numPollerMetric{ + gauge: metricsHandler.WithTags(metrics.PollerTags(pollerType)).Gauge(metrics.NumPoller), + } +} + +func (npm *numPollerMetric) increment() { + npm.lock.Lock() + defer npm.lock.Unlock() + npm.numPollers += 1 + npm.gauge.Update(float64(npm.numPollers)) +} + +func (npm *numPollerMetric) decrement() { + npm.lock.Lock() + defer npm.lock.Unlock() + npm.numPollers -= 1 + npm.gauge.Update(float64(npm.numPollers)) +} + func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel { return &localActivityTunnel{ taskCh: make(chan *localActivityTask, 100000), @@ -238,6 +269,8 @@ func newWorkflowTaskPoller( StickyScheduleToStartTimeout: params.StickyScheduleToStartTimeout, stickyCacheSize: params.cache.MaxWorkflowCacheSize(), eagerActivityExecutor: params.eagerActivityExecutor, + numNormalPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowTask), + numStickyPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowStickyTask), } } @@ -335,7 +368,6 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics( task *workflowservice.PollWorkflowTaskQueueResponse, startTime time.Time, ) (response *workflowservice.RespondWorkflowTaskCompletedResponse, err error) { - metricsHandler := wtp.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName())) if taskErr != nil { metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1) @@ -632,6 +664,19 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po } } +// Poll the workflow task queue and update the num_poller metric +func (wtp *workflowTaskPoller) pollWorkflowTaskQueue(ctx context.Context, request *workflowservice.PollWorkflowTaskQueueRequest) (*workflowservice.PollWorkflowTaskQueueResponse, error) { + if request.TaskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_NORMAL { + wtp.numNormalPollerMetric.increment() + defer wtp.numNormalPollerMetric.decrement() + } else { + wtp.numStickyPollerMetric.increment() + defer wtp.numStickyPollerMetric.decrement() + } + + return wtp.service.PollWorkflowTaskQueue(ctx, request) +} + // Poll for a single workflow task from the service func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) { traceLog(func() { @@ -641,7 +686,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) { request := wtp.getNextPollRequest() defer wtp.release(request.TaskQueue.GetKind()) - response, err := wtp.service.PollWorkflowTaskQueue(ctx, request) + response, err := wtp.pollWorkflowTaskQueue(ctx, request) if err != nil { wtp.updateBacklog(request.TaskQueue.GetKind(), 0) return nil, err @@ -784,9 +829,18 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv identity: params.Identity, logger: params.Logger, activitiesPerSecond: params.TaskQueueActivitiesPerSecond, + numPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeActivityTask), } } +// Poll the activity task queue and update the num_poller metric +func (atp *activityTaskPoller) pollActivityTaskQueue(ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest) (*workflowservice.PollActivityTaskQueueResponse, error) { + atp.numPollerMetric.increment() + defer atp.numPollerMetric.decrement() + + return atp.service.PollActivityTaskQueue(ctx, request) +} + // Poll for a single activity task from the service func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) { traceLog(func() { @@ -799,7 +853,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) { TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}}, } - response, err := atp.service.PollActivityTaskQueue(ctx, request) + response, err := atp.pollActivityTaskQueue(ctx, request) if err != nil { return nil, err } @@ -971,7 +1025,8 @@ func convertActivityResultToRespondRequest( TaskToken: taskToken, Result: result, Identity: identity, - Namespace: namespace} + Namespace: namespace, + } } // Only respond with canceled if allowed @@ -982,13 +1037,15 @@ func convertActivityResultToRespondRequest( TaskToken: taskToken, Details: convertErrDetailsToPayloads(canceledErr.details, dataConverter), Identity: identity, - Namespace: namespace} + Namespace: namespace, + } } if errors.Is(err, context.Canceled) { return &workflowservice.RespondActivityTaskCanceledRequest{ TaskToken: taskToken, Identity: identity, - Namespace: namespace} + Namespace: namespace, + } } } @@ -1002,7 +1059,8 @@ func convertActivityResultToRespondRequest( TaskToken: taskToken, Failure: failureConverter.ErrorToFailure(err), Identity: identity, - Namespace: namespace} + Namespace: namespace, + } } func convertActivityResultToRespondRequestByID( @@ -1030,7 +1088,8 @@ func convertActivityResultToRespondRequestByID( RunId: runID, ActivityId: activityID, Result: result, - Identity: identity} + Identity: identity, + } } // Only respond with canceled if allowed @@ -1043,7 +1102,8 @@ func convertActivityResultToRespondRequestByID( RunId: runID, ActivityId: activityID, Details: convertErrDetailsToPayloads(canceledErr.details, dataConverter), - Identity: identity} + Identity: identity, + } } if errors.Is(err, context.Canceled) { return &workflowservice.RespondActivityTaskCanceledByIdRequest{ @@ -1051,7 +1111,8 @@ func convertActivityResultToRespondRequestByID( WorkflowId: workflowID, RunId: runID, ActivityId: activityID, - Identity: identity} + Identity: identity, + } } } @@ -1067,5 +1128,6 @@ func convertActivityResultToRespondRequestByID( RunId: runID, ActivityId: activityID, Failure: failureConverter.ErrorToFailure(err), - Identity: identity} + Identity: identity, + } } diff --git a/test/integration_test.go b/test/integration_test.go index 17a1db57a..f3023904b 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1772,6 +1772,36 @@ func (ts *IntegrationTestSuite) waitForQueryTrue(run client.WorkflowRun, query s ts.True(result, "query didn't return true in reasonable amount of time") } +func (ts *IntegrationTestSuite) TestNumPollersCounter() { + _, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + assertNumPollersEventually := func(expected float64, pollerType string, tags ...string) { + // Try for two seconds + var lastCount float64 + for start := time.Now(); time.Since(start) <= 10*time.Second; { + lastCount = ts.metricGauge( + metrics.NumPoller, + "poller_type", pollerType, + "task_queue", ts.taskQueueName, + ) + if lastCount == expected { + return + } + time.Sleep(50 * time.Millisecond) + } + // Will fail + ts.Equal(expected, lastCount) + } + if ts.config.maxWorkflowCacheSize == 0 { + assertNumPollersEventually(2, "workflow_task") + assertNumPollersEventually(0, "workflow_sticky_task") + } else { + assertNumPollersEventually(1, "workflow_task") + assertNumPollersEventually(1, "workflow_sticky_task") + } + assertNumPollersEventually(2, "activity_task") +} + func (ts *IntegrationTestSuite) TestSlotsAvailableCounter() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()