Skip to content

Commit

Permalink
Add num_pollers metric (#965)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Nov 28, 2022
1 parent 0635622 commit 86b0ef6
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 13 deletions.
9 changes: 7 additions & 2 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
WorkerStartCounter = TemporalMetricsPrefix + "worker_start"
WorkerTaskSlotsAvailable = TemporalMetricsPrefix + "worker_task_slots_available"
PollerStartCounter = TemporalMetricsPrefix + "poller_start"
NumPoller = TemporalMetricsPrefix + "num_pollers"

TemporalRequest = TemporalMetricsPrefix + "request"
TemporalRequestFailure = TemporalRequest + "_failure"
Expand All @@ -84,6 +85,7 @@ const (
const (
NamespaceTagName = "namespace"
ClientTagName = "client_name"
PollerTypeTagName = "poller_type"
WorkerTypeTagName = "worker_type"
WorkflowTypeNameTagName = "workflow_type"
ActivityTypeNameTagName = "activity_type"
Expand All @@ -94,6 +96,9 @@ const (

// Metric tag values
const (
NoneTagValue = "none"
ClientTagValue = "temporal_go"
NoneTagValue = "none"
ClientTagValue = "temporal_go"
PollerTypeWorkflowTask = "workflow_task"
PollerTypeWorkflowStickyTask = "workflow_sticky_task"
PollerTypeActivityTask = "activity_task"
)
7 changes: 7 additions & 0 deletions internal/common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,10 @@ func WorkerTags(workerType string) map[string]string {
WorkerTypeTagName: workerType,
}
}

// PollerTags returns a set of tags for pollers.
func PollerTags(pollerType string) map[string]string {
return map[string]string{
PollerTypeTagName: pollerType,
}
}
84 changes: 73 additions & 11 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ type (
stopC <-chan struct{}
}

// numPollerMetric tracks the number of active pollers and publishes a metric on it.
numPollerMetric struct {
lock sync.Mutex
numPollers int32
gauge metrics.Gauge
}

// workflowTaskPoller implements polling/processing a workflow task
workflowTaskPoller struct {
basePoller
Expand All @@ -94,6 +101,9 @@ type (
requestLock sync.Mutex
stickyCacheSize int
eagerActivityExecutor *eagerActivityExecutor

numNormalPollerMetric *numPollerMetric
numStickyPollerMetric *numPollerMetric
}

// activityTaskPoller implements polling/processing a workflow task
Expand All @@ -106,6 +116,7 @@ type (
taskHandler ActivityTaskHandler
logger log.Logger
activitiesPerSecond float64
numPollerMetric *numPollerMetric
}

historyIteratorImpl struct {
Expand Down Expand Up @@ -149,6 +160,26 @@ type (
}
)

func newNumPollerMetric(metricsHandler metrics.Handler, pollerType string) *numPollerMetric {
return &numPollerMetric{
gauge: metricsHandler.WithTags(metrics.PollerTags(pollerType)).Gauge(metrics.NumPoller),
}
}

func (npm *numPollerMetric) increment() {
npm.lock.Lock()
defer npm.lock.Unlock()
npm.numPollers += 1
npm.gauge.Update(float64(npm.numPollers))
}

func (npm *numPollerMetric) decrement() {
npm.lock.Lock()
defer npm.lock.Unlock()
npm.numPollers -= 1
npm.gauge.Update(float64(npm.numPollers))
}

func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
return &localActivityTunnel{
taskCh: make(chan *localActivityTask, 100000),
Expand Down Expand Up @@ -238,6 +269,8 @@ func newWorkflowTaskPoller(
StickyScheduleToStartTimeout: params.StickyScheduleToStartTimeout,
stickyCacheSize: params.cache.MaxWorkflowCacheSize(),
eagerActivityExecutor: params.eagerActivityExecutor,
numNormalPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowTask),
numStickyPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeWorkflowStickyTask),
}
}

Expand Down Expand Up @@ -335,7 +368,6 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
task *workflowservice.PollWorkflowTaskQueueResponse,
startTime time.Time,
) (response *workflowservice.RespondWorkflowTaskCompletedResponse, err error) {

metricsHandler := wtp.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
if taskErr != nil {
metricsHandler.Counter(metrics.WorkflowTaskExecutionFailureCounter).Inc(1)
Expand Down Expand Up @@ -632,6 +664,19 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
}
}

// Poll the workflow task queue and update the num_poller metric
func (wtp *workflowTaskPoller) pollWorkflowTaskQueue(ctx context.Context, request *workflowservice.PollWorkflowTaskQueueRequest) (*workflowservice.PollWorkflowTaskQueueResponse, error) {
if request.TaskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_NORMAL {
wtp.numNormalPollerMetric.increment()
defer wtp.numNormalPollerMetric.decrement()
} else {
wtp.numStickyPollerMetric.increment()
defer wtp.numStickyPollerMetric.decrement()
}

return wtp.service.PollWorkflowTaskQueue(ctx, request)
}

// Poll for a single workflow task from the service
func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
traceLog(func() {
Expand All @@ -641,7 +686,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
request := wtp.getNextPollRequest()
defer wtp.release(request.TaskQueue.GetKind())

response, err := wtp.service.PollWorkflowTaskQueue(ctx, request)
response, err := wtp.pollWorkflowTaskQueue(ctx, request)
if err != nil {
wtp.updateBacklog(request.TaskQueue.GetKind(), 0)
return nil, err
Expand Down Expand Up @@ -784,9 +829,18 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv
identity: params.Identity,
logger: params.Logger,
activitiesPerSecond: params.TaskQueueActivitiesPerSecond,
numPollerMetric: newNumPollerMetric(params.MetricsHandler, metrics.PollerTypeActivityTask),
}
}

// Poll the activity task queue and update the num_poller metric
func (atp *activityTaskPoller) pollActivityTaskQueue(ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest) (*workflowservice.PollActivityTaskQueueResponse, error) {
atp.numPollerMetric.increment()
defer atp.numPollerMetric.decrement()

return atp.service.PollActivityTaskQueue(ctx, request)
}

// Poll for a single activity task from the service
func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
traceLog(func() {
Expand All @@ -799,7 +853,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}},
}

response, err := atp.service.PollActivityTaskQueue(ctx, request)
response, err := atp.pollActivityTaskQueue(ctx, request)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -971,7 +1025,8 @@ func convertActivityResultToRespondRequest(
TaskToken: taskToken,
Result: result,
Identity: identity,
Namespace: namespace}
Namespace: namespace,
}
}

