diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 80b1af5b1..3743e3a18 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -128,7 +128,7 @@ import socket import string import uuid -from datetime import datetime +from datetime import datetime, timedelta, timezone from queue import Empty from botocore.client import Config @@ -734,23 +734,23 @@ def sqs(self, queue=None): return c def _handle_sts_session(self, queue, q): - if not hasattr(self, 'sts_expiration'): # STS token - token init + datetime_now_utc = datetime.now(timezone.utc).replace( + tzinfo=None + ) + sts_token_timeout = self.transport_options.get( + 'sts_token_timeout', 900 + ) + # STS token is generated only if it is not present or + # the time reaches 80% of the token TTL + if (not hasattr(self, 'sts_expiration')) or ( + self.sts_expiration.replace(tzinfo=None) + - timedelta(seconds=int(sts_token_timeout * 0.2)) + < datetime_now_utc + ): sts_creds = self.generate_sts_session_token( self.transport_options.get('sts_role_arn'), - self.transport_options.get('sts_token_timeout', 900)) - self.sts_expiration = sts_creds['Expiration'] - c = self._predefined_queue_clients[queue] = self.new_sqs_client( - region=q.get('region', self.region), - access_key_id=sts_creds['AccessKeyId'], - secret_access_key=sts_creds['SecretAccessKey'], - session_token=sts_creds['SessionToken'], + sts_token_timeout, ) - return c - # STS token - refresh if expired - elif self.sts_expiration.replace(tzinfo=None) < datetime.utcnow(): - sts_creds = self.generate_sts_session_token( - self.transport_options.get('sts_role_arn'), - self.transport_options.get('sts_token_timeout', 900)) self.sts_expiration = sts_creds['Expiration'] c = self._predefined_queue_clients[queue] = self.new_sqs_client( region=q.get('region', self.region), @@ -759,7 +759,7 @@ def _handle_sts_session(self, queue, q): session_token=sts_creds['SessionToken'], ) return c - else: # STS token - ruse existing + else: # STS token still valid - reuse existing return self._predefined_queue_clients[queue] def generate_sts_session_token(self, role_arn, token_expiry_seconds):