Skip to content

Commit

Permalink
fix(subscriber): do not update on match (#303)
Browse files Browse the repository at this point in the history
This commit fixes a bug whereby the subscriber would run
`PutSubscriptionFilter` for every previously subscribed log group on
discovery.
  • Loading branch information
jta authored Jun 12, 2024
1 parent e950207 commit 9068f5f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 15 deletions.
3 changes: 2 additions & 1 deletion apps/logwriter/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ Resources:
Condition: HasDiscoveryRate
DependsOn: Subscriber
Properties:
Description: "Subscribe new log groups"
Description: >-
Subscribe new log groups. Requires CloudTrail in target region.
State: ENABLED
EventPattern:
source:
Expand Down
24 changes: 13 additions & 11 deletions pkg/handler/subscriber/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func subscriptionFilterEquals(a, b types.SubscriptionFilter) bool {
// cloudwatch API in order to converge to our intended configuration state.
func (h *Handler) SubscriptionFilterDiff(subscriptionFilters []types.SubscriptionFilter) (actions []any) {
var (
deleted, updated int
deleteOnly = aws.ToString(h.subscriptionFilter.DestinationArn) == ""
deleted, found int
deleteOnly = aws.ToString(h.subscriptionFilter.DestinationArn) == ""
)

for _, f := range subscriptionFilters {
Expand All @@ -117,18 +117,20 @@ func (h *Handler) SubscriptionFilterDiff(subscriptionFilters []types.Subscriptio
actions = append(actions, &cloudwatchlogs.DeleteSubscriptionFilterInput{
FilterName: f.FilterName,
})
} else if !subscriptionFilterEquals(h.subscriptionFilter, f) {
updated++
actions = append(actions, &cloudwatchlogs.PutSubscriptionFilterInput{
FilterName: h.subscriptionFilter.FilterName,
FilterPattern: h.subscriptionFilter.FilterPattern,
DestinationArn: h.subscriptionFilter.DestinationArn,
RoleArn: h.subscriptionFilter.RoleArn,
})
} else {
found++
if !subscriptionFilterEquals(h.subscriptionFilter, f) {
actions = append(actions, &cloudwatchlogs.PutSubscriptionFilterInput{
FilterName: h.subscriptionFilter.FilterName,
FilterPattern: h.subscriptionFilter.FilterPattern,
DestinationArn: h.subscriptionFilter.DestinationArn,
RoleArn: h.subscriptionFilter.RoleArn,
})
}
}
}

if !deleteOnly && updated == 0 && len(subscriptionFilters)-deleted < MaxSubscriptionFilterCount {
if !deleteOnly && found == 0 && len(subscriptionFilters)-deleted < MaxSubscriptionFilterCount {
actions = append(actions, &cloudwatchlogs.PutSubscriptionFilterInput{
FilterName: h.subscriptionFilter.FilterName,
FilterPattern: h.subscriptionFilter.FilterPattern,
Expand Down
24 changes: 21 additions & 3 deletions pkg/handler/subscriber/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ func TestSubscriptionFilterDiff(t *testing.T) {
},
},
{
/*
Do nothing if we exceed the two subscription filter limit
*/
Configure: types.SubscriptionFilter{
FilterName: aws.String("observe"),
DestinationArn: aws.String("arn:aws:lambda:us-west-2:123456789012:function:example"),
Expand All @@ -202,6 +199,9 @@ func TestSubscriptionFilterDiff(t *testing.T) {
},
},
{
/*
Do nothing if we exceed the two subscription filter limit
*/
Configure: types.SubscriptionFilter{
FilterName: aws.String("observe"),
DestinationArn: aws.String("arn:aws:lambda:us-west-2:123456789012:function:example"),
Expand All @@ -216,6 +216,24 @@ func TestSubscriptionFilterDiff(t *testing.T) {
},
// no expected actions
},
{
// Do nothing if filter already matches.
Configure: types.SubscriptionFilter{
FilterName: aws.String("observe"),
DestinationArn: aws.String("arn:aws:lambda:us-west-2:123456789012:function:example"),
},
Existing: []types.SubscriptionFilter{
{
FilterName: aws.String("observe"),
DestinationArn: aws.String("arn:aws:lambda:us-west-2:123456789012:function:example"),
},
{
FilterName: aws.String("something-else"),
DestinationArn: aws.String("arn:aws:lambda:us-west-2:123456789012:function:another"),
},
},
// no expected actions
},
}

for i, tt := range testcases {
Expand Down
2 changes: 2 additions & 0 deletions pkg/lambda/subscriber/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) {
logger := logging.New(cfg.Logging)
logger.V(4).Info("initialized", "config", cfg)

tracing.SetLogger(logger)

tracerProvider, err := tracing.NewTracerProvider(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize tracing: %w", err)
Expand Down

0 comments on commit 9068f5f

Please sign in to comment.