From d91dc326e6de68a4c76c9610f551ca58e8a3cc2c Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Mar 2023 15:43:57 -0500 Subject: [PATCH 01/19] added sqs worker utilization metric --- .../docs/inputs/input-aws-s3.asciidoc | 1 + x-pack/filebeat/input/awss3/input.go | 33 +++++++++++++++++-- .../input/awss3/input_integration_test.go | 2 ++ x-pack/filebeat/input/awss3/metrics.go | 18 +++++----- x-pack/filebeat/input/awss3/sqs.go | 7 ++-- 5 files changed, 49 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 40b412fab61..50bc1fe7f99 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -738,6 +738,7 @@ observe the activity of the input. | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. | `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. +| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 669cf7f53e6..cea22e3526b 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -129,10 +129,14 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } defer receiver.metrics.Close() - // Poll sqs waiting metric periodically in the background. + // Poll metrics periodically in the background go pollSqsWaitingMetric(ctx, receiver) - if err := receiver.Receive(ctx); err != nil { + metricReporterChan := make(chan int64) + defer close(metricReporterChan) + go pollSqsUtilizationMetric(ctx, receiver, metricReporterChan) + + if err := receiver.Receive(ctx, metricReporterChan); err != nil { return err } } @@ -410,5 +414,30 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { } } +func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricReporterChan chan int64) { + pollDur := 5 * time.Second + t := time.NewTicker(pollDur) + defer t.Stop() + + utilizedNanos := new(int64) + go func() { + for elem := range metricReporterChan { + *utilizedNanos += elem + } + }() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + denom := pollDur.Nanoseconds() * int64(receiver.maxMessagesInflight) + utilizedRate := float64(*utilizedNanos) / float64(denom) + receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) + *utilizedNanos = 0 + } + } +} + // boolPtr returns a pointer to b. func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 29d43aca8e7..ad4fd7dc5ea 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -192,6 +192,7 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) + assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } func TestInputRunS3(t *testing.T) { @@ -430,4 +431,5 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) + assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 3dcce8eee14..25c6cddc44b 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -18,14 +18,15 @@ type inputMetrics struct { registry *monitoring.Registry unregister func() - sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). - sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. - sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). - sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). - sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. - sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. - sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). - sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. + sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). + sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. + sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). + sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). + sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. + sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. + sqsWorkerUtilization *monitoring.Float // Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. + sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). + sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded. s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed. @@ -65,6 +66,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 1a62d0a4976..9230d91d6d7 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -47,7 +47,7 @@ func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessag } } -func (r *sqsReader) Receive(ctx context.Context) error { +func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) error { // This loop tries to keep the workers busy as much as possible while // honoring the max message cap as opposed to a simpler loop that receives // N messages, waits for them all to finish, then requests N more messages. @@ -61,6 +61,7 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Receive (at most) as many SQS messages as there are workers. msgs, err := r.sqs.ReceiveMessage(ctx, workers) + r.log.Warnw("Messages received", "msg count", len(msgs)) if err != nil { r.workerSem.Release(workers) @@ -85,7 +86,9 @@ func (r *sqsReader) Receive(ctx context.Context) error { go func(msg types.Message, start time.Time) { defer func() { r.metrics.sqsMessagesInflight.Dec() - r.metrics.sqsMessageProcessingTime.Update(time.Since(start).Nanoseconds()) + processingTimeNanos := time.Since(start).Nanoseconds() + r.metrics.sqsMessageProcessingTime.Update(processingTimeNanos) + metricReporterChan <- processingTimeNanos workerWg.Done() r.workerSem.Release(1) }() From 8ac5ec20611d673416fdd4b93f683791aa4072a4 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Mar 2023 15:50:01 -0500 Subject: [PATCH 02/19] little tweaks --- x-pack/filebeat/input/awss3/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index cea22e3526b..f7a90526e46 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -131,7 +131,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Poll metrics periodically in the background go pollSqsWaitingMetric(ctx, receiver) - metricReporterChan := make(chan int64) defer close(metricReporterChan) go pollSqsUtilizationMetric(ctx, receiver, metricReporterChan) @@ -434,6 +433,7 @@ func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricRe denom := pollDur.Nanoseconds() * int64(receiver.maxMessagesInflight) utilizedRate := float64(*utilizedNanos) / float64(denom) receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) + // reset for the next polling duration *utilizedNanos = 0 } } From 998f69485176504671d7cc22e7495f47a510b79e Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Mar 2023 15:51:28 -0500 Subject: [PATCH 03/19] changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b4ed9e64ae6..719a3849f3c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -222,6 +222,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Fill okta.request.ip_chain.* as a flattened object in Okta module. {pull}34621[34621] - Fixed GCS log format issues. {pull}34659[34659] - Add nginx.ingress_controller.upstream.ip to related.ip {issue}34645[34645] {pull}34672[34672] +- Added metric `sqs_worker_utilization` for aws-s3 input. {pull}34793[34793] *Auditbeat* From d3658732b78923900ae9bf5cb1372d30c26e2cf6 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Mar 2023 16:07:28 -0500 Subject: [PATCH 04/19] fixing args for test --- x-pack/filebeat/input/awss3/input_benchmark_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 2199d2b4980..bb0f63d53cd 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -233,7 +233,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR b.ResetTimer() start := time.Now() - if err := sqsReader.Receive(ctx); err != nil { + if err := sqsReader.Receive(ctx, nil); err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatal(err) } From 6a9f132b83902b7dea33bb648af6cf60dc4c12d1 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Mar 2023 16:15:51 -0500 Subject: [PATCH 05/19] more test param fixes --- x-pack/filebeat/input/awss3/sqs_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index fe0c731da06..b7d429aadaf 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -74,7 +74,7 @@ func TestSQSReceiver(t *testing.T) { // Execute sqsReader and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) - require.NoError(t, receiver.Receive(ctx)) + require.NoError(t, receiver.Receive(ctx, nil)) assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) @@ -107,7 +107,7 @@ func TestSQSReceiver(t *testing.T) { // Execute SQSReceiver and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) - require.NoError(t, receiver.Receive(ctx)) + require.NoError(t, receiver.Receive(ctx, nil)) assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) } From a5d12aaaccd9986e7d179e3c8eabf86124284e09 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Fri, 10 Mar 2023 11:30:08 -0500 Subject: [PATCH 06/19] review feedback updates --- x-pack/filebeat/input/awss3/input.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index f7a90526e46..08d5792ed22 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -10,6 +10,7 @@ import ( "fmt" "net/url" "strings" + "sync/atomic" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -132,7 +133,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Poll metrics periodically in the background go pollSqsWaitingMetric(ctx, receiver) metricReporterChan := make(chan int64) - defer close(metricReporterChan) go pollSqsUtilizationMetric(ctx, receiver, metricReporterChan) if err := receiver.Receive(ctx, metricReporterChan); err != nil { @@ -414,27 +414,33 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { } func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricReporterChan chan int64) { - pollDur := 5 * time.Second - t := time.NewTicker(pollDur) + defer close(metricReporterChan) + + t := time.NewTicker(5 * time.Second) defer t.Stop() - utilizedNanos := new(int64) + var utilizedNanos int64 go func() { for elem := range metricReporterChan { - *utilizedNanos += elem + atomic.AddInt64(&utilizedNanos, elem) } }() + lastTick := time.Now() for { select { case <-ctx.Done(): return - case <-t.C: - denom := pollDur.Nanoseconds() * int64(receiver.maxMessagesInflight) - utilizedRate := float64(*utilizedNanos) / float64(denom) + case tick := <-t.C: + denom := float64(tick.Sub(lastTick)) * float64(receiver.maxMessagesInflight) + + utilizedRate := float64(utilizedNanos) / denom receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) + receiver.log.Warnw("util rate", "util", utilizedRate, "top", utilizedNanos, "bottom", denom, "time", tick.Sub(lastTick)) + // reset for the next polling duration - *utilizedNanos = 0 + utilizedNanos = 0 + lastTick = tick } } } From a3d06692fdcec8f3fdc09b357cdec57fd0af2552 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Fri, 10 Mar 2023 13:14:57 -0500 Subject: [PATCH 07/19] cleanup and fix --- x-pack/filebeat/input/awss3/input.go | 1 - x-pack/filebeat/input/awss3/input_benchmark_test.go | 3 ++- x-pack/filebeat/input/awss3/sqs.go | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 08d5792ed22..d9714e0e9dd 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -436,7 +436,6 @@ func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricRe utilizedRate := float64(utilizedNanos) / denom receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) - receiver.log.Warnw("util rate", "util", utilizedRate, "top", utilizedNanos, "bottom", denom, "time", tick.Sub(lastTick)) // reset for the next polling duration utilizedNanos = 0 diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index bb0f63d53cd..9e1922b2a3b 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -216,6 +216,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR s3API := newConstantS3(t) pipeline := &fakePipeline{} conf := makeBenchmarkConfig(t) + metricChan := make(chan int64) s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}) sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory) @@ -233,7 +234,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR b.ResetTimer() start := time.Now() - if err := sqsReader.Receive(ctx, nil); err != nil { + if err := sqsReader.Receive(ctx, metricChan); err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatal(err) } diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 9230d91d6d7..261e94ea7be 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -61,7 +61,6 @@ func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) // Receive (at most) as many SQS messages as there are workers. msgs, err := r.sqs.ReceiveMessage(ctx, workers) - r.log.Warnw("Messages received", "msg count", len(msgs)) if err != nil { r.workerSem.Release(workers) From 092065cd5dc6c8913c774885351c1ba496130061 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Fri, 10 Mar 2023 14:19:52 -0500 Subject: [PATCH 08/19] null check --- x-pack/filebeat/input/awss3/input_benchmark_test.go | 3 +-- x-pack/filebeat/input/awss3/sqs.go | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 9e1922b2a3b..bb0f63d53cd 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -216,7 +216,6 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR s3API := newConstantS3(t) pipeline := &fakePipeline{} conf := makeBenchmarkConfig(t) - metricChan := make(chan int64) s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}) sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory) @@ -234,7 +233,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR b.ResetTimer() start := time.Now() - if err := sqsReader.Receive(ctx, metricChan); err != nil { + if err := sqsReader.Receive(ctx, nil); err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatal(err) } diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 261e94ea7be..022f578ffb4 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -87,7 +87,9 @@ func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) r.metrics.sqsMessagesInflight.Dec() processingTimeNanos := time.Since(start).Nanoseconds() r.metrics.sqsMessageProcessingTime.Update(processingTimeNanos) - metricReporterChan <- processingTimeNanos + if metricReporterChan != nil { + metricReporterChan <- processingTimeNanos + } workerWg.Done() r.workerSem.Release(1) }() From 223434b4b745a3bfced77ec3e825cf76ac7aa2f0 Mon Sep 17 00:00:00 2001 From: Kylie Meli Date: Mon, 20 Mar 2023 09:23:33 -0400 Subject: [PATCH 09/19] Update x-pack/filebeat/input/awss3/input.go Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com> --- x-pack/filebeat/input/awss3/input.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index d9714e0e9dd..368e7908a61 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -434,11 +434,10 @@ func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricRe case tick := <-t.C: denom := float64(tick.Sub(lastTick)) * float64(receiver.maxMessagesInflight) - utilizedRate := float64(utilizedNanos) / denom + utilizedRate := float64(atomic.SwapInt64(&utilizedNanos, 0)) / denom receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) // reset for the next polling duration - utilizedNanos = 0 lastTick = tick } } From eba0271b3057039fed8865b9e55043a86dbe157c Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 20 Mar 2023 13:26:14 -0400 Subject: [PATCH 10/19] review cleanup: renaming and organizing better --- x-pack/filebeat/input/awss3/input.go | 15 +++++++-------- .../filebeat/input/awss3/input_benchmark_test.go | 2 +- x-pack/filebeat/input/awss3/metrics.go | 6 ++++-- x-pack/filebeat/input/awss3/sqs.go | 8 +++++--- x-pack/filebeat/input/awss3/sqs_test.go | 4 ++-- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 368e7908a61..da9607fc77c 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -132,10 +132,9 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Poll metrics periodically in the background go pollSqsWaitingMetric(ctx, receiver) - metricReporterChan := make(chan int64) - go pollSqsUtilizationMetric(ctx, receiver, metricReporterChan) + go pollSqsUtilizationMetric(ctx, receiver) - if err := receiver.Receive(ctx, metricReporterChan); err != nil { + if err := receiver.Receive(ctx); err != nil { return err } } @@ -413,15 +412,15 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { } } -func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricReporterChan chan int64) { - defer close(metricReporterChan) +func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader) { + // defer close(metricReporterChan) t := time.NewTicker(5 * time.Second) defer t.Stop() var utilizedNanos int64 go func() { - for elem := range metricReporterChan { + for elem := range receiver.metrics.metricReporterChan { atomic.AddInt64(&utilizedNanos, elem) } }() @@ -432,9 +431,9 @@ func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricRe case <-ctx.Done(): return case tick := <-t.C: - denom := float64(tick.Sub(lastTick)) * float64(receiver.maxMessagesInflight) + maxUtilization := float64(tick.Sub(lastTick)) * float64(receiver.maxMessagesInflight) - utilizedRate := float64(atomic.SwapInt64(&utilizedNanos, 0)) / denom + utilizedRate := float64(atomic.SwapInt64(&utilizedNanos, 0)) / maxUtilization receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) // reset for the next polling duration diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index bb0f63d53cd..2199d2b4980 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -233,7 +233,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR b.ResetTimer() start := time.Now() - if err := sqsReader.Receive(ctx, nil); err != nil { + if err := sqsReader.Receive(ctx); err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatal(err) } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 25c6cddc44b..7232839a536 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -15,8 +15,9 @@ import ( ) type inputMetrics struct { - registry *monitoring.Registry - unregister func() + registry *monitoring.Registry + unregister func() + metricReporterChan chan int64 sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. @@ -61,6 +62,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri out := &inputMetrics{ registry: reg, unregister: unreg, + metricReporterChan: make(chan int64), sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 022f578ffb4..b0b8a348f92 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -47,7 +47,7 @@ func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessag } } -func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) error { +func (r *sqsReader) Receive(ctx context.Context) error { // This loop tries to keep the workers busy as much as possible while // honoring the max message cap as opposed to a simpler loop that receives // N messages, waits for them all to finish, then requests N more messages. @@ -87,8 +87,8 @@ func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) r.metrics.sqsMessagesInflight.Dec() processingTimeNanos := time.Since(start).Nanoseconds() r.metrics.sqsMessageProcessingTime.Update(processingTimeNanos) - if metricReporterChan != nil { - metricReporterChan <- processingTimeNanos + if r.metrics.metricReporterChan != nil { + r.metrics.metricReporterChan <- processingTimeNanos } workerWg.Done() r.workerSem.Release(1) @@ -107,6 +107,8 @@ func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) // Wait for all workers to finish. workerWg.Wait() + close(r.metrics.metricReporterChan) + if errors.Is(ctx.Err(), context.Canceled) { // A canceled context is a normal shutdown. return nil diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index b7d429aadaf..fe0c731da06 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -74,7 +74,7 @@ func TestSQSReceiver(t *testing.T) { // Execute sqsReader and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) - require.NoError(t, receiver.Receive(ctx, nil)) + require.NoError(t, receiver.Receive(ctx)) assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) @@ -107,7 +107,7 @@ func TestSQSReceiver(t *testing.T) { // Execute SQSReceiver and verify calls/state. receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) - require.NoError(t, receiver.Receive(ctx, nil)) + require.NoError(t, receiver.Receive(ctx)) assert.Equal(t, maxMessages, receiver.workerSem.Available()) }) } From 1bd00efd0a2182404824d2dafd465da019e5e21f Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 23 Mar 2023 14:32:38 -0400 Subject: [PATCH 11/19] adapting chan logic --- x-pack/filebeat/input/awss3/input.go | 2 -- x-pack/filebeat/input/awss3/metrics.go | 5 ++++- x-pack/filebeat/input/awss3/sqs.go | 6 +++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index da9607fc77c..fb96659d19b 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -413,8 +413,6 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { } func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader) { - // defer close(metricReporterChan) - t := time.NewTicker(5 * time.Second) defer t.Stop() diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 7232839a536..53283472c6f 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -44,6 +44,10 @@ func (m *inputMetrics) Close() { m.unregister() } +func (m *inputMetrics) initializeReporterChan() { + m.metricReporterChan = make(chan int64) +} + func (m *inputMetrics) setSQSMessagesWaiting(count int64) { if m.sqsMessagesWaiting == nil { // if metric not initialized, and count is -1, do nothing @@ -62,7 +66,6 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri out := &inputMetrics{ registry: reg, unregister: unreg, - metricReporterChan: make(chan int64), sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index b0b8a348f92..e20725d03cd 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -36,6 +36,8 @@ type sqsReader struct { func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessagesInflight int, msgHandler sqsProcessor) *sqsReader { if metrics == nil { metrics = newInputMetrics("", monitoring.NewRegistry()) + } else { + metrics.initializeReporterChan() } return &sqsReader{ maxMessagesInflight: maxMessagesInflight, @@ -107,7 +109,9 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Wait for all workers to finish. workerWg.Wait() - close(r.metrics.metricReporterChan) + if r.metrics.metricReporterChan != nil { + close(r.metrics.metricReporterChan) + } if errors.Is(ctx.Err(), context.Canceled) { // A canceled context is a normal shutdown. From 48a5edc84a2b321b8692c32cbe5f0ff0f8fe9840 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 27 Mar 2023 12:04:06 -0400 Subject: [PATCH 12/19] refactored to have polling encapsulated by metrics, and new test verifying calculation --- x-pack/filebeat/input/awss3/input.go | 40 ++------------ .../input/awss3/input_benchmark_test.go | 10 ++-- x-pack/filebeat/input/awss3/metrics.go | 53 ++++++++++++++++--- x-pack/filebeat/input/awss3/metrics_test.go | 15 +++++- x-pack/filebeat/input/awss3/s3.go | 2 +- x-pack/filebeat/input/awss3/s3_objects.go | 4 +- .../filebeat/input/awss3/s3_objects_test.go | 14 ++--- x-pack/filebeat/input/awss3/s3_test.go | 4 +- x-pack/filebeat/input/awss3/sqs.go | 14 +---- x-pack/filebeat/input/awss3/sqs_s3_event.go | 3 +- .../filebeat/input/awss3/sqs_s3_event_test.go | 16 +++--- 11 files changed, 94 insertions(+), 81 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index fb96659d19b..bc14a594a44 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -10,7 +10,6 @@ import ( "fmt" "net/url" "strings" - "sync/atomic" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -132,7 +131,6 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Poll metrics periodically in the background go pollSqsWaitingMetric(ctx, receiver) - go pollSqsUtilizationMetric(ctx, receiver) if err := receiver.Receive(ctx); err != nil { return err @@ -210,9 +208,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s if err != nil { return nil, err } - in.metrics = newInputMetrics(ctx.ID, nil) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig) - sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory) + in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages) sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) return sqsReader, nil @@ -283,8 +281,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli if len(in.config.FileSelectors) == 0 { fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} } - in.metrics = newInputMetrics(ctx.ID, nil) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig) + in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages) s3Poller := newS3Poller(log.Named("s3_poller"), in.metrics, s3API, @@ -412,33 +410,5 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { } } -func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader) { - t := time.NewTicker(5 * time.Second) - defer t.Stop() - - var utilizedNanos int64 - go func() { - for elem := range receiver.metrics.metricReporterChan { - atomic.AddInt64(&utilizedNanos, elem) - } - }() - - lastTick := time.Now() - for { - select { - case <-ctx.Done(): - return - case tick := <-t.C: - maxUtilization := float64(tick.Sub(lastTick)) * float64(receiver.maxMessagesInflight) - - utilizedRate := float64(atomic.SwapInt64(&utilizedNanos, 0)) / maxUtilization - receiver.metrics.sqsWorkerUtilization.Set(utilizedRate) - - // reset for the next polling duration - lastTick = tick - } - } -} - // boolPtr returns a pointer to b. func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 2199d2b4980..d0a25410638 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -211,14 +211,14 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR return testing.Benchmark(func(b *testing.B) { log := logp.NewLogger(inputName) metricRegistry := monitoring.NewRegistry() - metrics := newInputMetrics("test_id", metricRegistry) + metrics := newInputMetrics("test_id", metricRegistry, maxMessagesInflight) sqsAPI := newConstantSQS() s3API := newConstantS3(t) pipeline := &fakePipeline{} conf := makeBenchmarkConfig(t) - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}) - sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}, maxMessagesInflight) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory, maxMessagesInflight) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler) ctx, cancel := context.WithCancel(context.Background()) @@ -302,7 +302,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult log.Infof("benchmark with %d number of workers", numberOfWorkers) metricRegistry := monitoring.NewRegistry() - metrics := newInputMetrics("test_id", metricRegistry) + metrics := newInputMetrics("test_id", metricRegistry, numberOfWorkers) client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { event.Private.(*awscommon.EventACKTracker).ACK() @@ -348,7 +348,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult return } - s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}) + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}, numberOfWorkers) s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second) if err := s3Poller.Poll(ctx); err != nil { diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 53283472c6f..7b4568c877a 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -5,7 +5,10 @@ package awss3 import ( + "context" "io" + "sync/atomic" + "time" "github.com/rcrowley/go-metrics" @@ -15,9 +18,12 @@ import ( ) type inputMetrics struct { - registry *monitoring.Registry - unregister func() - metricReporterChan chan int64 + registry *monitoring.Registry + unregister func() + ctx context.Context + cancel context.CancelFunc + + utilizationNanos int64 sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. @@ -39,13 +45,15 @@ type inputMetrics struct { s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing). } -// Close removes the metrics from the registry. +// Close cancels the context and removes the metrics from the registry. func (m *inputMetrics) Close() { + m.cancel() m.unregister() } -func (m *inputMetrics) initializeReporterChan() { - m.metricReporterChan = make(chan int64) +func (m *inputMetrics) updateSQSProcessingTime(d time.Duration) { + m.sqsMessageProcessingTime.Update(d.Nanoseconds()) + atomic.AddInt64(&m.utilizationNanos, d.Nanoseconds()) } func (m *inputMetrics) setSQSMessagesWaiting(count int64) { @@ -60,12 +68,40 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) { m.sqsMessagesWaiting.Set(count) } -func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { +func calculateUtilization(d time.Duration, maxMessagesInflight int, m *inputMetrics) float64 { + maxUtilization := float64(d) * float64(maxMessagesInflight) + utilizedRate := float64(atomic.SwapInt64(&m.utilizationNanos, 0)) / maxUtilization + return utilizedRate +} + +func (m *inputMetrics) pollSQSUtilizationMetric(ctx context.Context, maxMessagesInflight int) { + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + lastTick := time.Now() + for { + select { + case <-ctx.Done(): + return + case tick := <-t.C: + duration := tick.Sub(lastTick) + utilizedRate := calculateUtilization(duration, maxMessagesInflight, m) + m.sqsWorkerUtilization.Set(utilizedRate) + // reset for the next polling duration + lastTick = tick + } + } +} + +func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers int) *inputMetrics { reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) + ctx, cancelInputCtx := context.WithCancel(context.Background()) out := &inputMetrics{ registry: reg, unregister: unreg, + ctx: ctx, + cancel: cancelInputCtx, sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), @@ -89,6 +125,9 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + + go out.pollSQSUtilizationMetric(ctx, maxWorkers) + return out } diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index c40a6dd4a9e..75c740ce41c 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -6,8 +6,10 @@ package awss3 import ( "testing" + "time" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/stretchr/testify/assert" ) // TestInputMetricsClose asserts that metrics registered by this input are @@ -17,10 +19,21 @@ import ( func TestInputMetricsClose(t *testing.T) { reg := monitoring.NewRegistry() - metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg) + metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg, defaultConfig().MaxNumberOfMessages) metrics.Close() reg.Do(monitoring.Full, func(s string, _ interface{}) { t.Errorf("registry should be empty, but found %v", s) }) } + +func TestInputMetricsSQSWorkerUtilization(t *testing.T) { + reg := monitoring.NewRegistry() + maxWorkers := 1 + metrics := newInputMetrics("test", reg, maxWorkers) + + metrics.utilizationNanos = time.Second.Nanoseconds() + + utilization := calculateUtilization(time.Second, maxWorkers, metrics) + assert.Greater(t, utilization, 0.9) +} diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index d07af133f0a..817255f3795 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -77,7 +77,7 @@ func newS3Poller(log *logp.Logger, bucketPollInterval time.Duration, ) *s3Poller { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), numberOfWorkers) } return &s3Poller{ numberOfWorkers: numberOfWorkers, diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index a8dd796c68b..ef66181e9a3 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -45,9 +45,9 @@ type s3ObjectProcessorFactory struct { backupConfig backupConfig } -func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory { +func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig, maxWorkers int) *s3ObjectProcessorFactory { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), maxWorkers) } if len(sel) == 0 { sel = []fileSelectorConfig{ diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index b85fe43eff0..61b6124cd9a 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -144,7 +144,7 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.Error(t, err) @@ -166,7 +166,7 @@ func TestS3ObjectProcessor(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.Error(t, err) @@ -193,7 +193,7 @@ func TestS3ObjectProcessor(t *testing.T) { Times(2), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.NoError(t, err) @@ -219,7 +219,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) @@ -249,7 +249,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) @@ -276,7 +276,7 @@ func TestS3ObjectProcessor(t *testing.T) { Return(nil, nil), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object() require.NoError(t, err) @@ -322,7 +322,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, Times(numEvents), ) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{}, 1) ack := awscommon.NewEventACKTracker(ctx) err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 99c88a6f9c6..1872388c8ce 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -125,7 +125,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) @@ -248,7 +248,7 @@ func TestS3Poller(t *testing.T) { GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) - s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}) + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}, numberOfWorkers) receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", "provider", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.Available()) diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index e20725d03cd..e39f700b131 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -35,9 +35,7 @@ type sqsReader struct { func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessagesInflight int, msgHandler sqsProcessor) *sqsReader { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) - } else { - metrics.initializeReporterChan() + metrics = newInputMetrics("", monitoring.NewRegistry(), maxMessagesInflight) } return &sqsReader{ maxMessagesInflight: maxMessagesInflight, @@ -87,11 +85,7 @@ func (r *sqsReader) Receive(ctx context.Context) error { go func(msg types.Message, start time.Time) { defer func() { r.metrics.sqsMessagesInflight.Dec() - processingTimeNanos := time.Since(start).Nanoseconds() - r.metrics.sqsMessageProcessingTime.Update(processingTimeNanos) - if r.metrics.metricReporterChan != nil { - r.metrics.metricReporterChan <- processingTimeNanos - } + r.metrics.updateSQSProcessingTime(time.Since(start)) workerWg.Done() r.workerSem.Release(1) }() @@ -109,10 +103,6 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Wait for all workers to finish. workerWg.Wait() - if r.metrics.metricReporterChan != nil { - close(r.metrics.metricReporterChan) - } - if errors.Is(ctx.Err(), context.Canceled) { // A canceled context is a normal shutdown. return nil diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index 0ab0e3376e5..08700351e99 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -105,9 +105,10 @@ func newSQSS3EventProcessor( maxReceiveCount int, pipeline beat.Pipeline, s3 s3ObjectHandlerFactory, + maxWorkers int, ) *sqsS3EventProcessor { if metrics == nil { - metrics = newInputMetrics("", monitoring.NewRegistry()) + metrics = newInputMetrics("", monitoring.NewRegistry(), maxWorkers) } return &sqsS3EventProcessor{ s3ObjectHandler: s3, diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index b1020cd4943..8209d298031 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -50,7 +50,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) require.NoError(t, p.ProcessSQS(ctx, &msg)) }) @@ -73,7 +73,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) err := p.ProcessSQS(ctx, &invalidBodyMsg) require.Error(t, err) t.Log(err) @@ -95,7 +95,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) require.NoError(t, p.ProcessSQS(ctx, &emptyRecordsMsg)) }) @@ -127,7 +127,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3Handler.EXPECT().FinalizeS3Object().Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory, 5) require.NoError(t, p.ProcessSQS(ctx, &msg)) }) @@ -150,7 +150,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockClient.EXPECT().Close(), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) err := p.ProcessSQS(ctx, &msg) t.Log(err) require.Error(t, err) @@ -181,7 +181,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory, 5) err := p.ProcessSQS(ctx, &msg) t.Log(err) require.Error(t, err) @@ -227,7 +227,7 @@ func TestSqsProcessor_keepalive(t *testing.T) { mockAPI.EXPECT().ChangeMessageVisibility(gomock.Any(), gomock.Eq(&msg), gomock.Eq(visibilityTimeout)). Times(1).Return(tc.Err) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockBeatPipeline, mockS3HandlerFactory, 5) var wg sync.WaitGroup wg.Add(1) p.keepalive(ctx, p.log, &wg, &msg) @@ -239,7 +239,7 @@ func TestSqsProcessor_keepalive(t *testing.T) { func TestSqsProcessor_getS3Notifications(t *testing.T) { require.NoError(t, logp.TestingSetup()) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil, 5) t.Run("s3 key is url unescaped", func(t *testing.T) { msg := newSQSMessage(newS3Event("Happy+Face.jpg")) From 4657e0a543be0b6099e4af6321de5efff3fd1199 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 27 Mar 2023 12:26:30 -0400 Subject: [PATCH 13/19] poke --- x-pack/filebeat/input/awss3/metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 75c740ce41c..479bacd3a5b 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -35,5 +35,5 @@ func TestInputMetricsSQSWorkerUtilization(t *testing.T) { metrics.utilizationNanos = time.Second.Nanoseconds() utilization := calculateUtilization(time.Second, maxWorkers, metrics) - assert.Greater(t, utilization, 0.9) + assert.Greater(t, utilization, 0.95) } From 84127c8df040a453b6e19d17032034fee310cf02 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 27 Mar 2023 12:28:10 -0400 Subject: [PATCH 14/19] formatting --- x-pack/filebeat/input/awss3/metrics_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 479bacd3a5b..35db41e2abc 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -8,8 +8,9 @@ import ( "testing" "time" - "github.com/elastic/elastic-agent-libs/monitoring" "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/monitoring" ) // TestInputMetricsClose asserts that metrics registered by this input are From a26f2750bb4984e637e356ba8081fad09d49f666 Mon Sep 17 00:00:00 2001 From: Kylie Meli Date: Tue, 28 Mar 2023 10:04:30 -0400 Subject: [PATCH 15/19] Update x-pack/filebeat/input/awss3/metrics_test.go Co-authored-by: Andrew Kroh --- x-pack/filebeat/input/awss3/metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 35db41e2abc..7329558de4d 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -20,7 +20,7 @@ import ( func TestInputMetricsClose(t *testing.T) { reg := monitoring.NewRegistry() - metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg, defaultConfig().MaxNumberOfMessages) + metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg, 1) metrics.Close() reg.Do(monitoring.Full, func(s string, _ interface{}) { From 045277dc432a1f09ab80103872f5509c8e30b6f8 Mon Sep 17 00:00:00 2001 From: Kylie Meli Date: Tue, 28 Mar 2023 10:04:44 -0400 Subject: [PATCH 16/19] Update x-pack/filebeat/input/awss3/metrics_test.go Co-authored-by: Andrew Kroh --- x-pack/filebeat/input/awss3/metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 7329558de4d..46244a589c7 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -30,7 +30,7 @@ func TestInputMetricsClose(t *testing.T) { func TestInputMetricsSQSWorkerUtilization(t *testing.T) { reg := monitoring.NewRegistry() - maxWorkers := 1 + const maxWorkers = 1 metrics := newInputMetrics("test", reg, maxWorkers) metrics.utilizationNanos = time.Second.Nanoseconds() From 23368f3faf5231008258ab4712c13022e533ea9e Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 28 Mar 2023 10:07:46 -0400 Subject: [PATCH 17/19] tweak naming --- x-pack/filebeat/input/awss3/metrics.go | 4 ++-- x-pack/filebeat/input/awss3/metrics_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 7b4568c877a..d9bff4cab5a 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -68,7 +68,7 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) { m.sqsMessagesWaiting.Set(count) } -func calculateUtilization(d time.Duration, maxMessagesInflight int, m *inputMetrics) float64 { +func calculateUtilizationAndReset(d time.Duration, maxMessagesInflight int, m *inputMetrics) float64 { maxUtilization := float64(d) * float64(maxMessagesInflight) utilizedRate := float64(atomic.SwapInt64(&m.utilizationNanos, 0)) / maxUtilization return utilizedRate @@ -85,7 +85,7 @@ func (m *inputMetrics) pollSQSUtilizationMetric(ctx context.Context, maxMessages return case tick := <-t.C: duration := tick.Sub(lastTick) - utilizedRate := calculateUtilization(duration, maxMessagesInflight, m) + utilizedRate := calculateUtilizationAndReset(duration, maxMessagesInflight, m) m.sqsWorkerUtilization.Set(utilizedRate) // reset for the next polling duration lastTick = tick diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 35db41e2abc..110d7863e7d 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -35,6 +35,6 @@ func TestInputMetricsSQSWorkerUtilization(t *testing.T) { metrics.utilizationNanos = time.Second.Nanoseconds() - utilization := calculateUtilization(time.Second, maxWorkers, metrics) + utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) assert.Greater(t, utilization, 0.95) } From fb6a17d3912f1f152f9fd61726f4c090204f9242 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 28 Mar 2023 10:41:53 -0400 Subject: [PATCH 18/19] handling of long running workers --- x-pack/filebeat/input/awss3/metrics.go | 12 +++++++++++ x-pack/filebeat/input/awss3/metrics_test.go | 23 +++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index d9bff4cab5a..8bd8e0e29eb 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -71,6 +71,18 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) { func calculateUtilizationAndReset(d time.Duration, maxMessagesInflight int, m *inputMetrics) float64 { maxUtilization := float64(d) * float64(maxMessagesInflight) utilizedRate := float64(atomic.SwapInt64(&m.utilizationNanos, 0)) / maxUtilization + + if utilizedRate == 0 { + // Falling back to inflight stats when the workers are long running + inflight := m.sqsMessagesInflight.Get() + if inflight > 0 { + return float64(inflight) / float64(maxMessagesInflight) + } + } else if utilizedRate > 1 { + // Normalizing the utilization after long running workers + return 1 + } + return utilizedRate } diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 45524184a21..0ba168dccc6 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -38,3 +38,26 @@ func TestInputMetricsSQSWorkerUtilization(t *testing.T) { utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) assert.Greater(t, utilization, 0.95) } + +func TestInputMetricsSQSWorkerUtilization_LongRunningWorkers(t *testing.T) { + reg := monitoring.NewRegistry() + const maxWorkers = 1 + metrics := newInputMetrics("test", reg, maxWorkers) + + metrics.utilizationNanos = time.Minute.Nanoseconds() + + utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) + assert.Equal(t, utilization, 1.0) +} + +func TestInputMetricsSQSWorkerUtilization_InFlightWorkers(t *testing.T) { + reg := monitoring.NewRegistry() + const maxWorkers = 1 + metrics := newInputMetrics("test", reg, maxWorkers) + + metrics.utilizationNanos = 0 + metrics.sqsMessagesInflight.Set(maxWorkers) + + utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) + assert.Equal(t, utilization, 1.0) +} From 32034e17ed82dddbcd202f249dfe7bbe9ca4330d Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 30 Mar 2023 16:25:40 -0400 Subject: [PATCH 19/19] Track running time of SQS worker routines Track the running time of each SQS worker in order to accurately compute the utilization of the workers after each 5 second period. --- x-pack/filebeat/input/awss3/metrics.go | 125 +++++++++++++------- x-pack/filebeat/input/awss3/metrics_test.go | 86 ++++++++++---- x-pack/filebeat/input/awss3/sqs.go | 5 +- 3 files changed, 149 insertions(+), 67 deletions(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 8bd8e0e29eb..28819054d53 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -7,7 +7,8 @@ package awss3 import ( "context" "io" - "sync/atomic" + "math" + "sync" "time" "github.com/rcrowley/go-metrics" @@ -15,15 +16,25 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" + "github.com/elastic/go-concert/timed" ) +// currentTime returns the current time. This exists to allow unit tests +// simulate the passage of time. +var currentTime = time.Now + type inputMetrics struct { registry *monitoring.Registry unregister func() - ctx context.Context - cancel context.CancelFunc + ctx context.Context // ctx signals when to stop the sqs worker utilization goroutine. + cancel context.CancelFunc // cancel cancels the ctx context. - utilizationNanos int64 + sqsMaxMessagesInflight int // Maximum number of SQS workers allowed. + sqsWorkerUtilizationMutex sync.Mutex // Guards the sqs worker utilization fields. + sqsWorkerUtilizationLastUpdate time.Time // Time of the last SQS worker utilization calculation. + sqsWorkerUtilizationCurrentPeriod time.Duration // Elapsed execution duration of any SQS workers that completed during the current period. + sqsWorkerIDCounter uint64 // Counter used to assigned unique IDs to SQS workers. + sqsWorkerStartTimes map[uint64]time.Time // Map of SQS worker ID to the time at which the worker started. sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. @@ -51,11 +62,6 @@ func (m *inputMetrics) Close() { m.unregister() } -func (m *inputMetrics) updateSQSProcessingTime(d time.Duration) { - m.sqsMessageProcessingTime.Update(d.Nanoseconds()) - atomic.AddInt64(&m.utilizationNanos, d.Nanoseconds()) -} - func (m *inputMetrics) setSQSMessagesWaiting(count int64) { if m.sqsMessagesWaiting == nil { // if metric not initialized, and count is -1, do nothing @@ -68,52 +74,84 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) { m.sqsMessagesWaiting.Set(count) } -func calculateUtilizationAndReset(d time.Duration, maxMessagesInflight int, m *inputMetrics) float64 { - maxUtilization := float64(d) * float64(maxMessagesInflight) - utilizedRate := float64(atomic.SwapInt64(&m.utilizationNanos, 0)) / maxUtilization +// beginSQSWorker tracks the start of a new SQS worker. The returned ID +// must be used to call endSQSWorker when the worker finishes. It also +// increments the sqsMessagesInflight counter. +func (m *inputMetrics) beginSQSWorker() (id uint64) { + m.sqsMessagesInflight.Inc() + + m.sqsWorkerUtilizationMutex.Lock() + defer m.sqsWorkerUtilizationMutex.Unlock() + m.sqsWorkerIDCounter++ + m.sqsWorkerStartTimes[m.sqsWorkerIDCounter] = currentTime() + return m.sqsWorkerIDCounter +} - if utilizedRate == 0 { - // Falling back to inflight stats when the workers are long running - inflight := m.sqsMessagesInflight.Get() - if inflight > 0 { - return float64(inflight) / float64(maxMessagesInflight) - } - } else if utilizedRate > 1 { - // Normalizing the utilization after long running workers - return 1 +// endSQSWorker is used to signal that the specified worker has +// finished. This is used update the SQS worker utilization metric. +// It also decrements the sqsMessagesInflight counter and +// sqsMessageProcessingTime histogram. +func (m *inputMetrics) endSQSWorker(id uint64) { + m.sqsMessagesInflight.Dec() + + m.sqsWorkerUtilizationMutex.Lock() + defer m.sqsWorkerUtilizationMutex.Unlock() + now := currentTime() + start := m.sqsWorkerStartTimes[id] + delete(m.sqsWorkerStartTimes, id) + m.sqsMessageProcessingTime.Update(now.Sub(start).Nanoseconds()) + if start.Before(m.sqsWorkerUtilizationLastUpdate) { + m.sqsWorkerUtilizationCurrentPeriod += now.Sub(m.sqsWorkerUtilizationLastUpdate) + } else { + m.sqsWorkerUtilizationCurrentPeriod += now.Sub(start) } - - return utilizedRate } -func (m *inputMetrics) pollSQSUtilizationMetric(ctx context.Context, maxMessagesInflight int) { - t := time.NewTicker(5 * time.Second) - defer t.Stop() - - lastTick := time.Now() - for { - select { - case <-ctx.Done(): - return - case tick := <-t.C: - duration := tick.Sub(lastTick) - utilizedRate := calculateUtilizationAndReset(duration, maxMessagesInflight, m) - m.sqsWorkerUtilization.Set(utilizedRate) - // reset for the next polling duration - lastTick = tick +// updateSqsWorkerUtilization updates the sqsWorkerUtilization metric. +// This is invoked periodically to compute the utilization level +// of the SQS workers. 0 indicates no workers were utilized during +// the period. And 1 indicates that all workers fully utilized +// during the period. +func (m *inputMetrics) updateSqsWorkerUtilization() { + m.sqsWorkerUtilizationMutex.Lock() + defer m.sqsWorkerUtilizationMutex.Unlock() + + now := currentTime() + lastPeriodDuration := now.Sub(m.sqsWorkerUtilizationLastUpdate) + maxUtilization := float64(m.sqsMaxMessagesInflight) * lastPeriodDuration.Seconds() + + for _, startTime := range m.sqsWorkerStartTimes { + // If the worker started before the current period then only compute + // from elapsed time since the last update. Otherwise, it started + // during the current period so compute time elapsed since it started. + if startTime.Before(m.sqsWorkerUtilizationLastUpdate) { + m.sqsWorkerUtilizationCurrentPeriod += lastPeriodDuration + } else { + m.sqsWorkerUtilizationCurrentPeriod += now.Sub(startTime) } } + + utilization := math.Round(m.sqsWorkerUtilizationCurrentPeriod.Seconds()/maxUtilization*1000) / 1000 + if utilization > 1 { + utilization = 1 + } + m.sqsWorkerUtilization.Set(utilization) + m.sqsWorkerUtilizationCurrentPeriod = 0 + m.sqsWorkerUtilizationLastUpdate = now } func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers int) *inputMetrics { reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) - ctx, cancelInputCtx := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) out := &inputMetrics{ registry: reg, unregister: unreg, ctx: ctx, - cancel: cancelInputCtx, + cancel: cancel, + sqsMaxMessagesInflight: maxWorkers, + sqsWorkerStartTimes: map[uint64]time.Time{}, + sqsWorkerUtilizationLastUpdate: currentTime(), sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), @@ -138,7 +176,12 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. - go out.pollSQSUtilizationMetric(ctx, maxWorkers) + // Periodically update the sqs worker utilization metric. + //nolint:errcheck // This never returns an error. + go timed.Periodic(ctx, 5*time.Second, func() error { + out.updateSqsWorkerUtilization() + return nil + }) return out } diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 0ba168dccc6..5d3132d5da9 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -29,35 +30,74 @@ func TestInputMetricsClose(t *testing.T) { } func TestInputMetricsSQSWorkerUtilization(t *testing.T) { - reg := monitoring.NewRegistry() - const maxWorkers = 1 - metrics := newInputMetrics("test", reg, maxWorkers) + const interval = 5000 - metrics.utilizationNanos = time.Second.Nanoseconds() + t.Run("worker ends before one interval", func(t *testing.T) { + fakeTimeMs.Store(0) + defer useFakeCurrentTimeThenReset()() - utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) - assert.Greater(t, utilization, 0.95) -} + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg, 1) + metrics.Close() -func TestInputMetricsSQSWorkerUtilization_LongRunningWorkers(t *testing.T) { - reg := monitoring.NewRegistry() - const maxWorkers = 1 - metrics := newInputMetrics("test", reg, maxWorkers) + id := metrics.beginSQSWorker() + fakeTimeMs.Add(2500) + metrics.endSQSWorker(id) - metrics.utilizationNanos = time.Minute.Nanoseconds() + fakeTimeMs.Store(1 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 0.5, metrics.sqsWorkerUtilization.Get()) + }) + t.Run("worker ends mid interval", func(t *testing.T) { + fakeTimeMs.Store(0) + defer useFakeCurrentTimeThenReset()() - utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) - assert.Equal(t, utilization, 1.0) -} + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg, 1) + metrics.Close() -func TestInputMetricsSQSWorkerUtilization_InFlightWorkers(t *testing.T) { - reg := monitoring.NewRegistry() - const maxWorkers = 1 - metrics := newInputMetrics("test", reg, maxWorkers) + fakeTimeMs.Add(4000) + id := metrics.beginSQSWorker() + + fakeTimeMs.Store(1 * interval) + metrics.updateSqsWorkerUtilization() + + fakeTimeMs.Add(1000) + metrics.endSQSWorker(id) + + fakeTimeMs.Store(2 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 0.2, metrics.sqsWorkerUtilization.Get()) + }) + t.Run("running worker goes longer than an interval", func(t *testing.T) { + fakeTimeMs.Store(0) + defer useFakeCurrentTimeThenReset()() + + reg := monitoring.NewRegistry() + metrics := newInputMetrics("test", reg, 1) + metrics.Close() + + id := metrics.beginSQSWorker() + + fakeTimeMs.Store(1 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 1.0, metrics.sqsWorkerUtilization.Get()) + + fakeTimeMs.Store(2 * interval) + metrics.updateSqsWorkerUtilization() + assert.Equal(t, 1.0, metrics.sqsWorkerUtilization.Get()) + + metrics.endSQSWorker(id) + }) +} - metrics.utilizationNanos = 0 - metrics.sqsMessagesInflight.Set(maxWorkers) +var fakeTimeMs = &atomic.Int64{} - utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics) - assert.Equal(t, utilization, 1.0) +func useFakeCurrentTimeThenReset() (reset func()) { + currentTime = func() time.Time { + return time.UnixMilli(fakeTimeMs.Load()) + } + return func() { + currentTime = time.Now + } } diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index e39f700b131..7b652e6c160 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -79,13 +79,12 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) - r.metrics.sqsMessagesInflight.Add(uint64(len(msgs))) workerWg.Add(len(msgs)) for _, msg := range msgs { go func(msg types.Message, start time.Time) { + id := r.metrics.beginSQSWorker() defer func() { - r.metrics.sqsMessagesInflight.Dec() - r.metrics.updateSQSProcessingTime(time.Since(start)) + r.metrics.endSQSWorker(id) workerWg.Done() r.workerSem.Release(1) }()