Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding aws-s3 metric for approximate messages waiting #34488

Merged
merged 30 commits into from
Feb 28, 2023
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
31839b7
added metric for polling the approximate message count from sqs
kgeller Feb 6, 2023
20c977d
documentation and rename
kgeller Feb 6, 2023
4472dfe
changelog
kgeller Feb 6, 2023
b8e8eac
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 6, 2023
f56a88e
initializing info for benchmarking
kgeller Feb 6, 2023
4a379f0
more foundational test pieces
kgeller Feb 6, 2023
a428d70
update ticker logic to stop on context cancelled
kgeller Feb 7, 2023
0856bf1
documentation
kgeller Feb 7, 2023
f786f5c
mockgen generation
kgeller Feb 7, 2023
2a7afd5
stop polling on permissions error
kgeller Feb 7, 2023
4e4d0f1
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 7, 2023
ffa7bbf
back to just -1, simpler
kgeller Feb 7, 2023
e651cbf
tests
kgeller Feb 7, 2023
7762df2
cr tweaks
kgeller Feb 9, 2023
820a826
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 9, 2023
94393da
updated description
kgeller Feb 9, 2023
b13ec82
adding in flag for permissions
kgeller Feb 10, 2023
83d585a
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 10, 2023
afb9e89
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 21, 2023
057982d
remove bool and update to initialize after success
kgeller Feb 21, 2023
13ff657
assorted updates; checking for 403, review cleanup
kgeller Feb 22, 2023
69691be
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 22, 2023
9294ef8
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 23, 2023
eb7dc99
fixing auth check
kgeller Feb 24, 2023
e3eda2b
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 24, 2023
3432255
removing do once
kgeller Feb 27, 2023
8211510
Merge branch 'main' of https://github.com/elastic/beats into s3-messa…
kgeller Feb 27, 2023
1305077
Merge branch 's3-messages-waiting-metric' of github.com:kgeller/beats…
kgeller Feb 27, 2023
673f7a0
reworked logic
kgeller Feb 27, 2023
a98eb74
Merge branch 'main' into s3-messages-waiting-metric
kgeller Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions x-pack/filebeat/input/awss3/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const testTimeout = 10 * time.Second

var errFakeConnectivityFailure = errors.New("fake connectivity failure")
var errFakeGetAttributeFailute = errors.New("something went wrong")
kgeller marked this conversation as resolved.
Show resolved Hide resolved

func TestSQSReceiver(t *testing.T) {
err := logp.TestingSetup()
Expand Down Expand Up @@ -109,6 +110,61 @@ func TestSQSReceiver(t *testing.T) {
})
}

func TestGetApproximateMessageCount(t *testing.T) {
err := logp.TestingSetup()
assert.Nil(t, err)

const maxMessages = 5
const count = 500
attrName := []types.QueueAttributeName{sqsApproximateNumberOfMessages}
attr := map[string]string{"ApproximateNumberOfMessages": "500"}

t.Run("GetApproximateMessageCount success", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()
mockAPI := NewMockSQSAPI(ctrl)
mockMsgHandler := NewMockSQSProcessor(ctrl)

gomock.InOrder(
mockAPI.EXPECT().
GetQueueAttributes(gomock.Any(), gomock.Eq(attrName)).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.QueueAttributeName) (map[string]string, error) {
return attr, nil
}),
)

receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler)
assert.Equal(t, count, receiver.GetApproximateMessageCount(ctx))
})

t.Run("GetApproximateMessageCount error", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()

mockAPI := NewMockSQSAPI(ctrl)
mockMsgHandler := NewMockSQSProcessor(ctrl)

gomock.InOrder(
mockAPI.EXPECT().
GetQueueAttributes(gomock.Any(), gomock.Eq(attrName)).
Times(1).
DoAndReturn(func(_ context.Context, _ []types.QueueAttributeName) (map[string]string, error) {
return nil, errFakeGetAttributeFailute
}),
)

receiver := newSQSReader(logp.NewLogger(inputName), nil, mockAPI, maxMessages, mockMsgHandler)
assert.Equal(t, -1, receiver.GetApproximateMessageCount(ctx))
})
}

func newSQSMessage(events ...s3EventV2) types.Message {
body, err := json.Marshal(s3EventsV2{Records: events})
if err != nil {
Expand Down