diff --git a/.github/workflows/test_sqs.yml b/.github/workflows/test_sqs.yml
new file mode 100644
index 000000000000..04e97bfa20c7
--- /dev/null
+++ b/.github/workflows/test_sqs.yml
@@ -0,0 +1,39 @@
+# Run separate test cases to verify SQS works with multiple botocore versions (/multiple response-types, QUERY and JSON)
+#
+name: "SQS Tests"
+
+on:
+ pull_request:
+ types: [ labeled ]
+
+jobs:
+ test:
+ if: ${{ github.event.label.name == 'service-sqs' }}
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ python-version: [ "3.11" ]
+ botocore-version: ["1.29.126", "1.29.127", "1.29.128"]
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Update pip
+ run: |
+ python -m pip install --upgrade pip
+
+ - name: Install project dependencies
+ run: |
+ pip install -r requirements-dev.txt
+ pip install botocore==${{ matrix.botocore-version }}
+
+ - name: Run tests
+ run: |
+ pytest -sv tests/test_sqs
diff --git a/moto/core/exceptions.py b/moto/core/exceptions.py
index 579024f7c279..5c0e306c2ada 100644
--- a/moto/core/exceptions.py
+++ b/moto/core/exceptions.py
@@ -84,6 +84,11 @@ def get_body(
) -> str:
return self.description # type: ignore[return-value]
+ def to_json(self) -> "JsonRESTError":
+ err = JsonRESTError(error_type=self.error_type, message=self.message)
+ err.code = self.code
+ return err
+
class DryRunClientError(RESTError):
code = 412
diff --git a/moto/sqs/models.py b/moto/sqs/models.py
index 7d775d274dbb..308afd4ee827 100644
--- a/moto/sqs/models.py
+++ b/moto/sqs/models.py
@@ -162,6 +162,10 @@ def utf8(value: Any) -> bytes: # type: ignore[misc]
def body(self) -> str:
return escape(self._body).replace('"', """).replace("\r", "
")
+ @property
+ def original_body(self) -> str:
+ return self._body
+
def mark_sent(self, delay_seconds: Optional[int] = None) -> None:
self.sent_timestamp = int(unix_time_millis()) # type: ignore
if delay_seconds:
@@ -1030,7 +1034,7 @@ def delete_message_batch(
errors.append(
{
"Id": receipt_and_id["msg_user_id"],
- "SenderFault": "true",
+ "SenderFault": True,
"Code": "ReceiptHandleIsInvalid",
"Message": f'The input receipt handle "{receipt_and_id["receipt_handle"]}" is not a valid receipt handle.',
}
diff --git a/moto/sqs/responses.py b/moto/sqs/responses.py
index 56d1d93e1987..32ed869d1d42 100644
--- a/moto/sqs/responses.py
+++ b/moto/sqs/responses.py
@@ -1,11 +1,18 @@
+import json
import re
-from typing import Any, Dict, Optional, Tuple, Union
+from functools import wraps
+from typing import Any, Callable, Dict, Optional, Union
from moto.core.common_types import TYPE_RESPONSE
-from moto.core.exceptions import RESTError
+from moto.core.exceptions import JsonRESTError
from moto.core.responses import BaseResponse
-from moto.core.utils import underscores_to_camelcase, camelcase_to_pascal
+from moto.core.utils import (
+ underscores_to_camelcase,
+ camelcase_to_pascal,
+ camelcase_to_underscores,
+)
from moto.utilities.aws_headers import amz_crc32, amzn_request_id
+from moto.utilities.constants import JSON_TYPES
from urllib.parse import urlparse
from .constants import (
@@ -14,12 +21,36 @@
MAXIMUM_VISIBILITY_TIMEOUT,
)
from .exceptions import (
+ RESTError,
EmptyBatchRequest,
InvalidAttributeName,
BatchEntryIdsNotDistinct,
)
from .models import sqs_backends, SQSBackend
-from .utils import parse_message_attributes, extract_input_message_attributes
+from .utils import (
+ parse_message_attributes,
+ extract_input_message_attributes,
+ validate_message_attributes,
+)
+
+
+def jsonify_error(
+ method: Callable[["SQSResponse"], Union[str, TYPE_RESPONSE]]
+) -> Callable[["SQSResponse"], Union[str, TYPE_RESPONSE]]:
+ """
+ The decorator to convert an RESTError to JSON, if necessary
+ """
+
+ @wraps(method)
+ def f(self: "SQSResponse") -> Union[str, TYPE_RESPONSE]:
+ try:
+ return method(self)
+ except RESTError as e:
+ if self.is_json():
+ raise e.to_json()
+ raise e
+
+ return f
class SQSResponse(BaseResponse):
@@ -29,27 +60,52 @@ class SQSResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="sqs")
+ def is_json(self) -> bool:
+ """
+ botocore 1.29.127 changed the wire-format to SQS
+ This means three things:
+ - The Content-Type is set to JSON
+ - The input-parameters are in different formats
+ - The output is in a different format
+
+ The change has been reverted for now, but it will be re-introduced later:
+ https://github.com/boto/botocore/pull/2931
+ """
+ return self.headers.get("Content-Type") in JSON_TYPES
+
@property
def sqs_backend(self) -> SQSBackend:
return sqs_backends[self.current_account][self.region]
@property
def attribute(self) -> Any: # type: ignore[misc]
- if not hasattr(self, "_attribute"):
- self._attribute = self._get_map_prefix(
- "Attribute", key_end=".Name", value_end=".Value"
- )
- return self._attribute
+ try:
+ assert self.is_json()
+ return json.loads(self.body).get("Attributes", {})
+ except: # noqa: E722 Do not use bare except
+ if not hasattr(self, "_attribute"):
+ self._attribute = self._get_map_prefix(
+ "Attribute", key_end=".Name", value_end=".Value"
+ )
+ return self._attribute
@property
def tags(self) -> Dict[str, str]:
if not hasattr(self, "_tags"):
- self._tags = self._get_map_prefix("Tag", key_end=".Key", value_end=".Value")
+ if self.is_json():
+ self._tags = self._get_param("tags")
+ else:
+ self._tags = self._get_map_prefix(
+ "Tag", key_end=".Key", value_end=".Value"
+ )
return self._tags
def _get_queue_name(self) -> str:
try:
- queue_url = self.querystring.get("QueueUrl")[0] # type: ignore
+ if self.is_json():
+ queue_url = self._get_param("QueueUrl")
+ else:
+ queue_url = self.querystring.get("QueueUrl")[0] # type: ignore
if queue_url.startswith("http://") or queue_url.startswith("https://"):
return queue_url.split("/")[-1]
else:
@@ -67,7 +123,10 @@ def _get_validated_visibility_timeout(self, timeout: Optional[str] = None) -> in
if timeout is not None:
visibility_timeout = int(timeout)
else:
- visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) # type: ignore
+ if self.is_json():
+ visibility_timeout = self._get_param("VisibilityTimeout")
+ else:
+ visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0]) # type: ignore
if visibility_timeout > MAXIMUM_VISIBILITY_TIMEOUT:
raise ValueError
@@ -85,45 +144,66 @@ def call_action(self) -> TYPE_RESPONSE:
return 404, headers, response
return status_code, headers, body
- def _error(
- self, code: str, message: str, status: int = 400
- ) -> Tuple[str, Dict[str, int]]:
+ def _error(self, code: str, message: str, status: int = 400) -> TYPE_RESPONSE:
+ if self.is_json():
+ err = JsonRESTError(error_type=code, message=message)
+ err.code = status
+ raise err
template = self.response_template(ERROR_TEMPLATE)
- return template.render(code=code, message=message), dict(status=status)
+ return status, {"status": status}, template.render(code=code, message=message)
+ @jsonify_error
def create_queue(self) -> str:
request_url = urlparse(self.uri)
queue_name = self._get_param("QueueName")
queue = self.sqs_backend.create_queue(queue_name, self.tags, **self.attribute)
+ if self.is_json():
+ return json.dumps({"QueueUrl": queue.url(request_url)})
+
template = self.response_template(CREATE_QUEUE_RESPONSE)
return template.render(queue_url=queue.url(request_url))
+ @jsonify_error
def get_queue_url(self) -> str:
request_url = urlparse(self.uri)
queue_name = self._get_param("QueueName")
queue = self.sqs_backend.get_queue_url(queue_name)
+ if self.is_json():
+ return json.dumps({"QueueUrl": queue.url(request_url)})
+
template = self.response_template(GET_QUEUE_URL_RESPONSE)
return template.render(queue_url=queue.url(request_url))
+ @jsonify_error
def list_queues(self) -> str:
request_url = urlparse(self.uri)
queue_name_prefix = self._get_param("QueueNamePrefix")
queues = self.sqs_backend.list_queues(queue_name_prefix)
+
+ if self.is_json():
+ if queues:
+ return json.dumps(
+ {"QueueUrls": [queue.url(request_url) for queue in queues]}
+ )
+ else:
+ return "{}"
+
template = self.response_template(LIST_QUEUES_RESPONSE)
return template.render(queues=queues, request_url=request_url)
- def change_message_visibility(self) -> Union[str, Tuple[str, Dict[str, int]]]:
+ @jsonify_error
+ def change_message_visibility(self) -> Union[str, TYPE_RESPONSE]:
queue_name = self._get_queue_name()
receipt_handle = self._get_param("ReceiptHandle")
try:
visibility_timeout = self._get_validated_visibility_timeout()
except ValueError:
- return ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE, dict(status=400)
+ return 400, {}, ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE
self.sqs_backend.change_message_visibility(
queue_name=queue_name,
@@ -131,36 +211,70 @@ def change_message_visibility(self) -> Union[str, Tuple[str, Dict[str, int]]]:
visibility_timeout=visibility_timeout,
)
+ if self.is_json():
+ return "{}"
+
template = self.response_template(CHANGE_MESSAGE_VISIBILITY_RESPONSE)
return template.render()
+ @jsonify_error
def change_message_visibility_batch(self) -> str:
queue_name = self._get_queue_name()
- entries = self._get_list_prefix("ChangeMessageVisibilityBatchRequestEntry")
+ if self.is_json():
+ entries = [
+ {camelcase_to_underscores(key): value for key, value in entr.items()}
+ for entr in self._get_param("Entries")
+ ]
+ else:
+ entries = self._get_list_prefix("ChangeMessageVisibilityBatchRequestEntry")
success, error = self.sqs_backend.change_message_visibility_batch(
queue_name, entries
)
+ if self.is_json():
+ return json.dumps(
+ {"Successful": [{"Id": _id} for _id in success], "Failed": error}
+ )
template = self.response_template(CHANGE_MESSAGE_VISIBILITY_BATCH_RESPONSE)
return template.render(success=success, errors=error)
+ @jsonify_error
def get_queue_attributes(self) -> str:
queue_name = self._get_queue_name()
- if self.querystring.get("AttributeNames"):
+ if not self.is_json() and self.querystring.get("AttributeNames"):
raise InvalidAttributeName("")
- # if connecting to AWS via boto, then 'AttributeName' is just a normal parameter
- attribute_names = self._get_multi_param(
- "AttributeName"
- ) or self.querystring.get("AttributeName")
+ if self.is_json():
+ attribute_names = self._get_param("AttributeNames")
+ if attribute_names == [] or (attribute_names and "" in attribute_names):
+ raise InvalidAttributeName("")
+ else:
+ # if connecting to AWS via boto, then 'AttributeName' is just a normal parameter
+ attribute_names = self._get_multi_param(
+ "AttributeName"
+ ) or self.querystring.get("AttributeName")
attributes = self.sqs_backend.get_queue_attributes(queue_name, attribute_names) # type: ignore
+ if self.is_json():
+ if len(attributes) == 0:
+ return "{}"
+ return json.dumps(
+ {
+ "Attributes": {
+ key: str(value)
+ for key, value in attributes.items()
+ if value is not None
+ }
+ }
+ )
+
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
return template.render(attributes=attributes)
+ @jsonify_error
def set_queue_attributes(self) -> str:
# TODO validate self.get_param('QueueUrl')
attribute = self.attribute
@@ -177,6 +291,7 @@ def set_queue_attributes(self) -> str:
return SET_QUEUE_ATTRIBUTE_RESPONSE
+ @jsonify_error
def delete_queue(self) -> str:
# TODO validate self.get_param('QueueUrl')
queue_name = self._get_queue_name()
@@ -186,19 +301,32 @@ def delete_queue(self) -> str:
template = self.response_template(DELETE_QUEUE_RESPONSE)
return template.render()
- def send_message(self) -> Union[str, Tuple[str, Dict[str, int]]]:
+ @jsonify_error
+ def send_message(self) -> Union[str, TYPE_RESPONSE]:
message = self._get_param("MessageBody")
delay_seconds = int(self._get_param("DelaySeconds", 0))
message_group_id = self._get_param("MessageGroupId")
message_dedupe_id = self._get_param("MessageDeduplicationId")
if len(message) > MAXIMUM_MESSAGE_LENGTH:
- return ERROR_TOO_LONG_RESPONSE, dict(status=400)
+ return self._error(
+ "InvalidParameterValue",
+ message="One or more parameters are invalid. Reason: Message must be shorter than 262144 bytes.",
+ )
- message_attributes = parse_message_attributes(self.querystring)
- system_message_attributes = parse_message_attributes(
- self.querystring, key="MessageSystemAttribute"
- )
+ if self.is_json():
+ message_attributes = self._get_param("MessageAttributes")
+ self.normalize_json_msg_attributes(message_attributes)
+ else:
+ message_attributes = parse_message_attributes(self.querystring)
+
+ if self.is_json():
+ system_message_attributes = self._get_param("MessageSystemAttributes")
+ self.normalize_json_msg_attributes(system_message_attributes)
+ else:
+ system_message_attributes = parse_message_attributes(
+ self.querystring, key="MessageSystemAttribute"
+ )
queue_name = self._get_queue_name()
@@ -215,9 +343,30 @@ def send_message(self) -> Union[str, Tuple[str, Dict[str, int]]]:
except RESTError as err:
return self._error(err.error_type, err.message)
+ if self.is_json():
+ resp = {
+ "MD5OfMessageBody": message.body_md5,
+ "MessageId": message.id,
+ }
+ if len(message.message_attributes) > 0:
+ resp["MD5OfMessageAttributes"] = message.attribute_md5
+ return json.dumps(resp)
+
template = self.response_template(SEND_MESSAGE_RESPONSE)
return template.render(message=message, message_attributes=message_attributes)
+ def normalize_json_msg_attributes(self, message_attributes: Dict[str, Any]) -> None:
+ for key, value in (message_attributes or {}).items():
+ if "BinaryValue" in value:
+ message_attributes[key]["binary_value"] = value.pop("BinaryValue")
+ if "StringValue" in value:
+ message_attributes[key]["string_value"] = value.pop("StringValue")
+ if "DataType" in value:
+ message_attributes[key]["data_type"] = value.pop("DataType")
+
+ validate_message_attributes(message_attributes)
+
+ @jsonify_error
def send_message_batch(self) -> str:
"""
The querystring comes like this
@@ -234,39 +383,33 @@ def send_message_batch(self) -> str:
self.sqs_backend.get_queue(queue_name)
- if self.querystring.get("Entries"):
+ if not self.is_json() and self.querystring.get("Entries"):
raise EmptyBatchRequest()
-
- entries = {}
- for key, value in self.querystring.items():
- match = re.match(r"^SendMessageBatchRequestEntry\.(\d+)\.Id", key)
- if match:
- index = match.group(1)
-
- message_attributes = parse_message_attributes(
- self.querystring,
- base=f"SendMessageBatchRequestEntry.{index}.",
+ if self.is_json():
+ entries = {
+ str(idx): entry for idx, entry in enumerate(self._get_param("Entries"))
+ }
+ else:
+ entries = {
+ str(idx): entry
+ for idx, entry in enumerate(
+ self._get_multi_param("SendMessageBatchRequestEntry")
)
-
- entries[index] = {
- "Id": value[0],
- "MessageBody": self.querystring.get( # type: ignore
- f"SendMessageBatchRequestEntry.{index}.MessageBody"
- )[0],
- "DelaySeconds": self.querystring.get(
- f"SendMessageBatchRequestEntry.{index}.DelaySeconds",
- [None],
- )[0],
- "MessageAttributes": message_attributes,
- "MessageGroupId": self.querystring.get(
- f"SendMessageBatchRequestEntry.{index}.MessageGroupId",
- [None],
- )[0],
- "MessageDeduplicationId": self.querystring.get(
- f"SendMessageBatchRequestEntry.{index}.MessageDeduplicationId",
- [None],
- )[0],
- }
+ }
+ for entry in entries.values():
+ if "MessageAttribute" in entry:
+ entry["MessageAttributes"] = {
+ val["Name"]: val["Value"]
+ for val in entry.pop("MessageAttribute")
+ }
+
+ for entry in entries.values():
+ if "MessageAttributes" in entry:
+ self.normalize_json_msg_attributes(entry["MessageAttributes"])
+ else:
+ entry["MessageAttributes"] = {}
+ if "DelaySeconds" not in entry:
+ entry["DelaySeconds"] = None
if entries == {}:
raise EmptyBatchRequest()
@@ -286,16 +429,38 @@ def send_message_batch(self) -> str:
}
)
+ if self.is_json():
+ resp: Dict[str, Any] = {"Successful": [], "Failed": errors}
+ for msg in messages:
+ msg_dict = {
+ "Id": msg.user_id, # type: ignore
+ "MessageId": msg.id,
+ "MD5OfMessageBody": msg.body_md5,
+ }
+ if len(msg.message_attributes) > 0:
+ msg_dict["MD5OfMessageAttributes"] = msg.attribute_md5
+ resp["Successful"].append(msg_dict)
+ return json.dumps(resp)
+
template = self.response_template(SEND_MESSAGE_BATCH_RESPONSE)
return template.render(messages=messages, errors=errors)
+ @jsonify_error
def delete_message(self) -> str:
queue_name = self._get_queue_name()
- receipt_handle = self.querystring.get("ReceiptHandle")[0] # type: ignore
+ if self.is_json():
+ receipt_handle = self._get_param("ReceiptHandle")
+ else:
+ receipt_handle = self.querystring.get("ReceiptHandle")[0] # type: ignore
self.sqs_backend.delete_message(queue_name, receipt_handle)
+
+ if self.is_json():
+ return "{}"
+
template = self.response_template(DELETE_MESSAGE_RESPONSE)
return template.render()
+ @jsonify_error
def delete_message_batch(self) -> str:
"""
The querystring comes like this
@@ -308,21 +473,17 @@ def delete_message_batch(self) -> str:
"""
queue_name = self._get_queue_name()
- receipts = []
-
- for index in range(1, 11):
- # Loop through looking for messages
- receipt_key = f"DeleteMessageBatchRequestEntry.{index}.ReceiptHandle"
- receipt_handle = self.querystring.get(receipt_key)
- if not receipt_handle:
- # Found all messages
- break
+ if self.is_json():
+ receipts = self._get_param("Entries")
+ else:
+ receipts = self._get_multi_param("DeleteMessageBatchRequestEntry")
- message_user_id_key = f"DeleteMessageBatchRequestEntry.{index}.Id"
- message_user_id = self.querystring.get(message_user_id_key)[0] # type: ignore
- receipts.append(
- {"receipt_handle": receipt_handle[0], "msg_user_id": message_user_id}
- )
+ for r in receipts:
+ for key in list(r.keys()):
+ if key == "Id":
+ r["msg_user_id"] = r.pop(key)
+ else:
+ r[camelcase_to_underscores(key)] = r.pop(key)
receipt_seen = set()
for receipt_and_id in receipts:
@@ -333,27 +494,45 @@ def delete_message_batch(self) -> str:
success, errors = self.sqs_backend.delete_message_batch(queue_name, receipts)
+ if self.is_json():
+ return json.dumps(
+ {"Successful": [{"Id": _id} for _id in success], "Failed": errors}
+ )
+
template = self.response_template(DELETE_MESSAGE_BATCH_RESPONSE)
return template.render(success=success, errors=errors)
+ @jsonify_error
def purge_queue(self) -> str:
queue_name = self._get_queue_name()
self.sqs_backend.purge_queue(queue_name)
template = self.response_template(PURGE_QUEUE_RESPONSE)
return template.render()
- def receive_message(self) -> Union[str, Tuple[str, Dict[str, int]]]:
+ @jsonify_error
+ def receive_message(self) -> Union[str, TYPE_RESPONSE]:
queue_name = self._get_queue_name()
- message_attributes = self._get_multi_param("message_attributes")
+ if self.is_json():
+ message_attributes = self._get_param("MessageAttributeNames")
+ else:
+ message_attributes = self._get_multi_param("message_attributes")
if not message_attributes:
message_attributes = extract_input_message_attributes(self.querystring)
- attribute_names = self._get_multi_param("AttributeName")
+ if self.is_json():
+ attribute_names = self._get_param("AttributeNames", [])
+ else:
+ attribute_names = self._get_multi_param("AttributeName")
queue = self.sqs_backend.get_queue(queue_name)
try:
- message_count = int(self.querystring.get("MaxNumberOfMessages")[0]) # type: ignore
+ if self.is_json():
+ message_count = self._get_param(
+ "MaxNumberOfMessages", DEFAULT_RECEIVED_MESSAGES
+ )
+ else:
+ message_count = int(self.querystring.get("MaxNumberOfMessages")[0]) # type: ignore
except TypeError:
message_count = DEFAULT_RECEIVED_MESSAGES
@@ -367,7 +546,10 @@ def receive_message(self) -> Union[str, Tuple[str, Dict[str, int]]]:
)
try:
- wait_time = int(self.querystring.get("WaitTimeSeconds")[0]) # type: ignore
+ if self.is_json():
+ wait_time = int(self._get_param("WaitTimeSeconds"))
+ else:
+ wait_time = int(self.querystring.get("WaitTimeSeconds")[0]) # type: ignore
except TypeError:
wait_time = int(queue.receive_message_wait_time_seconds) # type: ignore
@@ -385,7 +567,7 @@ def receive_message(self) -> Union[str, Tuple[str, Dict[str, int]]]:
except TypeError:
visibility_timeout = queue.visibility_timeout # type: ignore
except ValueError:
- return ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE, dict(status=400)
+ return 400, {}, ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE
messages = self.sqs_backend.receive_message(
queue_name, message_count, wait_time, visibility_timeout, message_attributes
@@ -406,22 +588,98 @@ def receive_message(self) -> Union[str, Tuple[str, Dict[str, int]]]:
if any(x in ["All", pascalcase_name] for x in attribute_names):
attributes[attribute] = True
+ if self.is_json():
+ msgs = []
+ for message in messages:
+ msg: Dict[str, Any] = {
+ "MessageId": message.id,
+ "ReceiptHandle": message.receipt_handle,
+ "MD5OfBody": message.body_md5,
+ "Body": message.original_body,
+ "Attributes": {},
+ "MessageAttributes": {},
+ }
+ if len(message.message_attributes) > 0:
+ msg["MD5OfMessageAttributes"] = message.attribute_md5
+ if attributes["sender_id"]:
+ msg["Attributes"]["SenderId"] = message.sender_id
+ if attributes["sent_timestamp"]:
+ msg["Attributes"]["SentTimestamp"] = str(message.sent_timestamp)
+ if attributes["approximate_receive_count"]:
+ msg["Attributes"]["ApproximateReceiveCount"] = str(
+ message.approximate_receive_count
+ )
+ if attributes["approximate_first_receive_timestamp"]:
+ msg["Attributes"]["ApproximateFirstReceiveTimestamp"] = str(
+ message.approximate_first_receive_timestamp
+ )
+ if attributes["message_deduplication_id"]:
+ msg["Attributes"][
+ "MessageDeduplicationId"
+ ] = message.deduplication_id
+ if attributes["message_group_id"] and message.group_id is not None:
+ msg["Attributes"]["MessageGroupId"] = message.group_id
+ if message.system_attributes and message.system_attributes.get(
+ "AWSTraceHeader"
+ ):
+ msg["Attributes"]["AWSTraceHeader"] = message.system_attributes[
+ "AWSTraceHeader"
+ ].get("string_value")
+ if (
+ attributes["sequence_number"]
+ and message.sequence_number is not None
+ ):
+ msg["Attributes"]["SequenceNumber"] = message.sequence_number
+ for name, value in message.message_attributes.items():
+ msg["MessageAttributes"][name] = {"DataType": value["data_type"]}
+ if "Binary" in value["data_type"]:
+ msg["MessageAttributes"][name]["BinaryValue"] = value[
+ "binary_value"
+ ]
+ else:
+ msg["MessageAttributes"][name]["StringValue"] = value[
+ "string_value"
+ ]
+
+ if len(msg["Attributes"]) == 0:
+ msg.pop("Attributes")
+ if len(msg["MessageAttributes"]) == 0:
+ msg.pop("MessageAttributes")
+ msgs.append(msg)
+
+ return json.dumps({"Messages": msgs} if msgs else {})
+
template = self.response_template(RECEIVE_MESSAGE_RESPONSE)
return template.render(messages=messages, attributes=attributes)
+ @jsonify_error
def list_dead_letter_source_queues(self) -> str:
request_url = urlparse(self.uri)
queue_name = self._get_queue_name()
- source_queue_urls = self.sqs_backend.list_dead_letter_source_queues(queue_name)
+ queues = self.sqs_backend.list_dead_letter_source_queues(queue_name)
+
+ if self.is_json():
+ return json.dumps(
+ {"queueUrls": [queue.url(request_url) for queue in queues]}
+ )
template = self.response_template(LIST_DEAD_LETTER_SOURCE_QUEUES_RESPONSE)
- return template.render(queues=source_queue_urls, request_url=request_url)
+ return template.render(queues=queues, request_url=request_url)
+ @jsonify_error
def add_permission(self) -> str:
queue_name = self._get_queue_name()
- actions = self._get_multi_param("ActionName")
- account_ids = self._get_multi_param("AWSAccountId")
+ actions = (
+ self._get_param("Actions")
+ if self.is_json()
+ else self._get_multi_param("ActionName")
+ )
+ account_ids = (
+ self._get_param("AWSAccountIds")
+ if self.is_json()
+ else self._get_multi_param("AWSAccountId")
+ )
label = self._get_param("Label")
self.sqs_backend.add_permission(queue_name, actions, account_ids, label)
@@ -429,6 +687,7 @@ def add_permission(self) -> str:
template = self.response_template(ADD_PERMISSION_RESPONSE)
return template.render()
+ @jsonify_error
def remove_permission(self) -> str:
queue_name = self._get_queue_name()
label = self._get_param("Label")
@@ -438,21 +697,36 @@ def remove_permission(self) -> str:
template = self.response_template(REMOVE_PERMISSION_RESPONSE)
return template.render()
+ @jsonify_error
def tag_queue(self) -> str:
queue_name = self._get_queue_name()
- tags = self._get_map_prefix("Tag", key_end=".Key", value_end=".Value")
+ if self.is_json():
+ tags = self._get_param("Tags")
+ else:
+ tags = self._get_map_prefix("Tag", key_end=".Key", value_end=".Value")
self.sqs_backend.tag_queue(queue_name, tags)
+ if self.is_json():
+ return "{}"
+
template = self.response_template(TAG_QUEUE_RESPONSE)
return template.render()
+ @jsonify_error
def untag_queue(self) -> str:
queue_name = self._get_queue_name()
- tag_keys = self._get_multi_param("TagKey")
+ tag_keys = (
+ self._get_param("TagKeys")
+ if self.is_json()
+ else self._get_multi_param("TagKey")
+ )
self.sqs_backend.untag_queue(queue_name, tag_keys)
+ if self.is_json():
+ return "{}"
+
template = self.response_template(UNTAG_QUEUE_RESPONSE)
return template.render()
@@ -461,6 +735,9 @@ def list_queue_tags(self) -> str:
queue = self.sqs_backend.list_queue_tags(queue_name)
+ if self.is_json():
+ return json.dumps({"Tags": queue.tags})
+
template = self.response_template(LIST_QUEUE_TAGS_RESPONSE)
return template.render(tags=queue.tags)
@@ -665,7 +942,7 @@ def list_queue_tags(self) -> str:
{{ error_dict['Id'] }}
{{ error_dict['Code'] }}
{{ error_dict['Message'] }}
- {{ error_dict['SenderFault'] }}
+ {{ 'true' if error_dict['SenderFault'] else 'false' }}
{% endfor %}
@@ -756,16 +1033,6 @@ def list_queue_tags(self) -> str:
"""
-ERROR_TOO_LONG_RESPONSE = """
-
- Sender
- InvalidParameterValue
- One or more parameters are invalid. Reason: Message must be shorter than 262144 bytes.
-
-
- 6fde8d1e-52cd-4581-8cd9-c512f4c64223
-"""
-
ERROR_MAX_VISIBILITY_TIMEOUT_RESPONSE = (
f"Invalid request, maximum visibility timeout is {MAXIMUM_VISIBILITY_TIMEOUT}"
)
diff --git a/moto/sqs/utils.py b/moto/sqs/utils.py
index 33ba0ec9bf42..7f1d777d6699 100644
--- a/moto/sqs/utils.py
+++ b/moto/sqs/utils.py
@@ -43,38 +43,51 @@ def parse_message_attributes(
break
data_type_key = base + f"{key}.{index}.{value_namespace}DataType"
- data_type = querystring.get(data_type_key)
- if not data_type:
- raise MessageAttributesInvalid(
- f"The message attribute '{name[0]}' must contain non-empty message attribute value."
- )
+ data_type = querystring.get(data_type_key, [None])[0]
- data_type_parts = data_type[0].split(".")
- if data_type_parts[0] not in [
- "String",
- "Binary",
- "Number",
- ]:
- raise MessageAttributesInvalid(
- f"The message attribute '{name[0]}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
- )
+ data_type_parts = (data_type or "").split(".")[0]
type_prefix = "String"
- if data_type_parts[0] == "Binary":
+ if data_type_parts == "Binary":
type_prefix = "Binary"
value_key = base + f"{key}.{index}.{value_namespace}{type_prefix}Value"
- value = querystring.get(value_key)
- if not value:
- raise MessageAttributesInvalid(
- f"The message attribute '{name[0]}' must contain non-empty message attribute value for message attribute type '{data_type[0]}'."
- )
+ value = querystring.get(value_key, [None])[0]
message_attributes[name[0]] = {
- "data_type": data_type[0],
- type_prefix.lower() + "_value": value[0],
+ "data_type": data_type,
+ type_prefix.lower() + "_value": value,
}
index += 1
+ validate_message_attributes(message_attributes)
+
return message_attributes
+
+
+def validate_message_attributes(message_attributes: Dict[str, Any]) -> None:
+ for name, value in (message_attributes or {}).items():
+ data_type = value["data_type"]
+
+ if not data_type:
+ raise MessageAttributesInvalid(
+ f"The message attribute '{name}' must contain non-empty message attribute value."
+ )
+
+ data_type_parts = data_type.split(".")[0]
+ if data_type_parts not in [
+ "String",
+ "Binary",
+ "Number",
+ ]:
+ raise MessageAttributesInvalid(
+ f"The message attribute '{name}' has an invalid message attribute type, the set of supported type prefixes is Binary, Number, and String."
+ )
+
+ possible_value_fields = ["string_value", "binary_value"]
+ for field in possible_value_fields:
+ if field in value and value[field] is None:
+ raise MessageAttributesInvalid(
+ f"The message attribute '{name}' must contain non-empty message attribute value for message attribute type '{data_type}'."
+ )
diff --git a/moto/utilities/constants.py b/moto/utilities/constants.py
new file mode 100644
index 000000000000..cea2a1aeae06
--- /dev/null
+++ b/moto/utilities/constants.py
@@ -0,0 +1,5 @@
+APPLICATION_AMZ_JSON_1_0 = "application/x-amz-json-1.0"
+APPLICATION_AMZ_JSON_1_1 = "application/x-amz-json-1.1"
+APPLICATION_JSON = "application/json"
+
+JSON_TYPES = [APPLICATION_JSON, APPLICATION_AMZ_JSON_1_0, APPLICATION_AMZ_JSON_1_1]