Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding aws-s3 metric for sqs worker utilization #34793

Merged
merged 30 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d91dc32
added sqs worker utilization metric
kgeller Mar 9, 2023
8ac5ec2
little tweaks
kgeller Mar 9, 2023
998f694
changelog
kgeller Mar 9, 2023
0f1bda6
Merge branch 'main' into s3-metric-utilization
kgeller Mar 9, 2023
d365873
fixing args for test
kgeller Mar 9, 2023
f46a5c7
Merge branch 's3-metric-utilization' of github.com:kgeller/beats into…
kgeller Mar 9, 2023
6a9f132
more test param fixes
kgeller Mar 9, 2023
a5d12aa
review feedback updates
kgeller Mar 10, 2023
a3d0669
cleanup and fix
kgeller Mar 10, 2023
092065c
null check
kgeller Mar 10, 2023
223434b
Update x-pack/filebeat/input/awss3/input.go
kgeller Mar 20, 2023
68d507e
Merge branch 'main' into s3-metric-utilization
kgeller Mar 20, 2023
eba0271
review cleanup: renaming and organizing better
kgeller Mar 20, 2023
78a1c0e
Merge branch 'main' into s3-metric-utilization
kgeller Mar 20, 2023
1bd00ef
adapting chan logic
kgeller Mar 23, 2023
4f81bf2
Merge branch 'main' into s3-metric-utilization
kgeller Mar 23, 2023
48a5edc
refactored to have polling encapsulated by metrics, and new test veri…
kgeller Mar 27, 2023
bdd616b
Merge branch 'main' of https://github.com/elastic/beats into s3-metri…
kgeller Mar 27, 2023
4657e0a
poke
kgeller Mar 27, 2023
0e53261
Merge branch 'main' of https://github.com/elastic/beats into s3-metri…
kgeller Mar 27, 2023
84127c8
formatting
kgeller Mar 27, 2023
a26f275
Update x-pack/filebeat/input/awss3/metrics_test.go
kgeller Mar 28, 2023
045277d
Update x-pack/filebeat/input/awss3/metrics_test.go
kgeller Mar 28, 2023
23368f3
tweak naming
kgeller Mar 28, 2023
a8c6a97
Merge branch 's3-metric-utilization' of github.com:kgeller/beats into…
kgeller Mar 28, 2023
fb6a17d
handling of long running workers
kgeller Mar 28, 2023
0e41f19
Merge branch 'main' into s3-metric-utilization
kgeller Mar 28, 2023
ed4cfe8
Merge branch 'main' into s3-metric-utilization
kgeller Mar 29, 2023
32034e1
Track running time of SQS worker routines
andrewkroh Mar 30, 2023
99eb889
Merge branch 'main' into s3-metric-utilization
andrewkroh Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add Basic Authentication support on constructed requests to CEL input {issue}34609[34609] {pull}34689[34689]
- Add string manipulation extensions to CEL input {issue}34610[34610] {pull}34689[34689]
- Add unix socket log parsing for nginx ingress_controller {pull}34732[34732]
- Added metric `sqs_worker_utilization` for aws-s3 input. {pull}34793[34793]
- Improve CEL input documentation {pull}34831[34831]
- Add metrics documentation for CEL and AWS CloudWatch inputs. {issue}34887[34887] {pull}34889[34889]
- Register MIME handlers for CSV types in CEL input. {pull}34934[34934]
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ observe the activity of the input.
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
Expand Down
12 changes: 6 additions & 6 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
defer receiver.metrics.Close()

// Poll sqs waiting metric periodically in the background.
// Poll metrics periodically in the background
go pollSqsWaitingMetric(ctx, receiver)

if err := receiver.Receive(ctx); err != nil {
Expand Down Expand Up @@ -208,9 +208,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if err != nil {
return nil, err
}
in.metrics = newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages)
sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down Expand Up @@ -281,8 +281,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
in.metrics = newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig)
in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages)
s3Poller := newS3Poller(log.Named("s3_poller"),
in.metrics,
s3API,
Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
return testing.Benchmark(func(b *testing.B) {
log := logp.NewLogger(inputName)
metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics("test_id", metricRegistry)
metrics := newInputMetrics("test_id", metricRegistry, maxMessagesInflight)
sqsAPI := newConstantSQS()
s3API := newConstantS3(t)
pipeline := &fakePipeline{}
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{})
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}, maxMessagesInflight)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory, maxMessagesInflight)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -302,7 +302,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
log.Infof("benchmark with %d number of workers", numberOfWorkers)

metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics("test_id", metricRegistry)
metrics := newInputMetrics("test_id", metricRegistry, numberOfWorkers)

client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) {
event.Private.(*awscommon.EventACKTracker).ACK()
Expand Down Expand Up @@ -348,7 +348,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{})
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}, numberOfWorkers)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0)
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -430,4 +431,5 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0)
}
78 changes: 68 additions & 10 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
package awss3

