Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding aws-s3 metric for sqs worker utilization #34793

Merged
merged 30 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d91dc32
added sqs worker utilization metric
kgeller Mar 9, 2023
8ac5ec2
little tweaks
kgeller Mar 9, 2023
998f694
changelog
kgeller Mar 9, 2023
0f1bda6
Merge branch 'main' into s3-metric-utilization
kgeller Mar 9, 2023
d365873
fixing args for test
kgeller Mar 9, 2023
f46a5c7
Merge branch 's3-metric-utilization' of github.com:kgeller/beats into…
kgeller Mar 9, 2023
6a9f132
more test param fixes
kgeller Mar 9, 2023
a5d12aa
review feedback updates
kgeller Mar 10, 2023
a3d0669
cleanup and fix
kgeller Mar 10, 2023
092065c
null check
kgeller Mar 10, 2023
223434b
Update x-pack/filebeat/input/awss3/input.go
kgeller Mar 20, 2023
68d507e
Merge branch 'main' into s3-metric-utilization
kgeller Mar 20, 2023
eba0271
review cleanup: renaming and organizing better
kgeller Mar 20, 2023
78a1c0e
Merge branch 'main' into s3-metric-utilization
kgeller Mar 20, 2023
1bd00ef
adapting chan logic
kgeller Mar 23, 2023
4f81bf2
Merge branch 'main' into s3-metric-utilization
kgeller Mar 23, 2023
48a5edc
refactored to have polling encapsulated by metrics, and new test veri…
kgeller Mar 27, 2023
bdd616b
Merge branch 'main' of https://github.com/elastic/beats into s3-metri…
kgeller Mar 27, 2023
4657e0a
poke
kgeller Mar 27, 2023
0e53261
Merge branch 'main' of https://github.com/elastic/beats into s3-metri…
kgeller Mar 27, 2023
84127c8
formatting
kgeller Mar 27, 2023
a26f275
Update x-pack/filebeat/input/awss3/metrics_test.go
kgeller Mar 28, 2023
045277d
Update x-pack/filebeat/input/awss3/metrics_test.go
kgeller Mar 28, 2023
23368f3
tweak naming
kgeller Mar 28, 2023
a8c6a97
Merge branch 's3-metric-utilization' of github.com:kgeller/beats into…
kgeller Mar 28, 2023
fb6a17d
handling of long running workers
kgeller Mar 28, 2023
0e41f19
Merge branch 'main' into s3-metric-utilization
kgeller Mar 28, 2023
ed4cfe8
Merge branch 'main' into s3-metric-utilization
kgeller Mar 29, 2023
32034e1
Track running time of SQS worker routines
andrewkroh Mar 30, 2023
99eb889
Merge branch 'main' into s3-metric-utilization
andrewkroh Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add Basic Authentication support on constructed requests to CEL input {issue}34609[34609] {pull}34689[34689]
- Add string manipulation extensions to CEL input {issue}34610[34610] {pull}34689[34689]
- Add unix socket log parsing for nginx ingress_controller {pull}34732[34732]
- Added metric `sqs_worker_utilization` for aws-s3 input. {pull}34793[34793]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ observe the activity of the input.
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
Expand Down
33 changes: 31 additions & 2 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
defer receiver.metrics.Close()

// Poll sqs waiting metric periodically in the background.
// Poll metrics periodically in the background
go pollSqsWaitingMetric(ctx, receiver)
metricReporterChan := make(chan int64)
defer close(metricReporterChan)
kgeller marked this conversation as resolved.
Show resolved Hide resolved
go pollSqsUtilizationMetric(ctx, receiver, metricReporterChan)
kgeller marked this conversation as resolved.
Show resolved Hide resolved

if err := receiver.Receive(ctx); err != nil {
if err := receiver.Receive(ctx, metricReporterChan); err != nil {
return err
}
}
Expand Down Expand Up @@ -410,5 +413,31 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) {
}
}

func pollSqsUtilizationMetric(ctx context.Context, receiver *sqsReader, metricReporterChan chan int64) {
pollDur := 5 * time.Second
t := time.NewTicker(pollDur)
defer t.Stop()

utilizedNanos := new(int64)
go func() {
for elem := range metricReporterChan {
*utilizedNanos += elem
kgeller marked this conversation as resolved.
Show resolved Hide resolved
}
}()

for {
select {
case <-ctx.Done():
return
case <-t.C:
denom := pollDur.Nanoseconds() * int64(receiver.maxMessagesInflight)
kgeller marked this conversation as resolved.
Show resolved Hide resolved
utilizedRate := float64(*utilizedNanos) / float64(denom)
receiver.metrics.sqsWorkerUtilization.Set(utilizedRate)
// reset for the next polling duration
*utilizedNanos = 0
}
}
}

// boolPtr returns a pointer to b.
func boolPtr(b bool) *bool { return &b }
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR

b.ResetTimer()
start := time.Now()
if err := sqsReader.Receive(ctx); err != nil {
if err := sqsReader.Receive(ctx, nil); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0)
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -430,4 +431,5 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0)
}
18 changes: 10 additions & 8 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ type inputMetrics struct {
registry *monitoring.Registry
unregister func()

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsWorkerUtilization *monitoring.Float // Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded.
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed.
Expand Down Expand Up @@ -65,6 +66,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"),
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"),
Expand Down
7 changes: 5 additions & 2 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newSQSReader(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, maxMessag
}
}

func (r *sqsReader) Receive(ctx context.Context) error {
func (r *sqsReader) Receive(ctx context.Context, metricReporterChan chan int64) error {
// This loop tries to keep the workers busy as much as possible while
// honoring the max message cap as opposed to a simpler loop that receives
// N messages, waits for them all to finish, then requests N more messages.
Expand All @@ -61,6 +61,7 @@ func (r *sqsReader) Receive(ctx context.Context) error {

// Receive (at most) as many SQS messages as there are workers.
msgs, err := r.sqs.ReceiveMessage(ctx, workers)
r.log.Warnw("Messages received", "msg count", len(msgs))
if err != nil {
r.workerSem.Release(workers)

Expand All @@ -85,7 +86,9 @@ func (r *sqsReader) Receive(ctx context.Context) error {
go func(msg types.Message, start time.Time) {
defer func() {
r.metrics.sqsMessagesInflight.Dec()
r.metrics.sqsMessageProcessingTime.Update(time.Since(start).Nanoseconds())
processingTimeNanos := time.Since(start).Nanoseconds()
r.metrics.sqsMessageProcessingTime.Update(processingTimeNanos)
metricReporterChan <- processingTimeNanos
workerWg.Done()
r.workerSem.Release(1)
}()
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestSQSReceiver(t *testing.T) {

// Execute sqsReader and verify calls/state.
receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler)
require.NoError(t, receiver.Receive(ctx))
require.NoError(t, receiver.Receive(ctx, nil))
assert.Equal(t, maxMessages, receiver.workerSem.Available())
})

Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSQSReceiver(t *testing.T) {

// Execute SQSReceiver and verify calls/state.
receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler)
require.NoError(t, receiver.Receive(ctx))
require.NoError(t, receiver.Receive(ctx, nil))
assert.Equal(t, maxMessages, receiver.workerSem.Available())
})
}
Expand Down