Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correcting Exact Number Matching #18

Merged
merged 5 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Fake Amazon Simple Notification Service (SNS) for local development. Supports:
- Get/Set subscription attribute endpoints
- Publish message
- Subscription persistence to file, including subscription attributes
- Integrations with (Fake-)SQS, File, HTTP, RabbitMQ, Slack, and Lambda
- Subscription filtering (currently under development with some alpha features)
- Integrations with SQS, File, HTTP, RabbitMQ, Slack, and Lambda

## Usage

Expand Down Expand Up @@ -75,9 +76,40 @@ cd example
docker-compose up
```

## Features
### Subscriptions
#### Supported Subscription Attributes (See [SetSubscriptionAttributes](https://docs.aws.amazon.com/sns/latest/api/API_SetSubscriptionAttributes.html))
* `RawMessageDelivery` - NOTE: Messages sent via a Slack endpoint are always sent raw.
* `FilterPolicyScope` - Both `MessageBody` and `MessageAttributes` are supported (`MessageAttributes` is the default behavior).
* `FilterPolicy` - Currently under development, supported for `MessageBody` and `MessageAttributes` with some limitations:

| Feature | Supported |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------|
| [Policy Complexity Constraints](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-common-constraints) | No |
| [Policy Constraints for MessageAttribute-based filtering](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) | Yes (only `String` and `Number`; `String.Array` is not currently supported) |
| [Nested Constraints](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) for payload-based filtering | No (local-sns only supports top-level attribute filtering for `MessageBody` filter policies) |
| [Exact String match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#string-exact-matching) | Yes |
| [String anything-but match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#string-anything-but-matching) | No |
| [String prefix match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#string-prefix-matching) | No |
| [String suffix match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#ip-suffix-matching) | No |
| [IP Address match](https://docs.aws.amazon.com/sns/latest/dg/string-value-matching.html#ip-address-matching) | No |
| [Exact Number match](https://docs.aws.amazon.com/sns/latest/dg/numeric-value-matching.html#numeric-exact-matching) | Yes |
| [Numeric anything-but match](https://docs.aws.amazon.com/sns/latest/dg/numeric-value-matching.html#numeric-anything-but-matching) | No |
| [Numeric Value Range match](https://docs.aws.amazon.com/sns/latest/dg/numeric-value-matching.html#numeric-value-range-matching) | No |
| [And Logic](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) | Yes |
| [Or Logic](https://docs.aws.amazon.com/sns/latest/dg/subscription-filter-policy-constraints.html#subscription-filter-policy-payload-constraints) | Yes |
| [Or Operator](https://docs.aws.amazon.com/sns/latest/dg/and-or-logic.html#or-operator) | No |
| [Key Matching](https://docs.aws.amazon.com/sns/latest/dg/attribute-key-matching.html) | No |

## Development
This project uses Kotlin, [Vert.X](https://vertx.io), and [Apache Camel](https://camel.apache.org) for message routing.

Be sure to read the [SNS documentation](https://docs.aws.amazon.com/sns/latest/dg/welcome.html). The [API docs](https://docs.aws.amazon.com/sns/latest/api/API_Operations.html) in particular are useful.

It's also useful to run the equivalent `aws sns` CLI command with the `--debug` flag to better understand what the request and response payloads look like.

You can also execute an `aws sns <command> --endpoint-url <local-sns url>` command to point the AWS CLI to the `local-sns` instance.

### Unit and Integration tests
`./gradlew test`

Expand Down
14 changes: 7 additions & 7 deletions example/config/db.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@
"subscriptions" : [ {
"arn" : "e9126059-9eab-4b37-8194-e0d64dfb2045",
"owner" : "",
"topicArn" : "arn:aws:sns:us-east-1:1465414804035:test1",
"topicArn" : "arn:aws:sns:us-east-1:0123456789012:test1",
"protocol" : "sqs",
"endpoint" : "aws2-sqs://queue1?accessKey=xxx&secretKey=xxx&region=us-east-1&trustAllCertificates=true&overrideEndpoint=true&uriEndpointOverride=http://sqs:9324",
"subscriptionAttributes" : {
"FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [10.5], \"sold\": [true] }",
"FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [{\"numeric\": [\"=\", 10.5]}], \"sold\": [true] }",
"FilterPolicyScope" : "MessageBody"
}
}, {
"arn" : "6df4ed2b-a650-4f7c-910a-1a89c7cae5a6",
"owner" : "",
"topicArn" : "arn:aws:sns:us-east-1:1465414804035:test1",
"topicArn" : "arn:aws:sns:us-east-1:0123456789012:test1",
"protocol" : "file",
"endpoint" : "file://tmp/logs?fileName=messages.log&fileExist=Append&appendChars=\\n",
"subscriptionAttributes" : {
"FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [10.5], \"sold\": [true] }",
"FilterPolicy" : "{\"status\": [\"not_sent\", \"resend\"], \"amount\": [{\"numeric\": [\"=\", 10.5]}], \"sold\": [true] }",
"RawMessageDelivery" : "true"
}
}, {
"arn" : "25da5e63-d5d3-469d-9e0c-e33539948bd1",
"owner" : "",
"topicArn" : "arn:aws:sns:us-east-1:1465414804035:test2",
"topicArn" : "arn:aws:sns:us-east-1:0123456789012:test2",
"protocol" : "file",
"endpoint" : "file://tmp/logs?fileName=no-attributes.log&fileExist=Append&appendChars=\\n"
} ],
"topics" : [ {
"arn" : "arn:aws:sns:us-east-1:1465414804035:test1",
"arn" : "arn:aws:sns:us-east-1:0123456789012:test1",
"name" : "test1"
}, {
"arn" : "arn:aws:sns:us-east-1:1465414804035:test2",
"arn" : "arn:aws:sns:us-east-1:0123456789012:test2",
"name" : "test2"
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,10 @@ private fun matchesFilterPolicy(
when (messageAttribute!!.dataType) {
"Number" -> {
val parsedAttribute = messageAttribute.value.toDouble()
permittedValues.contains(parsedAttribute)
attributeMatchesPolicy(permittedValues, parsedAttribute)
}
else -> {
permittedValues.any {permittedValue ->
when (permittedValue) {
(permittedValue is Boolean) -> permittedValue.toString() == messageAttribute.value
else -> permittedValue == messageAttribute.value
}
}
attributeMatchesPolicy(permittedValues, messageAttribute.value)
}
}
}
Expand All @@ -230,18 +225,74 @@ private fun matchesFilterPolicy(subscription: Subscription, message:String): Boo
val filterPolicySubscriptionAttribute = subscription.subscriptionAttributes[FILTER_POLICY]
val filterPolicy = JsonObject(filterPolicySubscriptionAttribute)
val messageJson = JsonObject(message)
val matched = filterPolicy.map.all {
if (!messageJson.containsKey(it.key)) {
val matched = filterPolicy.map.all { filterPolicyAttribute ->
if (!messageJson.containsKey(filterPolicyAttribute.key)) {
false
} else {
val permittedValues = it.value as List<*>
val messageAttribute = messageJson.getValue(it.key)
permittedValues.contains(messageAttribute!!)
val attribute = messageJson.getValue(filterPolicyAttribute.key)
attributeMatchesPolicy(filterPolicyAttribute.value as List<*>, attribute)
}
}
return matched
}

private fun attributeMatchesPolicy(
attributeMatchPolicy: List<*>,
value: Any?
): Boolean {
return attributeMatchPolicy.any {
when (val permittedValue = attributeMatchPolicy.firstOrNull()) {
is String -> {
stringMatches(attributeMatchPolicy, value)
}

is LinkedHashMap<*, *> -> {
if (permittedValue.containsKey("numeric")) {
numericMatches(permittedValue, value)
} else false
}

is Boolean -> {
booleanMatches(permittedValue, value)
}

else -> false
}
}
}

private fun numericMatches(permittedValue: LinkedHashMap<*, *>, attribute: Any?): Boolean {
val matchParams = permittedValue["numeric"] as List<*>
return when (matchParams.size) {
2 -> {
numberMatches(matchParams, attribute)
}

4 -> {
false
}

else -> false
}
}

private fun booleanMatches(permittedValue: Any?, attribute: Any?) = permittedValue.toString() == attribute.toString()

private fun stringMatches(permittedValues: List<*>, attribute: Any?) =
permittedValues.map { it.toString() }.contains(attribute)

private fun numberMatches(matchParams: List<*>, value: Any?): Boolean {
val operator = matchParams[0]
return when (operator) {
"=" -> numEquals(value as Double, matchParams[1] as Double)
else -> false
}
}

private fun numEquals(messageAttribute: Double, filterPolicyValue: Double):Boolean {
return messageAttribute == filterPolicyValue
}

private fun publishToSqs(
subscription: Subscription,
message: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.typesafe.config.ConfigFactory
import io.vertx.core.Vertx
import io.vertx.core.http.HttpServer
import io.vertx.core.json.Json
import io.vertx.core.json.JsonArray
import io.vertx.core.json.JsonObject
import io.vertx.ext.web.Router
import io.vertx.junit5.VertxExtension
Expand Down Expand Up @@ -194,15 +195,53 @@ class PublishRouteIntegrationTest: BaseTest() {
val topic = createTopicModel("topic1")
val queueName = "filter-policy-multiple-queue"
val endpoint = createQueue(queueName)
data class FilterPolicy(val status:List<String>, val amount:List<Double>, val sold:List<Boolean>): Serializable
val filterPolicy = FilterPolicy(status=listOf("not_sent"), amount=listOf(10.5), sold=listOf(true))
val gson = Gson()
data class FilterPolicy(val status:List<String>, val amount:List<JsonObject>, val sold:List<Boolean>): Serializable
val numericMatch = buildNumericPolicy(listOf("=", 10.5))
val filterPolicy = JsonObject.mapFrom(FilterPolicy(status=listOf("not_sent"), amount=listOf(numericMatch), sold=listOf(true)))
subscribe(
topic.arn,
endpoint,
"sqs",
mapOf(
"FilterPolicy" to gson.toJson(filterPolicy)
"FilterPolicy" to filterPolicy.toString()
)
)
val message = "Hello, SNS!"

val request = publishRequest(
topic,
message,
messageAttributes = listOf(
MessageAttribute("status", "not_sent"),
MessageAttribute("amount", "10.5", dataType = "Number"),
MessageAttribute("sold", "true"),
)
)
snsClient.publish(request)

val queueUrl = createQueueUrl(queueName)
startReceivingMessages(queueUrl, setOf("status", "amount", "sold")) { response ->
val messages = response.messages()
if (messages.isNotEmpty()) {
testContext.completeNow()
}
}
}

@Test
fun `FilterPolicy MessageAttributes - it does publish with exact numeric match`(testContext: VertxTestContext) {
val topic = createTopicModel("topic1")
val queueName = "filter-policy-multiple-queue"
val endpoint = createQueue(queueName)
data class FilterPolicy(val amount:List<JsonObject>): Serializable
val numericMatch = buildNumericPolicy(listOf("=", 10.5))
val filterPolicy = JsonObject.mapFrom(FilterPolicy(amount=listOf(numericMatch)))
subscribe(
topic.arn,
endpoint,
"sqs",
mapOf(
"FilterPolicy" to filterPolicy.toString()
)
)
val message = "Hello, SNS!"
Expand Down Expand Up @@ -340,21 +379,22 @@ class PublishRouteIntegrationTest: BaseTest() {
val topic = createTopicModel("topic1")
val queueName = "filter-policy-messagebody-multiple-queue"
val endpoint = createQueue(queueName)
data class FilterPolicy(val status:List<String>, val amount:List<Double>, val sold:List<Boolean>): Serializable
val gson = Gson()
val filterPolicy = FilterPolicy(status=listOf("not_sent"), amount=listOf(5.0), sold=listOf(true))
data class FilterPolicy(val status:List<String>, val amount:List<JsonObject>, val sold:List<Boolean>): Serializable
val numericMatch = buildNumericPolicy(listOf("=", 5.0))
val filterPolicy = JsonObject.mapFrom(FilterPolicy(status=listOf("not_sent"), amount=listOf(numericMatch), sold=listOf(true)))
subscribe(
topic.arn,
endpoint,
"sqs",
mapOf(
"FilterPolicy" to gson.toJson(filterPolicy),
"FilterPolicy" to filterPolicy.toString(),
"FilterPolicyScope" to "MessageBody",
)
)
data class Message(val status:String, val amount:Double, val sold:Boolean)
val message = Message(status="not_sent", amount=5.0, sold=true)

val gson = Gson()
val request = publishRequest(
topic,
gson.toJson(message),
Expand All @@ -370,14 +410,60 @@ class PublishRouteIntegrationTest: BaseTest() {
}
}

private fun buildNumericPolicy(params: List<Any>): JsonObject {
val numericMatch = JsonObject()
val matchParams = JsonArray()
params.forEach {
matchParams.add(it)
}
numericMatch.put("numeric", matchParams)
return numericMatch
}

@Test
fun `FilterPolicy MessageBody - it does publish with exact numeric match`(testContext: VertxTestContext) {
val topic = createTopicModel("topic1")
val queueName = "filter-policy-messagebody-numeric-exact-queue"
val endpoint = createQueue(queueName)
data class FilterPolicy(val amount:List<JsonObject>): Serializable
val numericMatch = buildNumericPolicy(listOf("=", 5.0))
val filterPolicy = JsonObject.mapFrom(FilterPolicy(amount=listOf(numericMatch)))
subscribe(
topic.arn,
endpoint,
"sqs",
mapOf(
"FilterPolicy" to filterPolicy.toString(),
"FilterPolicyScope" to "MessageBody",
)
)
data class Message(val status:String, val amount:Double, val sold:Boolean)
val message = Message(status="not_sent", amount=5.0, sold=true)

val gson = Gson()
val request = publishRequest(
topic,
gson.toJson(message),
)
snsClient.publish(request)

val queueUrl = createQueueUrl(queueName)
startReceivingMessages(queueUrl) { response ->
val messages = response.messages()
if (messages.isNotEmpty()) {
testContext.completeNow()
}
}
}

@Test
fun `FilterPolicy MessageBody - it does not publish when one or more message body attributes do not match`(testContext: VertxTestContext) {
val topic = createTopicModel("topic1")
val queueName = "filter-policy-messagebody-nomatch-queue"
val endpoint = createQueue(queueName)
data class FilterPolicy(val status:List<String>, val amount:List<Double>, val sold:List<Boolean>): Serializable
val gson = Gson()
val filterPolicy = FilterPolicy(status=listOf("not_sent"), amount=listOf(10.5), sold=listOf(true))
val gson = Gson()
subscribe(
topic.arn,
endpoint,
Expand Down
Loading