import (
"context"
"io"
"sync/atomic"
"time"

"github.com/rcrowley/go-metrics"

Expand All @@ -17,15 +20,20 @@ import (
type inputMetrics struct {
registry *monitoring.Registry
unregister func()
ctx context.Context
cancel context.CancelFunc

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
utilizationNanos int64

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsWorkerUtilization *monitoring.Float // Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded.
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed.
Expand All @@ -37,11 +45,17 @@ type inputMetrics struct {
s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing).
}

// Close removes the metrics from the registry.
// Close cancels the context and removes the metrics from the registry.
func (m *inputMetrics) Close() {
m.cancel()
m.unregister()
}

func (m *inputMetrics) updateSQSProcessingTime(d time.Duration) {
m.sqsMessageProcessingTime.Update(d.Nanoseconds())
atomic.AddInt64(&m.utilizationNanos, d.Nanoseconds())
}

func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
if m.sqsMessagesWaiting == nil {
// if metric not initialized, and count is -1, do nothing
Expand All @@ -54,17 +68,58 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
m.sqsMessagesWaiting.Set(count)
}

func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
func calculateUtilizationAndReset(d time.Duration, maxMessagesInflight int, m *inputMetrics) float64 {
maxUtilization := float64(d) * float64(maxMessagesInflight)
utilizedRate := float64(atomic.SwapInt64(&m.utilizationNanos, 0)) / maxUtilization

if utilizedRate == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we have two workers operating? One is long running (taking more than 5 seconds) and one is operating on short jobs (like completing every 100ms).

// Falling back to inflight stats when the workers are long running
inflight := m.sqsMessagesInflight.Get()
if inflight > 0 {
return float64(inflight) / float64(maxMessagesInflight)
}
} else if utilizedRate > 1 {
// Normalizing the utilization after long running workers
return 1
}

return utilizedRate
}

func (m *inputMetrics) pollSQSUtilizationMetric(ctx context.Context, maxMessagesInflight int) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()

lastTick := time.Now()
for {
select {
case <-ctx.Done():
return
case tick := <-t.C:
duration := tick.Sub(lastTick)
utilizedRate := calculateUtilizationAndReset(duration, maxMessagesInflight, m)
m.sqsWorkerUtilization.Set(utilizedRate)
// reset for the next polling duration
lastTick = tick
}
}
}

func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers int) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)
ctx, cancelInputCtx := context.WithCancel(context.Background())

out := &inputMetrics{
registry: reg,
unregister: unreg,
ctx: ctx,
cancel: cancelInputCtx,
sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"),
sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"),
sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"),
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"),
Expand All @@ -82,6 +137,9 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.

go out.pollSQSUtilizationMetric(ctx, maxWorkers)

return out
}

Expand Down
39 changes: 38 additions & 1 deletion x-pack/filebeat/input/awss3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package awss3

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/monitoring"
)
Expand All @@ -17,10 +20,44 @@ import (
func TestInputMetricsClose(t *testing.T) {
reg := monitoring.NewRegistry()

metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg)
metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg, 1)
metrics.Close()

reg.Do(monitoring.Full, func(s string, _ interface{}) {
t.Errorf("registry should be empty, but found %v", s)
})
}

func TestInputMetricsSQSWorkerUtilization(t *testing.T) {
reg := monitoring.NewRegistry()
const maxWorkers = 1
metrics := newInputMetrics("test", reg, maxWorkers)

metrics.utilizationNanos = time.Second.Nanoseconds()

utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics)
assert.Greater(t, utilization, 0.95)
}

func TestInputMetricsSQSWorkerUtilization_LongRunningWorkers(t *testing.T) {
reg := monitoring.NewRegistry()
const maxWorkers = 1
metrics := newInputMetrics("test", reg, maxWorkers)

metrics.utilizationNanos = time.Minute.Nanoseconds()

utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics)
assert.Equal(t, utilization, 1.0)
}

func TestInputMetricsSQSWorkerUtilization_InFlightWorkers(t *testing.T) {
reg := monitoring.NewRegistry()
const maxWorkers = 1
metrics := newInputMetrics("test", reg, maxWorkers)

metrics.utilizationNanos = 0
metrics.sqsMessagesInflight.Set(maxWorkers)

utilization := calculateUtilizationAndReset(time.Second, maxWorkers, metrics)
assert.Equal(t, utilization, 1.0)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newS3Poller(log *logp.Logger,
bucketPollInterval time.Duration,
) *s3Poller {
if metrics == nil {
metrics = newInputMetrics("", monitoring.NewRegistry())
metrics = newInputMetrics("", monitoring.NewRegistry(), numberOfWorkers)
}
return &s3Poller{
numberOfWorkers: numberOfWorkers,
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type s3ObjectProcessorFactory struct {
backupConfig backupConfig
}

func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory {
func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig, maxWorkers int) *s3ObjectProcessorFactory {
if metrics == nil {
metrics = newInputMetrics("", monitoring.NewRegistry())
metrics = newInputMetrics("", monitoring.NewRegistry(), maxWorkers)
}
if len(sel) == 0 {
sel = []fileSelectorConfig{
Expand Down
14 changes: 7 additions & 7 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestS3ObjectProcessor(t *testing.T) {
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object()
require.Error(t, err)
Expand All @@ -166,7 +166,7 @@ func TestS3ObjectProcessor(t *testing.T) {
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object()
require.Error(t, err)
Expand All @@ -193,7 +193,7 @@ func TestS3ObjectProcessor(t *testing.T) {
Times(2),
)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object()
require.NoError(t, err)
Expand All @@ -219,7 +219,7 @@ func TestS3ObjectProcessor(t *testing.T) {
Return(nil, nil),
)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg)
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object()
require.NoError(t, err)
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestS3ObjectProcessor(t *testing.T) {
Return(nil, nil),
)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg)
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object()
require.NoError(t, err)
Expand All @@ -276,7 +276,7 @@ func TestS3ObjectProcessor(t *testing.T) {
Return(nil, nil),
)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg)
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupCfg, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).FinalizeS3Object()
require.NoError(t, err)
Expand Down Expand Up @@ -322,7 +322,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int,
Times(numEvents),
)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{})
s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, selectors, backupConfig{}, 1)
ack := awscommon.NewEventACKTracker(ctx)
err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object()

Expand Down
Loading