diff --git a/README.md b/README.md index 2e8341b..ec009d1 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,8 @@ Fake Amazon Simple Notification Service (SNS) for local development. Supports: - Get/Set subscription attribute endpoints - Publish message - Subscription persistence to file, including subscription attributes -- Integrations with (Fake-)SQS, File, HTTP, RabbitMQ, Slack, and Lambda +- Subscription filtering (currently under development with some alpha features) +- Integrations with SQS, File, HTTP, RabbitMQ, Slack, and Lambda ## Usage @@ -75,9 +76,40 @@ cd example docker-compose up ``` +## Features +### Subscriptions +#### Supported Subscription Attributes (See [SetSubscriptionAttributes](https://docs.aws.amazon.com/sns/latest/api/API_SetSubscriptionAttributes.html)) +* `RawMessageDelivery` - NOTE: Messages sent via a Slack endpoint are always sent raw. +* `FilterPolicyScope` - Both `MessageBody` and `MessageAttributes` are supported (`MessageAttributes` is the default behavior). +* `FilterPolicy` - Currently under development, supported for `MessageBody` and `MessageAttributes` with some limitations: + + | Feature | Supported | + |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------| + | [Policy Complexity Constraints](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-common-constraints) | No | + | [Policy Constraints for MessageAttribute-based filtering](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) | Yes (only `String` and `Number`; `String.Array` is not currently supported) | + | [Nested Constraints](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) for payload-based filtering | No (local-sns only supports top-level attribute filtering for `MessageBody` filter policies) | + | [Exact String match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#string-exact-matching) | Yes | + | [String anything-but match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#string-anything-but-matching) | No | + | [String prefix match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#string-prefix-matching) | No | + | [String suffix match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#ip-suffix-matching) | No | + | [IP Address match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#ip-address-matching) | No | + | [Exact Number match](https://docs.aws.amazon.com/sns/latest/dg/numeric-value-matching.html#numeric-exact-matching) | Yes | + | [Numeric anything-but match](https://docs.aws.amazon.com/sns/latest/dg/numeric-value-matching.html#numeric-anything-but-matching) | No | + | [Numeric Value Range match](https://docs.aws.amazon.com/sns/latest/dg/numeric-value-matching.html#numeric-value-range-matching) | No | + | [And Logic](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) | Yes | + | [Or Logic](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) | Yes | + | [Or Operator](https://docs.aws.amazon.com/sns/latest/dg/and-or-logic.html#or-operator) | No | + | [Key Matching](https://docs.aws.amazon.com/sns/latest/dg/attribute-key-matching.html) | No | + ## Development This project uses Kotlin, [Vert.X](https://vertx.io), and [Apache Camel](https://camel.apache.org) for message routing. +Be sure to read the [SNS documentation](https://docs.aws.amazon.com/sns/latest/dg/welcome.html). The [API docs](https://docs.aws.amazon.com/sns/latest/api/API_Operations.html) in particular are useful. + +It's also useful to run the equivalent `aws sns` CLI command with the `--debug` flag to better understand what the request and response payloads look like. + +You can also execute an `aws sns --endpoint-url ` command to point the AWS CLI to the `local-sns` instance. + ### Unit and Integration tests `./gradlew test` diff --git a/example/config/db.json b/example/config/db.json index 2ac64de..77b09df 100644 --- a/example/config/db.json +++ b/example/config/db.json @@ -4,35 +4,35 @@ "subscriptions" : [ { "arn" : "e9126059-9eab-4b37-8194-e0d64dfb2045", "owner" : "", - "topicArn" : "arn:aws:sns:us-east-1:1465414804035:test1", + "topicArn" : "arn:aws:sns:us-east-1:0123456789012:test1", "protocol" : "sqs", "endpoint" : "aws2-sqs://queue1?accessKey=xxx&secretKey=xxx®ion=us-east-1&trustAllCertificates=true&overrideEndpoint=true&uriEndpointOverride=http://sqs:9324", "subscriptionAttributes" : { - "FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [10.5], \"sold\": [true] }", + "FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [{\"numeric\": [\"=\", 10.5]}], \"sold\": [true] }", "FilterPolicyScope" : "MessageBody" } }, { "arn" : "6df4ed2b-a650-4f7c-910a-1a89c7cae5a6", "owner" : "", - "topicArn" : "arn:aws:sns:us-east-1:1465414804035:test1", + "topicArn" : "arn:aws:sns:us-east-1:0123456789012:test1", "protocol" : "file", "endpoint" : "file://tmp/logs?fileName=messages.log&fileExist=Append&appendChars=\\n", "subscriptionAttributes" : { - "FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [10.5], \"sold\": [true] }", + "FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [{\"numeric\": [\"=\", 10.5]}], \"sold\": [true] }", "RawMessageDelivery" : "true" } }, { "arn" : "25da5e63-d5d3-469d-9e0c-e33539948bd1", "owner" : "", - "topicArn" : "arn:aws:sns:us-east-1:1465414804035:test2", + "topicArn" : "arn:aws:sns:us-east-1:0123456789012:test2", "protocol" : "file", "endpoint" : "file://tmp/logs?fileName=no-attributes.log&fileExist=Append&appendChars=\\n" } ], "topics" : [ { - "arn" : "arn:aws:sns:us-east-1:1465414804035:test1", + "arn" : "arn:aws:sns:us-east-1:0123456789012:test1", "name" : "test1" }, { - "arn" : "arn:aws:sns:us-east-1:1465414804035:test2", + "arn" : "arn:aws:sns:us-east-1:0123456789012:test2", "name" : "test2" } ] } \ No newline at end of file diff --git a/src/main/kotlin/com/jameskbride/localsns/routes/topics/publishRoute.kt b/src/main/kotlin/com/jameskbride/localsns/routes/topics/publishRoute.kt index 8e7458b..b4087d8 100644 --- a/src/main/kotlin/com/jameskbride/localsns/routes/topics/publishRoute.kt +++ b/src/main/kotlin/com/jameskbride/localsns/routes/topics/publishRoute.kt @@ -210,15 +210,10 @@ private fun matchesFilterPolicy( when (messageAttribute!!.dataType) { "Number" -> { val parsedAttribute = messageAttribute.value.toDouble() - permittedValues.contains(parsedAttribute) + attributeMatchesPolicy(permittedValues, parsedAttribute) } else -> { - permittedValues.any {permittedValue -> - when (permittedValue) { - (permittedValue is Boolean) -> permittedValue.toString() == messageAttribute.value - else -> permittedValue == messageAttribute.value - } - } + attributeMatchesPolicy(permittedValues, messageAttribute.value) } } } @@ -230,18 +225,74 @@ private fun matchesFilterPolicy(subscription: Subscription, message:String): Boo val filterPolicySubscriptionAttribute = subscription.subscriptionAttributes[FILTER_POLICY] val filterPolicy = JsonObject(filterPolicySubscriptionAttribute) val messageJson = JsonObject(message) - val matched = filterPolicy.map.all { - if (!messageJson.containsKey(it.key)) { + val matched = filterPolicy.map.all { filterPolicyAttribute -> + if (!messageJson.containsKey(filterPolicyAttribute.key)) { false } else { - val permittedValues = it.value as List<*> - val messageAttribute = messageJson.getValue(it.key) - permittedValues.contains(messageAttribute!!) + val attribute = messageJson.getValue(filterPolicyAttribute.key) + attributeMatchesPolicy(filterPolicyAttribute.value as List<*>, attribute) } } return matched } +private fun attributeMatchesPolicy( + attributeMatchPolicy: List<*>, + value: Any? +): Boolean { + return attributeMatchPolicy.any { + when (val permittedValue = attributeMatchPolicy.firstOrNull()) { + is String -> { + stringMatches(attributeMatchPolicy, value) + } + + is LinkedHashMap<*, *> -> { + if (permittedValue.containsKey("numeric")) { + numericMatches(permittedValue, value) + } else false + } + + is Boolean -> { + booleanMatches(permittedValue, value) + } + + else -> false + } + } +} + +private fun numericMatches(permittedValue: LinkedHashMap<*, *>, attribute: Any?): Boolean { + val matchParams = permittedValue["numeric"] as List<*> + return when (matchParams.size) { + 2 -> { + numberMatches(matchParams, attribute) + } + + 4 -> { + false + } + + else -> false + } +} + +private fun booleanMatches(permittedValue: Any?, attribute: Any?) = permittedValue.toString() == attribute.toString() + +private fun stringMatches(permittedValues: List<*>, attribute: Any?) = + permittedValues.map { it.toString() }.contains(attribute) + +private fun numberMatches(matchParams: List<*>, value: Any?): Boolean { + val operator = matchParams[0] + return when (operator) { + "=" -> numEquals(value as Double, matchParams[1] as Double) + else -> false + } +} + +private fun numEquals(messageAttribute: Double, filterPolicyValue: Double):Boolean { + return messageAttribute == filterPolicyValue +} + private fun publishToSqs( subscription: Subscription, message: String, diff --git a/src/test/kotlin/com/jameskbride/localsns/PublishRouteIntegrationTest.kt b/src/test/kotlin/com/jameskbride/localsns/PublishRouteIntegrationTest.kt index 36f0717..a7d53b4 100644 --- a/src/test/kotlin/com/jameskbride/localsns/PublishRouteIntegrationTest.kt +++ b/src/test/kotlin/com/jameskbride/localsns/PublishRouteIntegrationTest.kt @@ -9,6 +9,7 @@ import com.typesafe.config.ConfigFactory import io.vertx.core.Vertx import io.vertx.core.http.HttpServer import io.vertx.core.json.Json +import io.vertx.core.json.JsonArray import io.vertx.core.json.JsonObject import io.vertx.ext.web.Router import io.vertx.junit5.VertxExtension @@ -194,15 +195,53 @@ class PublishRouteIntegrationTest: BaseTest() { val topic = createTopicModel("topic1") val queueName = "filter-policy-multiple-queue" val endpoint = createQueue(queueName) - data class FilterPolicy(val status:List, val amount:List, val sold:List): Serializable - val filterPolicy = FilterPolicy(status=listOf("not_sent"), amount=listOf(10.5), sold=listOf(true)) - val gson = Gson() + data class FilterPolicy(val status:List, val amount:List, val sold:List): Serializable + val numericMatch = buildNumericPolicy(listOf("=", 10.5)) + val filterPolicy = JsonObject.mapFrom(FilterPolicy(status=listOf("not_sent"), amount=listOf(numericMatch), sold=listOf(true))) subscribe( topic.arn, endpoint, "sqs", mapOf( - "FilterPolicy" to gson.toJson(filterPolicy) + "FilterPolicy" to filterPolicy.toString() + ) + ) + val message = "Hello, SNS!" + + val request = publishRequest( + topic, + message, + messageAttributes = listOf( + MessageAttribute("status", "not_sent"), + MessageAttribute("amount", "10.5", dataType = "Number"), + MessageAttribute("sold", "true"), + ) + ) + snsClient.publish(request) + + val queueUrl = createQueueUrl(queueName) + startReceivingMessages(queueUrl, setOf("status", "amount", "sold")) { response -> + val messages = response.messages() + if (messages.isNotEmpty()) { + testContext.completeNow() + } + } + } + + @Test + fun `FilterPolicy MessageAttributes - it does publish with exact numeric match`(testContext: VertxTestContext) { + val topic = createTopicModel("topic1") + val queueName = "filter-policy-multiple-queue" + val endpoint = createQueue(queueName) + data class FilterPolicy(val amount:List): Serializable + val numericMatch = buildNumericPolicy(listOf("=", 10.5)) + val filterPolicy = JsonObject.mapFrom(FilterPolicy(amount=listOf(numericMatch))) + subscribe( + topic.arn, + endpoint, + "sqs", + mapOf( + "FilterPolicy" to filterPolicy.toString() ) ) val message = "Hello, SNS!" @@ -340,21 +379,22 @@ class PublishRouteIntegrationTest: BaseTest() { val topic = createTopicModel("topic1") val queueName = "filter-policy-messagebody-multiple-queue" val endpoint = createQueue(queueName) - data class FilterPolicy(val status:List, val amount:List, val sold:List): Serializable - val gson = Gson() - val filterPolicy = FilterPolicy(status=listOf("not_sent"), amount=listOf(5.0), sold=listOf(true)) + data class FilterPolicy(val status:List, val amount:List, val sold:List): Serializable + val numericMatch = buildNumericPolicy(listOf("=", 5.0)) + val filterPolicy = JsonObject.mapFrom(FilterPolicy(status=listOf("not_sent"), amount=listOf(numericMatch), sold=listOf(true))) subscribe( topic.arn, endpoint, "sqs", mapOf( - "FilterPolicy" to gson.toJson(filterPolicy), + "FilterPolicy" to filterPolicy.toString(), "FilterPolicyScope" to "MessageBody", ) ) data class Message(val status:String, val amount:Double, val sold:Boolean) val message = Message(status="not_sent", amount=5.0, sold=true) + val gson = Gson() val request = publishRequest( topic, gson.toJson(message), @@ -370,14 +410,60 @@ class PublishRouteIntegrationTest: BaseTest() { } } + private fun buildNumericPolicy(params: List): JsonObject { + val numericMatch = JsonObject() + val matchParams = JsonArray() + params.forEach { + matchParams.add(it) + } + numericMatch.put("numeric", matchParams) + return numericMatch + } + + @Test + fun `FilterPolicy MessageBody - it does publish with exact numeric match`(testContext: VertxTestContext) { + val topic = createTopicModel("topic1") + val queueName = "filter-policy-messagebody-numeric-exact-queue" + val endpoint = createQueue(queueName) + data class FilterPolicy(val amount:List): Serializable + val numericMatch = buildNumericPolicy(listOf("=", 5.0)) + val filterPolicy = JsonObject.mapFrom(FilterPolicy(amount=listOf(numericMatch))) + subscribe( + topic.arn, + endpoint, + "sqs", + mapOf( + "FilterPolicy" to filterPolicy.toString(), + "FilterPolicyScope" to "MessageBody", + ) + ) + data class Message(val status:String, val amount:Double, val sold:Boolean) + val message = Message(status="not_sent", amount=5.0, sold=true) + + val gson = Gson() + val request = publishRequest( + topic, + gson.toJson(message), + ) + snsClient.publish(request) + + val queueUrl = createQueueUrl(queueName) + startReceivingMessages(queueUrl) { response -> + val messages = response.messages() + if (messages.isNotEmpty()) { + testContext.completeNow() + } + } + } + @Test fun `FilterPolicy MessageBody - it does not publish when one or more message body attributes do not match`(testContext: VertxTestContext) { val topic = createTopicModel("topic1") val queueName = "filter-policy-messagebody-nomatch-queue" val endpoint = createQueue(queueName) data class FilterPolicy(val status:List, val amount:List, val sold:List): Serializable - val gson = Gson() val filterPolicy = FilterPolicy(status=listOf("not_sent"), amount=listOf(10.5), sold=listOf(true)) + val gson = Gson() subscribe( topic.arn, endpoint,