From 31839b7f51af0369c4a1980c3cc608bfc05d63b3 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 6 Feb 2023 12:13:50 -0500 Subject: [PATCH 01/19] added metric for polling the approximate message count from sqs --- x-pack/filebeat/input/awss3/input.go | 14 +++++++++++ .../input/awss3/input_integration_test.go | 2 ++ x-pack/filebeat/input/awss3/interfaces.go | 24 +++++++++++++++++++ x-pack/filebeat/input/awss3/metrics.go | 2 ++ x-pack/filebeat/input/awss3/sqs.go | 15 +++++++++++- 5 files changed, 56 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 4178661c42b..0b8d3175535 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -9,6 +9,7 @@ import ( "fmt" "net/url" "strings" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -123,6 +124,8 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } defer receiver.metrics.Close() + go MetricPoll(ctx, receiver) + if err := receiver.Receive(ctx); err != nil { return err } @@ -376,5 +379,16 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { return "unknown" } +func MetricPoll(ctx context.Context, receiver *sqsReader) { + t := time.NewTicker(1 * time.Minute) + + for range t.C { + count := receiver.GetApproximateMessageCount(ctx) + if count > -1 { + receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) + } + } +} + // boolPtr returns a pointer to b. func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index d950e87ff69..29d43aca8e7 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,6 +187,7 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) @@ -424,6 +425,7 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 64e522168b6..1f1390c4f2f 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -41,6 +41,7 @@ type sqsAPI interface { sqsReceiver sqsDeleter sqsVisibilityChanger + sqsAttributeGetter } type sqsReceiver interface { @@ -55,6 +56,10 @@ type sqsVisibilityChanger interface { ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error } +type sqsAttributeGetter interface { + GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) +} + type sqsProcessor interface { // ProcessSQS processes and SQS message. It takes fully ownership of the // given message and is responsible for updating the message's visibility @@ -197,6 +202,25 @@ func (a *awsSQSAPI) ChangeMessageVisibility(ctx context.Context, msg *types.Mess return nil } +func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { + ctx, cancel := context.WithTimeout(ctx, a.apiTimeout) + defer cancel() + + attributeOutput, err := a.client.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ + AttributeNames: attr, + QueueUrl: awssdk.String(a.queueURL), + }) + + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + err = fmt.Errorf("api_timeout exceeded: %w", err) + } + return nil, fmt.Errorf("sqs GetQueueAttributes failed: %w", err) + } + + return attributeOutput.Attributes, nil +} + // ------ // AWS S3 implementation // ------ diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index a1e9f4d4423..585773af84a 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -22,6 +22,7 @@ type inputMetrics struct { sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. + sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge). sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. @@ -50,6 +51,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsMessagesWaiting: monitoring.NewUint(reg, "sqs_messages_waiting_gauge"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index f5f175d4f6d..c07ae984e58 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -7,6 +7,7 @@ package awss3 import ( "context" "errors" + "strconv" "sync" "time" @@ -19,7 +20,8 @@ import ( ) const ( - sqsRetryDelay = 10 * time.Second + sqsRetryDelay = 10 * time.Second + sqsApproximateNumberOfMessages = "ApproximateNumberOfMessages" ) type sqsReader struct { @@ -107,3 +109,14 @@ func (r *sqsReader) Receive(ctx context.Context) error { } return ctx.Err() } + +func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) int { + if attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}); err == nil { + if c, found := attributes[sqsApproximateNumberOfMessages]; found { + if messagesCount, err := strconv.Atoi(c); err == nil { + return messagesCount + } + } + } + return -1 +} From 20c977d4e5890ce7ae4b7add1facc0953c7dadf9 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 6 Feb 2023 12:30:00 -0500 Subject: [PATCH 02/19] documentation and rename --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 1 + x-pack/filebeat/input/awss3/input.go | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index bc165ccba29..030e92b7fa1 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -732,6 +732,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge). | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 0b8d3175535..1707b975c99 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -124,7 +124,8 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } defer receiver.metrics.Close() - go MetricPoll(ctx, receiver) + // Poll sqs waiting metric periodically in the background. + go PollSqsWaitingMetric(ctx, receiver) if err := receiver.Receive(ctx); err != nil { return err @@ -379,7 +380,7 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { return "unknown" } -func MetricPoll(ctx context.Context, receiver *sqsReader) { +func PollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { t := time.NewTicker(1 * time.Minute) for range t.C { From 4472dfed875292488d5c8fe2d84bc657700f10e3 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 6 Feb 2023 12:32:19 -0500 Subject: [PATCH 03/19] changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2728c93ebfd..ee3c0bb3139 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -184,6 +184,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065] - Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302] - Added support for HTTP destination override to Google Cloud Storage input. {pull}34413[34413] +- Added metric `sqs_messages_waiting_gauge` for aws-s3 input. {pull}34488[34488] *Auditbeat* From f56a88e1ace54b1482806ade3f0800ed2ae6e15c Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 6 Feb 2023 12:46:11 -0500 Subject: [PATCH 04/19] initializing info for benchmarking --- x-pack/filebeat/input/awss3/input_benchmark_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index ee113c4c303..2199d2b4980 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -67,6 +67,10 @@ func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.M return nil } +func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) { + return map[string]string{}, nil +} + type s3PagerConstant struct { mutex *sync.Mutex objects []s3Types.Object From 4a379f03526e2fbbede51a66f602a42b79178d93 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 6 Feb 2023 13:00:47 -0500 Subject: [PATCH 05/19] more foundational test pieces --- .../filebeat/input/awss3/mock_interfaces_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 39889de990c..a6e5b596115 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -88,6 +88,21 @@ func (mr *MockSQSAPIMockRecorder) ReceiveMessage(ctx, maxMessages interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockSQSAPI)(nil).ReceiveMessage), ctx, maxMessages) } +// GetQueueAttributes mocks base method. +func (m *MockSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetQueueAttributes indicates an expected call of GetQueueAttributes. +func (mr *MockSQSAPIMockRecorder) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MockSQSAPI)(nil).GetQueueAttributes), ctx, attr) +} + // MocksqsReceiver is a mock of sqsReceiver interface. type MocksqsReceiver struct { ctrl *gomock.Controller From a428d70f6f251d22eef9c5c84a0696089bb5dbe9 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 7 Feb 2023 13:42:42 -0500 Subject: [PATCH 06/19] update ticker logic to stop on context cancelled --- x-pack/filebeat/input/awss3/input.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 1707b975c99..7dc8570eeb3 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -381,12 +381,17 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { } func PollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { - t := time.NewTicker(1 * time.Minute) - - for range t.C { - count := receiver.GetApproximateMessageCount(ctx) - if count > -1 { - receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) + t := time.NewTicker(time.Minute) + for { + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + count := receiver.GetApproximateMessageCount(ctx) + if count > -1 { + receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) + } } } } From 0856bf1d54775f1b0441ccdfdd9eebffad042f6c Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 7 Feb 2023 13:53:27 -0500 Subject: [PATCH 07/19] documentation --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 7 ++++++- x-pack/filebeat/input/awss3/metrics.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 030e92b7fa1..2489e79f391 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -497,6 +497,11 @@ In case `delete_after_backup` is set the following permission is required as wel s3:DeleteObject ---- +In case optional SQS metric `sqs_messages_waiting_gauge` is desired, the following permission is required: +---- +sqs:GetQueueAttributes +---- + [float] === S3 and SQS setup @@ -732,7 +737,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. -| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge). +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge) (optional). | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 585773af84a..53ead854744 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -22,7 +22,7 @@ type inputMetrics struct { sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. - sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge). + sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge) (optional). sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. From f786f5c5efde0c803281c281500bd08e7931669e Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 7 Feb 2023 15:01:48 -0500 Subject: [PATCH 08/19] mockgen generation --- .../input/awss3/mock_interfaces_test.go | 177 ++++++++++++++---- 1 file changed, 136 insertions(+), 41 deletions(-) diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index a6e5b596115..b976cf00ebc 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -73,34 +73,34 @@ func (mr *MockSQSAPIMockRecorder) DeleteMessage(ctx, msg interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMessage", reflect.TypeOf((*MockSQSAPI)(nil).DeleteMessage), ctx, msg) } -// ReceiveMessage mocks base method. -func (m *MockSQSAPI) ReceiveMessage(ctx context.Context, maxMessages int) ([]types.Message, error) { +// GetQueueAttributes mocks base method. +func (m *MockSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReceiveMessage", ctx, maxMessages) - ret0, _ := ret[0].([]types.Message) + ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) + ret0, _ := ret[0].(map[string]string) ret1, _ := ret[1].(error) return ret0, ret1 } -// ReceiveMessage indicates an expected call of ReceiveMessage. -func (mr *MockSQSAPIMockRecorder) ReceiveMessage(ctx, maxMessages interface{}) *gomock.Call { +// GetQueueAttributes indicates an expected call of GetQueueAttributes. +func (mr *MockSQSAPIMockRecorder) GetQueueAttributes(ctx, attr interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockSQSAPI)(nil).ReceiveMessage), ctx, maxMessages) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MockSQSAPI)(nil).GetQueueAttributes), ctx, attr) } -// GetQueueAttributes mocks base method. -func (m *MockSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { +// ReceiveMessage mocks base method. +func (m *MockSQSAPI) ReceiveMessage(ctx context.Context, maxMessages int) ([]types.Message, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) - ret0, _ := ret[0].(map[string]string) + ret := m.ctrl.Call(m, "ReceiveMessage", ctx, maxMessages) + ret0, _ := ret[0].([]types.Message) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetQueueAttributes indicates an expected call of GetQueueAttributes. -func (mr *MockSQSAPIMockRecorder) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) *gomock.Call { +// ReceiveMessage indicates an expected call of ReceiveMessage. +func (mr *MockSQSAPIMockRecorder) ReceiveMessage(ctx, maxMessages interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MockSQSAPI)(nil).GetQueueAttributes), ctx, attr) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockSQSAPI)(nil).ReceiveMessage), ctx, maxMessages) } // MocksqsReceiver is a mock of sqsReceiver interface. @@ -215,6 +215,44 @@ func (mr *MocksqsVisibilityChangerMockRecorder) ChangeMessageVisibility(ctx, msg return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChangeMessageVisibility", reflect.TypeOf((*MocksqsVisibilityChanger)(nil).ChangeMessageVisibility), ctx, msg, timeout) } +// MocksqsAttributeGetter is a mock of sqsAttributeGetter interface. +type MocksqsAttributeGetter struct { + ctrl *gomock.Controller + recorder *MocksqsAttributeGetterMockRecorder +} + +// MocksqsAttributeGetterMockRecorder is the mock recorder for MocksqsAttributeGetter. +type MocksqsAttributeGetterMockRecorder struct { + mock *MocksqsAttributeGetter +} + +// NewMocksqsAttributeGetter creates a new mock instance. +func NewMocksqsAttributeGetter(ctrl *gomock.Controller) *MocksqsAttributeGetter { + mock := &MocksqsAttributeGetter{ctrl: ctrl} + mock.recorder = &MocksqsAttributeGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MocksqsAttributeGetter) EXPECT() *MocksqsAttributeGetterMockRecorder { + return m.recorder +} + +// GetQueueAttributes mocks base method. +func (m *MocksqsAttributeGetter) GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetQueueAttributes", ctx, attr) + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetQueueAttributes indicates an expected call of GetQueueAttributes. +func (mr *MocksqsAttributeGetterMockRecorder) GetQueueAttributes(ctx, attr interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueAttributes", reflect.TypeOf((*MocksqsAttributeGetter)(nil).GetQueueAttributes), ctx, attr) +} + // MockSQSProcessor is a mock of sqsProcessor interface. type MockSQSProcessor struct { ctrl *gomock.Controller @@ -275,21 +313,7 @@ func (m *MockS3API) EXPECT() *MockS3APIMockRecorder { return m.recorder } -// GetObject mocks base method. -func (m *MockS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetObject", ctx, bucket, key) - ret0, _ := ret[0].(*s3.GetObjectOutput) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetObject indicates an expected call of GetObject. -func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) -} - +// CopyObject mocks base method. func (m *MockS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) @@ -298,11 +322,13 @@ func (m *MockS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from return ret0, ret1 } +// CopyObject indicates an expected call of CopyObject. func (mr *MockS3APIMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, from_bucket, to_bucket, from_key, to_key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).CopyObject), ctx, from_bucket, to_bucket, from_key, to_key) } +// DeleteObject mocks base method. func (m *MockS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) @@ -311,9 +337,25 @@ func (m *MockS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.D return ret0, ret1 } +// DeleteObject indicates an expected call of DeleteObject. func (mr *MockS3APIMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).DeleteObject), ctx, bucket, key) +} + +// GetObject mocks base method. +func (m *MockS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetObject", ctx, bucket, key) + ret0, _ := ret[0].(*s3.GetObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetObject indicates an expected call of GetObject. +func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) } // ListObjectsPaginator mocks base method. @@ -368,6 +410,59 @@ func (mr *Mocks3GetterMockRecorder) GetObject(ctx, bucket, key interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*Mocks3Getter)(nil).GetObject), ctx, bucket, key) } +// Mocks3Mover is a mock of s3Mover interface. +type Mocks3Mover struct { + ctrl *gomock.Controller + recorder *Mocks3MoverMockRecorder +} + +// Mocks3MoverMockRecorder is the mock recorder for Mocks3Mover. +type Mocks3MoverMockRecorder struct { + mock *Mocks3Mover +} + +// NewMocks3Mover creates a new mock instance. +func NewMocks3Mover(ctrl *gomock.Controller) *Mocks3Mover { + mock := &Mocks3Mover{ctrl: ctrl} + mock.recorder = &Mocks3MoverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mocks3Mover) EXPECT() *Mocks3MoverMockRecorder { + return m.recorder +} + +// CopyObject mocks base method. +func (m *Mocks3Mover) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) + ret0, _ := ret[0].(*s3.CopyObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CopyObject indicates an expected call of CopyObject. +func (mr *Mocks3MoverMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*Mocks3Mover)(nil).CopyObject), ctx, from_bucket, to_bucket, from_key, to_key) +} + +// DeleteObject mocks base method. +func (m *Mocks3Mover) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) + ret0, _ := ret[0].(*s3.DeleteObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *Mocks3MoverMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*Mocks3Mover)(nil).DeleteObject), ctx, bucket, key) +} + // Mocks3Lister is a mock of s3Lister interface. type Mocks3Lister struct { ctrl *gomock.Controller @@ -522,32 +617,32 @@ func (m *MockS3ObjectHandler) EXPECT() *MockS3ObjectHandlerMockRecorder { return m.recorder } -// ProcessS3Object mocks base method. -func (m *MockS3ObjectHandler) ProcessS3Object() error { +// FinalizeS3Object mocks base method. +func (m *MockS3ObjectHandler) FinalizeS3Object() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessS3Object") + ret := m.ctrl.Call(m, "FinalizeS3Object") ret0, _ := ret[0].(error) return ret0 } -// ProcessS3Object indicates an expected call of ProcessS3Object. -func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object() *gomock.Call { +// FinalizeS3Object indicates an expected call of FinalizeS3Object. +func (mr *MockS3ObjectHandlerMockRecorder) FinalizeS3Object() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalizeS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).FinalizeS3Object)) } // ProcessS3Object mocks base method. -func (m *MockS3ObjectHandler) FinalizeS3Object() error { +func (m *MockS3ObjectHandler) ProcessS3Object() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FinalizeS3Object") + ret := m.ctrl.Call(m, "ProcessS3Object") ret0, _ := ret[0].(error) return ret0 } // ProcessS3Object indicates an expected call of ProcessS3Object. -func (mr *MockS3ObjectHandlerMockRecorder) FinalizeS3Object() *gomock.Call { +func (mr *MockS3ObjectHandlerMockRecorder) ProcessS3Object() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalizeS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).FinalizeS3Object)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessS3Object", reflect.TypeOf((*MockS3ObjectHandler)(nil).ProcessS3Object)) } // Wait mocks base method. From 2a7afd5479ffe2a37205ea45a7a5491223df4521 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 7 Feb 2023 15:14:46 -0500 Subject: [PATCH 09/19] stop polling on permissions error --- x-pack/filebeat/input/awss3/input.go | 10 ++++++---- x-pack/filebeat/input/awss3/sqs.go | 9 +++++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 7dc8570eeb3..50ecdaad0b4 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -382,16 +382,18 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { func PollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { t := time.NewTicker(time.Minute) + defer t.Stop() for { select { case <-ctx.Done(): - t.Stop() return case <-t.C: - count := receiver.GetApproximateMessageCount(ctx) - if count > -1 { - receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) + count, err := receiver.GetApproximateMessageCount(ctx) + if strings.Contains(err.Error(), "StatusCode: 403") { + // stop polling if permissions error is encountered + return } + receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) } } } diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index c07ae984e58..1a62d0a4976 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -110,13 +110,14 @@ func (r *sqsReader) Receive(ctx context.Context) error { return ctx.Err() } -func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) int { - if attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}); err == nil { +func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) (int, error) { + attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}) + if err == nil { if c, found := attributes[sqsApproximateNumberOfMessages]; found { if messagesCount, err := strconv.Atoi(c); err == nil { - return messagesCount + return messagesCount, nil } } } - return -1 + return -1, err } From ffa7bbf0a664bbf8a51290b227b3aaa22cdab20a Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 7 Feb 2023 17:08:40 -0500 Subject: [PATCH 10/19] back to just -1, simpler --- x-pack/filebeat/input/awss3/input.go | 6 +++--- x-pack/filebeat/input/awss3/sqs.go | 9 ++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 50ecdaad0b4..03db3e011bf 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -388,9 +388,9 @@ func PollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { case <-ctx.Done(): return case <-t.C: - count, err := receiver.GetApproximateMessageCount(ctx) - if strings.Contains(err.Error(), "StatusCode: 403") { - // stop polling if permissions error is encountered + count := receiver.GetApproximateMessageCount(ctx) + if count == -1 { + // stop polling if error is encountered return } receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 1a62d0a4976..c07ae984e58 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -110,14 +110,13 @@ func (r *sqsReader) Receive(ctx context.Context) error { return ctx.Err() } -func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) (int, error) { - attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}) - if err == nil { +func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) int { + if attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}); err == nil { if c, found := attributes[sqsApproximateNumberOfMessages]; found { if messagesCount, err := strconv.Atoi(c); err == nil { - return messagesCount, nil + return messagesCount } } } - return -1, err + return -1 } From e651cbf56ec1d444d9d07820672f4559d16ef64a Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 7 Feb 2023 17:08:52 -0500 Subject: [PATCH 11/19] tests --- x-pack/filebeat/input/awss3/sqs_test.go | 56 +++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index d798f6e782e..5bbfa156f1e 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -24,6 +24,7 @@ import ( const testTimeout = 10 * time.Second var errFakeConnectivityFailure = errors.New("fake connectivity failure") +var errFakeGetAttributeFailute = errors.New("something went wrong") func TestSQSReceiver(t *testing.T) { err := logp.TestingSetup() @@ -109,6 +110,61 @@ func TestSQSReceiver(t *testing.T) { }) } +func TestGetApproximateMessageCount(t *testing.T) { + err := logp.TestingSetup() + assert.Nil(t, err) + + const maxMessages = 5 + const count = 500 + attrName := []types.QueueAttributeName{sqsApproximateNumberOfMessages} + attr := map[string]string{"ApproximateNumberOfMessages": "500"} + + t.Run("GetApproximateMessageCount success", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + mockAPI := NewMockSQSAPI(ctrl) + mockMsgHandler := NewMockSQSProcessor(ctrl) + + gomock.InOrder( + mockAPI.EXPECT(). + GetQueueAttributes(gomock.Any(), gomock.Eq(attrName)). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.QueueAttributeName) (map[string]string, error) { + return attr, nil + }), + ) + + receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) + assert.Equal(t, count, receiver.GetApproximateMessageCount(ctx)) + }) + + t.Run("GetApproximateMessageCount error", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + + mockAPI := NewMockSQSAPI(ctrl) + mockMsgHandler := NewMockSQSProcessor(ctrl) + + gomock.InOrder( + mockAPI.EXPECT(). + GetQueueAttributes(gomock.Any(), gomock.Eq(attrName)). + Times(1). + DoAndReturn(func(_ context.Context, _ []types.QueueAttributeName) (map[string]string, error) { + return nil, errFakeGetAttributeFailute + }), + ) + + receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) + assert.Equal(t, -1, receiver.GetApproximateMessageCount(ctx)) + }) +} + func newSQSMessage(events ...s3EventV2) types.Message { body, err := json.Marshal(s3EventsV2{Records: events}) if err != nil { From 7762df25e3315dea55e900812e960ae22865a9a0 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Feb 2023 09:49:13 -0500 Subject: [PATCH 12/19] cr tweaks --- x-pack/filebeat/input/awss3/input.go | 4 ++-- x-pack/filebeat/input/awss3/sqs_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 03db3e011bf..3f51c04f82e 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -125,7 +125,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { defer receiver.metrics.Close() // Poll sqs waiting metric periodically in the background. - go PollSqsWaitingMetric(ctx, receiver) + go pollSqsWaitingMetric(ctx, receiver) if err := receiver.Receive(ctx); err != nil { return err @@ -380,7 +380,7 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { return "unknown" } -func PollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { +func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { t := time.NewTicker(time.Minute) defer t.Stop() for { diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 5bbfa156f1e..440fa6b6e96 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -23,8 +23,10 @@ import ( const testTimeout = 10 * time.Second -var errFakeConnectivityFailure = errors.New("fake connectivity failure") -var errFakeGetAttributeFailute = errors.New("something went wrong") +var ( + errFakeConnectivityFailure = errors.New("fake connectivity failure") + errFakeGetAttributeFailute = errors.New("something went wrong") +) func TestSQSReceiver(t *testing.T) { err := logp.TestingSetup() From 94393da751b6d1f0e35060d069ffc0c3188b1a32 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Thu, 9 Feb 2023 16:15:29 -0500 Subject: [PATCH 13/19] updated description --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/input/awss3/metrics.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 2489e79f391..ca85f5507a8 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -737,7 +737,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. -| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge) (optional). +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 53ead854744..4391ac55a3b 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -22,7 +22,7 @@ type inputMetrics struct { sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. - sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge) (optional). + sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. From b13ec823ab0a2711f979f8bcd7185433c49fa11a Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Fri, 10 Feb 2023 10:54:01 -0500 Subject: [PATCH 14/19] adding in flag for permissions --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 1 + x-pack/filebeat/input/awss3/input.go | 4 ++++ x-pack/filebeat/input/awss3/metrics.go | 2 ++ 3 files changed, 7 insertions(+) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index ca85f5507a8..45b41b29563 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -740,6 +740,7 @@ observe the activity of the input. | `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. +| `sqs_has_get_attributes_permission` | Boolean indicating if input has permissions for SQS GetQueueAttributes API call (gauge). | `s3_objects_requested_total` | Number of S3 objects downloaded. | `s3_objects_listed_total` | Number of S3 objects returned by list operations. | `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 3f51c04f82e..e5322d06322 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -391,8 +391,12 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { count := receiver.GetApproximateMessageCount(ctx) if count == -1 { // stop polling if error is encountered + receiver.metrics.sqsHasGetAttributesPermission.Set(false) return } + if !receiver.metrics.sqsHasGetAttributesPermission.Get() { + receiver.metrics.sqsHasGetAttributesPermission.Set(true) + } receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) } } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 4391ac55a3b..7d14514f1af 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -25,6 +25,7 @@ type inputMetrics struct { sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. + sqsHasGetAttributesPermission *monitoring.Bool // Boolean indicating if input has permissions for SQS GetQueueAttributes API call (gauge). s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded. s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed. @@ -54,6 +55,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri sqsMessagesWaiting: monitoring.NewUint(reg, "sqs_messages_waiting_gauge"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), + sqsHasGetAttributesPermission: monitoring.NewBool(reg, "sqs_has_get_attributes_permission"), s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), s3ObjectsAckedTotal: monitoring.NewUint(reg, "s3_objects_acked_total"), s3ObjectsListedTotal: monitoring.NewUint(reg, "s3_objects_listed_total"), From 057982dd7f2d94ffe291558425c1b0586201dc74 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Tue, 21 Feb 2023 13:25:33 -0500 Subject: [PATCH 15/19] remove bool and update to initialize after success --- .../filebeat/docs/inputs/input-aws-s3.asciidoc | 1 - x-pack/filebeat/input/awss3/input.go | 8 +++----- x-pack/filebeat/input/awss3/metrics.go | 17 +++++++++++++---- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 45b41b29563..ca85f5507a8 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -740,7 +740,6 @@ observe the activity of the input. | `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. -| `sqs_has_get_attributes_permission` | Boolean indicating if input has permissions for SQS GetQueueAttributes API call (gauge). | `s3_objects_requested_total` | Number of S3 objects downloaded. | `s3_objects_listed_total` | Number of S3 objects returned by list operations. | `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index e5322d06322..669bc63257d 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -391,13 +391,11 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { count := receiver.GetApproximateMessageCount(ctx) if count == -1 { // stop polling if error is encountered - receiver.metrics.sqsHasGetAttributesPermission.Set(false) + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) return } - if !receiver.metrics.sqsHasGetAttributesPermission.Get() { - receiver.metrics.sqsHasGetAttributesPermission.Set(true) - } - receiver.metrics.sqsMessagesWaiting.Set(uint64(count)) + receiver.metrics.Initialize() + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) } } } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 7d14514f1af..2d9bfe5286d 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -6,6 +6,7 @@ package awss3 import ( "io" + "sync" "github.com/rcrowley/go-metrics" @@ -14,7 +15,10 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) +var initWaitingMetricOnce sync.Once + type inputMetrics struct { + registry *monitoring.Registry unregister func() sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). @@ -22,10 +26,9 @@ type inputMetrics struct { sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. - sqsMessagesWaiting *monitoring.Uint // Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. + sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. - sqsHasGetAttributesPermission *monitoring.Bool // Boolean indicating if input has permissions for SQS GetQueueAttributes API call (gauge). s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded. s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed. @@ -42,20 +45,26 @@ func (m *inputMetrics) Close() { m.unregister() } +func (m *inputMetrics) Initialize() { + initWaitingMetricOnce.Do(func() { + m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") + }) +} + func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent) out := &inputMetrics{ + registry: reg, unregister: unreg, sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"), sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"), sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), - sqsMessagesWaiting: monitoring.NewUint(reg, "sqs_messages_waiting_gauge"), + sqsMessagesWaiting: nil, sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), - sqsHasGetAttributesPermission: monitoring.NewBool(reg, "sqs_has_get_attributes_permission"), s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), s3ObjectsAckedTotal: monitoring.NewUint(reg, "s3_objects_acked_total"), s3ObjectsListedTotal: monitoring.NewUint(reg, "s3_objects_listed_total"), From 13ff657bacf35ba6fc8fd5f19eaff79bd6b2f01c Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Wed, 22 Feb 2023 15:20:30 -0500 Subject: [PATCH 16/19] assorted updates; checking for 403, review cleanup --- .../docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/input/awss3/input.go | 11 +++++----- x-pack/filebeat/input/awss3/metrics.go | 20 +++++++++++-------- x-pack/filebeat/input/awss3/sqs.go | 9 +++++---- x-pack/filebeat/input/awss3/sqs_test.go | 8 ++++++-- 5 files changed, 29 insertions(+), 21 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index ca85f5507a8..40b412fab61 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -737,7 +737,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. -| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. | `s3_objects_requested_total` | Number of S3 objects downloaded. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 669bc63257d..ed507ba1fa9 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -388,14 +388,13 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { case <-ctx.Done(): return case <-t.C: - count := receiver.GetApproximateMessageCount(ctx) - if count == -1 { - // stop polling if error is encountered - receiver.metrics.sqsMessagesWaiting.Set(int64(count)) + count, err := receiver.GetApproximateMessageCount(ctx) + if count == -1 && strings.Contains(err.Error(), "StatusCode: 403") { + // stop polling if auth error is encountered + receiver.metrics.setSQSMessagesWaiting(int64(count)) return } - receiver.metrics.Initialize() - receiver.metrics.sqsMessagesWaiting.Set(int64(count)) + receiver.metrics.setSQSMessagesWaiting(int64(count)) } } } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 2d9bfe5286d..5ddba3610f1 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -15,18 +15,17 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) -var initWaitingMetricOnce sync.Once - type inputMetrics struct { - registry *monitoring.Registry - unregister func() + registry *monitoring.Registry + unregister func() + initSQSMessageWaitingOnce sync.Once sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge). sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted. - sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS Queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. + sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. @@ -45,10 +44,16 @@ func (m *inputMetrics) Close() { m.unregister() } -func (m *inputMetrics) Initialize() { - initWaitingMetricOnce.Do(func() { +func (m *inputMetrics) setSQSMessagesWaiting(count int64) { + if count == -1 && m.sqsMessagesWaiting == nil { + // if metric not initialized, and count is -1, do nothing + return + } + + m.initSQSMessageWaitingOnce.Do(func() { m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") }) + m.sqsMessagesWaiting.Set(count) } func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics { @@ -62,7 +67,6 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), - sqsMessagesWaiting: nil, sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"), diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index c07ae984e58..1a62d0a4976 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -110,13 +110,14 @@ func (r *sqsReader) Receive(ctx context.Context) error { return ctx.Err() } -func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) int { - if attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}); err == nil { +func (r *sqsReader) GetApproximateMessageCount(ctx context.Context) (int, error) { + attributes, err := r.sqs.GetQueueAttributes(ctx, []types.QueueAttributeName{sqsApproximateNumberOfMessages}) + if err == nil { if c, found := attributes[sqsApproximateNumberOfMessages]; found { if messagesCount, err := strconv.Atoi(c); err == nil { - return messagesCount + return messagesCount, nil } } } - return -1 + return -1, err } diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 440fa6b6e96..fe0c731da06 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -140,7 +140,9 @@ func TestGetApproximateMessageCount(t *testing.T) { ) receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) - assert.Equal(t, count, receiver.GetApproximateMessageCount(ctx)) + receivedCount, err := receiver.GetApproximateMessageCount(ctx) + assert.Equal(t, count, receivedCount) + assert.Nil(t, err) }) t.Run("GetApproximateMessageCount error", func(t *testing.T) { @@ -163,7 +165,9 @@ func TestGetApproximateMessageCount(t *testing.T) { ) receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler) - assert.Equal(t, -1, receiver.GetApproximateMessageCount(ctx)) + receivedCount, err := receiver.GetApproximateMessageCount(ctx) + assert.Equal(t, -1, receivedCount) + assert.NotNil(t, err) }) } From eb7dc99f63584cced687205c614510fd89bcb8d4 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Fri, 24 Feb 2023 16:03:02 -0500 Subject: [PATCH 17/19] fixing auth check --- x-pack/filebeat/input/awss3/input.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index ed507ba1fa9..669cf7f53e6 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -6,6 +6,7 @@ package awss3 import ( "context" + "errors" "fmt" "net/url" "strings" @@ -14,6 +15,7 @@ import ( awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/smithy-go" "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -25,7 +27,10 @@ import ( "github.com/elastic/go-concert/unison" ) -const inputName = "aws-s3" +const ( + inputName = "aws-s3" + sqsAccessDeniedErrorCode = "AccessDeniedException" +) func Plugin(store beater.StateStore) v2.Plugin { return v2.Plugin{ @@ -389,11 +394,17 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { return case <-t.C: count, err := receiver.GetApproximateMessageCount(ctx) - if count == -1 && strings.Contains(err.Error(), "StatusCode: 403") { - // stop polling if auth error is encountered - receiver.metrics.setSQSMessagesWaiting(int64(count)) - return + + var apiError smithy.APIError + if errors.As(err, &apiError) { + switch apiError.ErrorCode() { + case sqsAccessDeniedErrorCode: + // stop polling if auth error is encountered + receiver.metrics.setSQSMessagesWaiting(int64(count)) + return + } } + receiver.metrics.setSQSMessagesWaiting(int64(count)) } } From 343225562de4234e08736bee649a8b8798d596a0 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 27 Feb 2023 09:18:20 -0500 Subject: [PATCH 18/19] removing do once --- x-pack/filebeat/input/awss3/metrics.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 5ddba3610f1..9d42b8bbe69 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -6,7 +6,6 @@ package awss3 import ( "io" - "sync" "github.com/rcrowley/go-metrics" @@ -16,9 +15,8 @@ import ( ) type inputMetrics struct { - registry *monitoring.Registry - unregister func() - initSQSMessageWaitingOnce sync.Once + registry *monitoring.Registry + unregister func() sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully). sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions. @@ -50,9 +48,7 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) { return } - m.initSQSMessageWaitingOnce.Do(func() { - m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") - }) + m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") m.sqsMessagesWaiting.Set(count) } From 673f7a0b62f07ed8cc434c67e05276518069a338 Mon Sep 17 00:00:00 2001 From: Kylie Geller Date: Mon, 27 Feb 2023 16:35:52 -0500 Subject: [PATCH 19/19] reworked logic --- x-pack/filebeat/input/awss3/metrics.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 9d42b8bbe69..3dcce8eee14 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -43,12 +43,14 @@ func (m *inputMetrics) Close() { } func (m *inputMetrics) setSQSMessagesWaiting(count int64) { - if count == -1 && m.sqsMessagesWaiting == nil { + if m.sqsMessagesWaiting == nil { // if metric not initialized, and count is -1, do nothing - return + if count == -1 { + return + } + m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") } - m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") m.sqsMessagesWaiting.Set(count) }