// Only respond with canceled if allowed
Expand All @@ -982,13 +1037,15 @@ func convertActivityResultToRespondRequest(
TaskToken: taskToken,
Details: convertErrDetailsToPayloads(canceledErr.details, dataConverter),
Identity: identity,
Namespace: namespace}
Namespace: namespace,
}
}
if errors.Is(err, context.Canceled) {
return &workflowservice.RespondActivityTaskCanceledRequest{
TaskToken: taskToken,
Identity: identity,
Namespace: namespace}
Namespace: namespace,
}
}
}

Expand All @@ -1002,7 +1059,8 @@ func convertActivityResultToRespondRequest(
TaskToken: taskToken,
Failure: failureConverter.ErrorToFailure(err),
Identity: identity,
Namespace: namespace}
Namespace: namespace,
}
}

func convertActivityResultToRespondRequestByID(
Expand Down Expand Up @@ -1030,7 +1088,8 @@ func convertActivityResultToRespondRequestByID(
RunId: runID,
ActivityId: activityID,
Result: result,
Identity: identity}
Identity: identity,
}
}

// Only respond with canceled if allowed
Expand All @@ -1043,15 +1102,17 @@ func convertActivityResultToRespondRequestByID(
RunId: runID,
ActivityId: activityID,
Details: convertErrDetailsToPayloads(canceledErr.details, dataConverter),
Identity: identity}
Identity: identity,
}
}
if errors.Is(err, context.Canceled) {
return &workflowservice.RespondActivityTaskCanceledByIdRequest{
Namespace: namespace,
WorkflowId: workflowID,
RunId: runID,
ActivityId: activityID,
Identity: identity}
Identity: identity,
}
}
}

Expand All @@ -1067,5 +1128,6 @@ func convertActivityResultToRespondRequestByID(
RunId: runID,
ActivityId: activityID,
Failure: failureConverter.ErrorToFailure(err),
Identity: identity}
Identity: identity,
}
}
30 changes: 30 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,36 @@ func (ts *IntegrationTestSuite) waitForQueryTrue(run client.WorkflowRun, query s
ts.True(result, "query didn't return true in reasonable amount of time")
}

func (ts *IntegrationTestSuite) TestNumPollersCounter() {
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
assertNumPollersEventually := func(expected float64, pollerType string, tags ...string) {
// Try for two seconds
var lastCount float64
for start := time.Now(); time.Since(start) <= 10*time.Second; {
lastCount = ts.metricGauge(
metrics.NumPoller,
"poller_type", pollerType,
"task_queue", ts.taskQueueName,
)
if lastCount == expected {
return
}
time.Sleep(50 * time.Millisecond)
}
// Will fail
ts.Equal(expected, lastCount)
}
if ts.config.maxWorkflowCacheSize == 0 {
assertNumPollersEventually(2, "workflow_task")
assertNumPollersEventually(0, "workflow_sticky_task")
} else {
assertNumPollersEventually(1, "workflow_task")
assertNumPollersEventually(1, "workflow_sticky_task")
}
assertNumPollersEventually(2, "activity_task")
}

func (ts *IntegrationTestSuite) TestSlotsAvailableCounter() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down

0 comments on commit 86b0ef6

Please sign in to comment.