Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_sns_topic_subscription: add filter_policy_scope attribute #28004

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/28004.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_sns_topic_subscription: Add `filter_policy_scope` argument in support of [SNS message filtering](https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html)
```
13 changes: 13 additions & 0 deletions internal/service/sns/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
SubscriptionAttributeNameDeliveryPolicy = "DeliveryPolicy"
SubscriptionAttributeNameEndpoint = "Endpoint"
SubscriptionAttributeNameFilterPolicy = "FilterPolicy"
SubscriptionAttributeNameFilterPolicyScope = "FilterPolicyScope"
SubscriptionAttributeNameOwner = "Owner"
SubscriptionAttributeNamePendingConfirmation = "PendingConfirmation"
SubscriptionAttributeNameProtocol = "Protocol"
Expand Down Expand Up @@ -97,3 +98,15 @@ const (
const (
propagationTimeout = 2 * time.Minute
)

const (
SubscriptionFilterPolicyScopeMessageAttributes = "MessageAttributes"
SubscriptionFilterPolicyScopeMessageBody = "MessageBody"
)

func SubscriptionFilterPolicyScope_Values() []string {
return []string{
SubscriptionFilterPolicyScopeMessageAttributes,
SubscriptionFilterPolicyScopeMessageBody,
}
}
35 changes: 35 additions & 0 deletions internal/service/sns/topic_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
Expand Down Expand Up @@ -64,6 +65,12 @@ var (
return json
},
},
"filter_policy_scope": {
Type: schema.TypeString,
Optional: true,
Computed: true, // When filter_policy is set, this defaults to MessageAttributes.
ValidateFunc: validation.StringInSlice(SubscriptionFilterPolicyScope_Values(), false),
},
"owner_id": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -108,6 +115,7 @@ var (
"delivery_policy": SubscriptionAttributeNameDeliveryPolicy,
"endpoint": SubscriptionAttributeNameEndpoint,
"filter_policy": SubscriptionAttributeNameFilterPolicy,
"filter_policy_scope": SubscriptionAttributeNameFilterPolicyScope,
"owner_id": SubscriptionAttributeNameOwner,
"pending_confirmation": SubscriptionAttributeNamePendingConfirmation,
"protocol": SubscriptionAttributeNameProtocol,
Expand All @@ -129,6 +137,8 @@ func ResourceTopicSubscription() *schema.Resource {
State: schema.ImportStatePassthrough,
},

CustomizeDiff: resourceTopicSubscriptionCustomizeDiff,

Schema: subscriptionSchema,
}
}
Expand Down Expand Up @@ -392,3 +402,28 @@ func normalizeTopicSubscriptionDeliveryPolicy(policy string) ([]byte, error) {

return b.Bytes(), nil
}

func resourceTopicSubscriptionCustomizeDiff(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
hasPolicy := diff.Get("filter_policy").(string) != ""
hasScope := !diff.GetRawConfig().GetAttr("filter_policy_scope").IsNull()

if hasPolicy && !hasScope {
// When the scope is removed from configuration, the API will
// continue reading back the last value so long as the policy
// itself still exists. The expected result would be to revert
// to the default value of the attribute (MessageAttributes).
return diff.SetNew("filter_policy_scope", SubscriptionFilterPolicyScopeMessageAttributes)
}

if !hasPolicy && !hasScope {
// When the policy is not set, the API silently drops the scope.
return diff.Clear("filter_policy_scope")
}

if !hasPolicy && hasScope {
// Make it explicit that the scope doesn't exist without a policy.
return errors.New("filter_policy is required when filter_policy_scope is set")
}

return nil
}
184 changes: 184 additions & 0 deletions internal/service/sns/topic_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestAccSNSTopicSubscription_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "delivery_policy", ""),
resource.TestCheckResourceAttrPair(resourceName, "endpoint", "aws_sqs_queue.test", "arn"),
resource.TestCheckResourceAttr(resourceName, "filter_policy", ""),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", ""),
resource.TestCheckResourceAttr(resourceName, "pending_confirmation", "false"),
resource.TestCheckResourceAttr(resourceName, "protocol", "sqs"),
resource.TestCheckResourceAttr(resourceName, "raw_message_delivery", "false"),
Expand Down Expand Up @@ -123,6 +124,7 @@ func TestAccSNSTopicSubscription_filterPolicy(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy", filterPolicy1),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageAttributes"),
),
},
{
Expand All @@ -140,6 +142,7 @@ func TestAccSNSTopicSubscription_filterPolicy(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy", filterPolicy2),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageAttributes"),
),
},
// Test attribute removal
Expand All @@ -148,12 +151,150 @@ func TestAccSNSTopicSubscription_filterPolicy(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy", ""),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", ""),
),
},
},
})
}

func TestAccSNSTopicSubscription_filterPolicyScope(t *testing.T) {
var attributes map[string]string
resourceName := "aws_sns_topic_subscription.test"
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, sns.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckTopicSubscriptionDestroy,
Steps: []resource.TestStep{
{
Config: testAccTopicSubscriptionConfig_filterPolicyScope(rName, strconv.Quote("MessageBody")),
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageBody"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
{
Config: testAccTopicSubscriptionConfig_filterPolicyScope(rName, strconv.Quote("MessageAttributes")),
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageAttributes"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
{
Config: testAccTopicSubscriptionConfig_filterPolicyScope(rName, strconv.Quote("MessageBody")),
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageBody"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
{
Config: testAccTopicSubscriptionConfig_filterPolicyScope(rName, "null"),
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageAttributes"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
{
Config: testAccTopicSubscriptionConfig_filterPolicyScope(rName, strconv.Quote("MessageBody")),
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", "MessageBody"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
{
Config: testAccTopicSubscriptionConfig_basic(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckTopicSubscriptionExists(resourceName, &attributes),
resource.TestCheckResourceAttr(resourceName, "filter_policy_scope", ""),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"confirmation_timeout_in_minutes",
"endpoint_auto_confirms",
},
},
},
})
}

func TestAccSNSTopicSubscription_filterPolicyScope_policyNotSet(t *testing.T) {
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, sns.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckTopicSubscriptionDestroy,
Steps: []resource.TestStep{
{
Config: testAccTopicSubscriptionConfig_filterPolicyScope_policyNotSet(rName),
ExpectError: regexp.MustCompile(`filter_policy is required when filter_policy_scope is set`),
},
},
})
}

func TestAccSNSTopicSubscription_deliveryPolicy(t *testing.T) {
var attributes map[string]string
resourceName := "aws_sns_topic_subscription.test"
Expand Down Expand Up @@ -629,6 +770,49 @@ resource "aws_sns_topic_subscription" "test" {
`, rName, policy)
}

func testAccTopicSubscriptionConfig_filterPolicyScope(rName, scope string) string {
return fmt.Sprintf(`
resource "aws_sns_topic" "test" {
name = %[1]q
}

resource "aws_sqs_queue" "test" {
name = %[1]q

sqs_managed_sse_enabled = true
}

resource "aws_sns_topic_subscription" "test" {
topic_arn = aws_sns_topic.test.arn
protocol = "sqs"
endpoint = aws_sqs_queue.test.arn
filter_policy = jsonencode({ key1 = ["value1"] })
filter_policy_scope = %[2]s
}
`, rName, scope)
}

func testAccTopicSubscriptionConfig_filterPolicyScope_policyNotSet(rName string) string {
return fmt.Sprintf(`
resource "aws_sns_topic" "test" {
name = %[1]q
}

resource "aws_sqs_queue" "test" {
name = %[1]q

sqs_managed_sse_enabled = true
}

resource "aws_sns_topic_subscription" "test" {
topic_arn = aws_sns_topic.test.arn
protocol = "sqs"
endpoint = aws_sqs_queue.test.arn
filter_policy_scope = "MessageBody"
}
`, rName)
}

func testAccTopicSubscriptionConfig_deliveryPolicy(rName, policy string) string {
return fmt.Sprintf(`
resource "aws_sns_topic" "test" {
Expand Down
1 change: 1 addition & 0 deletions website/docs/r/sns_topic_subscription.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ The following arguments are optional:
* `delivery_policy` - (Optional) JSON String with the delivery policy (retries, backoff, etc.) that will be used in the subscription - this only applies to HTTP/S subscriptions. Refer to the [SNS docs](https://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html) for more details.
* `endpoint_auto_confirms` - (Optional) Whether the endpoint is capable of [auto confirming subscription](http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html#SendMessageToHttp.prepare) (e.g., PagerDuty). Default is `false`.
* `filter_policy` - (Optional) JSON String with the filter policy that will be used in the subscription to filter messages seen by the target resource. Refer to the [SNS docs](https://docs.aws.amazon.com/sns/latest/dg/message-filtering.html) for more details.
* `filter_policy_scope` - (Optional) Whether the `filter_policy` applies to `MessageAttributes` (default) or `MessageBody`.
* `raw_message_delivery` - (Optional) Whether to enable raw message delivery (the original message is directly passed, not wrapped in JSON with the original message in the message property). Default is `false`.
* `redrive_policy` - (Optional) JSON String with the redrive policy that will be used in the subscription. Refer to the [SNS docs](https://docs.aws.amazon.com/sns/latest/dg/sns-dead-letter-queues.html#how-messages-moved-into-dead-letter-queue) for more details.

Expand Down